package upstream import ( "context" "encoding/json" "errors" "fmt" "io" "net/http" "net/url" "strings" "time" "github.com/metamorphosis-dev/kafka/internal/contracts" ) type Client struct { baseURL string http *http.Client } func NewClient(baseURL string, timeout time.Duration) (*Client, error) { if strings.TrimSpace(baseURL) == "" { return nil, errors.New("upstream base URL is empty") } u, err := url.Parse(baseURL) if err != nil { return nil, fmt.Errorf("invalid upstream base URL: %w", err) } // Normalize: trim trailing slash to make URL concatenation predictable. base := strings.TrimRight(u.String(), "/") if timeout <= 0 { timeout = 10 * time.Second } return &Client{ baseURL: base, http: &http.Client{ Timeout: timeout, }, }, nil } func (c *Client) SearchJSON(ctx context.Context, req contracts.SearchRequest, engines []string) (contracts.SearchResponse, error) { // Always request upstream JSON; the Go service will handle csv/rss later. form := url.Values{} form.Set("q", req.Query) form.Set("format", "json") form.Set("pageno", fmt.Sprintf("%d", req.Pageno)) form.Set("safesearch", fmt.Sprintf("%d", req.Safesearch)) form.Set("language", req.Language) if req.TimeRange != nil { form.Set("time_range", *req.TimeRange) } if req.TimeoutLimit != nil { form.Set("timeout_limit", formatFloat(*req.TimeoutLimit)) } if len(req.Categories) > 0 { form.Set("categories", strings.Join(req.Categories, ",")) } if len(engines) > 0 { form.Set("engines", strings.Join(engines, ",")) } for engineName, kv := range req.EngineData { for key, value := range kv { // Mirror the naming convention: `engine_data--=` form.Set(fmt.Sprintf("engine_data-%s-%s", engineName, key), value) } } endpoint := c.baseURL + "/search" httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, endpoint, strings.NewReader(form.Encode())) if err != nil { return contracts.SearchResponse{}, err } httpReq.Header.Set("Content-Type", "application/x-www-form-urlencoded; charset=utf-8") resp, err := c.http.Do(httpReq) if err != nil { return contracts.SearchResponse{}, err } defer resp.Body.Close() body, err := io.ReadAll(io.LimitReader(resp.Body, 4*1024*1024)) if err != nil { return contracts.SearchResponse{}, err } if resp.StatusCode != http.StatusOK { return contracts.SearchResponse{}, fmt.Errorf("upstream search failed: status=%d body=%q", resp.StatusCode, string(body)) } // Decode upstream JSON into our contract types. var out contracts.SearchResponse dec := json.NewDecoder(strings.NewReader(string(body))) if err := dec.Decode(&out); err != nil { return contracts.SearchResponse{}, fmt.Errorf("decode upstream JSON: %w", err) } return out, nil } func formatFloat(f float64) string { // Keep stable formatting for upstream parsing. return strings.TrimRight(strings.TrimRight(fmt.Sprintf("%.6f", f), "0"), ".") }