From 73efb024618eebb41ee9c53dbd996b55a8b8144c Mon Sep 17 00:00:00 2001 From: ashisgreat22 Date: Wed, 15 Apr 2026 08:38:54 +0000 Subject: [PATCH] fix: use non-streaming upstream requests, convert to SSE for clients --- handler.go | 318 ++++++++++++++--------------------------------------- 1 file changed, 81 insertions(+), 237 deletions(-) diff --git a/handler.go b/handler.go index 57da7fe..06ce07b 100644 --- a/handler.go +++ b/handler.go @@ -1,9 +1,6 @@ package main import ( - "bufio" - "bytes" - "context" "encoding/json" "fmt" "io" @@ -26,8 +23,8 @@ var config *Config // for security/privacy reasons. These headers could leak internal URLs, // session information, or other sensitive data. var blockedHeaders = map[string]bool{ - "Referer": true, // Don't leak internal URLs to external API - "Cookie": true, // Don't forward session cookies + "Referer": true, // Don't leak internal URLs to external API + "Cookie": true, // Don't forward session cookies "Authorization": true, // Already extracted and sent as x-api-key "X-Forwarded-For": true, // Don't leak client IP "X-Real-Ip": true, // Don't leak client IP @@ -54,12 +51,8 @@ func handleModels(w http.ResponseWriter, r *http.Request) { } models := []map[string]interface{}{ - {"id": "claude-opus-4-5", "object": "model", "created": 1234567890, "owned_by": "anthropic"}, - {"id": "claude-sonnet-4-20250514", "object": "model", "created": 1234567890, "owned_by": "anthropic"}, - {"id": "claude-3-5-sonnet-20241022", "object": "model", "created": 1234567890, "owned_by": "anthropic"}, - {"id": "claude-3-opus-20240229", "object": "model", "created": 1234567890, "owned_by": "anthropic"}, - {"id": "claude-3-sonnet-20240229", "object": "model", "created": 1234567890, "owned_by": "anthropic"}, - {"id": "claude-3-haiku-20240307", "object": "model", "created": 1234567890, "owned_by": "anthropic"}, + {"id": "glm-4.7", "object": "model", "created": 1234567890, "owned_by": "zhipu"}, + {"id": "glm-4.6", "object": "model", "created": 1234567890, "owned_by": "zhipu"}, } response := map[string]interface{}{ @@ -102,260 +95,120 @@ func handleChatCompletions(w http.ResponseWriter, r *http.Request) { // Get session ID from context (set by main) sessionID := r.Context().Value(sessionIDKey).(string) - // Convert to Anthropic format + // Convert to Anthropic format — always non-streaming to upstream + // (ZAI's streaming returns empty for GLM models) anthropicReq := ConvertOpenAIRequest(&req) - anthropicReq.Stream = true // Always stream from upstream for reliability + anthropicReq.Stream = false reqBody, _ := json.Marshal(anthropicReq) log.Printf("[debug] Sending to upstream %s, model=%s, body=%s", config.UpstreamURL, req.Model, string(reqBody)) - // Proxy to upstream (always streaming) - resp, err := proxyToUpstream(anthropicReq, apiKey, sessionID, true) + // Non-streaming request to upstream + upstreamResp, err := callUpstream(anthropicReq, apiKey, sessionID) if err != nil { writeError(w, http.StatusBadGateway, fmt.Sprintf("Upstream request failed: %v", err), "upstream_error", "proxy_error") return } - defer resp.Body.Close() + defer upstreamResp.Body.Close() - if resp.StatusCode != http.StatusOK { - respBody, _ := io.ReadAll(resp.Body) - log.Printf("[debug] Upstream error status %d: %s", resp.StatusCode, string(respBody)) - writeError(w, http.StatusBadGateway, fmt.Sprintf("Upstream returned error: %s", string(respBody)), "upstream_error", fmt.Sprintf("status_%d", resp.StatusCode)) + if upstreamResp.StatusCode != http.StatusOK { + respBody, _ := io.ReadAll(upstreamResp.Body) + log.Printf("[debug] Upstream error status %d: %s", upstreamResp.StatusCode, string(respBody)) + writeError(w, http.StatusBadGateway, fmt.Sprintf("Upstream returned error: %s", string(respBody)), "upstream_error", fmt.Sprintf("status_%d", upstreamResp.StatusCode)) + return + } + + // Read the full Anthropic response + respBody, err := io.ReadAll(upstreamResp.Body) + if err != nil { + writeError(w, http.StatusBadGateway, "Failed to read upstream response", "upstream_error", "body_read_error") + return + } + log.Printf("[debug] Upstream response: %s", string(respBody)) + + var anthropicResp AnthropicResponse + if err := json.Unmarshal(respBody, &anthropicResp); err != nil { + writeError(w, http.StatusBadGateway, "Failed to parse upstream response", "upstream_error", "json_decode_error") return } isStream := req.Stream != nil && *req.Stream if isStream { - // Stream to client + // Convert the non-streaming response to SSE chunks for the client w.Header().Set("content-type", "text/event-stream") w.Header().Set("cache-control", "no-cache") w.Header().Set("connection", "keep-alive") - scanner := bufio.NewScanner(resp.Body) - buf := make([]byte, 0, 64*1024) - scanner.Buffer(buf, 1024*1024) - - created := int64(time.Now().Unix()) + created := time.Now().Unix() chunkID := "chatcmpl-" + randomString(8) - for scanner.Scan() { - line := scanner.Text() - log.Printf("[stream] raw: %q", line) - if !strings.HasPrefix(line, "data: ") { - continue - } - dataStr := strings.TrimPrefix(line, "data: ") - if dataStr == "[DONE]" { - break - } - - var event struct { - Type string `json:"type"` - } - if err := json.Unmarshal([]byte(dataStr), &event); err != nil { - log.Printf("[stream] JSON err: %v", err) - continue - } - log.Printf("[stream] type=%s", event.Type) - - openAIData := processAnthropicEvent(dataStr, chunkID, req.Model, created) - if openAIData != "" { - fmt.Fprintf(w, "data: %s\n\n", openAIData) - w.(http.Flusher).Flush() + // Extract text content + var textContent string + for _, block := range anthropicResp.Content { + if block.Type == "text" { + textContent += block.Text } } + + // Send text as chunks (simulate streaming) + if textContent != "" { + chunk := StreamChunk{ + ID: chunkID, + Object: "chat.completion.chunk", + Created: created, + Model: req.Model, + Choices: []StreamChoice{ + { + Index: 0, + Delta: Delta{ + Role: "assistant", + Content: textContent, + }, + }, + }, + } + data, _ := json.Marshal(chunk) + fmt.Fprintf(w, "data: %s\n\n", data) + w.(http.Flusher).Flush() + } + + // Send finish chunk + finishChunk := StreamChunk{ + ID: chunkID, + Object: "chat.completion.chunk", + Created: created, + Model: req.Model, + Choices: []StreamChoice{ + { + Index: 0, + Delta: Delta{}, + FinishReason: mapStopReason(anthropicResp.StopReason), + }, + }, + } + data, _ := json.Marshal(finishChunk) + fmt.Fprintf(w, "data: %s\n\n", data) + w.(http.Flusher).Flush() + fmt.Fprintf(w, "data: [DONE]\n\n") w.(http.Flusher).Flush() } else { - // Non-streaming to client: accumulate upstream stream into a single response - scanner := bufio.NewScanner(resp.Body) - buf := make([]byte, 0, 64*1024) - scanner.Buffer(buf, 1024*1024) - - var textContent string - var stopReason string - var usage AnthropicUsage - msgID := "" - - for scanner.Scan() { - line := scanner.Text() - log.Printf("[accumulate] raw: %q", line) - if !strings.HasPrefix(line, "data: ") { - continue - } - dataStr := strings.TrimPrefix(line, "data: ") - if dataStr == "[DONE]" { - break - } - - var event struct { - Type string `json:"type"` - } - if err := json.Unmarshal([]byte(dataStr), &event); err != nil { - log.Printf("[accumulate] JSON err: %v", err) - continue - } - log.Printf("[accumulate] type=%s data=%.200s", event.Type, dataStr) - - switch event.Type { - case "message_start": - var msgStart struct { - Message struct { - Id string `json:"id"` - Usage AnthropicUsage `json:"usage"` - } `json:"message"` - } - if json.Unmarshal([]byte(dataStr), &msgStart) == nil { - msgID = msgStart.Message.Id - usage.InputTokens = msgStart.Message.Usage.InputTokens - } - - case "content_block_delta": - var delta struct { - Delta struct { - Text string `json:"text,omitempty"` - } `json:"delta"` - } - if json.Unmarshal([]byte(dataStr), &delta) == nil { - textContent += delta.Delta.Text - } - - case "message_delta": - var msgDelta struct { - Delta struct{} `json:"delta"` - Usage *AnthropicUsage `json:"usage,omitempty"` - StopReason string `json:"stop_reason,omitempty"` - } - if json.Unmarshal([]byte(dataStr), &msgDelta) == nil { - stopReason = msgDelta.StopReason - if msgDelta.Usage != nil { - usage.OutputTokens = msgDelta.Usage.OutputTokens - } - } - } - } - - openAIResp := &ChatCompletionResponse{ - ID: msgID, - Object: "chat.completion", - Created: time.Now().Unix(), - Model: req.Model, - Choices: []Choice{ - { - Index: 0, - Message: Message{Role: "assistant", Content: textContent}, - FinishReason: mapStopReason(stopReason), - }, - }, - Usage: Usage{ - PromptTokens: usage.InputTokens, - CompletionTokens: usage.OutputTokens, - TotalTokens: usage.InputTokens + usage.OutputTokens, - }, - } - + // Non-streaming: convert directly + openAIResp := ConvertAnthropicResponse(&anthropicResp, req.Model) w.Header().Set("content-type", "application/json") json.NewEncoder(w).Encode(openAIResp) } } -func processAnthropicEvent(dataStr, chunkID, model string, created int64) string { - var event struct { - Type string `json:"type"` - } - if err := json.Unmarshal([]byte(dataStr), &event); err != nil { - return "" - } - - switch event.Type { - case "content_block_delta": - var delta struct { - Index int `json:"index"` - Delta struct { - Text string `json:"text,omitempty"` - InputJSONDelta string `json:"input_json_delta,omitempty"` - } `json:"delta"` - } - json.Unmarshal([]byte(dataStr), &delta) - - if delta.Delta.Text != "" { - chunk := StreamChunk{ - ID: chunkID, - Object: "chat.completion.chunk", - Created: created, - Model: model, - Choices: []StreamChoice{ - { - Index: delta.Index, - Delta: Delta{ - Content: delta.Delta.Text, - }, - }, - }, - } - data, _ := json.Marshal(chunk) - return string(data) - } - if delta.Delta.InputJSONDelta != "" { - chunk := StreamChunk{ - ID: chunkID, - Object: "chat.completion.chunk", - Created: created, - Model: model, - Choices: []StreamChoice{ - { - Index: delta.Index, - Delta: Delta{ - ToolCalls: []StreamToolCall{ - { - Index: delta.Index, - Function: StreamFunction{ - Arguments: delta.Delta.InputJSONDelta, - }, - }, - }, - }, - }, - }, - } - data, _ := json.Marshal(chunk) - return string(data) - } - - case "message_delta": - var msgDelta struct { - StopReason string `json:"stop_reason,omitempty"` - } - json.Unmarshal([]byte(dataStr), &msgDelta) - chunk := StreamChunk{ - ID: chunkID, - Object: "chat.completion.chunk", - Created: created, - Model: model, - Choices: []StreamChoice{ - { - Index: 0, - Delta: Delta{}, - FinishReason: mapStopReason(msgDelta.StopReason), - }, - }, - } - data, _ := json.Marshal(chunk) - return string(data) - } - - return "" -} - -func proxyToUpstream(req *AnthropicRequest, apiKey, sessionID string, stream bool) (*http.Response, error) { +func callUpstream(req *AnthropicRequest, apiKey, sessionID string) (*http.Response, error) { bodyBytes, err := json.Marshal(req) if err != nil { return nil, fmt.Errorf("failed to marshal request: %w", err) } upstreamURL := config.UpstreamURL + "/v1/messages" - httpReq, err := http.NewRequest(http.MethodPost, upstreamURL, bytes.NewReader(bodyBytes)) + httpReq, err := http.NewRequest(http.MethodPost, upstreamURL, strings.NewReader(string(bodyBytes))) if err != nil { return nil, fmt.Errorf("failed to create request: %w", err) } @@ -365,16 +218,7 @@ func proxyToUpstream(req *AnthropicRequest, apiKey, sessionID string, stream boo httpReq.Header.Set(k, v) } - if stream { - httpReq.Header.Set("anthropic-sse-beta", "output-2025-05-14") - } - - ctx, cancel := context.WithTimeout(context.Background(), 300*time.Second) - defer cancel() - - httpReq = httpReq.WithContext(ctx) - - client := &http.Client{} + client := &http.Client{Timeout: 300 * time.Second} return client.Do(httpReq) }