From 02989e59b51bcf06ae389386f3e646acf1fb928c Mon Sep 17 00:00:00 2001 From: ashisgreat22 Date: Wed, 15 Apr 2026 07:54:50 +0000 Subject: [PATCH] fix: always stream from upstream, accumulate for non-streaming clients --- handler.go | 134 ++++++++++++++++++++++++++++++++++++++--------------- 1 file changed, 97 insertions(+), 37 deletions(-) diff --git a/handler.go b/handler.go index f1f2e43..6d2d95f 100644 --- a/handler.go +++ b/handler.go @@ -36,13 +36,13 @@ var blockedHeaders = map[string]bool{ // ClaudeCodeHeaders returns the headers to mimic claude-code CLI func ClaudeCodeHeaders(apiKey, sessionID string) map[string]string { return map[string]string{ - "User-Agent": "claude-cli/1.0.18 (pro, cli)", - "x-api-key": apiKey, - "x-app": "cli", - "anthropic-version": "2023-06-01", - "anthropic-beta": "interleaved-thinking-2025-05-14,prompt-caching-scope-2026-01-05,context-management-2025-06-27", + "User-Agent": "claude-cli/1.0.18 (pro, cli)", + "x-api-key": apiKey, + "x-app": "cli", + "anthropic-version": "2023-06-01", + "anthropic-beta": "interleaved-thinking-2025-05-14,prompt-caching-scope-2026-01-05,context-management-2025-06-27", "X-Claude-Code-Session-Id": sessionID, - "content-type": "application/json", + "content-type": "application/json", } } @@ -103,12 +103,12 @@ func handleChatCompletions(w http.ResponseWriter, r *http.Request) { // Convert to Anthropic format anthropicReq := ConvertOpenAIRequest(&req) + anthropicReq.Stream = true // Always stream from upstream for reliability - // Check streaming - isStream := req.Stream != nil && *req.Stream + log.Printf("[debug] Sending to upstream %s, model=%s", config.UpstreamURL, req.Model) - // Proxy to upstream - resp, err := proxyToUpstream(anthropicReq, apiKey, sessionID, isStream) + // Proxy to upstream (always streaming) + resp, err := proxyToUpstream(anthropicReq, apiKey, sessionID, true) if err != nil { writeError(w, http.StatusBadGateway, fmt.Sprintf("Upstream request failed: %v", err), "upstream_error", "proxy_error") return @@ -117,20 +117,20 @@ func handleChatCompletions(w http.ResponseWriter, r *http.Request) { if resp.StatusCode != http.StatusOK { respBody, _ := io.ReadAll(resp.Body) + log.Printf("[debug] Upstream error status %d: %s", resp.StatusCode, string(respBody)) writeError(w, http.StatusBadGateway, fmt.Sprintf("Upstream returned error: %s", string(respBody)), "upstream_error", fmt.Sprintf("status_%d", resp.StatusCode)) return } + isStream := req.Stream != nil && *req.Stream + if isStream { - // Handle streaming response + // Stream to client w.Header().Set("content-type", "text/event-stream") w.Header().Set("cache-control", "no-cache") w.Header().Set("connection", "keep-alive") - // Convert SSE stream on the fly - reader := io.NopCloser(resp.Body) - scanner := bufio.NewScanner(reader) - // Use a larger buffer for potential large JSON lines + scanner := bufio.NewScanner(resp.Body) buf := make([]byte, 0, 64*1024) scanner.Buffer(buf, 1024*1024) @@ -144,9 +144,7 @@ func handleChatCompletions(w http.ResponseWriter, r *http.Request) { } dataStr := strings.TrimPrefix(line, "data: ") if dataStr == "[DONE]" { - fmt.Fprintf(w, "data: [DONE]\n\n") - w.(http.Flusher).Flush() - continue + break } var event struct { @@ -156,7 +154,6 @@ func handleChatCompletions(w http.ResponseWriter, r *http.Request) { continue } - // Process event and write OpenAI SSE openAIData := processAnthropicEvent(dataStr, chunkID, req.Model, created) if openAIData != "" { fmt.Fprintf(w, "data: %s\n\n", openAIData) @@ -166,21 +163,90 @@ func handleChatCompletions(w http.ResponseWriter, r *http.Request) { fmt.Fprintf(w, "data: [DONE]\n\n") w.(http.Flusher).Flush() } else { - // Non-streaming response - respBody, err := io.ReadAll(resp.Body) - if err != nil { - writeError(w, http.StatusBadGateway, "Failed to read upstream response", "upstream_error", "body_read_error") - return + // Non-streaming to client: accumulate upstream stream into a single response + scanner := bufio.NewScanner(resp.Body) + buf := make([]byte, 0, 64*1024) + scanner.Buffer(buf, 1024*1024) + + var textContent string + var stopReason string + var usage AnthropicUsage + msgID := "" + + for scanner.Scan() { + line := scanner.Text() + if !strings.HasPrefix(line, "data: ") { + continue + } + dataStr := strings.TrimPrefix(line, "data: ") + if dataStr == "[DONE]" { + break + } + + var event struct { + Type string `json:"type"` + } + if err := json.Unmarshal([]byte(dataStr), &event); err != nil { + continue + } + + switch event.Type { + case "message_start": + var msgStart struct { + Message struct { + Id string `json:"id"` + Usage AnthropicUsage `json:"usage"` + } `json:"message"` + } + if json.Unmarshal([]byte(dataStr), &msgStart) == nil { + msgID = msgStart.Message.Id + usage.InputTokens = msgStart.Message.Usage.InputTokens + } + + case "content_block_delta": + var delta struct { + Delta struct { + Text string `json:"text,omitempty"` + } `json:"delta"` + } + if json.Unmarshal([]byte(dataStr), &delta) == nil { + textContent += delta.Delta.Text + } + + case "message_delta": + var msgDelta struct { + Delta struct{} `json:"delta"` + Usage *AnthropicUsage `json:"usage,omitempty"` + StopReason string `json:"stop_reason,omitempty"` + } + if json.Unmarshal([]byte(dataStr), &msgDelta) == nil { + stopReason = msgDelta.StopReason + if msgDelta.Usage != nil { + usage.OutputTokens = msgDelta.Usage.OutputTokens + } + } + } } - var anthropicResp AnthropicResponse - if err := json.Unmarshal(respBody, &anthropicResp); err != nil { - writeError(w, http.StatusBadGateway, "Failed to parse upstream response", "upstream_error", "json_decode_error") - return + openAIResp := &ChatCompletionResponse{ + ID: msgID, + Object: "chat.completion", + Created: time.Now().Unix(), + Model: req.Model, + Choices: []Choice{ + { + Index: 0, + Message: Message{Role: "assistant", Content: textContent}, + FinishReason: mapStopReason(stopReason), + }, + }, + Usage: Usage{ + PromptTokens: usage.InputTokens, + CompletionTokens: usage.OutputTokens, + TotalTokens: usage.InputTokens + usage.OutputTokens, + }, } - openAIResp := ConvertAnthropicResponse(&anthropicResp, req.Model) - w.Header().Set("content-type", "application/json") json.NewEncoder(w).Encode(openAIResp) } @@ -195,12 +261,6 @@ func processAnthropicEvent(dataStr, chunkID, model string, created int64) string } switch event.Type { - case "message_start": - return "" - - case "content_block_start": - return "" - case "content_block_delta": var delta struct { Index int `json:"index"` @@ -301,7 +361,7 @@ func proxyToUpstream(req *AnthropicRequest, apiKey, sessionID string, stream boo httpReq.Header.Set("anthropic-sse-beta", "output-2025-05-14") } - ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second) + ctx, cancel := context.WithTimeout(context.Background(), 300*time.Second) defer cancel() httpReq = httpReq.WithContext(ctx)