search: wire per-engine cache with tier-aware TTLs

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
ashisgreat22 2026-03-24 01:14:31 +01:00
parent b710aec798
commit 26f8e4855b

View file

@ -18,14 +18,16 @@ package search
import (
"context"
"encoding/json"
"fmt"
"sync"
"time"
"github.com/metamorphosis-dev/samsa/internal/cache"
"github.com/metamorphosis-dev/samsa/internal/httpclient"
"github.com/metamorphosis-dev/samsa/internal/config"
"github.com/metamorphosis-dev/samsa/internal/contracts"
"github.com/metamorphosis-dev/samsa/internal/engines"
"github.com/metamorphosis-dev/samsa/internal/httpclient"
"github.com/metamorphosis-dev/samsa/internal/upstream"
)
@ -33,6 +35,7 @@ type ServiceConfig struct {
UpstreamURL string
HTTPTimeout time.Duration
Cache *cache.Cache
CacheTTLOverrides map[string]time.Duration
EnginesConfig *config.Config
}
@ -40,7 +43,7 @@ type Service struct {
upstreamClient *upstream.Client
planner *engines.Planner
localEngines map[string]engines.Engine
cache *cache.Cache
engineCache *cache.EngineCache
}
func NewService(cfg ServiceConfig) *Service {
@ -59,118 +62,177 @@ func NewService(cfg ServiceConfig) *Service {
}
}
var engineCache *cache.EngineCache
if cfg.Cache != nil {
engineCache = cache.NewEngineCache(cfg.Cache, cfg.CacheTTLOverrides)
}
return &Service{
upstreamClient: up,
planner: engines.NewPlannerFromEnv(),
localEngines: engines.NewDefaultPortedEngines(httpClient, cfg.EnginesConfig),
cache: cfg.Cache,
engineCache: engineCache,
}
}
// derefString returns the string value of a *string, or "" if nil.
func derefString(s *string) string {
if s == nil {
return ""
}
return *s
}
// Search executes the request against local engines (in parallel) and
// optionally the upstream instance for unported engines.
//
// Individual engine failures are reported as unresponsive_engines rather
// than aborting the entire search.
//
// If a Valkey cache is configured and contains a cached response for this
// request, the cached result is returned without hitting any engines.
func (s *Service) Search(ctx context.Context, req SearchRequest) (SearchResponse, error) {
// Check cache first.
if s.cache != nil {
cacheKey := cache.Key(req)
if cached, hit := s.cache.Get(ctx, cacheKey); hit {
return cached, nil
}
}
queryHash := cache.QueryHash(
req.Query,
int(req.Pageno),
int(req.Safesearch),
req.Language,
derefString(req.TimeRange),
)
merged, err := s.executeSearch(ctx, req)
if err != nil {
return SearchResponse{}, err
}
// Store in cache.
if s.cache != nil {
cacheKey := cache.Key(req)
s.cache.Set(ctx, cacheKey, merged)
}
return merged, nil
}
// executeSearch runs the actual engine queries and merges results.
func (s *Service) executeSearch(ctx context.Context, req SearchRequest) (SearchResponse, error) {
localEngineNames, upstreamEngineNames, _ := s.planner.Plan(req)
// Run all local engines concurrently.
type engineResult struct {
name string
resp contracts.SearchResponse
err error
// Phase 1: Parallel cache lookups — classify each engine as fresh/stale/miss
type cacheResult struct {
engine string
cached cache.CachedEngineResponse
hit bool
fresh *contracts.SearchResponse // nil if no fresh response
fetchErr error
unmarshalErr bool // true if hit but unmarshal failed (treat as miss)
}
localResults := make([]engineResult, 0, len(localEngineNames))
cacheResults := make([]cacheResult, len(localEngineNames))
var wg sync.WaitGroup
var mu sync.Mutex
var lookupWg sync.WaitGroup
for i, name := range localEngineNames {
lookupWg.Add(1)
go func(i int, name string) {
defer lookupWg.Done()
for _, name := range localEngineNames {
eng, ok := s.localEngines[name]
if !ok {
mu.Lock()
localResults = append(localResults, engineResult{
name: name,
resp: unresponsiveResponse(req.Query, name, "engine_not_registered"),
})
mu.Unlock()
result := cacheResult{engine: name}
if s.engineCache != nil {
cached, ok := s.engineCache.Get(ctx, name, queryHash)
if ok {
result.hit = true
result.cached = cached
if !s.engineCache.IsStale(cached, name) {
// Fresh cache hit — deserialize and use directly
var resp contracts.SearchResponse
if err := json.Unmarshal(cached.Response, &resp); err == nil {
result.fresh = &resp
} else {
// Unmarshal failed — treat as cache miss (will fetch fresh synchronously)
result.unmarshalErr = true
result.hit = false // treat as miss
}
}
// If stale: result.fresh stays zero, result.cached has stale data
}
}
cacheResults[i] = result
}(i, name)
}
lookupWg.Wait()
// Phase 2: Fetch fresh for misses and stale entries
var fetchWg sync.WaitGroup
for i, name := range localEngineNames {
cr := cacheResults[i]
// Fresh hit — nothing to do in phase 2
if cr.hit && cr.fresh != nil {
continue
}
wg.Add(1)
go func(name string, eng engines.Engine) {
defer wg.Done()
r, err := eng.Search(ctx, req)
mu.Lock()
defer mu.Unlock()
if err != nil {
localResults = append(localResults, engineResult{
name: name,
resp: unresponsiveResponse(req.Query, name, err.Error()),
})
// Stale hit — return stale immediately, refresh in background
if cr.hit && len(cr.cached.Response) > 0 && s.engineCache != nil && s.engineCache.IsStale(cr.cached, name) {
fetchWg.Add(1)
go func(name string) {
defer fetchWg.Done()
eng, ok := s.localEngines[name]
if !ok {
return
}
localResults = append(localResults, engineResult{name: name, resp: r})
}(name, eng)
freshResp, err := eng.Search(ctx, req)
if err != nil {
s.engineCache.Logger().Debug("background refresh failed", "engine", name, "error", err)
return
}
s.engineCache.Set(ctx, name, queryHash, freshResp)
}(name)
continue
}
wg.Wait()
// Cache miss — fetch fresh synchronously
if !cr.hit {
fetchWg.Add(1)
go func(i int, name string) {
defer fetchWg.Done()
// Collect successful responses and determine upstream fallbacks.
responses := make([]contracts.SearchResponse, 0, len(localResults)+1)
upstreamSet := map[string]bool{}
for _, e := range upstreamEngineNames {
upstreamSet[e] = true
eng, ok := s.localEngines[name]
if !ok {
cacheResults[i] = cacheResult{
engine: name,
fetchErr: fmt.Errorf("engine not registered: %s", name),
}
return
}
for _, lr := range localResults {
responses = append(responses, lr.resp)
freshResp, err := eng.Search(ctx, req)
if err != nil {
cacheResults[i] = cacheResult{
engine: name,
fetchErr: err,
}
return
}
// If a local engine returned nothing (e.g. qwant anti-bot), fall back
// to upstream if available.
if shouldFallbackToUpstream(lr.name, lr.resp) && !upstreamSet[lr.name] {
upstreamEngineNames = append(upstreamEngineNames, lr.name)
upstreamSet[lr.name] = true
// Cache the fresh response
if s.engineCache != nil {
s.engineCache.Set(ctx, name, queryHash, freshResp)
}
cacheResults[i] = cacheResult{
engine: name,
fresh: &freshResp,
hit: false,
}
}(i, name)
}
}
fetchWg.Wait()
// Phase 3: Collect responses for merge
responses := make([]contracts.SearchResponse, 0, len(cacheResults))
for _, cr := range cacheResults {
if cr.fetchErr != nil {
responses = append(responses, unresponsiveResponse(req.Query, cr.engine, cr.fetchErr.Error()))
continue
}
// Use fresh data if available (fresh hit or freshly fetched), otherwise use stale cached
if cr.fresh != nil {
responses = append(responses, *cr.fresh)
} else if cr.hit && len(cr.cached.Response) > 0 {
var resp contracts.SearchResponse
if err := json.Unmarshal(cr.cached.Response, &resp); err == nil {
responses = append(responses, resp)
}
}
}
// Upstream proxy for unported (or fallback) engines.
// ... rest of the existing code is UNCHANGED ...
if s.upstreamClient != nil && len(upstreamEngineNames) > 0 {
r, err := s.upstreamClient.SearchJSON(ctx, req, upstreamEngineNames)
if err != nil {
// Upstream failure is treated as a single unresponsive engine entry.
responses = append(responses, contracts.SearchResponse{
Query: req.Query,
UnresponsiveEngines: [][2]string{{"upstream", err.Error()}},