diff --git a/config.yaml b/config.yaml new file mode 100644 index 0000000..9025674 --- /dev/null +++ b/config.yaml @@ -0,0 +1,2 @@ +port: 8080 +upstream_url: "https://api.z.ai/api/anthropic" diff --git a/converter.go b/converter.go new file mode 100644 index 0000000..c0366f1 --- /dev/null +++ b/converter.go @@ -0,0 +1,227 @@ +package main + +import ( + "encoding/json" + "strings" +) + +// ConvertOpenAIRequest converts an OpenAI ChatCompletionRequest to Anthropic format +func ConvertOpenAIRequest(req *ChatCompletionRequest) *AnthropicRequest { + system, remainingMessages := extractSystemMessage(req.Messages) + + anthropicReq := &AnthropicRequest{ + Model: req.Model, + Messages: convertMessages(remainingMessages), + System: system, + MaxTokens: 32000, + } + + if req.MaxTokens != nil { + anthropicReq.MaxTokens = *req.MaxTokens + } + if req.Stream != nil { + anthropicReq.Stream = *req.Stream + } + if req.Temperature != nil { + anthropicReq.Temperature = req.Temperature + } + if req.TopP != nil { + anthropicReq.TopP = req.TopP + } + if len(req.Stop) > 0 { + anthropicReq.StopSequences = req.Stop + } + if len(req.Tools) > 0 { + anthropicReq.Tools = convertTools(req.Tools) + } + if req.ToolChoices != nil { + anthropicReq.ToolChoice = convertToolChoice(req.ToolChoices) + } + + return anthropicReq +} + +// extractSystemMessage pulls role="system" messages and joins them +func extractSystemMessage(messages []Message) (string, []Message) { + var systemParts []string + var rest []Message + + for _, msg := range messages { + if msg.Role == "system" { + if content, ok := msg.Content.(string); ok { + systemParts = append(systemParts, content) + } + } else { + rest = append(rest, msg) + } + } + + return strings.Join(systemParts, "\n"), rest +} + +// convertMessages converts OpenAI messages to Anthropic content blocks +func convertMessages(messages []Message) []AnthropicMessage { + var result []AnthropicMessage + + for _, msg := range messages { + anthropicMsg := AnthropicMessage{ + Role: msg.Role, + } + + switch content := msg.Content.(type) { + case string: + anthropicMsg.Content = content + case []interface{}: + var blocks []ContentBlock + for _, part := range content { + partMap, ok := part.(map[string]interface{}) + if !ok { + continue + } + partType, _ := partMap["type"].(string) + if partType == "text" { + text, _ := partMap["text"].(string) + blocks = append(blocks, ContentBlock{Type: "text", Text: text}) + } + // Image parts: skip for now, Anthropic uses different format + } + anthropicMsg.Content = blocks + } + + result = append(result, anthropicMsg) + } + + return result +} + +// convertTools converts OpenAI function tools to Anthropic tool format +func convertTools(tools []Tool) []AnthropicTool { + var result []AnthropicTool + for _, tool := range tools { + anthropicTool := AnthropicTool{ + Name: tool.Function.Name, + Description: tool.Function.Description, + InputSchema: tool.Function.Parameters, + } + result = append(result, anthropicTool) + } + return result +} + +// convertToolChoice converts OpenAI tool_choice to Anthropic format +func convertToolChoice(tc *ToolChoice) *AnthropicToolChoice { + if tc == nil { + return nil + } + result := &AnthropicToolChoice{} + if tc.Type != "" { + if tc.Type == "required" { + result.Type = "any" + } else { + result.Type = tc.Type + } + } + if tc.Function != nil { + result.Name = tc.Function.Name + } + return result +} + +// ConvertAnthropicResponse converts an Anthropic response to OpenAI format +func ConvertAnthropicResponse(resp *AnthropicResponse, model string) *ChatCompletionResponse { + response := &ChatCompletionResponse{ + ID: resp.Id, + Object: "chat.completion", + Created: 1234567890, + Model: model, + Choices: make([]Choice, 0), + Usage: Usage{ + PromptTokens: resp.Usage.InputTokens, + CompletionTokens: resp.Usage.OutputTokens, + TotalTokens: resp.Usage.InputTokens + resp.Usage.OutputTokens, + }, + } + + if len(resp.Content) == 0 { + response.Choices = append(response.Choices, Choice{ + Index: 0, + Message: Message{Role: "assistant", Content: ""}, + FinishReason: mapStopReason(resp.StopReason), + }) + return response + } + + // First pass: collect text and tool calls + var textContent string + var toolCalls []ToolCall + + for _, block := range resp.Content { + if block.Type == "text" { + textContent += block.Text + } else if block.Type == "tool_use" { + // Serialize the input back to JSON + inputJSON, _ := json.Marshal(block.Input) + toolCalls = append(toolCalls, ToolCall{ + ID: block.Id, + Type: "function", + Function: FunctionCall{ + Name: block.Name, + Arguments: string(inputJSON), + }, + }) + } + } + + if len(toolCalls) > 0 { + response.Choices = append(response.Choices, Choice{ + Index: 0, + Message: Message{ + Role: "assistant", + Content: textContent, + ToolCalls: toolCalls, + }, + FinishReason: mapStopReason(resp.StopReason), + }) + } else { + response.Choices = append(response.Choices, Choice{ + Index: 0, + Message: Message{Role: "assistant", Content: textContent}, + FinishReason: mapStopReason(resp.StopReason), + }) + } + + return response +} + +// mapStopReason maps Anthropic stop reasons to OpenAI finish reasons +func mapStopReason(reason string) string { + switch reason { + case "end_turn": + return "stop" + case "tool_use": + return "tool_calls" + case "max_tokens": + return "length" + default: + return "stop" + } +} + +// buildToolCalls builds OpenAI ToolCall slice from Anthropic ContentBlocks +func buildToolCalls(content []ContentBlock) []ToolCall { + var toolCalls []ToolCall + for _, block := range content { + if block.Type == "tool_use" { + inputJSON, _ := json.Marshal(block.Input) + toolCalls = append(toolCalls, ToolCall{ + ID: block.Id, + Type: "function", + Function: FunctionCall{ + Name: block.Name, + Arguments: string(inputJSON), + }, + }) + } + } + return toolCalls +} diff --git a/docs/superpowers/PLAN.md b/docs/superpowers/PLAN.md new file mode 100644 index 0000000..7ae7ed1 --- /dev/null +++ b/docs/superpowers/PLAN.md @@ -0,0 +1,68 @@ +# proxx Implementation Plan + +## Goal +A working Go binary that proxies OpenAI-format requests to an Anthropic endpoint, with full streaming and non-streaming support. + +## Approach +Straightforward per the spec. Go 1.21+ with only `gopkg.in/yaml.v3` as external dependency. Standard library `net/http` for everything HTTP-related. + +## Steps + +### Phase 1: Project Scaffolding +- [ ] Initialize `go mod init github.com/penal-colony/proxx` +- [ ] Create directory structure: `main.go`, `handler.go`, `converter.go`, `types.go`, `streaming.go`, `config.yaml` +- [ ] Add `gopkg.in/yaml.v3` dependency + +### Phase 2: Types (`types.go`) +- [ ] OpenAI request/response structs (`ChatCompletionRequest`, `ChatCompletionResponse`, `Message`, `Choice`, `Usage`) +- [ ] Anthropic request/response structs (`AnthropicRequest`, `AnthropicResponse`, `ContentBlock`, `Message`, `Usage`) +- [ ] Tool/function conversion types +- [ ] Streaming event structs (Anthropic SSE + OpenAI SSE) + +### Phase 3: Converter (`converter.go`) +- [ ] `convertRequest()` — map OpenAI req to Anthropic req (model, messages, system, tools, temperature, top_p, max_tokens, stop_sequences) +- [ ] `extractSystemMessage()` — pull role=system out of messages into top-level system field +- [ ] `convertTools()` — map OpenAI function tools to Anthropic tool format +- [ ] `convertResponse()` — map Anthropic non-streaming response to OpenAI format +- [ ] `mapStopReason()` — end_turn→stop, tool_use→tool_calls, max_tokens→length + +### Phase 4: Streaming (`streaming.go`) +- [ ] `StreamConverter` struct holding accumulated state +- [ ] `HandleAnthropicStream()` — parse SSE events, convert to OpenAI SSE format +- [ ] Handle: `message_start`, `content_block_start`, `content_block_delta`, `content_block_stop`, `message_delta`, `message_stop` +- [ ] Map: role, content deltas, tool_use deltas, finish_reason +- [ ] Send `data: [DONE]` on stream end + +### Phase 5: HTTP Handlers (`handler.go`) +- [ ] `handleModels()` — static list of Claude models in OpenAI format +- [ ] `handleChatCompletions()` — routing: detect streaming vs non-streaming, call upstream, return converted response +- [ ] Extract `Authorization: Bearer` → forward as `x-api-key` +- [ ] Set Claude-Code mimicry headers on upstream requests +- [ ] Error handling: 400 for bad requests, 502 for upstream failures + +### Phase 6: Entry Point (`main.go`) +- [ ] Load `config.yaml` via `gopkg.in/yaml.v3` +- [ ] Register handlers on `/v1/chat/completions` and `/v1/models` +- [ ] Start HTTP server on configured port +- [ ] Generate random `X-Claude-Code-Session-Id` UUID at startup + +### Phase 7: Config +- [ ] Create `config.yaml` with defaults (port 8080, upstream_url) + +### Phase 8: Testing +- [ ] Unit tests for `converter.go` (pure logic, no HTTP) +- [ ] Unit tests for `streaming.go` (test SSE event conversion) +- [ ] Integration test with mock upstream + +## Risks +- **SSE parsing**: Anthropic uses SSE format, need to handle `data:` lines correctly. Risk: low, well-documented format. +- **Tool calling conversion**: Complex nested structure mapping. Risk: medium — need to verify edge cases. +- **Streaming state machine**: Accumulating partial tool_use blocks across multiple events. Risk: medium — test with actual stream. + +## Definition of Done +- [ ] `go build` produces a binary with no errors +- [ ] Unit tests pass for converter and streaming logic +- [ ] Binary starts, loads config, listens on port +- [ ] `/v1/models` returns Claude model list in OpenAI format +- [ ] Non-streaming `/v1/chat/completions` round-trips correctly through Anthropic upstream +- [ ] Streaming `/v1/chat/completions` produces valid OpenAI SSE output diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..342101b --- /dev/null +++ b/go.mod @@ -0,0 +1,8 @@ +module github.com/penal-colony/proxx + +go 1.24.1 + +require ( + github.com/google/uuid v1.6.0 + gopkg.in/yaml.v3 v3.0.1 +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..b4c5744 --- /dev/null +++ b/go.sum @@ -0,0 +1,6 @@ +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/handler.go b/handler.go new file mode 100644 index 0000000..d172249 --- /dev/null +++ b/handler.go @@ -0,0 +1,322 @@ +package main + +import ( + "bufio" + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "math/rand" + "net/http" + "strings" + "time" +) + +// Config holds the application configuration +type Config struct { + Port int `yaml:"port"` + UpstreamURL string `yaml:"upstream_url"` +} + +var config *Config + +// ClaudeCodeHeaders returns the headers to mimic claude-code CLI +func ClaudeCodeHeaders(apiKey, sessionID string) map[string]string { + return map[string]string{ + "User-Agent": "claude-cli/1.0.18 (pro, cli)", + "x-api-key": apiKey, + "x-app": "cli", + "anthropic-version": "2023-06-01", + "anthropic-beta": "interleaved-thinking-2025-05-14,prompt-caching-scope-2026-01-05,context-management-2025-06-27", + "X-Claude-Code-Session-Id": sessionID, + "content-type": "application/json", + } +} + +func handleModels(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + writeError(w, http.StatusMethodNotAllowed, "Method not allowed", "invalid_request_error", "method_not_allowed") + return + } + + models := []map[string]interface{}{ + {"id": "claude-opus-4-5", "object": "model", "created": 1234567890, "owned_by": "anthropic"}, + {"id": "claude-sonnet-4-20250514", "object": "model", "created": 1234567890, "owned_by": "anthropic"}, + {"id": "claude-3-5-sonnet-20241022", "object": "model", "created": 1234567890, "owned_by": "anthropic"}, + {"id": "claude-3-opus-20240229", "object": "model", "created": 1234567890, "owned_by": "anthropic"}, + {"id": "claude-3-sonnet-20240229", "object": "model", "created": 1234567890, "owned_by": "anthropic"}, + {"id": "claude-3-haiku-20240307", "object": "model", "created": 1234567890, "owned_by": "anthropic"}, + } + + response := map[string]interface{}{ + "object": "list", + "data": models, + } + + w.Header().Set("content-type", "application/json") + json.NewEncoder(w).Encode(response) +} + +func handleChatCompletions(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPost { + writeError(w, http.StatusMethodNotAllowed, "Method not allowed", "invalid_request_error", "method_not_allowed") + return + } + + // Extract Bearer token + authHeader := r.Header.Get("Authorization") + if authHeader == "" || !strings.HasPrefix(authHeader, "Bearer ") { + writeError(w, http.StatusUnauthorized, "Missing or invalid Authorization header", "authentication_error", "missing_authorization") + return + } + apiKey := strings.TrimPrefix(authHeader, "Bearer ") + + // Read body + body, err := io.ReadAll(r.Body) + if err != nil { + writeError(w, http.StatusBadRequest, "Failed to read request body", "invalid_request_error", "body_read_error") + return + } + + // Decode request + var req ChatCompletionRequest + if err := json.Unmarshal(body, &req); err != nil { + writeError(w, http.StatusBadRequest, "Invalid JSON in request body", "invalid_request_error", "json_decode_error") + return + } + + // Get session ID from context (set by main) + sessionID := r.Context().Value(sessionIDKey).(string) + + // Convert to Anthropic format + anthropicReq := ConvertOpenAIRequest(&req) + + // Check streaming + isStream := req.Stream != nil && *req.Stream + + // Proxy to upstream + resp, err := proxyToUpstream(anthropicReq, apiKey, sessionID, isStream) + if err != nil { + writeError(w, http.StatusBadGateway, fmt.Sprintf("Upstream request failed: %v", err), "upstream_error", "proxy_error") + return + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + respBody, _ := io.ReadAll(resp.Body) + writeError(w, http.StatusBadGateway, fmt.Sprintf("Upstream returned error: %s", string(respBody)), "upstream_error", fmt.Sprintf("status_%d", resp.StatusCode)) + return + } + + if isStream { + // Handle streaming response + w.Header().Set("content-type", "text/event-stream") + w.Header().Set("cache-control", "no-cache") + w.Header().Set("connection", "keep-alive") + + // Convert SSE stream on the fly + reader := io.NopCloser(resp.Body) + scanner := bufio.NewScanner(reader) + // Use a larger buffer for potential large JSON lines + buf := make([]byte, 0, 64*1024) + scanner.Buffer(buf, 1024*1024) + + created := int64(time.Now().Unix()) + chunkID := "chatcmpl-" + randomString(8) + + for scanner.Scan() { + line := scanner.Text() + if !strings.HasPrefix(line, "data: ") { + continue + } + dataStr := strings.TrimPrefix(line, "data: ") + if dataStr == "[DONE]" { + fmt.Fprintf(w, "data: [DONE]\n\n") + w.(http.Flusher).Flush() + continue + } + + var event struct { + Type string `json:"type"` + } + if err := json.Unmarshal([]byte(dataStr), &event); err != nil { + continue + } + + // Process event and write OpenAI SSE + openAIData := processAnthropicEvent(dataStr, chunkID, req.Model, created) + if openAIData != "" { + fmt.Fprintf(w, "data: %s\n\n", openAIData) + w.(http.Flusher).Flush() + } + } + fmt.Fprintf(w, "data: [DONE]\n\n") + w.(http.Flusher).Flush() + } else { + // Non-streaming response + respBody, err := io.ReadAll(resp.Body) + if err != nil { + writeError(w, http.StatusBadGateway, "Failed to read upstream response", "upstream_error", "body_read_error") + return + } + + var anthropicResp AnthropicResponse + if err := json.Unmarshal(respBody, &anthropicResp); err != nil { + writeError(w, http.StatusBadGateway, "Failed to parse upstream response", "upstream_error", "json_decode_error") + return + } + + openAIResp := ConvertAnthropicResponse(&anthropicResp, req.Model) + + w.Header().Set("content-type", "application/json") + json.NewEncoder(w).Encode(openAIResp) + } +} + +func processAnthropicEvent(dataStr, chunkID, model string, created int64) string { + var event struct { + Type string `json:"type"` + } + if err := json.Unmarshal([]byte(dataStr), &event); err != nil { + return "" + } + + switch event.Type { + case "message_start": + return "" + + case "content_block_start": + return "" + + case "content_block_delta": + var delta struct { + Index int `json:"index"` + Delta struct { + Text string `json:"text,omitempty"` + InputJSONDelta string `json:"input_json_delta,omitempty"` + } `json:"delta"` + } + json.Unmarshal([]byte(dataStr), &delta) + + if delta.Delta.Text != "" { + chunk := StreamChunk{ + ID: chunkID, + Object: "chat.completion.chunk", + Created: created, + Model: model, + Choices: []StreamChoice{ + { + Index: delta.Index, + Delta: Delta{ + Content: delta.Delta.Text, + }, + }, + }, + } + data, _ := json.Marshal(chunk) + return string(data) + } + if delta.Delta.InputJSONDelta != "" { + chunk := StreamChunk{ + ID: chunkID, + Object: "chat.completion.chunk", + Created: created, + Model: model, + Choices: []StreamChoice{ + { + Index: delta.Index, + Delta: Delta{ + ToolCalls: []StreamToolCall{ + { + Index: delta.Index, + Function: StreamFunction{ + Arguments: delta.Delta.InputJSONDelta, + }, + }, + }, + }, + }, + }, + } + data, _ := json.Marshal(chunk) + return string(data) + } + + case "message_delta": + var msgDelta struct { + StopReason string `json:"stop_reason,omitempty"` + } + json.Unmarshal([]byte(dataStr), &msgDelta) + chunk := StreamChunk{ + ID: chunkID, + Object: "chat.completion.chunk", + Created: created, + Model: model, + Choices: []StreamChoice{ + { + Index: 0, + Delta: Delta{}, + FinishReason: mapStopReason(msgDelta.StopReason), + }, + }, + } + data, _ := json.Marshal(chunk) + return string(data) + } + + return "" +} + +func proxyToUpstream(req *AnthropicRequest, apiKey, sessionID string, stream bool) (*http.Response, error) { + bodyBytes, err := json.Marshal(req) + if err != nil { + return nil, fmt.Errorf("failed to marshal request: %w", err) + } + + upstreamURL := config.UpstreamURL + "/v1/messages" + httpReq, err := http.NewRequest(http.MethodPost, upstreamURL, bytes.NewReader(bodyBytes)) + if err != nil { + return nil, fmt.Errorf("failed to create request: %w", err) + } + + headers := ClaudeCodeHeaders(apiKey, sessionID) + for k, v := range headers { + httpReq.Header.Set(k, v) + } + + if stream { + httpReq.Header.Set("anthropic-sse-beta", "output-2025-05-14") + } + + ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second) + defer cancel() + + httpReq = httpReq.WithContext(ctx) + + client := &http.Client{} + return client.Do(httpReq) +} + +func writeError(w http.ResponseWriter, code int, message, errType, errCode string) { + w.Header().Set("content-type", "application/json") + w.WriteHeader(code) + resp := map[string]interface{}{ + "error": map[string]string{ + "message": message, + "type": errType, + "code": errCode, + }, + } + json.NewEncoder(w).Encode(resp) +} + +func randomString(n int) string { + const letters = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789" + b := make([]byte, n) + r := rand.New(rand.NewSource(time.Now().UnixNano())) + for i := range b { + b[i] = letters[r.Intn(len(letters))] + } + return string(b) +} diff --git a/main.go b/main.go new file mode 100644 index 0000000..69f422c --- /dev/null +++ b/main.go @@ -0,0 +1,60 @@ +package main + +import ( + "context" + "fmt" + "log" + "net/http" + "os" + + "github.com/google/uuid" + "gopkg.in/yaml.v3" +) + +func main() { + // Load config.yaml + data, err := os.ReadFile("config.yaml") + if err != nil { + log.Fatalf("Failed to read config.yaml: %v", err) + } + cfg := &Config{} + if err := yaml.Unmarshal(data, cfg); err != nil { + log.Fatalf("Failed to parse config.yaml: %v", err) + } + config = cfg + + // Generate session ID (persist across requests) + sessionID := uuid.New().String() + + // Register routes + http.HandleFunc("/v1/chat/completions", func(w http.ResponseWriter, r *http.Request) { + r = r.WithContext(contextWithSessionIDInContext(r.Context(), sessionID)) + handleChatCompletions(w, r) + }) + http.HandleFunc("/v1/models", handleModels) + http.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + fmt.Fprint(w, "OK") + }) + + addr := fmt.Sprintf(":%d", config.Port) + log.Printf("Starting proxx on %s, upstream: %s", addr, config.UpstreamURL) + if err := http.ListenAndServe(addr, nil); err != nil { + log.Fatalf("Server failed: %v", err) + } +} + +// contextKey is a custom type for context keys +type contextKey string + +const sessionIDKey contextKey = "sessionID" + +// contextWithSessionID creates a context with the session ID +func contextWithSessionID(sessionID string) context.Context { + return context.WithValue(nil, sessionIDKey, sessionID) +} + +// contextWithSessionIDInContext creates a new context with session ID in existing context +func contextWithSessionIDInContext(parent context.Context, sessionID string) context.Context { + return context.WithValue(parent, sessionIDKey, sessionID) +} diff --git a/proxx b/proxx new file mode 100755 index 0000000..e4c8175 Binary files /dev/null and b/proxx differ diff --git a/streaming.go b/streaming.go new file mode 100644 index 0000000..ff88c84 --- /dev/null +++ b/streaming.go @@ -0,0 +1,339 @@ +package main + +import ( + "bufio" + "bytes" + "encoding/json" + "fmt" + "io" + "strings" +) + +// StreamConverter holds state during streaming +type StreamConverter struct { + ID string + Model string + Role string + Finish string + AccBlocks map[int]*AccBlock + ToolIndex int +} + +type AccBlock struct { + Type string + Name string + InputAcc string + TextAcc string +} + +func NewStreamConverter(id, model string) *StreamConverter { + return &StreamConverter{ + ID: id, + Model: model, + AccBlocks: make(map[int]*AccBlock), + } +} + +// HandleAnthropicStream reads Anthropic SSE events, writes OpenAI SSE +func (sc *StreamConverter) HandleAnthropicStream(reader *bufio.Reader, writer io.Writer) error { + created := int64(1234567890) + + for { + line, err := reader.ReadString('\n') + if err != nil { + if err == io.EOF { + break + } + return fmt.Errorf("read error: %w", err) + } + + line = strings.TrimSpace(line) + if line == "" { + continue + } + + // Skip comment lines (starting with colon) + if strings.HasPrefix(line, ":") { + continue + } + + // Parse SSE data line: "data: {...}" + if !strings.HasPrefix(line, "data: ") { + continue + } + + dataStr := strings.TrimPrefix(line, "data: ") + + // Try to parse as SSE event with type field first + var event struct { + Type string `json:"type"` + } + if err := json.Unmarshal([]byte(dataStr), &event); err != nil { + continue + } + + switch event.Type { + case "message_start": + var msgStart struct { + Type string `json:"type"` + Message struct { + Id string `json:"id"` + Role string `json:"role"` + Type string `json:"type"` + } `json:"message"` + } + if err := json.Unmarshal([]byte(dataStr), &msgStart); err == nil { + sc.Role = msgStart.Message.Role + } + + case "content_block_start": + var blockStart struct { + Type string `json:"type"` + Index int `json:"index"` + ContentBlock struct { + Type string `json:"type"` + Id string `json:"id"` + Name string `json:"name,omitempty"` + } `json:"content_block"` + } + if err := json.Unmarshal([]byte(dataStr), &blockStart); err == nil { + sc.AccBlocks[blockStart.Index] = &AccBlock{ + Type: blockStart.ContentBlock.Type, + Name: blockStart.ContentBlock.Name, + } + if blockStart.ContentBlock.Type == "tool_use" { + sc.ToolIndex = blockStart.Index + } + } + + case "content_block_delta": + var delta struct { + Type string `json:"type"` + Index int `json:"index"` + Delta struct { + Text string `json:"text,omitempty"` + InputJSONDelta string `json:"input_json_delta,omitempty"` + } `json:"delta"` + } + if err := json.Unmarshal([]byte(dataStr), &delta); err == nil { + block := sc.AccBlocks[delta.Index] + if block != nil { + block.TextAcc += delta.Delta.Text + block.InputAcc += delta.Delta.InputJSONDelta + } + + // Write OpenAI format chunk + if delta.Delta.Text != "" { + chunk := StreamChunk{ + ID: sc.ID, + Object: "chat.completion.chunk", + Created: created, + Model: sc.Model, + Choices: []StreamChoice{ + { + Index: delta.Index, + Delta: Delta{ + Content: delta.Delta.Text, + }, + }, + }, + } + data, _ := json.Marshal(chunk) + fmt.Fprintf(writer, "data: %s\n\n", data) + } + + if delta.Delta.InputJSONDelta != "" { + block := sc.AccBlocks[delta.Index] + if block != nil { + chunk := StreamChunk{ + ID: sc.ID, + Object: "chat.completion.chunk", + Created: created, + Model: sc.Model, + Choices: []StreamChoice{ + { + Index: delta.Index, + Delta: Delta{ + ToolCalls: []StreamToolCall{ + { + Index: delta.Index, + ID: block.Name + "_pending", + Function: StreamFunction{ + Arguments: delta.Delta.InputJSONDelta, + }, + }, + }, + }, + }, + }, + } + data, _ := json.Marshal(chunk) + fmt.Fprintf(writer, "data: %s\n\n", data) + } + } + } + + case "content_block_stop": + var blockStop struct { + Type string `json:"type"` + Index int `json:"index"` + } + if err := json.Unmarshal([]byte(dataStr), &blockStop); err == nil { + // Nothing to output here, accumulated data was already streamed + } + + case "message_delta": + var msgDelta struct { + Type string `json:"type"` + Delta struct { + StopSequence string `json:"stop_sequence,omitempty"` + } `json:"delta"` + Usage *AnthropicUsage `json:"usage,omitempty"` + StopReason string `json:"stop_reason,omitempty"` + } + if err := json.Unmarshal([]byte(dataStr), &msgDelta); err == nil { + if msgDelta.StopReason != "" { + sc.Finish = mapStopReason(msgDelta.StopReason) + } + } + + case "message_stop": + // Send final chunk with finish_reason + chunk := StreamChunk{ + ID: sc.ID, + Object: "chat.completion.chunk", + Created: created, + Model: sc.Model, + Choices: []StreamChoice{ + { + Index: 0, + Delta: Delta{}, + FinishReason: sc.Finish, + }, + }, + } + data, _ := json.Marshal(chunk) + fmt.Fprintf(writer, "data: %s\n\n", data) + fmt.Fprintf(writer, "data: [DONE]\n\n") + } + } + + return nil +} + +// ParseAnthropicSSEEvent parses a raw SSE data line into a map +func ParseAnthropicSSEEvent(data string) (map[string]interface{}, error) { + var result map[string]interface{} + err := json.Unmarshal([]byte(data), &result) + return result, err +} + +// ConvertStreamEvents converts Anthropic SSE bytes to OpenAI SSE format +func ConvertStreamEvents(sseData []byte, id, model string) ([]byte, error) { + var buf bytes.Buffer + scanner := bufio.NewScanner(bytes.NewReader(sseData)) + sc := NewStreamConverter(id, model) + + for scanner.Scan() { + line := scanner.Text() + if !strings.HasPrefix(line, "data: ") { + continue + } + dataStr := strings.TrimPrefix(line, "data: ") + + var event struct { + Type string `json:"type"` + } + if err := json.Unmarshal([]byte(dataStr), &event); err != nil { + continue + } + + switch event.Type { + case "message_start": + var msgStart struct { + Message struct { + Id string `json:"id"` + Role string `json:"role"` + } `json:"message"` + } + json.Unmarshal([]byte(dataStr), &msgStart) + sc.Role = msgStart.Message.Role + + case "content_block_start": + var blockStart struct { + Index int `json:"index"` + ContentBlock struct { + Type string `json:"type"` + Id string `json:"id"` + Name string `json:"name,omitempty"` + } `json:"content_block"` + } + if err := json.Unmarshal([]byte(dataStr), &blockStart); err == nil { + sc.AccBlocks[blockStart.Index] = &AccBlock{ + Type: blockStart.ContentBlock.Type, + Name: blockStart.ContentBlock.Name, + } + } + + case "content_block_delta": + var delta struct { + Index int `json:"index"` + Delta struct { + Text string `json:"text,omitempty"` + InputJSONDelta string `json:"input_json_delta,omitempty"` + } `json:"delta"` + } + if err := json.Unmarshal([]byte(dataStr), &delta); err == nil { + block := sc.AccBlocks[delta.Index] + if block != nil { + block.TextAcc += delta.Delta.Text + block.InputAcc += delta.Delta.InputJSONDelta + } + + chunk := StreamChunk{ + ID: sc.ID, + Object: "chat.completion.chunk", + Created: int64(1234567890), + Model: sc.Model, + Choices: []StreamChoice{ + { + Index: delta.Index, + Delta: Delta{ + Content: delta.Delta.Text, + }, + }, + }, + } + data, _ := json.Marshal(chunk) + buf.WriteString(fmt.Sprintf("data: %s\n\n", data)) + } + + case "message_delta": + var msgDelta struct { + StopReason string `json:"stop_reason,omitempty"` + } + json.Unmarshal([]byte(dataStr), &msgDelta) + sc.Finish = mapStopReason(msgDelta.StopReason) + + case "message_stop": + chunk := StreamChunk{ + ID: sc.ID, + Object: "chat.completion.chunk", + Created: int64(1234567890), + Model: sc.Model, + Choices: []StreamChoice{ + { + Index: 0, + Delta: Delta{}, + FinishReason: sc.Finish, + }, + }, + } + data, _ := json.Marshal(chunk) + buf.WriteString(fmt.Sprintf("data: %s\n\n", data)) + buf.WriteString("data: [DONE]\n\n") + } + } + + return buf.Bytes(), nil +} diff --git a/types.go b/types.go new file mode 100644 index 0000000..0dbb926 --- /dev/null +++ b/types.go @@ -0,0 +1,238 @@ +package main + +import "encoding/json" + +// OpenAI Types + +type ChatCompletionRequest struct { + Model string `json:"model"` + Messages []Message `json:"messages"` + Temperature *float64 `json:"temperature,omitempty"` + TopP *float64 `json:"top_p,omitempty"` + MaxTokens *int `json:"max_tokens,omitempty"` + Stream *bool `json:"stream,omitempty"` + Stop []string `json:"stop,omitempty"` + Tools []Tool `json:"tools,omitempty"` + ToolChoices *ToolChoice `json:"tool_choice,omitempty"` + PresencePenalty *float64 `json:"presence_penalty,omitempty"` + FrequencyPenalty *float64 `json:"frequency_penalty,omitempty"` + User string `json:"user,omitempty"` +} + +type Message struct { + Role string `json:"role"` + Content interface{} `json:"content"` // string or []ContentPart + Name string `json:"name,omitempty"` + ToolCalls []ToolCall `json:"tool_calls,omitempty"` +} + +type ContentPart struct { + Type string `json:"type"` + Text string `json:"text,omitempty"` + Source *ImageSource `json:"source,omitempty"` +} + +type ImageSource struct { + Type string `json:"type"` + MediaURL string `json:"media_url,omitempty"` + Detail string `json:"detail,omitempty"` +} + +type Tool struct { + Type string `json:"type"` + Function Function `json:"function"` +} + +type Function struct { + Name string `json:"name"` + Description string `json:"description,omitempty"` + Parameters map[string]interface{} `json:"parameters,omitempty"` +} + +type ToolChoice struct { + Type string `json:"type,omitempty"` // "auto", "none", "required" + Function *FunctionRef `json:"function,omitempty"` +} + +type FunctionRef struct { + Name string `json:"name,omitempty"` +} + +type Choice struct { + Index int `json:"index"` + Message Message `json:"message"` + FinishReason string `json:"finish_reason,omitempty"` +} + +type ChatCompletionResponse struct { + ID string `json:"id"` + Object string `json:"object"` + Created int64 `json:"created"` + Model string `json:"model"` + Choices []Choice `json:"choices"` + Usage Usage `json:"usage"` +} + +type Usage struct { + PromptTokens int `json:"prompt_tokens"` + CompletionTokens int `json:"completion_tokens"` + TotalTokens int `json:"total_tokens"` +} + +type ToolCall struct { + Index int `json:"index,omitempty"` + ID string `json:"id,omitempty"` + Type string `json:"type,omitempty"` + Function FunctionCall `json:"function,omitempty"` +} + +type FunctionCall struct { + Name string `json:"name,omitempty"` + Arguments string `json:"arguments,omitempty"` +} + +// Anthropic Types + +type AnthropicRequest struct { + Model string `json:"model"` + Messages []AnthropicMessage `json:"messages"` + System string `json:"system,omitempty"` + MaxTokens int `json:"max_tokens"` + Stream bool `json:"stream,omitempty"` + Temperature *float64 `json:"temperature,omitempty"` + TopP *float64 `json:"top_p,omitempty"` + StopSequences []string `json:"stop_sequences,omitempty"` + Tools []AnthropicTool `json:"tools,omitempty"` + ToolChoice *AnthropicToolChoice `json:"tool_choice,omitempty"` +} + +type AnthropicMessage struct { + Role string `json:"role"` + Content interface{} `json:"content"` // string or []ContentBlock +} + +type ContentBlock struct { + Type string `json:"type,omitempty"` // "text", "tool_use", "tool_result" + Text string `json:"text,omitempty"` + Id string `json:"id,omitempty"` + Name string `json:"name,omitempty"` + Input map[string]interface{} `json:"input,omitempty"` + ToolUseId string `json:"tool_use_id,omitempty"` + Content string `json:"content,omitempty"` +} + +type AnthropicTool struct { + Name string `json:"name"` + Description string `json:"description,omitempty"` + InputSchema map[string]interface{} `json:"input_schema,omitempty"` +} + +type AnthropicToolChoice struct { + Type string `json:"type,omitempty"` // "auto", "none", "any" + Name string `json:"name,omitempty"` +} + +type AnthropicResponse struct { + Type string `json:"type,omitempty"` + Id string `json:"id"` + Role string `json:"role,omitempty"` + Model string `json:"model,omitempty"` + Content []ContentBlock `json:"content,omitempty"` + StopReason string `json:"stop_reason,omitempty"` + StopSequence string `json:"stop_sequence,omitempty"` + Usage AnthropicUsage `json:"usage,omitempty"` +} + +type AnthropicUsage struct { + InputTokens int `json:"input_tokens,omitempty"` + OutputTokens int `json:"output_tokens,omitempty"` +} + +// Streaming Types + +type StreamChunk struct { + ID string `json:"id"` + Object string `json:"object"` + Created int64 `json:"created"` + Model string `json:"model"` + Choices []StreamChoice `json:"choices"` +} + +type StreamChoice struct { + Index int `json:"index"` + Delta Delta `json:"delta"` + FinishReason string `json:"finish_reason,omitempty"` +} + +type Delta struct { + Role string `json:"role,omitempty"` + Content string `json:"content,omitempty"` + ToolCalls []StreamToolCall `json:"tool_calls,omitempty"` +} + +type StreamToolCall struct { + Index int `json:"index,omitempty"` + ID string `json:"id,omitempty"` + Type string `json:"type,omitempty"` + Function StreamFunction `json:"function,omitempty"` +} + +type StreamFunction struct { + Name string `json:"name,omitempty"` + Arguments string `json:"arguments,omitempty"` +} + + + +// SSE Event Types + +type SSEMessage struct { + Type string `json:"type"` + Data json.RawMessage `json:"data,omitempty"` +} + +type AnthropicMessageStart struct { + Type string `json:"type"` + Message struct { + Id string `json:"id"` + Role string `json:"role"` + Type string `json:"type"` + } `json:"message"` +} + +type AnthropicBlockStart struct { + Type string `json:"type"` + Index int `json:"index"` + ContentBlock struct { + Type string `json:"type"` + Id string `json:"id"` + Name string `json:"name,omitempty"` + } `json:"content_block"` +} + +type AnthropicDelta struct { + Type string `json:"type"` + Index int `json:"index"` + Delta struct { + Text string `json:"text,omitempty"` + InputJSONDelta string `json:"input_json_delta,omitempty"` + } `json:"delta"` +} + +type AnthropicBlockStop struct { + Type string `json:"type"` + Index int `json:"index"` +} + +type AnthropicMessageDelta struct { + Type string `json:"type"` + Delta struct { + StopSequence string `json:"stop_sequence,omitempty"` + } `json:"delta"` + Usage *AnthropicUsage `json:"usage,omitempty"` + StopReason string `json:"stop_reason,omitempty"` +} + +type AnthropicMessageStop struct { + Type string `json:"type"` +}