// kafka — a privacy-respecting metasearch engine // Copyright (C) 2026-present metamorphosis-dev // // This program is free software: you can redistribute it and/or modify // it under the terms of the GNU Affero General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // // This program is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU Affero General Public License for more details. // // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . package search import ( "context" "net/http" "sync" "time" "github.com/metamorphosis-dev/kafka/internal/cache" "github.com/metamorphosis-dev/kafka/internal/config" "github.com/metamorphosis-dev/kafka/internal/contracts" "github.com/metamorphosis-dev/kafka/internal/engines" "github.com/metamorphosis-dev/kafka/internal/upstream" ) type ServiceConfig struct { UpstreamURL string HTTPTimeout time.Duration Cache *cache.Cache EnginesConfig *config.Config } type Service struct { upstreamClient *upstream.Client planner *engines.Planner localEngines map[string]engines.Engine cache *cache.Cache } func NewService(cfg ServiceConfig) *Service { timeout := cfg.HTTPTimeout if timeout <= 0 { timeout = 10 * time.Second } httpClient := &http.Client{Timeout: timeout} var up *upstream.Client if cfg.UpstreamURL != "" { c, err := upstream.NewClient(cfg.UpstreamURL, timeout) if err == nil { up = c } } return &Service{ upstreamClient: up, planner: engines.NewPlannerFromEnv(), localEngines: engines.NewDefaultPortedEngines(httpClient, cfg.EnginesConfig), cache: cfg.Cache, } } // Search executes the request against local engines (in parallel) and // optionally the upstream instance for unported engines. // // Individual engine failures are reported as unresponsive_engines rather // than aborting the entire search. // // If a Valkey cache is configured and contains a cached response for this // request, the cached result is returned without hitting any engines. func (s *Service) Search(ctx context.Context, req SearchRequest) (SearchResponse, error) { // Check cache first. if s.cache != nil { cacheKey := cache.Key(req) if cached, hit := s.cache.Get(ctx, cacheKey); hit { return cached, nil } } merged, err := s.executeSearch(ctx, req) if err != nil { return SearchResponse{}, err } // Store in cache. if s.cache != nil { cacheKey := cache.Key(req) s.cache.Set(ctx, cacheKey, merged) } return merged, nil } // executeSearch runs the actual engine queries and merges results. func (s *Service) executeSearch(ctx context.Context, req SearchRequest) (SearchResponse, error) { localEngineNames, upstreamEngineNames, _ := s.planner.Plan(req) // Run all local engines concurrently. type engineResult struct { name string resp contracts.SearchResponse err error } localResults := make([]engineResult, 0, len(localEngineNames)) var wg sync.WaitGroup var mu sync.Mutex for _, name := range localEngineNames { eng, ok := s.localEngines[name] if !ok { mu.Lock() localResults = append(localResults, engineResult{ name: name, resp: unresponsiveResponse(req.Query, name, "engine_not_registered"), }) mu.Unlock() continue } wg.Add(1) go func(name string, eng engines.Engine) { defer wg.Done() r, err := eng.Search(ctx, req) mu.Lock() defer mu.Unlock() if err != nil { localResults = append(localResults, engineResult{ name: name, resp: unresponsiveResponse(req.Query, name, err.Error()), }) return } localResults = append(localResults, engineResult{name: name, resp: r}) }(name, eng) } wg.Wait() // Collect successful responses and determine upstream fallbacks. responses := make([]contracts.SearchResponse, 0, len(localResults)+1) upstreamSet := map[string]bool{} for _, e := range upstreamEngineNames { upstreamSet[e] = true } for _, lr := range localResults { responses = append(responses, lr.resp) // If a local engine returned nothing (e.g. qwant anti-bot), fall back // to upstream if available. if shouldFallbackToUpstream(lr.name, lr.resp) && !upstreamSet[lr.name] { upstreamEngineNames = append(upstreamEngineNames, lr.name) upstreamSet[lr.name] = true } } // Upstream proxy for unported (or fallback) engines. if s.upstreamClient != nil && len(upstreamEngineNames) > 0 { r, err := s.upstreamClient.SearchJSON(ctx, req, upstreamEngineNames) if err != nil { // Upstream failure is treated as a single unresponsive engine entry. responses = append(responses, contracts.SearchResponse{ Query: req.Query, UnresponsiveEngines: [][2]string{{"upstream", err.Error()}}, }) } else { responses = append(responses, r) } } if len(responses) == 0 { return emptyResponse(req.Query), nil } merged := MergeResponses(responses) if merged.Query == "" { merged.Query = req.Query } return merged, nil } // unresponsiveResponse returns a zero-result response marking the engine as unresponsive. func unresponsiveResponse(query, engine, reason string) contracts.SearchResponse { return contracts.SearchResponse{ Query: query, NumberOfResults: 0, Results: []contracts.MainResult{}, Answers: []map[string]any{}, Corrections: []string{}, Infoboxes: []map[string]any{}, Suggestions: []string{}, UnresponsiveEngines: [][2]string{{engine, reason}}, } } // emptyResponse returns a valid empty response with stable empty slices. func emptyResponse(query string) contracts.SearchResponse { return contracts.SearchResponse{ Query: query, NumberOfResults: 0, Results: []contracts.MainResult{}, Answers: []map[string]any{}, Corrections: []string{}, Infoboxes: []map[string]any{}, Suggestions: []string{}, UnresponsiveEngines: [][2]string{}, } } func shouldFallbackToUpstream(engineName string, r contracts.SearchResponse) bool { if engineName != "qwant" { return false } return len(r.Results) == 0 && len(r.Answers) == 0 && len(r.Infoboxes) == 0 }