samsa/internal/search/service.go
Franz Kafka 9e95ce7b53 perf: shared http.Transport with tuned connection pooling
Add internal/httpclient package as a singleton RoundTripper used by
all outbound engine requests (search, engines, autocomplete, upstream).

Key Transport settings:
- MaxIdleConnsPerHost = 20  (up from Go default of 2)
- MaxIdleConns = 100
- IdleConnTimeout = 90s
- DialContext timeout = 5s

Previously, the default transport limited each host to 2 idle connections,
forcing a new TCP+TLS handshake on every search for each engine. With
12 engines hitting the same upstream hosts in parallel, connections
were constantly recycled. Now warm connections are reused across all
goroutines and requests.
2026-03-23 14:26:26 +00:00

227 lines
6.4 KiB
Go

// samsa — a privacy-respecting metasearch engine
// Copyright (C) 2026-present metamorphosis-dev
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.
package search
import (
"context"
"sync"
"time"
"github.com/metamorphosis-dev/samsa/internal/cache"
"github.com/metamorphosis-dev/samsa/internal/httpclient"
"github.com/metamorphosis-dev/samsa/internal/config"
"github.com/metamorphosis-dev/samsa/internal/contracts"
"github.com/metamorphosis-dev/samsa/internal/engines"
"github.com/metamorphosis-dev/samsa/internal/upstream"
)
type ServiceConfig struct {
UpstreamURL string
HTTPTimeout time.Duration
Cache *cache.Cache
EnginesConfig *config.Config
}
type Service struct {
upstreamClient *upstream.Client
planner *engines.Planner
localEngines map[string]engines.Engine
cache *cache.Cache
}
func NewService(cfg ServiceConfig) *Service {
timeout := cfg.HTTPTimeout
if timeout <= 0 {
timeout = 10 * time.Second
}
httpClient := httpclient.NewClient(timeout)
var up *upstream.Client
if cfg.UpstreamURL != "" {
c, err := upstream.NewClient(cfg.UpstreamURL, timeout)
if err == nil {
up = c
}
}
return &Service{
upstreamClient: up,
planner: engines.NewPlannerFromEnv(),
localEngines: engines.NewDefaultPortedEngines(httpClient, cfg.EnginesConfig),
cache: cfg.Cache,
}
}
// Search executes the request against local engines (in parallel) and
// optionally the upstream instance for unported engines.
//
// Individual engine failures are reported as unresponsive_engines rather
// than aborting the entire search.
//
// If a Valkey cache is configured and contains a cached response for this
// request, the cached result is returned without hitting any engines.
func (s *Service) Search(ctx context.Context, req SearchRequest) (SearchResponse, error) {
// Check cache first.
if s.cache != nil {
cacheKey := cache.Key(req)
if cached, hit := s.cache.Get(ctx, cacheKey); hit {
return cached, nil
}
}
merged, err := s.executeSearch(ctx, req)
if err != nil {
return SearchResponse{}, err
}
// Store in cache.
if s.cache != nil {
cacheKey := cache.Key(req)
s.cache.Set(ctx, cacheKey, merged)
}
return merged, nil
}
// executeSearch runs the actual engine queries and merges results.
func (s *Service) executeSearch(ctx context.Context, req SearchRequest) (SearchResponse, error) {
localEngineNames, upstreamEngineNames, _ := s.planner.Plan(req)
// Run all local engines concurrently.
type engineResult struct {
name string
resp contracts.SearchResponse
err error
}
localResults := make([]engineResult, 0, len(localEngineNames))
var wg sync.WaitGroup
var mu sync.Mutex
for _, name := range localEngineNames {
eng, ok := s.localEngines[name]
if !ok {
mu.Lock()
localResults = append(localResults, engineResult{
name: name,
resp: unresponsiveResponse(req.Query, name, "engine_not_registered"),
})
mu.Unlock()
continue
}
wg.Add(1)
go func(name string, eng engines.Engine) {
defer wg.Done()
r, err := eng.Search(ctx, req)
mu.Lock()
defer mu.Unlock()
if err != nil {
localResults = append(localResults, engineResult{
name: name,
resp: unresponsiveResponse(req.Query, name, err.Error()),
})
return
}
localResults = append(localResults, engineResult{name: name, resp: r})
}(name, eng)
}
wg.Wait()
// Collect successful responses and determine upstream fallbacks.
responses := make([]contracts.SearchResponse, 0, len(localResults)+1)
upstreamSet := map[string]bool{}
for _, e := range upstreamEngineNames {
upstreamSet[e] = true
}
for _, lr := range localResults {
responses = append(responses, lr.resp)
// If a local engine returned nothing (e.g. qwant anti-bot), fall back
// to upstream if available.
if shouldFallbackToUpstream(lr.name, lr.resp) && !upstreamSet[lr.name] {
upstreamEngineNames = append(upstreamEngineNames, lr.name)
upstreamSet[lr.name] = true
}
}
// Upstream proxy for unported (or fallback) engines.
if s.upstreamClient != nil && len(upstreamEngineNames) > 0 {
r, err := s.upstreamClient.SearchJSON(ctx, req, upstreamEngineNames)
if err != nil {
// Upstream failure is treated as a single unresponsive engine entry.
responses = append(responses, contracts.SearchResponse{
Query: req.Query,
UnresponsiveEngines: [][2]string{{"upstream", err.Error()}},
})
} else {
responses = append(responses, r)
}
}
if len(responses) == 0 {
return emptyResponse(req.Query), nil
}
merged := MergeResponses(responses)
if merged.Query == "" {
merged.Query = req.Query
}
return merged, nil
}
// unresponsiveResponse returns a zero-result response marking the engine as unresponsive.
func unresponsiveResponse(query, engine, reason string) contracts.SearchResponse {
return contracts.SearchResponse{
Query: query,
NumberOfResults: 0,
Results: []contracts.MainResult{},
Answers: []map[string]any{},
Corrections: []string{},
Infoboxes: []map[string]any{},
Suggestions: []string{},
UnresponsiveEngines: [][2]string{{engine, reason}},
}
}
// emptyResponse returns a valid empty response with stable empty slices.
func emptyResponse(query string) contracts.SearchResponse {
return contracts.SearchResponse{
Query: query,
NumberOfResults: 0,
Results: []contracts.MainResult{},
Answers: []map[string]any{},
Corrections: []string{},
Infoboxes: []map[string]any{},
Suggestions: []string{},
UnresponsiveEngines: [][2]string{},
}
}
func shouldFallbackToUpstream(engineName string, r contracts.SearchResponse) bool {
if engineName != "qwant" {
return false
}
return len(r.Results) == 0 && len(r.Answers) == 0 && len(r.Infoboxes) == 0
}