diff --git a/cmd/kafka/main.go b/cmd/kafka/main.go index ab29852..90c750d 100644 --- a/cmd/kafka/main.go +++ b/cmd/kafka/main.go @@ -53,9 +53,10 @@ func main() { } svc := search.NewService(search.ServiceConfig{ - UpstreamURL: cfg.Upstream.URL, - HTTPTimeout: cfg.HTTPTimeout(), - Cache: searchCache, + UpstreamURL: cfg.Upstream.URL, + HTTPTimeout: cfg.HTTPTimeout(), + Cache: searchCache, + EnginesConfig: cfg, }) acSvc := autocomplete.NewService(cfg.Upstream.URL, cfg.HTTPTimeout()) diff --git a/config.example.toml b/config.example.toml index 1e3b75c..34f60a6 100644 --- a/config.example.toml +++ b/config.example.toml @@ -22,7 +22,7 @@ url = "" [engines] # Comma-separated list of engines to execute locally in Go (env: LOCAL_PORTED_ENGINES) # Engines not listed here will be proxied to the upstream instance. -local_ported = ["wikipedia", "arxiv", "crossref", "braveapi", "qwant", "duckduckgo", "github", "reddit", "bing"] +local_ported = ["wikipedia", "arxiv", "crossref", "braveapi", "qwant", "duckduckgo", "github", "reddit", "bing", "google", "youtube"] [engines.brave] # Brave Search API key (env: BRAVE_API_KEY) @@ -35,6 +35,10 @@ access_token = "" category = "web-lite" results_per_page = 10 +[engines.youtube] +# YouTube Data API v3 key (env: YOUTUBE_API_KEY) +api_key = "" + [cache] # Valkey/Redis cache for search results. # Leave address empty to disable caching entirely. diff --git a/internal/config/config.go b/internal/config/config.go index 93b8d86..7f8b06a 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -35,6 +35,7 @@ type EnginesConfig struct { LocalPorted []string `toml:"local_ported"` Brave BraveConfig `toml:"brave"` Qwant QwantConfig `toml:"qwant"` + YouTube YouTubeConfig `toml:"youtube"` } // CacheConfig holds Valkey/Redis cache settings. @@ -85,6 +86,10 @@ type QwantConfig struct { ResultsPerPage int `toml:"results_per_page"` } +type YouTubeConfig struct { + APIKey string `toml:"api_key"` +} + // Load reads configuration from the given TOML file path. // If the file does not exist, it returns defaults (empty values where applicable). // Environment variables are used as fallbacks for any zero-value fields. @@ -109,7 +114,7 @@ func defaultConfig() *Config { }, Upstream: UpstreamConfig{}, Engines: EnginesConfig{ - LocalPorted: []string{"wikipedia", "arxiv", "crossref", "braveapi", "qwant", "duckduckgo", "github", "reddit", "bing"}, + LocalPorted: []string{"wikipedia", "arxiv", "crossref", "braveapi", "qwant", "duckduckgo", "github", "reddit", "bing", "google", "youtube"}, Qwant: QwantConfig{ Category: "web-lite", ResultsPerPage: 10, @@ -151,6 +156,9 @@ func applyEnvOverrides(cfg *Config) { if v := os.Getenv("BRAVE_ACCESS_TOKEN"); v != "" { cfg.Engines.Brave.AccessToken = v } + if v := os.Getenv("YOUTUBE_API_KEY"); v != "" { + cfg.Engines.YouTube.APIKey = v + } if v := os.Getenv("VALKEY_ADDRESS"); v != "" { cfg.Cache.Address = v } diff --git a/internal/engines/factory.go b/internal/engines/factory.go index 937225f..b7f3c00 100644 --- a/internal/engines/factory.go +++ b/internal/engines/factory.go @@ -4,23 +4,42 @@ import ( "net/http" "os" "time" + + "github.com/metamorphosis-dev/kafka/internal/config" ) // NewDefaultPortedEngines returns the starter set of Go-native engines. // The service can swap/extend this registry later as more engines are ported. -func NewDefaultPortedEngines(client *http.Client) map[string]Engine { +// If cfg is nil, falls back to reading API keys from environment variables. +func NewDefaultPortedEngines(client *http.Client, cfg *config.Config) map[string]Engine { if client == nil { client = &http.Client{Timeout: 10 * time.Second} } + var braveAPIKey, braveAccessToken, youtubeAPIKey string + if cfg != nil { + braveAPIKey = cfg.Engines.Brave.APIKey + braveAccessToken = cfg.Engines.Brave.AccessToken + youtubeAPIKey = cfg.Engines.YouTube.APIKey + } + if braveAPIKey == "" { + braveAPIKey = os.Getenv("BRAVE_API_KEY") + } + if braveAccessToken == "" { + braveAccessToken = os.Getenv("BRAVE_ACCESS_TOKEN") + } + if youtubeAPIKey == "" { + youtubeAPIKey = os.Getenv("YOUTUBE_API_KEY") + } + return map[string]Engine{ "wikipedia": &WikipediaEngine{client: client}, "arxiv": &ArxivEngine{client: client}, "crossref": &CrossrefEngine{client: client}, "braveapi": &BraveEngine{ client: client, - apiKey: os.Getenv("BRAVE_API_KEY"), - accessGateToken: os.Getenv("BRAVE_ACCESS_TOKEN"), + apiKey: braveAPIKey, + accessGateToken: braveAccessToken, resultsPerPage: 20, }, "qwant": &QwantEngine{ @@ -32,6 +51,11 @@ func NewDefaultPortedEngines(client *http.Client) map[string]Engine { "github": &GitHubEngine{client: client}, "reddit": &RedditEngine{client: client}, "bing": &BingEngine{client: client}, - "google": &GoogleEngine{client: client}, + "google": &GoogleEngine{client: client}, + "youtube": &YouTubeEngine{ + client: client, + apiKey: youtubeAPIKey, + baseURL: "https://www.googleapis.com", + }, } } diff --git a/internal/engines/planner.go b/internal/engines/planner.go index 24af031..b180f7e 100644 --- a/internal/engines/planner.go +++ b/internal/engines/planner.go @@ -7,7 +7,7 @@ import ( "github.com/metamorphosis-dev/kafka/internal/contracts" ) -var defaultPortedEngines = []string{"wikipedia", "arxiv", "crossref", "braveapi", "qwant", "duckduckgo", "github", "reddit", "bing"} +var defaultPortedEngines = []string{"wikipedia", "arxiv", "crossref", "braveapi", "qwant", "duckduckgo", "github", "reddit", "bing", "google", "youtube"} type Planner struct { PortedSet map[string]bool @@ -99,6 +99,8 @@ func inferFromCategories(categories []string) []string { set["github"] = true case "social media": set["reddit"] = true + case "videos": + set["youtube"] = true } } @@ -107,7 +109,7 @@ func inferFromCategories(categories []string) []string { out = append(out, e) } // stable order - order := map[string]int{"wikipedia": 0, "braveapi": 1, "qwant": 2, "duckduckgo": 3, "bing": 4, "google": 5, "arxiv": 6, "crossref": 7, "github": 8, "reddit": 9} + order := map[string]int{"wikipedia": 0, "braveapi": 1, "qwant": 2, "duckduckgo": 3, "bing": 4, "google": 5, "arxiv": 6, "crossref": 7, "github": 8, "reddit": 9, "youtube": 10} sortByOrder(out, order) return out } diff --git a/internal/engines/youtube.go b/internal/engines/youtube.go new file mode 100644 index 0000000..7580a09 --- /dev/null +++ b/internal/engines/youtube.go @@ -0,0 +1,182 @@ +package engines + +import ( + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "net/url" + "os" + "strings" + "time" + + "github.com/metamorphosis-dev/kafka/internal/contracts" +) + +type YouTubeEngine struct { + client *http.Client + apiKey string + baseURL string +} + +func (e *YouTubeEngine) Name() string { return "youtube" } + +func (e *YouTubeEngine) Search(ctx context.Context, req contracts.SearchRequest) (contracts.SearchResponse, error) { + if strings.TrimSpace(req.Query) == "" { + return contracts.SearchResponse{Query: req.Query}, nil + } + + if e.apiKey == "" { + e.apiKey = os.Getenv("YOUTUBE_API_KEY") + } + + maxResults := 10 + if req.Pageno > 1 { + maxResults = 20 + } + + u := e.baseURL + "/youtube/v3/search?" + url.Values{ + "part": {"snippet"}, + "q": {req.Query}, + "type": {"video"}, + "maxResults": {fmt.Sprintf("%d", maxResults)}, + "key": {e.apiKey}, + }.Encode() + + if req.Language != "" && req.Language != "auto" { + lang := strings.Split(strings.ToLower(req.Language), "-")[0] + u += "&relevanceLanguage=" + lang + } + + httpReq, err := http.NewRequestWithContext(ctx, http.MethodGet, u, nil) + if err != nil { + return contracts.SearchResponse{}, err + } + + resp, err := e.client.Do(httpReq) + if err != nil { + return contracts.SearchResponse{}, err + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(io.LimitReader(resp.Body, 4096)) + return contracts.SearchResponse{}, fmt.Errorf("youtube api error: status=%d body=%q", resp.StatusCode, string(body)) + } + + var apiResp youtubeSearchResponse + if err := json.NewDecoder(resp.Body).Decode(&apiResp); err != nil { + return contracts.SearchResponse{}, err + } + + if apiResp.Error != nil { + return contracts.SearchResponse{}, fmt.Errorf("youtube api error: %s", apiResp.Error.Message) + } + + results := make([]contracts.MainResult, 0, len(apiResp.Items)) + for _, item := range apiResp.Items { + if item.ID.VideoID == "" { + continue + } + + videoURL := "https://www.youtube.com/watch?v=" + item.ID.VideoID + urlPtr := videoURL + + published := "" + if item.Snippet.PublishedAt != "" { + if t, err := time.Parse(time.RFC3339, item.Snippet.PublishedAt); err == nil { + published = t.Format("Jan 2, 2006") + } + } + + content := item.Snippet.Description + if len(content) > 300 { + content = content[:300] + "..." + } + if published != "" { + content = "Published " + published + " ยท " + content + } + + thumbnail := "" + if item.Snippet.Thumbnails.High.URL != "" { + thumbnail = item.Snippet.Thumbnails.High.URL + } else if item.Snippet.Thumbnails.Medium.URL != "" { + thumbnail = item.Snippet.Thumbnails.Medium.URL + } + + results = append(results, contracts.MainResult{ + Template: "videos.html", + Title: item.Snippet.Title, + URL: &urlPtr, + Content: content, + Thumbnail: thumbnail, + Engine: "youtube", + Score: 1.0, + Category: "videos", + Engines: []string{"youtube"}, + Metadata: map[string]any{ + "channel": item.Snippet.ChannelTitle, + "video_id": item.Snippet.ResourceID.VideoID, + }, + }) + } + + return contracts.SearchResponse{ + Query: req.Query, + NumberOfResults: len(results), + Results: results, + Answers: []map[string]any{}, + Corrections: []string{}, + Infoboxes: []map[string]any{}, + Suggestions: []string{}, + UnresponsiveEngines: [][2]string{}, + }, nil +} + +// YouTube API response types. + +type youtubeSearchResponse struct { + Items []youtubeSearchItem `json:"items"` + PageInfo struct { + TotalResults int `json:"totalResults"` + ResultsPerPage int `json:"resultsPerPage"` + } `json:"pageInfo"` + NextPageToken string `json:"nextPageToken"` + Error *struct { + Code int `json:"code"` + Message string `json:"message"` + Errors []struct { + Domain string `json:"domain"` + Reason string `json:"reason"` + Message string `json:"message"` + } `json:"errors"` + } `json:"error"` +} + +type youtubeSearchItem struct { + ID struct { + VideoID string `json:"videoId"` + } `json:"id"` + Snippet struct { + PublishedAt string `json:"publishedAt"` + ChannelID string `json:"channelId"` + ChannelTitle string `json:"channelTitle"` + Title string `json:"title"` + Description string `json:"description"` + Thumbnails struct { + Default struct { + URL string `json:"url"` + } `json:"default"` + Medium struct { + URL string `json:"url"` + } `json:"medium"` + High struct { + URL string `json:"url"` + } `json:"high"` + } `json:"thumbnails"` + ResourceID struct { + VideoID string `json:"videoId"` + } `json:"resourceId"` + } `json:"snippet"` +} diff --git a/internal/search/service.go b/internal/search/service.go index 62a9308..47d2895 100644 --- a/internal/search/service.go +++ b/internal/search/service.go @@ -7,15 +7,17 @@ import ( "time" "github.com/metamorphosis-dev/kafka/internal/cache" + "github.com/metamorphosis-dev/kafka/internal/config" "github.com/metamorphosis-dev/kafka/internal/contracts" "github.com/metamorphosis-dev/kafka/internal/engines" "github.com/metamorphosis-dev/kafka/internal/upstream" ) type ServiceConfig struct { - UpstreamURL string - HTTPTimeout time.Duration - Cache *cache.Cache + UpstreamURL string + HTTPTimeout time.Duration + Cache *cache.Cache + EnginesConfig *config.Config } type Service struct { @@ -44,7 +46,7 @@ func NewService(cfg ServiceConfig) *Service { return &Service{ upstreamClient: up, planner: engines.NewPlannerFromEnv(), - localEngines: engines.NewDefaultPortedEngines(httpClient), + localEngines: engines.NewDefaultPortedEngines(httpClient, cfg.EnginesConfig), cache: cfg.Cache, } }