Update LICENSE file and add AGPL header to all source files. AGPLv3 ensures that if someone runs Kafka as a network service and modifies it, they must release their source code under the same license.
178 lines
4.8 KiB
Go
178 lines
4.8 KiB
Go
// kafka — a privacy-respecting metasearch engine
|
|
// Copyright (C) 2026-present metamorphosis-dev
|
|
//
|
|
// This program is free software: you can redistribute it and/or modify
|
|
// it under the terms of the GNU Affero General Public License as published by
|
|
// the Free Software Foundation, either version 3 of the License, or
|
|
// (at your option) any later version.
|
|
//
|
|
// This program is distributed in the hope that it will be useful,
|
|
// but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
// GNU Affero General Public License for more details.
|
|
//
|
|
// You should have received a copy of the GNU Affero General Public License
|
|
// along with this program. If not, see <https://www.gnu.org/licenses/>.
|
|
|
|
package cache
|
|
|
|
import (
|
|
"context"
|
|
"crypto/sha256"
|
|
"encoding/hex"
|
|
"encoding/json"
|
|
"fmt"
|
|
"log/slog"
|
|
"time"
|
|
|
|
"github.com/metamorphosis-dev/kafka/internal/contracts"
|
|
"github.com/redis/go-redis/v9"
|
|
)
|
|
|
|
// Config holds Valkey/Redis connection settings.
|
|
type Config struct {
|
|
// Address is the Valkey server address (e.g. "localhost:6379").
|
|
Address string
|
|
// Password for authentication (empty = no auth).
|
|
Password string
|
|
// Database index (default 0).
|
|
DB int
|
|
// Default TTL for cached search results.
|
|
DefaultTTL time.Duration
|
|
}
|
|
|
|
// Cache provides a Valkey-backed cache for search responses.
|
|
// It is safe for concurrent use.
|
|
// If the Valkey connection is nil or fails, cache operations are no-ops.
|
|
type Cache struct {
|
|
client *redis.Client
|
|
ttl time.Duration
|
|
logger *slog.Logger
|
|
}
|
|
|
|
// New creates a new Cache. If cfg.Address is empty, returns a no-op cache.
|
|
func New(cfg Config, logger *slog.Logger) *Cache {
|
|
if logger == nil {
|
|
logger = slog.Default()
|
|
}
|
|
|
|
if cfg.Address == "" {
|
|
logger.Debug("cache disabled: no valkey address configured")
|
|
return &Cache{logger: logger}
|
|
}
|
|
|
|
ttl := cfg.DefaultTTL
|
|
if ttl <= 0 {
|
|
ttl = 5 * time.Minute
|
|
}
|
|
|
|
client := redis.NewClient(&redis.Options{
|
|
Addr: cfg.Address,
|
|
Password: cfg.Password,
|
|
DB: cfg.DB,
|
|
})
|
|
|
|
// Verify connectivity with a short timeout.
|
|
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
|
|
defer cancel()
|
|
|
|
if err := client.Ping(ctx).Err(); err != nil {
|
|
logger.Warn("cache disabled: valkey ping failed", "addr", cfg.Address, "error", err)
|
|
return &Cache{logger: logger}
|
|
}
|
|
|
|
logger.Info("cache connected", "addr", cfg.Address, "db", cfg.DB, "ttl", ttl)
|
|
return &Cache{client: client, ttl: ttl, logger: logger}
|
|
}
|
|
|
|
// Enabled returns true if the cache has a live Valkey connection.
|
|
func (c *Cache) Enabled() bool {
|
|
return c.client != nil
|
|
}
|
|
|
|
// Get retrieves a cached search response. Returns (response, true) on hit,
|
|
// (zero, false) on miss or error.
|
|
func (c *Cache) Get(ctx context.Context, key string) (contracts.SearchResponse, bool) {
|
|
if !c.Enabled() {
|
|
return contracts.SearchResponse{}, false
|
|
}
|
|
|
|
fullKey := "kafka:" + key
|
|
|
|
data, err := c.client.Get(ctx, fullKey).Bytes()
|
|
if err != nil {
|
|
if err != redis.Nil {
|
|
c.logger.Debug("cache miss (error)", "key", fullKey, "error", err)
|
|
}
|
|
return contracts.SearchResponse{}, false
|
|
}
|
|
|
|
var resp contracts.SearchResponse
|
|
if err := json.Unmarshal(data, &resp); err != nil {
|
|
c.logger.Warn("cache hit but unmarshal failed", "key", fullKey, "error", err)
|
|
return contracts.SearchResponse{}, false
|
|
}
|
|
|
|
c.logger.Debug("cache hit", "key", fullKey)
|
|
return resp, true
|
|
}
|
|
|
|
// Set stores a search response in the cache with the default TTL.
|
|
func (c *Cache) Set(ctx context.Context, key string, resp contracts.SearchResponse) {
|
|
if !c.Enabled() {
|
|
return
|
|
}
|
|
|
|
data, err := json.Marshal(resp)
|
|
if err != nil {
|
|
c.logger.Warn("cache set: marshal failed", "key", key, "error", err)
|
|
return
|
|
}
|
|
|
|
fullKey := "kafka:" + key
|
|
if err := c.client.Set(ctx, fullKey, data, c.ttl).Err(); err != nil {
|
|
c.logger.Warn("cache set failed", "key", fullKey, "error", err)
|
|
}
|
|
}
|
|
|
|
// Invalidate removes a specific key from the cache.
|
|
func (c *Cache) Invalidate(ctx context.Context, key string) {
|
|
if !c.Enabled() {
|
|
return
|
|
}
|
|
fullKey := "kafka:" + key
|
|
c.client.Del(ctx, fullKey)
|
|
}
|
|
|
|
// Close closes the Valkey connection.
|
|
func (c *Cache) Close() error {
|
|
if c.client == nil {
|
|
return nil
|
|
}
|
|
return c.client.Close()
|
|
}
|
|
|
|
// Key generates a deterministic cache key from search parameters.
|
|
// The key is a SHA-256 hash of the normalized parameters, prefixed for readability.
|
|
func Key(req contracts.SearchRequest) string {
|
|
h := sha256.New()
|
|
|
|
fmt.Fprintf(h, "q=%s|", req.Query)
|
|
fmt.Fprintf(h, "format=%s|", req.Format)
|
|
fmt.Fprintf(h, "pageno=%d|", req.Pageno)
|
|
fmt.Fprintf(h, "safesearch=%d|", req.Safesearch)
|
|
fmt.Fprintf(h, "lang=%s|", req.Language)
|
|
|
|
if req.TimeRange != nil {
|
|
fmt.Fprintf(h, "tr=%s|", *req.TimeRange)
|
|
}
|
|
|
|
for _, e := range req.Engines {
|
|
fmt.Fprintf(h, "e=%s|", e)
|
|
}
|
|
for _, cat := range req.Categories {
|
|
fmt.Fprintf(h, "c=%s|", cat)
|
|
}
|
|
|
|
return hex.EncodeToString(h.Sum(nil))[:32]
|
|
}
|