Implement OpenAI-to-Anthropic proxy with streaming support
- Add request/response converters (OpenAI <-> Anthropic formats) - Implement SSE streaming conversion (Anthropic events -> OpenAI SSE) - Add /v1/models endpoint with Claude model list - Add /v1/chat/completions endpoint with streaming and non-streaming support - Fix context key type matching bug (sessionIDKey) - Configurable upstream URL via config.yaml - Mimic claude-code CLI headers for upstream requests
This commit is contained in:
parent
87a74edbf5
commit
8450d96e2e
10 changed files with 1270 additions and 0 deletions
339
streaming.go
Normal file
339
streaming.go
Normal file
|
|
@ -0,0 +1,339 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"strings"
|
||||
)
|
||||
|
||||
// StreamConverter holds state during streaming
|
||||
type StreamConverter struct {
|
||||
ID string
|
||||
Model string
|
||||
Role string
|
||||
Finish string
|
||||
AccBlocks map[int]*AccBlock
|
||||
ToolIndex int
|
||||
}
|
||||
|
||||
type AccBlock struct {
|
||||
Type string
|
||||
Name string
|
||||
InputAcc string
|
||||
TextAcc string
|
||||
}
|
||||
|
||||
func NewStreamConverter(id, model string) *StreamConverter {
|
||||
return &StreamConverter{
|
||||
ID: id,
|
||||
Model: model,
|
||||
AccBlocks: make(map[int]*AccBlock),
|
||||
}
|
||||
}
|
||||
|
||||
// HandleAnthropicStream reads Anthropic SSE events, writes OpenAI SSE
|
||||
func (sc *StreamConverter) HandleAnthropicStream(reader *bufio.Reader, writer io.Writer) error {
|
||||
created := int64(1234567890)
|
||||
|
||||
for {
|
||||
line, err := reader.ReadString('\n')
|
||||
if err != nil {
|
||||
if err == io.EOF {
|
||||
break
|
||||
}
|
||||
return fmt.Errorf("read error: %w", err)
|
||||
}
|
||||
|
||||
line = strings.TrimSpace(line)
|
||||
if line == "" {
|
||||
continue
|
||||
}
|
||||
|
||||
// Skip comment lines (starting with colon)
|
||||
if strings.HasPrefix(line, ":") {
|
||||
continue
|
||||
}
|
||||
|
||||
// Parse SSE data line: "data: {...}"
|
||||
if !strings.HasPrefix(line, "data: ") {
|
||||
continue
|
||||
}
|
||||
|
||||
dataStr := strings.TrimPrefix(line, "data: ")
|
||||
|
||||
// Try to parse as SSE event with type field first
|
||||
var event struct {
|
||||
Type string `json:"type"`
|
||||
}
|
||||
if err := json.Unmarshal([]byte(dataStr), &event); err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
switch event.Type {
|
||||
case "message_start":
|
||||
var msgStart struct {
|
||||
Type string `json:"type"`
|
||||
Message struct {
|
||||
Id string `json:"id"`
|
||||
Role string `json:"role"`
|
||||
Type string `json:"type"`
|
||||
} `json:"message"`
|
||||
}
|
||||
if err := json.Unmarshal([]byte(dataStr), &msgStart); err == nil {
|
||||
sc.Role = msgStart.Message.Role
|
||||
}
|
||||
|
||||
case "content_block_start":
|
||||
var blockStart struct {
|
||||
Type string `json:"type"`
|
||||
Index int `json:"index"`
|
||||
ContentBlock struct {
|
||||
Type string `json:"type"`
|
||||
Id string `json:"id"`
|
||||
Name string `json:"name,omitempty"`
|
||||
} `json:"content_block"`
|
||||
}
|
||||
if err := json.Unmarshal([]byte(dataStr), &blockStart); err == nil {
|
||||
sc.AccBlocks[blockStart.Index] = &AccBlock{
|
||||
Type: blockStart.ContentBlock.Type,
|
||||
Name: blockStart.ContentBlock.Name,
|
||||
}
|
||||
if blockStart.ContentBlock.Type == "tool_use" {
|
||||
sc.ToolIndex = blockStart.Index
|
||||
}
|
||||
}
|
||||
|
||||
case "content_block_delta":
|
||||
var delta struct {
|
||||
Type string `json:"type"`
|
||||
Index int `json:"index"`
|
||||
Delta struct {
|
||||
Text string `json:"text,omitempty"`
|
||||
InputJSONDelta string `json:"input_json_delta,omitempty"`
|
||||
} `json:"delta"`
|
||||
}
|
||||
if err := json.Unmarshal([]byte(dataStr), &delta); err == nil {
|
||||
block := sc.AccBlocks[delta.Index]
|
||||
if block != nil {
|
||||
block.TextAcc += delta.Delta.Text
|
||||
block.InputAcc += delta.Delta.InputJSONDelta
|
||||
}
|
||||
|
||||
// Write OpenAI format chunk
|
||||
if delta.Delta.Text != "" {
|
||||
chunk := StreamChunk{
|
||||
ID: sc.ID,
|
||||
Object: "chat.completion.chunk",
|
||||
Created: created,
|
||||
Model: sc.Model,
|
||||
Choices: []StreamChoice{
|
||||
{
|
||||
Index: delta.Index,
|
||||
Delta: Delta{
|
||||
Content: delta.Delta.Text,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
data, _ := json.Marshal(chunk)
|
||||
fmt.Fprintf(writer, "data: %s\n\n", data)
|
||||
}
|
||||
|
||||
if delta.Delta.InputJSONDelta != "" {
|
||||
block := sc.AccBlocks[delta.Index]
|
||||
if block != nil {
|
||||
chunk := StreamChunk{
|
||||
ID: sc.ID,
|
||||
Object: "chat.completion.chunk",
|
||||
Created: created,
|
||||
Model: sc.Model,
|
||||
Choices: []StreamChoice{
|
||||
{
|
||||
Index: delta.Index,
|
||||
Delta: Delta{
|
||||
ToolCalls: []StreamToolCall{
|
||||
{
|
||||
Index: delta.Index,
|
||||
ID: block.Name + "_pending",
|
||||
Function: StreamFunction{
|
||||
Arguments: delta.Delta.InputJSONDelta,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
data, _ := json.Marshal(chunk)
|
||||
fmt.Fprintf(writer, "data: %s\n\n", data)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
case "content_block_stop":
|
||||
var blockStop struct {
|
||||
Type string `json:"type"`
|
||||
Index int `json:"index"`
|
||||
}
|
||||
if err := json.Unmarshal([]byte(dataStr), &blockStop); err == nil {
|
||||
// Nothing to output here, accumulated data was already streamed
|
||||
}
|
||||
|
||||
case "message_delta":
|
||||
var msgDelta struct {
|
||||
Type string `json:"type"`
|
||||
Delta struct {
|
||||
StopSequence string `json:"stop_sequence,omitempty"`
|
||||
} `json:"delta"`
|
||||
Usage *AnthropicUsage `json:"usage,omitempty"`
|
||||
StopReason string `json:"stop_reason,omitempty"`
|
||||
}
|
||||
if err := json.Unmarshal([]byte(dataStr), &msgDelta); err == nil {
|
||||
if msgDelta.StopReason != "" {
|
||||
sc.Finish = mapStopReason(msgDelta.StopReason)
|
||||
}
|
||||
}
|
||||
|
||||
case "message_stop":
|
||||
// Send final chunk with finish_reason
|
||||
chunk := StreamChunk{
|
||||
ID: sc.ID,
|
||||
Object: "chat.completion.chunk",
|
||||
Created: created,
|
||||
Model: sc.Model,
|
||||
Choices: []StreamChoice{
|
||||
{
|
||||
Index: 0,
|
||||
Delta: Delta{},
|
||||
FinishReason: sc.Finish,
|
||||
},
|
||||
},
|
||||
}
|
||||
data, _ := json.Marshal(chunk)
|
||||
fmt.Fprintf(writer, "data: %s\n\n", data)
|
||||
fmt.Fprintf(writer, "data: [DONE]\n\n")
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// ParseAnthropicSSEEvent parses a raw SSE data line into a map
|
||||
func ParseAnthropicSSEEvent(data string) (map[string]interface{}, error) {
|
||||
var result map[string]interface{}
|
||||
err := json.Unmarshal([]byte(data), &result)
|
||||
return result, err
|
||||
}
|
||||
|
||||
// ConvertStreamEvents converts Anthropic SSE bytes to OpenAI SSE format
|
||||
func ConvertStreamEvents(sseData []byte, id, model string) ([]byte, error) {
|
||||
var buf bytes.Buffer
|
||||
scanner := bufio.NewScanner(bytes.NewReader(sseData))
|
||||
sc := NewStreamConverter(id, model)
|
||||
|
||||
for scanner.Scan() {
|
||||
line := scanner.Text()
|
||||
if !strings.HasPrefix(line, "data: ") {
|
||||
continue
|
||||
}
|
||||
dataStr := strings.TrimPrefix(line, "data: ")
|
||||
|
||||
var event struct {
|
||||
Type string `json:"type"`
|
||||
}
|
||||
if err := json.Unmarshal([]byte(dataStr), &event); err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
switch event.Type {
|
||||
case "message_start":
|
||||
var msgStart struct {
|
||||
Message struct {
|
||||
Id string `json:"id"`
|
||||
Role string `json:"role"`
|
||||
} `json:"message"`
|
||||
}
|
||||
json.Unmarshal([]byte(dataStr), &msgStart)
|
||||
sc.Role = msgStart.Message.Role
|
||||
|
||||
case "content_block_start":
|
||||
var blockStart struct {
|
||||
Index int `json:"index"`
|
||||
ContentBlock struct {
|
||||
Type string `json:"type"`
|
||||
Id string `json:"id"`
|
||||
Name string `json:"name,omitempty"`
|
||||
} `json:"content_block"`
|
||||
}
|
||||
if err := json.Unmarshal([]byte(dataStr), &blockStart); err == nil {
|
||||
sc.AccBlocks[blockStart.Index] = &AccBlock{
|
||||
Type: blockStart.ContentBlock.Type,
|
||||
Name: blockStart.ContentBlock.Name,
|
||||
}
|
||||
}
|
||||
|
||||
case "content_block_delta":
|
||||
var delta struct {
|
||||
Index int `json:"index"`
|
||||
Delta struct {
|
||||
Text string `json:"text,omitempty"`
|
||||
InputJSONDelta string `json:"input_json_delta,omitempty"`
|
||||
} `json:"delta"`
|
||||
}
|
||||
if err := json.Unmarshal([]byte(dataStr), &delta); err == nil {
|
||||
block := sc.AccBlocks[delta.Index]
|
||||
if block != nil {
|
||||
block.TextAcc += delta.Delta.Text
|
||||
block.InputAcc += delta.Delta.InputJSONDelta
|
||||
}
|
||||
|
||||
chunk := StreamChunk{
|
||||
ID: sc.ID,
|
||||
Object: "chat.completion.chunk",
|
||||
Created: int64(1234567890),
|
||||
Model: sc.Model,
|
||||
Choices: []StreamChoice{
|
||||
{
|
||||
Index: delta.Index,
|
||||
Delta: Delta{
|
||||
Content: delta.Delta.Text,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
data, _ := json.Marshal(chunk)
|
||||
buf.WriteString(fmt.Sprintf("data: %s\n\n", data))
|
||||
}
|
||||
|
||||
case "message_delta":
|
||||
var msgDelta struct {
|
||||
StopReason string `json:"stop_reason,omitempty"`
|
||||
}
|
||||
json.Unmarshal([]byte(dataStr), &msgDelta)
|
||||
sc.Finish = mapStopReason(msgDelta.StopReason)
|
||||
|
||||
case "message_stop":
|
||||
chunk := StreamChunk{
|
||||
ID: sc.ID,
|
||||
Object: "chat.completion.chunk",
|
||||
Created: int64(1234567890),
|
||||
Model: sc.Model,
|
||||
Choices: []StreamChoice{
|
||||
{
|
||||
Index: 0,
|
||||
Delta: Delta{},
|
||||
FinishReason: sc.Finish,
|
||||
},
|
||||
},
|
||||
}
|
||||
data, _ := json.Marshal(chunk)
|
||||
buf.WriteString(fmt.Sprintf("data: %s\n\n", data))
|
||||
buf.WriteString("data: [DONE]\n\n")
|
||||
}
|
||||
}
|
||||
|
||||
return buf.Bytes(), nil
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue