fix: always stream from upstream, accumulate for non-streaming clients
This commit is contained in:
parent
f284f8dbde
commit
02989e59b5
1 changed files with 97 additions and 37 deletions
120
handler.go
120
handler.go
|
|
@ -103,12 +103,12 @@ func handleChatCompletions(w http.ResponseWriter, r *http.Request) {
|
|||
|
||||
// Convert to Anthropic format
|
||||
anthropicReq := ConvertOpenAIRequest(&req)
|
||||
anthropicReq.Stream = true // Always stream from upstream for reliability
|
||||
|
||||
// Check streaming
|
||||
isStream := req.Stream != nil && *req.Stream
|
||||
log.Printf("[debug] Sending to upstream %s, model=%s", config.UpstreamURL, req.Model)
|
||||
|
||||
// Proxy to upstream
|
||||
resp, err := proxyToUpstream(anthropicReq, apiKey, sessionID, isStream)
|
||||
// Proxy to upstream (always streaming)
|
||||
resp, err := proxyToUpstream(anthropicReq, apiKey, sessionID, true)
|
||||
if err != nil {
|
||||
writeError(w, http.StatusBadGateway, fmt.Sprintf("Upstream request failed: %v", err), "upstream_error", "proxy_error")
|
||||
return
|
||||
|
|
@ -117,20 +117,20 @@ func handleChatCompletions(w http.ResponseWriter, r *http.Request) {
|
|||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
respBody, _ := io.ReadAll(resp.Body)
|
||||
log.Printf("[debug] Upstream error status %d: %s", resp.StatusCode, string(respBody))
|
||||
writeError(w, http.StatusBadGateway, fmt.Sprintf("Upstream returned error: %s", string(respBody)), "upstream_error", fmt.Sprintf("status_%d", resp.StatusCode))
|
||||
return
|
||||
}
|
||||
|
||||
isStream := req.Stream != nil && *req.Stream
|
||||
|
||||
if isStream {
|
||||
// Handle streaming response
|
||||
// Stream to client
|
||||
w.Header().Set("content-type", "text/event-stream")
|
||||
w.Header().Set("cache-control", "no-cache")
|
||||
w.Header().Set("connection", "keep-alive")
|
||||
|
||||
// Convert SSE stream on the fly
|
||||
reader := io.NopCloser(resp.Body)
|
||||
scanner := bufio.NewScanner(reader)
|
||||
// Use a larger buffer for potential large JSON lines
|
||||
scanner := bufio.NewScanner(resp.Body)
|
||||
buf := make([]byte, 0, 64*1024)
|
||||
scanner.Buffer(buf, 1024*1024)
|
||||
|
||||
|
|
@ -144,9 +144,7 @@ func handleChatCompletions(w http.ResponseWriter, r *http.Request) {
|
|||
}
|
||||
dataStr := strings.TrimPrefix(line, "data: ")
|
||||
if dataStr == "[DONE]" {
|
||||
fmt.Fprintf(w, "data: [DONE]\n\n")
|
||||
w.(http.Flusher).Flush()
|
||||
continue
|
||||
break
|
||||
}
|
||||
|
||||
var event struct {
|
||||
|
|
@ -156,7 +154,6 @@ func handleChatCompletions(w http.ResponseWriter, r *http.Request) {
|
|||
continue
|
||||
}
|
||||
|
||||
// Process event and write OpenAI SSE
|
||||
openAIData := processAnthropicEvent(dataStr, chunkID, req.Model, created)
|
||||
if openAIData != "" {
|
||||
fmt.Fprintf(w, "data: %s\n\n", openAIData)
|
||||
|
|
@ -166,20 +163,89 @@ func handleChatCompletions(w http.ResponseWriter, r *http.Request) {
|
|||
fmt.Fprintf(w, "data: [DONE]\n\n")
|
||||
w.(http.Flusher).Flush()
|
||||
} else {
|
||||
// Non-streaming response
|
||||
respBody, err := io.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
writeError(w, http.StatusBadGateway, "Failed to read upstream response", "upstream_error", "body_read_error")
|
||||
return
|
||||
// Non-streaming to client: accumulate upstream stream into a single response
|
||||
scanner := bufio.NewScanner(resp.Body)
|
||||
buf := make([]byte, 0, 64*1024)
|
||||
scanner.Buffer(buf, 1024*1024)
|
||||
|
||||
var textContent string
|
||||
var stopReason string
|
||||
var usage AnthropicUsage
|
||||
msgID := ""
|
||||
|
||||
for scanner.Scan() {
|
||||
line := scanner.Text()
|
||||
if !strings.HasPrefix(line, "data: ") {
|
||||
continue
|
||||
}
|
||||
dataStr := strings.TrimPrefix(line, "data: ")
|
||||
if dataStr == "[DONE]" {
|
||||
break
|
||||
}
|
||||
|
||||
var anthropicResp AnthropicResponse
|
||||
if err := json.Unmarshal(respBody, &anthropicResp); err != nil {
|
||||
writeError(w, http.StatusBadGateway, "Failed to parse upstream response", "upstream_error", "json_decode_error")
|
||||
return
|
||||
var event struct {
|
||||
Type string `json:"type"`
|
||||
}
|
||||
if err := json.Unmarshal([]byte(dataStr), &event); err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
openAIResp := ConvertAnthropicResponse(&anthropicResp, req.Model)
|
||||
switch event.Type {
|
||||
case "message_start":
|
||||
var msgStart struct {
|
||||
Message struct {
|
||||
Id string `json:"id"`
|
||||
Usage AnthropicUsage `json:"usage"`
|
||||
} `json:"message"`
|
||||
}
|
||||
if json.Unmarshal([]byte(dataStr), &msgStart) == nil {
|
||||
msgID = msgStart.Message.Id
|
||||
usage.InputTokens = msgStart.Message.Usage.InputTokens
|
||||
}
|
||||
|
||||
case "content_block_delta":
|
||||
var delta struct {
|
||||
Delta struct {
|
||||
Text string `json:"text,omitempty"`
|
||||
} `json:"delta"`
|
||||
}
|
||||
if json.Unmarshal([]byte(dataStr), &delta) == nil {
|
||||
textContent += delta.Delta.Text
|
||||
}
|
||||
|
||||
case "message_delta":
|
||||
var msgDelta struct {
|
||||
Delta struct{} `json:"delta"`
|
||||
Usage *AnthropicUsage `json:"usage,omitempty"`
|
||||
StopReason string `json:"stop_reason,omitempty"`
|
||||
}
|
||||
if json.Unmarshal([]byte(dataStr), &msgDelta) == nil {
|
||||
stopReason = msgDelta.StopReason
|
||||
if msgDelta.Usage != nil {
|
||||
usage.OutputTokens = msgDelta.Usage.OutputTokens
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
openAIResp := &ChatCompletionResponse{
|
||||
ID: msgID,
|
||||
Object: "chat.completion",
|
||||
Created: time.Now().Unix(),
|
||||
Model: req.Model,
|
||||
Choices: []Choice{
|
||||
{
|
||||
Index: 0,
|
||||
Message: Message{Role: "assistant", Content: textContent},
|
||||
FinishReason: mapStopReason(stopReason),
|
||||
},
|
||||
},
|
||||
Usage: Usage{
|
||||
PromptTokens: usage.InputTokens,
|
||||
CompletionTokens: usage.OutputTokens,
|
||||
TotalTokens: usage.InputTokens + usage.OutputTokens,
|
||||
},
|
||||
}
|
||||
|
||||
w.Header().Set("content-type", "application/json")
|
||||
json.NewEncoder(w).Encode(openAIResp)
|
||||
|
|
@ -195,12 +261,6 @@ func processAnthropicEvent(dataStr, chunkID, model string, created int64) string
|
|||
}
|
||||
|
||||
switch event.Type {
|
||||
case "message_start":
|
||||
return ""
|
||||
|
||||
case "content_block_start":
|
||||
return ""
|
||||
|
||||
case "content_block_delta":
|
||||
var delta struct {
|
||||
Index int `json:"index"`
|
||||
|
|
@ -301,7 +361,7 @@ func proxyToUpstream(req *AnthropicRequest, apiKey, sessionID string, stream boo
|
|||
httpReq.Header.Set("anthropic-sse-beta", "output-2025-05-14")
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 300*time.Second)
|
||||
defer cancel()
|
||||
|
||||
httpReq = httpReq.WithContext(ctx)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue