kafka/internal/config/config.go
Franz Kafka a7f594b7fa feat: add YouTube engine with config file and env support
YouTube Data API v3 engine:
- Add YouTubeConfig to EnginesConfig with api_key field
- Add YOUTUBE_API_KEY env override
- Thread *config.Config through search service to factory
- Factory falls back to env vars if config fields are empty
- Update config.example.toml with youtube section

Also update default local_ported to include google and youtube.
2026-03-22 01:57:13 +00:00

283 lines
8.5 KiB
Go

package config
import (
"fmt"
"os"
"strings"
"time"
"github.com/BurntSushi/toml"
)
// Config is the top-level configuration for the kafka service.
type Config struct {
Server ServerConfig `toml:"server"`
Upstream UpstreamConfig `toml:"upstream"`
Engines EnginesConfig `toml:"engines"`
Cache CacheConfig `toml:"cache"`
CORS CORSConfig `toml:"cors"`
RateLimit RateLimitConfig `toml:"rate_limit"`
GlobalRateLimit GlobalRateLimitConfig `toml:"global_rate_limit"`
BurstRateLimit BurstRateLimitConfig `toml:"burst_rate_limit"`
}
type ServerConfig struct {
Port int `toml:"port"`
HTTPTimeout string `toml:"http_timeout"`
BaseURL string `toml:"base_url"` // Public URL for OpenSearch XML (e.g. "https://search.example.com")
}
type UpstreamConfig struct {
URL string `toml:"url"`
}
type EnginesConfig struct {
LocalPorted []string `toml:"local_ported"`
Brave BraveConfig `toml:"brave"`
Qwant QwantConfig `toml:"qwant"`
YouTube YouTubeConfig `toml:"youtube"`
}
// CacheConfig holds Valkey/Redis cache settings.
type CacheConfig struct {
Address string `toml:"address"` // Valkey server address (e.g. "localhost:6379")
Password string `toml:"password"` // Auth password (empty = none)
DB int `toml:"db"` // Database index (default 0)
DefaultTTL string `toml:"default_ttl"` // Cache TTL (e.g. "5m", default "5m")
}
// CORSConfig holds CORS middleware settings.
type CORSConfig struct {
AllowedOrigins []string `toml:"allowed_origins"`
AllowedMethods []string `toml:"allowed_methods"`
AllowedHeaders []string `toml:"allowed_headers"`
ExposedHeaders []string `toml:"exposed_headers"`
MaxAge int `toml:"max_age"`
}
// RateLimitConfig holds per-IP rate limiting settings.
type RateLimitConfig struct {
Requests int `toml:"requests"` // Max requests per window (default: 30)
Window string `toml:"window"` // Time window (e.g. "1m", default: "1m")
CleanupInterval string `toml:"cleanup_interval"` // Stale entry cleanup interval (default: "5m")
}
// GlobalRateLimitConfig holds server-wide rate limiting settings.
type GlobalRateLimitConfig struct {
Requests int `toml:"requests"` // Max total requests per window across all IPs (0 = disabled)
Window string `toml:"window"` // Time window (e.g. "1m", default: "1m")
}
// BurstRateLimitConfig holds per-IP burst rate limiting settings.
type BurstRateLimitConfig struct {
Burst int `toml:"burst"` // Max requests in burst window (0 = disabled)
BurstWindow string `toml:"burst_window"` // Burst window (default: "5s")
Sustained int `toml:"sustained"` // Max requests in sustained window
SustainedWindow string `toml:"sustained_window"` // Sustained window (default: "1m")
}
type BraveConfig struct {
APIKey string `toml:"api_key"`
AccessToken string `toml:"access_token"`
}
type QwantConfig struct {
Category string `toml:"category"`
ResultsPerPage int `toml:"results_per_page"`
}
type YouTubeConfig struct {
APIKey string `toml:"api_key"`
}
// Load reads configuration from the given TOML file path.
// If the file does not exist, it returns defaults (empty values where applicable).
// Environment variables are used as fallbacks for any zero-value fields.
func Load(path string) (*Config, error) {
cfg := defaultConfig()
if _, err := os.Stat(path); err == nil {
if _, err := toml.DecodeFile(path, &cfg); err != nil {
return nil, fmt.Errorf("parse config %s: %w", path, err)
}
}
applyEnvOverrides(cfg)
return cfg, nil
}
func defaultConfig() *Config {
return &Config{
Server: ServerConfig{
Port: 8080,
HTTPTimeout: "10s",
},
Upstream: UpstreamConfig{},
Engines: EnginesConfig{
LocalPorted: []string{"wikipedia", "arxiv", "crossref", "braveapi", "qwant", "duckduckgo", "github", "reddit", "bing", "google", "youtube"},
Qwant: QwantConfig{
Category: "web-lite",
ResultsPerPage: 10,
},
},
Cache: CacheConfig{
DB: 0,
DefaultTTL: "5m",
},
RateLimit: RateLimitConfig{
Window: "1m",
CleanupInterval: "5m",
},
}
}
// applyEnvOverrides fills any zero-value fields from environment variables.
// This preserves backward compatibility: existing deployments using env vars
// continue to work without a config file.
func applyEnvOverrides(cfg *Config) {
if v := os.Getenv("PORT"); v != "" {
fmt.Sscanf(v, "%d", &cfg.Server.Port)
}
if v := os.Getenv("HTTP_TIMEOUT"); v != "" {
cfg.Server.HTTPTimeout = v
}
if v := os.Getenv("UPSTREAM_SEARXNG_URL"); v != "" {
cfg.Upstream.URL = v
}
if v := os.Getenv("LOCAL_PORTED_ENGINES"); v != "" {
parts := splitCSV(v)
if len(parts) > 0 {
cfg.Engines.LocalPorted = parts
}
}
if v := os.Getenv("BRAVE_API_KEY"); v != "" {
cfg.Engines.Brave.APIKey = v
}
if v := os.Getenv("BRAVE_ACCESS_TOKEN"); v != "" {
cfg.Engines.Brave.AccessToken = v
}
if v := os.Getenv("YOUTUBE_API_KEY"); v != "" {
cfg.Engines.YouTube.APIKey = v
}
if v := os.Getenv("VALKEY_ADDRESS"); v != "" {
cfg.Cache.Address = v
}
if v := os.Getenv("VALKEY_PASSWORD"); v != "" {
cfg.Cache.Password = v
}
if v := os.Getenv("VALKEY_DB"); v != "" {
fmt.Sscanf(v, "%d", &cfg.Cache.DB)
}
if v := os.Getenv("VALKEY_CACHE_TTL"); v != "" {
cfg.Cache.DefaultTTL = v
}
if v := os.Getenv("CORS_ALLOWED_ORIGINS"); v != "" {
cfg.CORS.AllowedOrigins = splitCSV(v)
}
if v := os.Getenv("RATE_LIMIT_REQUESTS"); v != "" {
fmt.Sscanf(v, "%d", &cfg.RateLimit.Requests)
}
if v := os.Getenv("RATE_LIMIT_WINDOW"); v != "" {
cfg.RateLimit.Window = v
}
if v := os.Getenv("RATE_LIMIT_CLEANUP_INTERVAL"); v != "" {
cfg.RateLimit.CleanupInterval = v
}
if v := os.Getenv("GLOBAL_RATE_LIMIT_REQUESTS"); v != "" {
fmt.Sscanf(v, "%d", &cfg.GlobalRateLimit.Requests)
}
if v := os.Getenv("GLOBAL_RATE_LIMIT_WINDOW"); v != "" {
cfg.GlobalRateLimit.Window = v
}
if v := os.Getenv("BURST_RATE_LIMIT_BURST"); v != "" {
fmt.Sscanf(v, "%d", &cfg.BurstRateLimit.Burst)
}
if v := os.Getenv("BURST_RATE_LIMIT_BURST_WINDOW"); v != "" {
cfg.BurstRateLimit.BurstWindow = v
}
if v := os.Getenv("BURST_RATE_LIMIT_SUSTAINED"); v != "" {
fmt.Sscanf(v, "%d", &cfg.BurstRateLimit.Sustained)
}
if v := os.Getenv("BURST_RATE_LIMIT_SUSTAINED_WINDOW"); v != "" {
cfg.BurstRateLimit.SustainedWindow = v
}
if v := os.Getenv("BASE_URL"); v != "" {
cfg.Server.BaseURL = v
}
}
// HTTPTimeout parses the configured timeout string into a time.Duration.
func (c *Config) HTTPTimeout() time.Duration {
if d, err := time.ParseDuration(c.Server.HTTPTimeout); err == nil && d > 0 {
return d
}
return 10 * time.Second
}
// LocalPortedCSV returns the local ported engines as a comma-separated string.
func (c *Config) LocalPortedCSV() string {
return strings.Join(c.Engines.LocalPorted, ",")
}
// CacheTTL parses the configured cache TTL string into a time.Duration.
func (c *Config) CacheTTL() time.Duration {
if d, err := time.ParseDuration(c.Cache.DefaultTTL); err == nil && d > 0 {
return d
}
return 5 * time.Minute
}
// RateLimitWindow parses the rate limit window into a time.Duration.
func (c *Config) RateLimitWindow() time.Duration {
if d, err := time.ParseDuration(c.RateLimit.Window); err == nil && d > 0 {
return d
}
return time.Minute
}
// RateLimitCleanupInterval parses the cleanup interval into a time.Duration.
func (c *Config) RateLimitCleanupInterval() time.Duration {
if d, err := time.ParseDuration(c.RateLimit.CleanupInterval); err == nil && d > 0 {
return d
}
return 5 * time.Minute
}
// GlobalRateLimitWindow parses the global rate limit window into a time.Duration.
func (c *Config) GlobalRateLimitWindow() time.Duration {
if d, err := time.ParseDuration(c.GlobalRateLimit.Window); err == nil && d > 0 {
return d
}
return time.Minute
}
// BurstWindow parses the burst window into a time.Duration.
func (c *Config) BurstWindow() time.Duration {
if d, err := time.ParseDuration(c.BurstRateLimit.BurstWindow); err == nil && d > 0 {
return d
}
return 5 * time.Second
}
// SustainedWindow parses the sustained window into a time.Duration.
func (c *Config) SustainedWindow() time.Duration {
if d, err := time.ParseDuration(c.BurstRateLimit.SustainedWindow); err == nil && d > 0 {
return d
}
return time.Minute
}
func splitCSV(s string) []string {
if s == "" {
return nil
}
parts := strings.Split(s, ",")
out := make([]string, 0, len(parts))
for _, p := range parts {
p = strings.TrimSpace(p)
if p != "" {
out = append(out, p)
}
}
return out
}