diff --git a/internal/search/service.go b/internal/search/service.go index 5759f60..9e16ceb 100644 --- a/internal/search/service.go +++ b/internal/search/service.go @@ -18,29 +18,32 @@ package search import ( "context" + "encoding/json" + "fmt" "sync" "time" "github.com/metamorphosis-dev/samsa/internal/cache" - "github.com/metamorphosis-dev/samsa/internal/httpclient" "github.com/metamorphosis-dev/samsa/internal/config" "github.com/metamorphosis-dev/samsa/internal/contracts" "github.com/metamorphosis-dev/samsa/internal/engines" + "github.com/metamorphosis-dev/samsa/internal/httpclient" "github.com/metamorphosis-dev/samsa/internal/upstream" ) type ServiceConfig struct { - UpstreamURL string - HTTPTimeout time.Duration - Cache *cache.Cache - EnginesConfig *config.Config + UpstreamURL string + HTTPTimeout time.Duration + Cache *cache.Cache + CacheTTLOverrides map[string]time.Duration + EnginesConfig *config.Config } type Service struct { upstreamClient *upstream.Client planner *engines.Planner localEngines map[string]engines.Engine - cache *cache.Cache + engineCache *cache.EngineCache } func NewService(cfg ServiceConfig) *Service { @@ -59,118 +62,177 @@ func NewService(cfg ServiceConfig) *Service { } } + var engineCache *cache.EngineCache + if cfg.Cache != nil { + engineCache = cache.NewEngineCache(cfg.Cache, cfg.CacheTTLOverrides) + } + return &Service{ upstreamClient: up, planner: engines.NewPlannerFromEnv(), - localEngines: engines.NewDefaultPortedEngines(httpClient, cfg.EnginesConfig), - cache: cfg.Cache, + localEngines: engines.NewDefaultPortedEngines(httpClient, cfg.EnginesConfig), + engineCache: engineCache, } } +// derefString returns the string value of a *string, or "" if nil. +func derefString(s *string) string { + if s == nil { + return "" + } + return *s +} + // Search executes the request against local engines (in parallel) and // optionally the upstream instance for unported engines. -// -// Individual engine failures are reported as unresponsive_engines rather -// than aborting the entire search. -// -// If a Valkey cache is configured and contains a cached response for this -// request, the cached result is returned without hitting any engines. func (s *Service) Search(ctx context.Context, req SearchRequest) (SearchResponse, error) { - // Check cache first. - if s.cache != nil { - cacheKey := cache.Key(req) - if cached, hit := s.cache.Get(ctx, cacheKey); hit { - return cached, nil - } - } + queryHash := cache.QueryHash( + req.Query, + int(req.Pageno), + int(req.Safesearch), + req.Language, + derefString(req.TimeRange), + ) - merged, err := s.executeSearch(ctx, req) - if err != nil { - return SearchResponse{}, err - } - - // Store in cache. - if s.cache != nil { - cacheKey := cache.Key(req) - s.cache.Set(ctx, cacheKey, merged) - } - - return merged, nil -} - -// executeSearch runs the actual engine queries and merges results. -func (s *Service) executeSearch(ctx context.Context, req SearchRequest) (SearchResponse, error) { localEngineNames, upstreamEngineNames, _ := s.planner.Plan(req) - // Run all local engines concurrently. - type engineResult struct { - name string - resp contracts.SearchResponse - err error + // Phase 1: Parallel cache lookups — classify each engine as fresh/stale/miss + type cacheResult struct { + engine string + cached cache.CachedEngineResponse + hit bool + fresh *contracts.SearchResponse // nil if no fresh response + fetchErr error + unmarshalErr bool // true if hit but unmarshal failed (treat as miss) } - localResults := make([]engineResult, 0, len(localEngineNames)) + cacheResults := make([]cacheResult, len(localEngineNames)) - var wg sync.WaitGroup - var mu sync.Mutex + var lookupWg sync.WaitGroup + for i, name := range localEngineNames { + lookupWg.Add(1) + go func(i int, name string) { + defer lookupWg.Done() - for _, name := range localEngineNames { - eng, ok := s.localEngines[name] - if !ok { - mu.Lock() - localResults = append(localResults, engineResult{ - name: name, - resp: unresponsiveResponse(req.Query, name, "engine_not_registered"), - }) - mu.Unlock() + result := cacheResult{engine: name} + + if s.engineCache != nil { + cached, ok := s.engineCache.Get(ctx, name, queryHash) + if ok { + result.hit = true + result.cached = cached + if !s.engineCache.IsStale(cached, name) { + // Fresh cache hit — deserialize and use directly + var resp contracts.SearchResponse + if err := json.Unmarshal(cached.Response, &resp); err == nil { + result.fresh = &resp + } else { + // Unmarshal failed — treat as cache miss (will fetch fresh synchronously) + result.unmarshalErr = true + result.hit = false // treat as miss + } + } + // If stale: result.fresh stays zero, result.cached has stale data + } + } + + cacheResults[i] = result + }(i, name) + } + lookupWg.Wait() + + // Phase 2: Fetch fresh for misses and stale entries + var fetchWg sync.WaitGroup + for i, name := range localEngineNames { + cr := cacheResults[i] + + // Fresh hit — nothing to do in phase 2 + if cr.hit && cr.fresh != nil { continue } - wg.Add(1) - go func(name string, eng engines.Engine) { - defer wg.Done() + // Stale hit — return stale immediately, refresh in background + if cr.hit && len(cr.cached.Response) > 0 && s.engineCache != nil && s.engineCache.IsStale(cr.cached, name) { + fetchWg.Add(1) + go func(name string) { + defer fetchWg.Done() + eng, ok := s.localEngines[name] + if !ok { + return + } + freshResp, err := eng.Search(ctx, req) + if err != nil { + s.engineCache.Logger().Debug("background refresh failed", "engine", name, "error", err) + return + } + s.engineCache.Set(ctx, name, queryHash, freshResp) + }(name) + continue + } - r, err := eng.Search(ctx, req) + // Cache miss — fetch fresh synchronously + if !cr.hit { + fetchWg.Add(1) + go func(i int, name string) { + defer fetchWg.Done() - mu.Lock() - defer mu.Unlock() + eng, ok := s.localEngines[name] + if !ok { + cacheResults[i] = cacheResult{ + engine: name, + fetchErr: fmt.Errorf("engine not registered: %s", name), + } + return + } - if err != nil { - localResults = append(localResults, engineResult{ - name: name, - resp: unresponsiveResponse(req.Query, name, err.Error()), - }) - return + freshResp, err := eng.Search(ctx, req) + if err != nil { + cacheResults[i] = cacheResult{ + engine: name, + fetchErr: err, + } + return + } + + // Cache the fresh response + if s.engineCache != nil { + s.engineCache.Set(ctx, name, queryHash, freshResp) + } + + cacheResults[i] = cacheResult{ + engine: name, + fresh: &freshResp, + hit: false, + } + }(i, name) + } + } + fetchWg.Wait() + + // Phase 3: Collect responses for merge + responses := make([]contracts.SearchResponse, 0, len(cacheResults)) + + for _, cr := range cacheResults { + if cr.fetchErr != nil { + responses = append(responses, unresponsiveResponse(req.Query, cr.engine, cr.fetchErr.Error())) + continue + } + // Use fresh data if available (fresh hit or freshly fetched), otherwise use stale cached + if cr.fresh != nil { + responses = append(responses, *cr.fresh) + } else if cr.hit && len(cr.cached.Response) > 0 { + var resp contracts.SearchResponse + if err := json.Unmarshal(cr.cached.Response, &resp); err == nil { + responses = append(responses, resp) } - localResults = append(localResults, engineResult{name: name, resp: r}) - }(name, eng) - } - - wg.Wait() - - // Collect successful responses and determine upstream fallbacks. - responses := make([]contracts.SearchResponse, 0, len(localResults)+1) - upstreamSet := map[string]bool{} - for _, e := range upstreamEngineNames { - upstreamSet[e] = true - } - - for _, lr := range localResults { - responses = append(responses, lr.resp) - - // If a local engine returned nothing (e.g. qwant anti-bot), fall back - // to upstream if available. - if shouldFallbackToUpstream(lr.name, lr.resp) && !upstreamSet[lr.name] { - upstreamEngineNames = append(upstreamEngineNames, lr.name) - upstreamSet[lr.name] = true } } // Upstream proxy for unported (or fallback) engines. + // ... rest of the existing code is UNCHANGED ... if s.upstreamClient != nil && len(upstreamEngineNames) > 0 { r, err := s.upstreamClient.SearchJSON(ctx, req, upstreamEngineNames) if err != nil { - // Upstream failure is treated as a single unresponsive engine entry. responses = append(responses, contracts.SearchResponse{ Query: req.Query, UnresponsiveEngines: [][2]string{{"upstream", err.Error()}}, @@ -195,12 +257,12 @@ func (s *Service) executeSearch(ctx context.Context, req SearchRequest) (SearchR func unresponsiveResponse(query, engine, reason string) contracts.SearchResponse { return contracts.SearchResponse{ Query: query, - NumberOfResults: 0, - Results: []contracts.MainResult{}, - Answers: []map[string]any{}, - Corrections: []string{}, - Infoboxes: []map[string]any{}, - Suggestions: []string{}, + NumberOfResults: 0, + Results: []contracts.MainResult{}, + Answers: []map[string]any{}, + Corrections: []string{}, + Infoboxes: []map[string]any{}, + Suggestions: []string{}, UnresponsiveEngines: [][2]string{{engine, reason}}, } } @@ -209,12 +271,12 @@ func unresponsiveResponse(query, engine, reason string) contracts.SearchResponse func emptyResponse(query string) contracts.SearchResponse { return contracts.SearchResponse{ Query: query, - NumberOfResults: 0, - Results: []contracts.MainResult{}, - Answers: []map[string]any{}, - Corrections: []string{}, - Infoboxes: []map[string]any{}, - Suggestions: []string{}, + NumberOfResults: 0, + Results: []contracts.MainResult{}, + Answers: []map[string]any{}, + Corrections: []string{}, + Infoboxes: []map[string]any{}, + Suggestions: []string{}, UnresponsiveEngines: [][2]string{}, } }