diff --git a/go.mod b/go.mod index d525525..5346e09 100644 --- a/go.mod +++ b/go.mod @@ -10,4 +10,5 @@ require ( require ( github.com/andybalholm/cascadia v1.3.3 // indirect golang.org/x/net v0.52.0 // indirect + golang.org/x/sync v0.20.0 // indirect ) diff --git a/go.sum b/go.sum index ea7aba8..d6a1c83 100644 --- a/go.sum +++ b/go.sum @@ -35,6 +35,8 @@ golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y= golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sync v0.20.0 h1:e0PTpb7pjO8GAtTs2dQ6jYa5BWYlMuX047Dco/pItO4= +golang.org/x/sync v0.20.0/go.mod h1:9xrNwdLfx4jkKbNva9FpL6vEN7evnE43NNNJQ2LF3+0= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= diff --git a/internal/search/service.go b/internal/search/service.go index a2cbdd2..f7cf83b 100644 --- a/internal/search/service.go +++ b/internal/search/service.go @@ -3,10 +3,11 @@ package search import ( "context" "net/http" + "sync" "time" - "github.com/ashie/gosearch/internal/engines" "github.com/ashie/gosearch/internal/contracts" + "github.com/ashie/gosearch/internal/engines" "github.com/ashie/gosearch/internal/upstream" ) @@ -44,55 +45,94 @@ func NewService(cfg ServiceConfig) *Service { } } +// Search executes the request against local engines (in parallel) and +// optionally upstream SearXNG for unported engines. +// +// Individual engine failures are reported as unresponsive_engines rather +// than aborting the entire search. func (s *Service) Search(ctx context.Context, req SearchRequest) (SearchResponse, error) { - localEngines, upstreamEngines, _ := s.planner.Plan(req) + localEngineNames, upstreamEngineNames, _ := s.planner.Plan(req) - responses := make([]contracts.SearchResponse, 0, 2) + // Run all local engines concurrently. + type engineResult struct { + name string + resp contracts.SearchResponse + err error + } + + localResults := make([]engineResult, 0, len(localEngineNames)) + + var wg sync.WaitGroup + var mu sync.Mutex + + for _, name := range localEngineNames { + eng, ok := s.localEngines[name] + if !ok { + mu.Lock() + localResults = append(localResults, engineResult{ + name: name, + resp: unresponsiveResponse(req.Query, name, "engine_not_registered"), + }) + mu.Unlock() + continue + } + + wg.Add(1) + go func(name string, eng engines.Engine) { + defer wg.Done() + + r, err := eng.Search(ctx, req) + + mu.Lock() + defer mu.Unlock() + + if err != nil { + localResults = append(localResults, engineResult{ + name: name, + resp: unresponsiveResponse(req.Query, name, err.Error()), + }) + return + } + localResults = append(localResults, engineResult{name: name, resp: r}) + }(name, eng) + } + + wg.Wait() + + // Collect successful responses and determine upstream fallbacks. + responses := make([]contracts.SearchResponse, 0, len(localResults)+1) upstreamSet := map[string]bool{} - for _, e := range upstreamEngines { + for _, e := range upstreamEngineNames { upstreamSet[e] = true } - for _, engineName := range localEngines { - eng, ok := s.localEngines[engineName] - if !ok { - continue - } - r, err := eng.Search(ctx, req) - if err != nil { - // MVP: fail fast so the client sees a real error. - return SearchResponse{}, err - } - responses = append(responses, r) + for _, lr := range localResults { + responses = append(responses, lr.resp) - // Some engines (notably qwant due to anti-bot protections) can return - // zero local results depending on client/IP. If upstream SearXNG is - // configured, let it attempt the same engine as a fallback. - if shouldFallbackToUpstream(engineName, r) && !upstreamSet[engineName] { - upstreamEngines = append(upstreamEngines, engineName) - upstreamSet[engineName] = true + // If a local engine returned nothing (e.g. qwant anti-bot), fall back + // to upstream if available. + if shouldFallbackToUpstream(lr.name, lr.resp) && !upstreamSet[lr.name] { + upstreamEngineNames = append(upstreamEngineNames, lr.name) + upstreamSet[lr.name] = true } } - if s.upstreamClient != nil && len(upstreamEngines) > 0 { - r, err := s.upstreamClient.SearchJSON(ctx, req, upstreamEngines) + // Upstream proxy for unported (or fallback) engines. + if s.upstreamClient != nil && len(upstreamEngineNames) > 0 { + r, err := s.upstreamClient.SearchJSON(ctx, req, upstreamEngineNames) if err != nil { - return SearchResponse{}, err + // Upstream failure is treated as a single unresponsive engine entry. + responses = append(responses, contracts.SearchResponse{ + Query: req.Query, + UnresponsiveEngines: [][2]string{{"upstream", err.Error()}}, + }) + } else { + responses = append(responses, r) } - responses = append(responses, r) } if len(responses) == 0 { - return SearchResponse{ - Query: req.Query, - NumberOfResults: 0, - Results: []MainResult{}, - Answers: []map[string]any{}, - Corrections: []string{}, - Infoboxes: []map[string]any{}, - Suggestions: []string{}, - UnresponsiveEngines: [][2]string{}, - }, nil + return emptyResponse(req.Query), nil } merged := MergeResponses(responses) @@ -102,6 +142,34 @@ func (s *Service) Search(ctx context.Context, req SearchRequest) (SearchResponse return merged, nil } +// unresponsiveResponse returns a zero-result response marking the engine as unresponsive. +func unresponsiveResponse(query, engine, reason string) contracts.SearchResponse { + return contracts.SearchResponse{ + Query: query, + NumberOfResults: 0, + Results: []contracts.MainResult{}, + Answers: []map[string]any{}, + Corrections: []string{}, + Infoboxes: []map[string]any{}, + Suggestions: []string{}, + UnresponsiveEngines: [][2]string{{engine, reason}}, + } +} + +// emptyResponse returns a valid empty response with stable empty slices. +func emptyResponse(query string) contracts.SearchResponse { + return contracts.SearchResponse{ + Query: query, + NumberOfResults: 0, + Results: []contracts.MainResult{}, + Answers: []map[string]any{}, + Corrections: []string{}, + Infoboxes: []map[string]any{}, + Suggestions: []string{}, + UnresponsiveEngines: [][2]string{}, + } +} + func shouldFallbackToUpstream(engineName string, r contracts.SearchResponse) bool { if engineName != "qwant" { return false @@ -109,3 +177,4 @@ func shouldFallbackToUpstream(engineName string, r contracts.SearchResponse) boo return len(r.Results) == 0 && len(r.Answers) == 0 && len(r.Infoboxes) == 0 } + diff --git a/internal/search/service_test.go b/internal/search/service_test.go new file mode 100644 index 0000000..b47f2d8 --- /dev/null +++ b/internal/search/service_test.go @@ -0,0 +1,248 @@ +package search + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/ashie/gosearch/internal/contracts" + "github.com/ashie/gosearch/internal/engines" +) + +// mockEngine is a test engine that returns a predefined response or error. +type mockEngine struct { + name string + resp contracts.SearchResponse + err error + delay time.Duration +} + +func (m *mockEngine) Name() string { return m.name } +func (m *mockEngine) Search(ctx context.Context, req contracts.SearchRequest) (contracts.SearchResponse, error) { + if m.delay > 0 { + select { + case <-ctx.Done(): + return contracts.SearchResponse{}, ctx.Err() + case <-time.After(m.delay): + } + } + if m.err != nil { + return contracts.SearchResponse{}, m.err + } + return m.resp, nil +} + +func TestSearch_ConcurrentEngines(t *testing.T) { + // Two engines that each take 100ms. If sequential, total would be ~200ms. + // Concurrent should be ~100ms. + registry := map[string]engines.Engine{ + "fast-a": &mockEngine{ + name: "fast-a", + resp: contracts.SearchResponse{ + Query: "test", + NumberOfResults: 1, + Results: []contracts.MainResult{ + {Title: "Result A", Engine: "fast-a"}, + }, + Answers: []map[string]any{}, + Corrections: []string{}, + Infoboxes: []map[string]any{}, + Suggestions: []string{}, + UnresponsiveEngines: [][2]string{}, + }, + delay: 100 * time.Millisecond, + }, + "fast-b": &mockEngine{ + name: "fast-b", + resp: contracts.SearchResponse{ + Query: "test", + NumberOfResults: 1, + Results: []contracts.MainResult{ + {Title: "Result B", Engine: "fast-b"}, + }, + Answers: []map[string]any{}, + Corrections: []string{}, + Infoboxes: []map[string]any{}, + Suggestions: []string{}, + UnresponsiveEngines: [][2]string{}, + }, + delay: 100 * time.Millisecond, + }, + } + + svc := &Service{ + upstreamClient: nil, + planner: engines.NewPlanner([]string{"fast-a", "fast-b"}), + localEngines: registry, + } + + req := SearchRequest{ + Format: FormatJSON, + Query: "test", + Engines: []string{"fast-a", "fast-b"}, + } + + start := time.Now() + resp, err := svc.Search(context.Background(), req) + elapsed := time.Since(start) + + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if len(resp.Results) != 2 { + t.Errorf("expected 2 results, got %d", len(resp.Results)) + } + // Should complete in well under 200ms (sequential time). + if elapsed > 180*time.Millisecond { + t.Errorf("engines not running concurrently: took %v", elapsed) + } +} + +func TestSearch_GracefulDegradation(t *testing.T) { + // One engine succeeds, one fails. Both should be represented: + // the failing engine in unresponsive_engines, the succeeding one in results. + registry := map[string]engines.Engine{ + "good": &mockEngine{ + name: "good", + resp: contracts.SearchResponse{ + Query: "test", + NumberOfResults: 1, + Results: []contracts.MainResult{ + {Title: "Good Result", Engine: "good"}, + }, + Answers: []map[string]any{}, + Corrections: []string{}, + Infoboxes: []map[string]any{}, + Suggestions: []string{}, + UnresponsiveEngines: [][2]string{}, + }, + }, + "bad": &mockEngine{ + name: "bad", + err: errors.New("connection refused"), + }, + } + + svc := &Service{ + upstreamClient: nil, + planner: engines.NewPlanner([]string{"good", "bad"}), + localEngines: registry, + } + + req := SearchRequest{ + Format: FormatJSON, + Query: "test", + Engines: []string{"good", "bad"}, + } + + resp, err := svc.Search(context.Background(), req) + if err != nil { + t.Fatalf("search should not fail on individual engine error: %v", err) + } + + // Should still have the good result. + if len(resp.Results) != 1 { + t.Errorf("expected 1 result from good engine, got %d", len(resp.Results)) + } + + // The bad engine should appear in unresponsive_engines. + foundBad := false + for _, ue := range resp.UnresponsiveEngines { + if ue[0] == "bad" { + foundBad = true + } + } + if !foundBad { + t.Errorf("expected 'bad' in unresponsive_engines, got %v", resp.UnresponsiveEngines) + } +} + +func TestSearch_AllEnginesFail(t *testing.T) { + registry := map[string]engines.Engine{ + "failing": &mockEngine{ + name: "failing", + err: errors.New("timeout"), + }, + } + + svc := &Service{ + upstreamClient: nil, + planner: engines.NewPlanner([]string{"failing"}), + localEngines: registry, + } + + req := SearchRequest{ + Format: FormatJSON, + Query: "test", + Engines: []string{"failing"}, + } + + resp, err := svc.Search(context.Background(), req) + if err != nil { + t.Fatalf("should not return error even when all engines fail: %v", err) + } + + if len(resp.Results) != 0 { + t.Errorf("expected 0 results, got %d", len(resp.Results)) + } + if len(resp.UnresponsiveEngines) != 1 { + t.Errorf("expected 1 unresponsive engine, got %d", len(resp.UnresponsiveEngines)) + } +} + +func TestSearch_ContextCancellation(t *testing.T) { + slowEngine := &mockEngine{ + name: "slow", + delay: 5 * time.Second, + resp: contracts.SearchResponse{ + Query: "test", + Results: []contracts.MainResult{}, + Answers: []map[string]any{}, + Corrections: []string{}, + Infoboxes: []map[string]any{}, + Suggestions: []string{}, + UnresponsiveEngines: [][2]string{}, + }, + } + + svc := &Service{ + upstreamClient: nil, + planner: engines.NewPlanner([]string{"slow"}), + localEngines: map[string]engines.Engine{ + "slow": slowEngine, + }, + } + + ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond) + defer cancel() + + req := SearchRequest{ + Format: FormatJSON, + Query: "test", + Engines: []string{"slow"}, + } + + start := time.Now() + resp, err := svc.Search(ctx, req) + elapsed := time.Since(start) + + // Should not error — graceful degradation handles context cancellation. + if err != nil { + t.Fatalf("should not error on context cancel: %v", err) + } + + if len(resp.Results) != 0 { + t.Errorf("expected 0 results from cancelled engine, got %d", len(resp.Results)) + } + + // Should complete quickly, not wait for the 5s delay. + if elapsed > 200*time.Millisecond { + t.Errorf("context cancellation not respected: took %v", elapsed) + } + + // Engine should be marked unresponsive. + if len(resp.UnresponsiveEngines) != 1 { + t.Errorf("expected 1 unresponsive engine, got %d", len(resp.UnresponsiveEngines)) + } +} diff --git a/searxng-go b/searxng-go index 90b8bd0..e286897 100755 Binary files a/searxng-go and b/searxng-go differ