kafka/cmd/kafka/main.go
Franz Kafka 41b80a939a feat: add YouTube engine with config file and env support
YouTube Data API v3 engine:
- Add YouTubeConfig to EnginesConfig with api_key field
- Add YOUTUBE_API_KEY env override
- Thread *config.Config through search service to factory
- Factory falls back to env vars if config fields are empty
- Update config.example.toml with youtube section

Also update default local_ported to include google and youtube.
2026-03-22 01:57:13 +00:00

113 lines
3.5 KiB
Go

package main
import (
"flag"
"fmt"
"io/fs"
"log"
"log/slog"
"net/http"
"os"
"github.com/metamorphosis-dev/kafka/internal/autocomplete"
"github.com/metamorphosis-dev/kafka/internal/cache"
"github.com/metamorphosis-dev/kafka/internal/config"
"github.com/metamorphosis-dev/kafka/internal/httpapi"
"github.com/metamorphosis-dev/kafka/internal/middleware"
"github.com/metamorphosis-dev/kafka/internal/search"
"github.com/metamorphosis-dev/kafka/internal/views"
)
func main() {
configPath := flag.String("config", "config.toml", "path to config.toml")
flag.Parse()
// Initialize structured logging.
logger := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelInfo}))
slog.SetDefault(logger)
cfg, err := config.Load(*configPath)
if err != nil {
log.Fatalf("failed to load config: %v", err)
}
// Initialize Valkey cache.
searchCache := cache.New(cache.Config{
Address: cfg.Cache.Address,
Password: cfg.Cache.Password,
DB: cfg.Cache.DB,
DefaultTTL: cfg.CacheTTL(),
}, logger)
defer searchCache.Close()
// Seed env vars from config so existing engine/factory/planner code
// picks them up without changes.
if len(cfg.Engines.LocalPorted) > 0 {
os.Setenv("LOCAL_PORTED_ENGINES", cfg.LocalPortedCSV())
}
if cfg.Engines.Brave.APIKey != "" {
os.Setenv("BRAVE_API_KEY", cfg.Engines.Brave.APIKey)
}
if cfg.Engines.Brave.AccessToken != "" {
os.Setenv("BRAVE_ACCESS_TOKEN", cfg.Engines.Brave.AccessToken)
}
svc := search.NewService(search.ServiceConfig{
UpstreamURL: cfg.Upstream.URL,
HTTPTimeout: cfg.HTTPTimeout(),
Cache: searchCache,
EnginesConfig: cfg,
})
acSvc := autocomplete.NewService(cfg.Upstream.URL, cfg.HTTPTimeout())
h := httpapi.NewHandler(svc, acSvc.Suggestions)
mux := http.NewServeMux()
mux.HandleFunc("/", h.Index)
mux.HandleFunc("/healthz", h.Healthz)
mux.HandleFunc("/search", h.Search)
mux.HandleFunc("/autocompleter", h.Autocompleter)
mux.HandleFunc("/opensearch.xml", h.OpenSearch(cfg.Server.BaseURL))
// Serve embedded static files (CSS, JS, images).
staticFS, err := views.StaticFS()
if err != nil {
log.Fatalf("failed to load static files: %v", err)
}
var subFS fs.FS = staticFS
mux.Handle("/static/", http.StripPrefix("/static/", http.FileServer(http.FS(subFS))))
// Apply middleware: global rate limit → burst rate limit → per-IP rate limit → CORS → handler.
var handler http.Handler = mux
handler = middleware.CORS(middleware.CORSConfig{
AllowedOrigins: cfg.CORS.AllowedOrigins,
AllowedMethods: cfg.CORS.AllowedMethods,
AllowedHeaders: cfg.CORS.AllowedHeaders,
ExposedHeaders: cfg.CORS.ExposedHeaders,
MaxAge: cfg.CORS.MaxAge,
})(handler)
handler = middleware.RateLimit(middleware.RateLimitConfig{
Requests: cfg.RateLimit.Requests,
Window: cfg.RateLimitWindow(),
CleanupInterval: cfg.RateLimitCleanupInterval(),
}, logger)(handler)
handler = middleware.GlobalRateLimit(middleware.GlobalRateLimitConfig{
Requests: cfg.GlobalRateLimit.Requests,
Window: cfg.GlobalRateLimitWindow(),
}, logger)(handler)
handler = middleware.BurstRateLimit(middleware.BurstRateLimitConfig{
Burst: cfg.BurstRateLimit.Burst,
BurstWindow: cfg.BurstWindow(),
Sustained: cfg.BurstRateLimit.Sustained,
SustainedWindow: cfg.SustainedWindow(),
}, logger)(handler)
addr := fmt.Sprintf(":%d", cfg.Server.Port)
logger.Info("kafka starting",
"addr", addr,
"cache", searchCache.Enabled(),
"rate_limit", cfg.RateLimit.Requests > 0,
)
log.Fatal(http.ListenAndServe(addr, handler))
}