samsa/internal/search/service.go
ashisgreat22 26f8e4855b search: wire per-engine cache with tier-aware TTLs
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-24 01:14:31 +01:00

289 lines
8 KiB
Go

// samsa — a privacy-respecting metasearch engine
// Copyright (C) 2026-present metamorphosis-dev
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
package search
import (
"context"
"encoding/json"
"fmt"
"sync"
"time"
"github.com/metamorphosis-dev/samsa/internal/cache"
"github.com/metamorphosis-dev/samsa/internal/config"
"github.com/metamorphosis-dev/samsa/internal/contracts"
"github.com/metamorphosis-dev/samsa/internal/engines"
"github.com/metamorphosis-dev/samsa/internal/httpclient"
"github.com/metamorphosis-dev/samsa/internal/upstream"
)
type ServiceConfig struct {
UpstreamURL string
HTTPTimeout time.Duration
Cache *cache.Cache
CacheTTLOverrides map[string]time.Duration
EnginesConfig *config.Config
}
type Service struct {
upstreamClient *upstream.Client
planner *engines.Planner
localEngines map[string]engines.Engine
engineCache *cache.EngineCache
}
func NewService(cfg ServiceConfig) *Service {
timeout := cfg.HTTPTimeout
if timeout <= 0 {
timeout = 10 * time.Second
}
httpClient := httpclient.NewClient(timeout)
var up *upstream.Client
if cfg.UpstreamURL != "" {
c, err := upstream.NewClient(cfg.UpstreamURL, timeout)
if err == nil {
up = c
}
}
var engineCache *cache.EngineCache
if cfg.Cache != nil {
engineCache = cache.NewEngineCache(cfg.Cache, cfg.CacheTTLOverrides)
}
return &Service{
upstreamClient: up,
planner: engines.NewPlannerFromEnv(),
localEngines: engines.NewDefaultPortedEngines(httpClient, cfg.EnginesConfig),
engineCache: engineCache,
}
}
// derefString returns the string value of a *string, or "" if nil.
func derefString(s *string) string {
if s == nil {
return ""
}
return *s
}
// Search executes the request against local engines (in parallel) and
// optionally the upstream instance for unported engines.
func (s *Service) Search(ctx context.Context, req SearchRequest) (SearchResponse, error) {
queryHash := cache.QueryHash(
req.Query,
int(req.Pageno),
int(req.Safesearch),
req.Language,
derefString(req.TimeRange),
)
localEngineNames, upstreamEngineNames, _ := s.planner.Plan(req)
// Phase 1: Parallel cache lookups — classify each engine as fresh/stale/miss
type cacheResult struct {
engine string
cached cache.CachedEngineResponse
hit bool
fresh *contracts.SearchResponse // nil if no fresh response
fetchErr error
unmarshalErr bool // true if hit but unmarshal failed (treat as miss)
}
cacheResults := make([]cacheResult, len(localEngineNames))
var lookupWg sync.WaitGroup
for i, name := range localEngineNames {
lookupWg.Add(1)
go func(i int, name string) {
defer lookupWg.Done()
result := cacheResult{engine: name}
if s.engineCache != nil {
cached, ok := s.engineCache.Get(ctx, name, queryHash)
if ok {
result.hit = true
result.cached = cached
if !s.engineCache.IsStale(cached, name) {
// Fresh cache hit — deserialize and use directly
var resp contracts.SearchResponse
if err := json.Unmarshal(cached.Response, &resp); err == nil {
result.fresh = &resp
} else {
// Unmarshal failed — treat as cache miss (will fetch fresh synchronously)
result.unmarshalErr = true
result.hit = false // treat as miss
}
}
// If stale: result.fresh stays zero, result.cached has stale data
}
}
cacheResults[i] = result
}(i, name)
}
lookupWg.Wait()
// Phase 2: Fetch fresh for misses and stale entries
var fetchWg sync.WaitGroup
for i, name := range localEngineNames {
cr := cacheResults[i]
// Fresh hit — nothing to do in phase 2
if cr.hit && cr.fresh != nil {
continue
}
// Stale hit — return stale immediately, refresh in background
if cr.hit && len(cr.cached.Response) > 0 && s.engineCache != nil && s.engineCache.IsStale(cr.cached, name) {
fetchWg.Add(1)
go func(name string) {
defer fetchWg.Done()
eng, ok := s.localEngines[name]
if !ok {
return
}
freshResp, err := eng.Search(ctx, req)
if err != nil {
s.engineCache.Logger().Debug("background refresh failed", "engine", name, "error", err)
return
}
s.engineCache.Set(ctx, name, queryHash, freshResp)
}(name)
continue
}
// Cache miss — fetch fresh synchronously
if !cr.hit {
fetchWg.Add(1)
go func(i int, name string) {
defer fetchWg.Done()
eng, ok := s.localEngines[name]
if !ok {
cacheResults[i] = cacheResult{
engine: name,
fetchErr: fmt.Errorf("engine not registered: %s", name),
}
return
}
freshResp, err := eng.Search(ctx, req)
if err != nil {
cacheResults[i] = cacheResult{
engine: name,
fetchErr: err,
}
return
}
// Cache the fresh response
if s.engineCache != nil {
s.engineCache.Set(ctx, name, queryHash, freshResp)
}
cacheResults[i] = cacheResult{
engine: name,
fresh: &freshResp,
hit: false,
}
}(i, name)
}
}
fetchWg.Wait()
// Phase 3: Collect responses for merge
responses := make([]contracts.SearchResponse, 0, len(cacheResults))
for _, cr := range cacheResults {
if cr.fetchErr != nil {
responses = append(responses, unresponsiveResponse(req.Query, cr.engine, cr.fetchErr.Error()))
continue
}
// Use fresh data if available (fresh hit or freshly fetched), otherwise use stale cached
if cr.fresh != nil {
responses = append(responses, *cr.fresh)
} else if cr.hit && len(cr.cached.Response) > 0 {
var resp contracts.SearchResponse
if err := json.Unmarshal(cr.cached.Response, &resp); err == nil {
responses = append(responses, resp)
}
}
}
// Upstream proxy for unported (or fallback) engines.
// ... rest of the existing code is UNCHANGED ...
if s.upstreamClient != nil && len(upstreamEngineNames) > 0 {
r, err := s.upstreamClient.SearchJSON(ctx, req, upstreamEngineNames)
if err != nil {
responses = append(responses, contracts.SearchResponse{
Query: req.Query,
UnresponsiveEngines: [][2]string{{"upstream", err.Error()}},
})
} else {
responses = append(responses, r)
}
}
if len(responses) == 0 {
return emptyResponse(req.Query), nil
}
merged := MergeResponses(responses)
if merged.Query == "" {
merged.Query = req.Query
}
return merged, nil
}
// unresponsiveResponse returns a zero-result response marking the engine as unresponsive.
func unresponsiveResponse(query, engine, reason string) contracts.SearchResponse {
return contracts.SearchResponse{
Query: query,
NumberOfResults: 0,
Results: []contracts.MainResult{},
Answers: []map[string]any{},
Corrections: []string{},
Infoboxes: []map[string]any{},
Suggestions: []string{},
UnresponsiveEngines: [][2]string{{engine, reason}},
}
}
// emptyResponse returns a valid empty response with stable empty slices.
func emptyResponse(query string) contracts.SearchResponse {
return contracts.SearchResponse{
Query: query,
NumberOfResults: 0,
Results: []contracts.MainResult{},
Answers: []map[string]any{},
Corrections: []string{},
Infoboxes: []map[string]any{},
Suggestions: []string{},
UnresponsiveEngines: [][2]string{},
}
}
func shouldFallbackToUpstream(engineName string, r contracts.SearchResponse) bool {
if engineName != "qwant" {
return false
}
return len(r.Results) == 0 && len(r.Answers) == 0 && len(r.Infoboxes) == 0
}