kafka/internal/search/service.go
ashisgreat22 21b77f25bf
Some checks failed
Build and Push Docker Image / build-and-push (push) Failing after 8s
Mirror to GitHub / mirror (push) Failing after 3s
Tests / test (push) Successful in 38s
refactor: remove SearXNG references and rename binary to kafka
- Rename cmd/searxng-go to cmd/kafka
- Remove all SearXNG references from source comments while keeping
  "SearXNG-compatible API" in user-facing docs
- Update binary paths in README, CLAUDE.md, and Dockerfile
- Update log message to "kafka starting"

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-22 01:47:03 +01:00

209 lines
5.5 KiB
Go

package search
import (
"context"
"net/http"
"sync"
"time"
"github.com/metamorphosis-dev/kafka/internal/cache"
"github.com/metamorphosis-dev/kafka/internal/contracts"
"github.com/metamorphosis-dev/kafka/internal/engines"
"github.com/metamorphosis-dev/kafka/internal/upstream"
)
type ServiceConfig struct {
UpstreamURL string
HTTPTimeout time.Duration
Cache *cache.Cache
}
type Service struct {
upstreamClient *upstream.Client
planner *engines.Planner
localEngines map[string]engines.Engine
cache *cache.Cache
}
func NewService(cfg ServiceConfig) *Service {
timeout := cfg.HTTPTimeout
if timeout <= 0 {
timeout = 10 * time.Second
}
httpClient := &http.Client{Timeout: timeout}
var up *upstream.Client
if cfg.UpstreamURL != "" {
c, err := upstream.NewClient(cfg.UpstreamURL, timeout)
if err == nil {
up = c
}
}
return &Service{
upstreamClient: up,
planner: engines.NewPlannerFromEnv(),
localEngines: engines.NewDefaultPortedEngines(httpClient),
cache: cfg.Cache,
}
}
// Search executes the request against local engines (in parallel) and
// optionally the upstream instance for unported engines.
//
// Individual engine failures are reported as unresponsive_engines rather
// than aborting the entire search.
//
// If a Valkey cache is configured and contains a cached response for this
// request, the cached result is returned without hitting any engines.
func (s *Service) Search(ctx context.Context, req SearchRequest) (SearchResponse, error) {
// Check cache first.
if s.cache != nil {
cacheKey := cache.Key(req)
if cached, hit := s.cache.Get(ctx, cacheKey); hit {
return cached, nil
}
}
merged, err := s.executeSearch(ctx, req)
if err != nil {
return SearchResponse{}, err
}
// Store in cache.
if s.cache != nil {
cacheKey := cache.Key(req)
s.cache.Set(ctx, cacheKey, merged)
}
return merged, nil
}
// executeSearch runs the actual engine queries and merges results.
func (s *Service) executeSearch(ctx context.Context, req SearchRequest) (SearchResponse, error) {
localEngineNames, upstreamEngineNames, _ := s.planner.Plan(req)
// Run all local engines concurrently.
type engineResult struct {
name string
resp contracts.SearchResponse
err error
}
localResults := make([]engineResult, 0, len(localEngineNames))
var wg sync.WaitGroup
var mu sync.Mutex
for _, name := range localEngineNames {
eng, ok := s.localEngines[name]
if !ok {
mu.Lock()
localResults = append(localResults, engineResult{
name: name,
resp: unresponsiveResponse(req.Query, name, "engine_not_registered"),
})
mu.Unlock()
continue
}
wg.Add(1)
go func(name string, eng engines.Engine) {
defer wg.Done()
r, err := eng.Search(ctx, req)
mu.Lock()
defer mu.Unlock()
if err != nil {
localResults = append(localResults, engineResult{
name: name,
resp: unresponsiveResponse(req.Query, name, err.Error()),
})
return
}
localResults = append(localResults, engineResult{name: name, resp: r})
}(name, eng)
}
wg.Wait()
// Collect successful responses and determine upstream fallbacks.
responses := make([]contracts.SearchResponse, 0, len(localResults)+1)
upstreamSet := map[string]bool{}
for _, e := range upstreamEngineNames {
upstreamSet[e] = true
}
for _, lr := range localResults {
responses = append(responses, lr.resp)
// If a local engine returned nothing (e.g. qwant anti-bot), fall back
// to upstream if available.
if shouldFallbackToUpstream(lr.name, lr.resp) && !upstreamSet[lr.name] {
upstreamEngineNames = append(upstreamEngineNames, lr.name)
upstreamSet[lr.name] = true
}
}
// Upstream proxy for unported (or fallback) engines.
if s.upstreamClient != nil && len(upstreamEngineNames) > 0 {
r, err := s.upstreamClient.SearchJSON(ctx, req, upstreamEngineNames)
if err != nil {
// Upstream failure is treated as a single unresponsive engine entry.
responses = append(responses, contracts.SearchResponse{
Query: req.Query,
UnresponsiveEngines: [][2]string{{"upstream", err.Error()}},
})
} else {
responses = append(responses, r)
}
}
if len(responses) == 0 {
return emptyResponse(req.Query), nil
}
merged := MergeResponses(responses)
if merged.Query == "" {
merged.Query = req.Query
}
return merged, nil
}
// unresponsiveResponse returns a zero-result response marking the engine as unresponsive.
func unresponsiveResponse(query, engine, reason string) contracts.SearchResponse {
return contracts.SearchResponse{
Query: query,
NumberOfResults: 0,
Results: []contracts.MainResult{},
Answers: []map[string]any{},
Corrections: []string{},
Infoboxes: []map[string]any{},
Suggestions: []string{},
UnresponsiveEngines: [][2]string{{engine, reason}},
}
}
// emptyResponse returns a valid empty response with stable empty slices.
func emptyResponse(query string) contracts.SearchResponse {
return contracts.SearchResponse{
Query: query,
NumberOfResults: 0,
Results: []contracts.MainResult{},
Answers: []map[string]any{},
Corrections: []string{},
Infoboxes: []map[string]any{},
Suggestions: []string{},
UnresponsiveEngines: [][2]string{},
}
}
func shouldFallbackToUpstream(engineName string, r contracts.SearchResponse) bool {
if engineName != "qwant" {
return false
}
return len(r.Results) == 0 && len(r.Answers) == 0 && len(r.Infoboxes) == 0
}