feat: concurrent engine execution with graceful degradation

- Run all local engines in parallel using goroutines + sync.WaitGroup
- Individual engine failures are captured as unresponsive_engines entries
  instead of aborting the entire search request
- Context cancellation is respected: cancelled engines report as unresponsive
- Upstream proxy failure is also gracefully handled (single unresponsive entry)
- Extract unresponsiveResponse() and emptyResponse() helpers for consistency
- Add comprehensive tests:
  - ConcurrentEngines: verifies parallelism (2x100ms engines complete in ~100ms)
  - GracefulDegradation: one engine fails, one succeeds, both represented
  - AllEnginesFail: no error returned, all engines in unresponsive_engines
  - ContextCancellation: engine respects context timeout, reports unresponsive
This commit is contained in:
Franz Kafka 2026-03-21 15:39:00 +00:00
parent 5181073a95
commit 385a7acab7
5 changed files with 355 additions and 35 deletions

View file

@ -3,10 +3,11 @@ package search
import (
"context"
"net/http"
"sync"
"time"
"github.com/ashie/gosearch/internal/engines"
"github.com/ashie/gosearch/internal/contracts"
"github.com/ashie/gosearch/internal/engines"
"github.com/ashie/gosearch/internal/upstream"
)
@ -44,55 +45,94 @@ func NewService(cfg ServiceConfig) *Service {
}
}
// Search executes the request against local engines (in parallel) and
// optionally upstream SearXNG for unported engines.
//
// Individual engine failures are reported as unresponsive_engines rather
// than aborting the entire search.
func (s *Service) Search(ctx context.Context, req SearchRequest) (SearchResponse, error) {
localEngines, upstreamEngines, _ := s.planner.Plan(req)
localEngineNames, upstreamEngineNames, _ := s.planner.Plan(req)
responses := make([]contracts.SearchResponse, 0, 2)
// Run all local engines concurrently.
type engineResult struct {
name string
resp contracts.SearchResponse
err error
}
localResults := make([]engineResult, 0, len(localEngineNames))
var wg sync.WaitGroup
var mu sync.Mutex
for _, name := range localEngineNames {
eng, ok := s.localEngines[name]
if !ok {
mu.Lock()
localResults = append(localResults, engineResult{
name: name,
resp: unresponsiveResponse(req.Query, name, "engine_not_registered"),
})
mu.Unlock()
continue
}
wg.Add(1)
go func(name string, eng engines.Engine) {
defer wg.Done()
r, err := eng.Search(ctx, req)
mu.Lock()
defer mu.Unlock()
if err != nil {
localResults = append(localResults, engineResult{
name: name,
resp: unresponsiveResponse(req.Query, name, err.Error()),
})
return
}
localResults = append(localResults, engineResult{name: name, resp: r})
}(name, eng)
}
wg.Wait()
// Collect successful responses and determine upstream fallbacks.
responses := make([]contracts.SearchResponse, 0, len(localResults)+1)
upstreamSet := map[string]bool{}
for _, e := range upstreamEngines {
for _, e := range upstreamEngineNames {
upstreamSet[e] = true
}
for _, engineName := range localEngines {
eng, ok := s.localEngines[engineName]
if !ok {
continue
}
r, err := eng.Search(ctx, req)
if err != nil {
// MVP: fail fast so the client sees a real error.
return SearchResponse{}, err
}
responses = append(responses, r)
for _, lr := range localResults {
responses = append(responses, lr.resp)
// Some engines (notably qwant due to anti-bot protections) can return
// zero local results depending on client/IP. If upstream SearXNG is
// configured, let it attempt the same engine as a fallback.
if shouldFallbackToUpstream(engineName, r) && !upstreamSet[engineName] {
upstreamEngines = append(upstreamEngines, engineName)
upstreamSet[engineName] = true
// If a local engine returned nothing (e.g. qwant anti-bot), fall back
// to upstream if available.
if shouldFallbackToUpstream(lr.name, lr.resp) && !upstreamSet[lr.name] {
upstreamEngineNames = append(upstreamEngineNames, lr.name)
upstreamSet[lr.name] = true
}
}
if s.upstreamClient != nil && len(upstreamEngines) > 0 {
r, err := s.upstreamClient.SearchJSON(ctx, req, upstreamEngines)
// Upstream proxy for unported (or fallback) engines.
if s.upstreamClient != nil && len(upstreamEngineNames) > 0 {
r, err := s.upstreamClient.SearchJSON(ctx, req, upstreamEngineNames)
if err != nil {
return SearchResponse{}, err
// Upstream failure is treated as a single unresponsive engine entry.
responses = append(responses, contracts.SearchResponse{
Query: req.Query,
UnresponsiveEngines: [][2]string{{"upstream", err.Error()}},
})
} else {
responses = append(responses, r)
}
responses = append(responses, r)
}
if len(responses) == 0 {
return SearchResponse{
Query: req.Query,
NumberOfResults: 0,
Results: []MainResult{},
Answers: []map[string]any{},
Corrections: []string{},
Infoboxes: []map[string]any{},
Suggestions: []string{},
UnresponsiveEngines: [][2]string{},
}, nil
return emptyResponse(req.Query), nil
}
merged := MergeResponses(responses)
@ -102,6 +142,34 @@ func (s *Service) Search(ctx context.Context, req SearchRequest) (SearchResponse
return merged, nil
}
// unresponsiveResponse returns a zero-result response marking the engine as unresponsive.
func unresponsiveResponse(query, engine, reason string) contracts.SearchResponse {
return contracts.SearchResponse{
Query: query,
NumberOfResults: 0,
Results: []contracts.MainResult{},
Answers: []map[string]any{},
Corrections: []string{},
Infoboxes: []map[string]any{},
Suggestions: []string{},
UnresponsiveEngines: [][2]string{{engine, reason}},
}
}
// emptyResponse returns a valid empty response with stable empty slices.
func emptyResponse(query string) contracts.SearchResponse {
return contracts.SearchResponse{
Query: query,
NumberOfResults: 0,
Results: []contracts.MainResult{},
Answers: []map[string]any{},
Corrections: []string{},
Infoboxes: []map[string]any{},
Suggestions: []string{},
UnresponsiveEngines: [][2]string{},
}
}
func shouldFallbackToUpstream(engineName string, r contracts.SearchResponse) bool {
if engineName != "qwant" {
return false
@ -109,3 +177,4 @@ func shouldFallbackToUpstream(engineName string, r contracts.SearchResponse) boo
return len(r.Results) == 0 && len(r.Answers) == 0 && len(r.Infoboxes) == 0
}