fix: use non-streaming upstream requests, convert to SSE for clients

This commit is contained in:
ashisgreat22 2026-04-15 08:38:54 +00:00
parent 5989e9a390
commit 73efb02461

View file

@ -1,9 +1,6 @@
package main package main
import ( import (
"bufio"
"bytes"
"context"
"encoding/json" "encoding/json"
"fmt" "fmt"
"io" "io"
@ -54,12 +51,8 @@ func handleModels(w http.ResponseWriter, r *http.Request) {
} }
models := []map[string]interface{}{ models := []map[string]interface{}{
{"id": "claude-opus-4-5", "object": "model", "created": 1234567890, "owned_by": "anthropic"}, {"id": "glm-4.7", "object": "model", "created": 1234567890, "owned_by": "zhipu"},
{"id": "claude-sonnet-4-20250514", "object": "model", "created": 1234567890, "owned_by": "anthropic"}, {"id": "glm-4.6", "object": "model", "created": 1234567890, "owned_by": "zhipu"},
{"id": "claude-3-5-sonnet-20241022", "object": "model", "created": 1234567890, "owned_by": "anthropic"},
{"id": "claude-3-opus-20240229", "object": "model", "created": 1234567890, "owned_by": "anthropic"},
{"id": "claude-3-sonnet-20240229", "object": "model", "created": 1234567890, "owned_by": "anthropic"},
{"id": "claude-3-haiku-20240307", "object": "model", "created": 1234567890, "owned_by": "anthropic"},
} }
response := map[string]interface{}{ response := map[string]interface{}{
@ -102,260 +95,120 @@ func handleChatCompletions(w http.ResponseWriter, r *http.Request) {
// Get session ID from context (set by main) // Get session ID from context (set by main)
sessionID := r.Context().Value(sessionIDKey).(string) sessionID := r.Context().Value(sessionIDKey).(string)
// Convert to Anthropic format // Convert to Anthropic format — always non-streaming to upstream
// (ZAI's streaming returns empty for GLM models)
anthropicReq := ConvertOpenAIRequest(&req) anthropicReq := ConvertOpenAIRequest(&req)
anthropicReq.Stream = true // Always stream from upstream for reliability anthropicReq.Stream = false
reqBody, _ := json.Marshal(anthropicReq) reqBody, _ := json.Marshal(anthropicReq)
log.Printf("[debug] Sending to upstream %s, model=%s, body=%s", config.UpstreamURL, req.Model, string(reqBody)) log.Printf("[debug] Sending to upstream %s, model=%s, body=%s", config.UpstreamURL, req.Model, string(reqBody))
// Proxy to upstream (always streaming) // Non-streaming request to upstream
resp, err := proxyToUpstream(anthropicReq, apiKey, sessionID, true) upstreamResp, err := callUpstream(anthropicReq, apiKey, sessionID)
if err != nil { if err != nil {
writeError(w, http.StatusBadGateway, fmt.Sprintf("Upstream request failed: %v", err), "upstream_error", "proxy_error") writeError(w, http.StatusBadGateway, fmt.Sprintf("Upstream request failed: %v", err), "upstream_error", "proxy_error")
return return
} }
defer resp.Body.Close() defer upstreamResp.Body.Close()
if resp.StatusCode != http.StatusOK { if upstreamResp.StatusCode != http.StatusOK {
respBody, _ := io.ReadAll(resp.Body) respBody, _ := io.ReadAll(upstreamResp.Body)
log.Printf("[debug] Upstream error status %d: %s", resp.StatusCode, string(respBody)) log.Printf("[debug] Upstream error status %d: %s", upstreamResp.StatusCode, string(respBody))
writeError(w, http.StatusBadGateway, fmt.Sprintf("Upstream returned error: %s", string(respBody)), "upstream_error", fmt.Sprintf("status_%d", resp.StatusCode)) writeError(w, http.StatusBadGateway, fmt.Sprintf("Upstream returned error: %s", string(respBody)), "upstream_error", fmt.Sprintf("status_%d", upstreamResp.StatusCode))
return
}
// Read the full Anthropic response
respBody, err := io.ReadAll(upstreamResp.Body)
if err != nil {
writeError(w, http.StatusBadGateway, "Failed to read upstream response", "upstream_error", "body_read_error")
return
}
log.Printf("[debug] Upstream response: %s", string(respBody))
var anthropicResp AnthropicResponse
if err := json.Unmarshal(respBody, &anthropicResp); err != nil {
writeError(w, http.StatusBadGateway, "Failed to parse upstream response", "upstream_error", "json_decode_error")
return return
} }
isStream := req.Stream != nil && *req.Stream isStream := req.Stream != nil && *req.Stream
if isStream { if isStream {
// Stream to client // Convert the non-streaming response to SSE chunks for the client
w.Header().Set("content-type", "text/event-stream") w.Header().Set("content-type", "text/event-stream")
w.Header().Set("cache-control", "no-cache") w.Header().Set("cache-control", "no-cache")
w.Header().Set("connection", "keep-alive") w.Header().Set("connection", "keep-alive")
scanner := bufio.NewScanner(resp.Body) created := time.Now().Unix()
buf := make([]byte, 0, 64*1024)
scanner.Buffer(buf, 1024*1024)
created := int64(time.Now().Unix())
chunkID := "chatcmpl-" + randomString(8) chunkID := "chatcmpl-" + randomString(8)
for scanner.Scan() { // Extract text content
line := scanner.Text() var textContent string
log.Printf("[stream] raw: %q", line) for _, block := range anthropicResp.Content {
if !strings.HasPrefix(line, "data: ") { if block.Type == "text" {
continue textContent += block.Text
} }
dataStr := strings.TrimPrefix(line, "data: ")
if dataStr == "[DONE]" {
break
} }
var event struct { // Send text as chunks (simulate streaming)
Type string `json:"type"` if textContent != "" {
chunk := StreamChunk{
ID: chunkID,
Object: "chat.completion.chunk",
Created: created,
Model: req.Model,
Choices: []StreamChoice{
{
Index: 0,
Delta: Delta{
Role: "assistant",
Content: textContent,
},
},
},
} }
if err := json.Unmarshal([]byte(dataStr), &event); err != nil { data, _ := json.Marshal(chunk)
log.Printf("[stream] JSON err: %v", err) fmt.Fprintf(w, "data: %s\n\n", data)
continue
}
log.Printf("[stream] type=%s", event.Type)
openAIData := processAnthropicEvent(dataStr, chunkID, req.Model, created)
if openAIData != "" {
fmt.Fprintf(w, "data: %s\n\n", openAIData)
w.(http.Flusher).Flush() w.(http.Flusher).Flush()
} }
// Send finish chunk
finishChunk := StreamChunk{
ID: chunkID,
Object: "chat.completion.chunk",
Created: created,
Model: req.Model,
Choices: []StreamChoice{
{
Index: 0,
Delta: Delta{},
FinishReason: mapStopReason(anthropicResp.StopReason),
},
},
} }
data, _ := json.Marshal(finishChunk)
fmt.Fprintf(w, "data: %s\n\n", data)
w.(http.Flusher).Flush()
fmt.Fprintf(w, "data: [DONE]\n\n") fmt.Fprintf(w, "data: [DONE]\n\n")
w.(http.Flusher).Flush() w.(http.Flusher).Flush()
} else { } else {
// Non-streaming to client: accumulate upstream stream into a single response // Non-streaming: convert directly
scanner := bufio.NewScanner(resp.Body) openAIResp := ConvertAnthropicResponse(&anthropicResp, req.Model)
buf := make([]byte, 0, 64*1024)
scanner.Buffer(buf, 1024*1024)
var textContent string
var stopReason string
var usage AnthropicUsage
msgID := ""
for scanner.Scan() {
line := scanner.Text()
log.Printf("[accumulate] raw: %q", line)
if !strings.HasPrefix(line, "data: ") {
continue
}
dataStr := strings.TrimPrefix(line, "data: ")
if dataStr == "[DONE]" {
break
}
var event struct {
Type string `json:"type"`
}
if err := json.Unmarshal([]byte(dataStr), &event); err != nil {
log.Printf("[accumulate] JSON err: %v", err)
continue
}
log.Printf("[accumulate] type=%s data=%.200s", event.Type, dataStr)
switch event.Type {
case "message_start":
var msgStart struct {
Message struct {
Id string `json:"id"`
Usage AnthropicUsage `json:"usage"`
} `json:"message"`
}
if json.Unmarshal([]byte(dataStr), &msgStart) == nil {
msgID = msgStart.Message.Id
usage.InputTokens = msgStart.Message.Usage.InputTokens
}
case "content_block_delta":
var delta struct {
Delta struct {
Text string `json:"text,omitempty"`
} `json:"delta"`
}
if json.Unmarshal([]byte(dataStr), &delta) == nil {
textContent += delta.Delta.Text
}
case "message_delta":
var msgDelta struct {
Delta struct{} `json:"delta"`
Usage *AnthropicUsage `json:"usage,omitempty"`
StopReason string `json:"stop_reason,omitempty"`
}
if json.Unmarshal([]byte(dataStr), &msgDelta) == nil {
stopReason = msgDelta.StopReason
if msgDelta.Usage != nil {
usage.OutputTokens = msgDelta.Usage.OutputTokens
}
}
}
}
openAIResp := &ChatCompletionResponse{
ID: msgID,
Object: "chat.completion",
Created: time.Now().Unix(),
Model: req.Model,
Choices: []Choice{
{
Index: 0,
Message: Message{Role: "assistant", Content: textContent},
FinishReason: mapStopReason(stopReason),
},
},
Usage: Usage{
PromptTokens: usage.InputTokens,
CompletionTokens: usage.OutputTokens,
TotalTokens: usage.InputTokens + usage.OutputTokens,
},
}
w.Header().Set("content-type", "application/json") w.Header().Set("content-type", "application/json")
json.NewEncoder(w).Encode(openAIResp) json.NewEncoder(w).Encode(openAIResp)
} }
} }
func processAnthropicEvent(dataStr, chunkID, model string, created int64) string { func callUpstream(req *AnthropicRequest, apiKey, sessionID string) (*http.Response, error) {
var event struct {
Type string `json:"type"`
}
if err := json.Unmarshal([]byte(dataStr), &event); err != nil {
return ""
}
switch event.Type {
case "content_block_delta":
var delta struct {
Index int `json:"index"`
Delta struct {
Text string `json:"text,omitempty"`
InputJSONDelta string `json:"input_json_delta,omitempty"`
} `json:"delta"`
}
json.Unmarshal([]byte(dataStr), &delta)
if delta.Delta.Text != "" {
chunk := StreamChunk{
ID: chunkID,
Object: "chat.completion.chunk",
Created: created,
Model: model,
Choices: []StreamChoice{
{
Index: delta.Index,
Delta: Delta{
Content: delta.Delta.Text,
},
},
},
}
data, _ := json.Marshal(chunk)
return string(data)
}
if delta.Delta.InputJSONDelta != "" {
chunk := StreamChunk{
ID: chunkID,
Object: "chat.completion.chunk",
Created: created,
Model: model,
Choices: []StreamChoice{
{
Index: delta.Index,
Delta: Delta{
ToolCalls: []StreamToolCall{
{
Index: delta.Index,
Function: StreamFunction{
Arguments: delta.Delta.InputJSONDelta,
},
},
},
},
},
},
}
data, _ := json.Marshal(chunk)
return string(data)
}
case "message_delta":
var msgDelta struct {
StopReason string `json:"stop_reason,omitempty"`
}
json.Unmarshal([]byte(dataStr), &msgDelta)
chunk := StreamChunk{
ID: chunkID,
Object: "chat.completion.chunk",
Created: created,
Model: model,
Choices: []StreamChoice{
{
Index: 0,
Delta: Delta{},
FinishReason: mapStopReason(msgDelta.StopReason),
},
},
}
data, _ := json.Marshal(chunk)
return string(data)
}
return ""
}
func proxyToUpstream(req *AnthropicRequest, apiKey, sessionID string, stream bool) (*http.Response, error) {
bodyBytes, err := json.Marshal(req) bodyBytes, err := json.Marshal(req)
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to marshal request: %w", err) return nil, fmt.Errorf("failed to marshal request: %w", err)
} }
upstreamURL := config.UpstreamURL + "/v1/messages" upstreamURL := config.UpstreamURL + "/v1/messages"
httpReq, err := http.NewRequest(http.MethodPost, upstreamURL, bytes.NewReader(bodyBytes)) httpReq, err := http.NewRequest(http.MethodPost, upstreamURL, strings.NewReader(string(bodyBytes)))
if err != nil { if err != nil {
return nil, fmt.Errorf("failed to create request: %w", err) return nil, fmt.Errorf("failed to create request: %w", err)
} }
@ -365,16 +218,7 @@ func proxyToUpstream(req *AnthropicRequest, apiKey, sessionID string, stream boo
httpReq.Header.Set(k, v) httpReq.Header.Set(k, v)
} }
if stream { client := &http.Client{Timeout: 300 * time.Second}
httpReq.Header.Set("anthropic-sse-beta", "output-2025-05-14")
}
ctx, cancel := context.WithTimeout(context.Background(), 300*time.Second)
defer cancel()
httpReq = httpReq.WithContext(ctx)
client := &http.Client{}
return client.Do(httpReq) return client.Do(httpReq)
} }