Add internal/httpclient package as a singleton RoundTripper used by all outbound engine requests (search, engines, autocomplete, upstream). Key Transport settings: - MaxIdleConnsPerHost = 20 (up from Go default of 2) - MaxIdleConns = 100 - IdleConnTimeout = 90s - DialContext timeout = 5s Previously, the default transport limited each host to 2 idle connections, forcing a new TCP+TLS handshake on every search for each engine. With 12 engines hitting the same upstream hosts in parallel, connections were constantly recycled. Now warm connections are reused across all goroutines and requests.
227 lines
6.4 KiB
Go
227 lines
6.4 KiB
Go
// samsa — a privacy-respecting metasearch engine
|
|
// Copyright (C) 2026-present metamorphosis-dev
|
|
//
|
|
// This program is free software: you can redistribute it and/or modify
|
|
// it under the terms of the GNU Affero General Public License as published by
|
|
// the Free Software Foundation, either version 3 of the License, or
|
|
// (at your option) any later version.
|
|
//
|
|
// This program is distributed in the hope that it will be useful,
|
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
// GNU Affero General Public License for more details.
|
|
//
|
|
// You should have received a copy of the GNU Affero General Public License
|
|
// along with this program. If not, see <https://www.gnu.org/licenses/>.
|
|
|
|
package search
|
|
|
|
import (
|
|
"context"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/metamorphosis-dev/samsa/internal/cache"
|
|
"github.com/metamorphosis-dev/samsa/internal/httpclient"
|
|
"github.com/metamorphosis-dev/samsa/internal/config"
|
|
"github.com/metamorphosis-dev/samsa/internal/contracts"
|
|
"github.com/metamorphosis-dev/samsa/internal/engines"
|
|
"github.com/metamorphosis-dev/samsa/internal/upstream"
|
|
)
|
|
|
|
type ServiceConfig struct {
|
|
UpstreamURL string
|
|
HTTPTimeout time.Duration
|
|
Cache *cache.Cache
|
|
EnginesConfig *config.Config
|
|
}
|
|
|
|
type Service struct {
|
|
upstreamClient *upstream.Client
|
|
planner *engines.Planner
|
|
localEngines map[string]engines.Engine
|
|
cache *cache.Cache
|
|
}
|
|
|
|
func NewService(cfg ServiceConfig) *Service {
|
|
timeout := cfg.HTTPTimeout
|
|
if timeout <= 0 {
|
|
timeout = 10 * time.Second
|
|
}
|
|
|
|
httpClient := httpclient.NewClient(timeout)
|
|
|
|
var up *upstream.Client
|
|
if cfg.UpstreamURL != "" {
|
|
c, err := upstream.NewClient(cfg.UpstreamURL, timeout)
|
|
if err == nil {
|
|
up = c
|
|
}
|
|
}
|
|
|
|
return &Service{
|
|
upstreamClient: up,
|
|
planner: engines.NewPlannerFromEnv(),
|
|
localEngines: engines.NewDefaultPortedEngines(httpClient, cfg.EnginesConfig),
|
|
cache: cfg.Cache,
|
|
}
|
|
}
|
|
|
|
// Search executes the request against local engines (in parallel) and
|
|
// optionally the upstream instance for unported engines.
|
|
//
|
|
// Individual engine failures are reported as unresponsive_engines rather
|
|
// than aborting the entire search.
|
|
//
|
|
// If a Valkey cache is configured and contains a cached response for this
|
|
// request, the cached result is returned without hitting any engines.
|
|
func (s *Service) Search(ctx context.Context, req SearchRequest) (SearchResponse, error) {
|
|
// Check cache first.
|
|
if s.cache != nil {
|
|
cacheKey := cache.Key(req)
|
|
if cached, hit := s.cache.Get(ctx, cacheKey); hit {
|
|
return cached, nil
|
|
}
|
|
}
|
|
|
|
merged, err := s.executeSearch(ctx, req)
|
|
if err != nil {
|
|
return SearchResponse{}, err
|
|
}
|
|
|
|
// Store in cache.
|
|
if s.cache != nil {
|
|
cacheKey := cache.Key(req)
|
|
s.cache.Set(ctx, cacheKey, merged)
|
|
}
|
|
|
|
return merged, nil
|
|
}
|
|
|
|
// executeSearch runs the actual engine queries and merges results.
|
|
func (s *Service) executeSearch(ctx context.Context, req SearchRequest) (SearchResponse, error) {
|
|
localEngineNames, upstreamEngineNames, _ := s.planner.Plan(req)
|
|
|
|
// Run all local engines concurrently.
|
|
type engineResult struct {
|
|
name string
|
|
resp contracts.SearchResponse
|
|
err error
|
|
}
|
|
|
|
localResults := make([]engineResult, 0, len(localEngineNames))
|
|
|
|
var wg sync.WaitGroup
|
|
var mu sync.Mutex
|
|
|
|
for _, name := range localEngineNames {
|
|
eng, ok := s.localEngines[name]
|
|
if !ok {
|
|
mu.Lock()
|
|
localResults = append(localResults, engineResult{
|
|
name: name,
|
|
resp: unresponsiveResponse(req.Query, name, "engine_not_registered"),
|
|
})
|
|
mu.Unlock()
|
|
continue
|
|
}
|
|
|
|
wg.Add(1)
|
|
go func(name string, eng engines.Engine) {
|
|
defer wg.Done()
|
|
|
|
r, err := eng.Search(ctx, req)
|
|
|
|
mu.Lock()
|
|
defer mu.Unlock()
|
|
|
|
if err != nil {
|
|
localResults = append(localResults, engineResult{
|
|
name: name,
|
|
resp: unresponsiveResponse(req.Query, name, err.Error()),
|
|
})
|
|
return
|
|
}
|
|
localResults = append(localResults, engineResult{name: name, resp: r})
|
|
}(name, eng)
|
|
}
|
|
|
|
wg.Wait()
|
|
|
|
// Collect successful responses and determine upstream fallbacks.
|
|
responses := make([]contracts.SearchResponse, 0, len(localResults)+1)
|
|
upstreamSet := map[string]bool{}
|
|
for _, e := range upstreamEngineNames {
|
|
upstreamSet[e] = true
|
|
}
|
|
|
|
for _, lr := range localResults {
|
|
responses = append(responses, lr.resp)
|
|
|
|
// If a local engine returned nothing (e.g. qwant anti-bot), fall back
|
|
// to upstream if available.
|
|
if shouldFallbackToUpstream(lr.name, lr.resp) && !upstreamSet[lr.name] {
|
|
upstreamEngineNames = append(upstreamEngineNames, lr.name)
|
|
upstreamSet[lr.name] = true
|
|
}
|
|
}
|
|
|
|
// Upstream proxy for unported (or fallback) engines.
|
|
if s.upstreamClient != nil && len(upstreamEngineNames) > 0 {
|
|
r, err := s.upstreamClient.SearchJSON(ctx, req, upstreamEngineNames)
|
|
if err != nil {
|
|
// Upstream failure is treated as a single unresponsive engine entry.
|
|
responses = append(responses, contracts.SearchResponse{
|
|
Query: req.Query,
|
|
UnresponsiveEngines: [][2]string{{"upstream", err.Error()}},
|
|
})
|
|
} else {
|
|
responses = append(responses, r)
|
|
}
|
|
}
|
|
|
|
if len(responses) == 0 {
|
|
return emptyResponse(req.Query), nil
|
|
}
|
|
|
|
merged := MergeResponses(responses)
|
|
if merged.Query == "" {
|
|
merged.Query = req.Query
|
|
}
|
|
return merged, nil
|
|
}
|
|
|
|
// unresponsiveResponse returns a zero-result response marking the engine as unresponsive.
|
|
func unresponsiveResponse(query, engine, reason string) contracts.SearchResponse {
|
|
return contracts.SearchResponse{
|
|
Query: query,
|
|
NumberOfResults: 0,
|
|
Results: []contracts.MainResult{},
|
|
Answers: []map[string]any{},
|
|
Corrections: []string{},
|
|
Infoboxes: []map[string]any{},
|
|
Suggestions: []string{},
|
|
UnresponsiveEngines: [][2]string{{engine, reason}},
|
|
}
|
|
}
|
|
|
|
// emptyResponse returns a valid empty response with stable empty slices.
|
|
func emptyResponse(query string) contracts.SearchResponse {
|
|
return contracts.SearchResponse{
|
|
Query: query,
|
|
NumberOfResults: 0,
|
|
Results: []contracts.MainResult{},
|
|
Answers: []map[string]any{},
|
|
Corrections: []string{},
|
|
Infoboxes: []map[string]any{},
|
|
Suggestions: []string{},
|
|
UnresponsiveEngines: [][2]string{},
|
|
}
|
|
}
|
|
|
|
func shouldFallbackToUpstream(engineName string, r contracts.SearchResponse) bool {
|
|
if engineName != "qwant" {
|
|
return false
|
|
}
|
|
return len(r.Results) == 0 && len(r.Answers) == 0 && len(r.Infoboxes) == 0
|
|
}
|