- Add request/response converters (OpenAI <-> Anthropic formats) - Implement SSE streaming conversion (Anthropic events -> OpenAI SSE) - Add /v1/models endpoint with Claude model list - Add /v1/chat/completions endpoint with streaming and non-streaming support - Fix context key type matching bug (sessionIDKey) - Configurable upstream URL via config.yaml - Mimic claude-code CLI headers for upstream requests
339 lines
8.2 KiB
Go
339 lines
8.2 KiB
Go
package main
|
|
|
|
import (
|
|
"bufio"
|
|
"bytes"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"strings"
|
|
)
|
|
|
|
// StreamConverter holds state during streaming
|
|
type StreamConverter struct {
|
|
ID string
|
|
Model string
|
|
Role string
|
|
Finish string
|
|
AccBlocks map[int]*AccBlock
|
|
ToolIndex int
|
|
}
|
|
|
|
type AccBlock struct {
|
|
Type string
|
|
Name string
|
|
InputAcc string
|
|
TextAcc string
|
|
}
|
|
|
|
func NewStreamConverter(id, model string) *StreamConverter {
|
|
return &StreamConverter{
|
|
ID: id,
|
|
Model: model,
|
|
AccBlocks: make(map[int]*AccBlock),
|
|
}
|
|
}
|
|
|
|
// HandleAnthropicStream reads Anthropic SSE events, writes OpenAI SSE
|
|
func (sc *StreamConverter) HandleAnthropicStream(reader *bufio.Reader, writer io.Writer) error {
|
|
created := int64(1234567890)
|
|
|
|
for {
|
|
line, err := reader.ReadString('\n')
|
|
if err != nil {
|
|
if err == io.EOF {
|
|
break
|
|
}
|
|
return fmt.Errorf("read error: %w", err)
|
|
}
|
|
|
|
line = strings.TrimSpace(line)
|
|
if line == "" {
|
|
continue
|
|
}
|
|
|
|
// Skip comment lines (starting with colon)
|
|
if strings.HasPrefix(line, ":") {
|
|
continue
|
|
}
|
|
|
|
// Parse SSE data line: "data: {...}"
|
|
if !strings.HasPrefix(line, "data: ") {
|
|
continue
|
|
}
|
|
|
|
dataStr := strings.TrimPrefix(line, "data: ")
|
|
|
|
// Try to parse as SSE event with type field first
|
|
var event struct {
|
|
Type string `json:"type"`
|
|
}
|
|
if err := json.Unmarshal([]byte(dataStr), &event); err != nil {
|
|
continue
|
|
}
|
|
|
|
switch event.Type {
|
|
case "message_start":
|
|
var msgStart struct {
|
|
Type string `json:"type"`
|
|
Message struct {
|
|
Id string `json:"id"`
|
|
Role string `json:"role"`
|
|
Type string `json:"type"`
|
|
} `json:"message"`
|
|
}
|
|
if err := json.Unmarshal([]byte(dataStr), &msgStart); err == nil {
|
|
sc.Role = msgStart.Message.Role
|
|
}
|
|
|
|
case "content_block_start":
|
|
var blockStart struct {
|
|
Type string `json:"type"`
|
|
Index int `json:"index"`
|
|
ContentBlock struct {
|
|
Type string `json:"type"`
|
|
Id string `json:"id"`
|
|
Name string `json:"name,omitempty"`
|
|
} `json:"content_block"`
|
|
}
|
|
if err := json.Unmarshal([]byte(dataStr), &blockStart); err == nil {
|
|
sc.AccBlocks[blockStart.Index] = &AccBlock{
|
|
Type: blockStart.ContentBlock.Type,
|
|
Name: blockStart.ContentBlock.Name,
|
|
}
|
|
if blockStart.ContentBlock.Type == "tool_use" {
|
|
sc.ToolIndex = blockStart.Index
|
|
}
|
|
}
|
|
|
|
case "content_block_delta":
|
|
var delta struct {
|
|
Type string `json:"type"`
|
|
Index int `json:"index"`
|
|
Delta struct {
|
|
Text string `json:"text,omitempty"`
|
|
InputJSONDelta string `json:"input_json_delta,omitempty"`
|
|
} `json:"delta"`
|
|
}
|
|
if err := json.Unmarshal([]byte(dataStr), &delta); err == nil {
|
|
block := sc.AccBlocks[delta.Index]
|
|
if block != nil {
|
|
block.TextAcc += delta.Delta.Text
|
|
block.InputAcc += delta.Delta.InputJSONDelta
|
|
}
|
|
|
|
// Write OpenAI format chunk
|
|
if delta.Delta.Text != "" {
|
|
chunk := StreamChunk{
|
|
ID: sc.ID,
|
|
Object: "chat.completion.chunk",
|
|
Created: created,
|
|
Model: sc.Model,
|
|
Choices: []StreamChoice{
|
|
{
|
|
Index: delta.Index,
|
|
Delta: Delta{
|
|
Content: delta.Delta.Text,
|
|
},
|
|
},
|
|
},
|
|
}
|
|
data, _ := json.Marshal(chunk)
|
|
fmt.Fprintf(writer, "data: %s\n\n", data)
|
|
}
|
|
|
|
if delta.Delta.InputJSONDelta != "" {
|
|
block := sc.AccBlocks[delta.Index]
|
|
if block != nil {
|
|
chunk := StreamChunk{
|
|
ID: sc.ID,
|
|
Object: "chat.completion.chunk",
|
|
Created: created,
|
|
Model: sc.Model,
|
|
Choices: []StreamChoice{
|
|
{
|
|
Index: delta.Index,
|
|
Delta: Delta{
|
|
ToolCalls: []StreamToolCall{
|
|
{
|
|
Index: delta.Index,
|
|
ID: block.Name + "_pending",
|
|
Function: StreamFunction{
|
|
Arguments: delta.Delta.InputJSONDelta,
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
},
|
|
}
|
|
data, _ := json.Marshal(chunk)
|
|
fmt.Fprintf(writer, "data: %s\n\n", data)
|
|
}
|
|
}
|
|
}
|
|
|
|
case "content_block_stop":
|
|
var blockStop struct {
|
|
Type string `json:"type"`
|
|
Index int `json:"index"`
|
|
}
|
|
if err := json.Unmarshal([]byte(dataStr), &blockStop); err == nil {
|
|
// Nothing to output here, accumulated data was already streamed
|
|
}
|
|
|
|
case "message_delta":
|
|
var msgDelta struct {
|
|
Type string `json:"type"`
|
|
Delta struct {
|
|
StopSequence string `json:"stop_sequence,omitempty"`
|
|
} `json:"delta"`
|
|
Usage *AnthropicUsage `json:"usage,omitempty"`
|
|
StopReason string `json:"stop_reason,omitempty"`
|
|
}
|
|
if err := json.Unmarshal([]byte(dataStr), &msgDelta); err == nil {
|
|
if msgDelta.StopReason != "" {
|
|
sc.Finish = mapStopReason(msgDelta.StopReason)
|
|
}
|
|
}
|
|
|
|
case "message_stop":
|
|
// Send final chunk with finish_reason
|
|
chunk := StreamChunk{
|
|
ID: sc.ID,
|
|
Object: "chat.completion.chunk",
|
|
Created: created,
|
|
Model: sc.Model,
|
|
Choices: []StreamChoice{
|
|
{
|
|
Index: 0,
|
|
Delta: Delta{},
|
|
FinishReason: sc.Finish,
|
|
},
|
|
},
|
|
}
|
|
data, _ := json.Marshal(chunk)
|
|
fmt.Fprintf(writer, "data: %s\n\n", data)
|
|
fmt.Fprintf(writer, "data: [DONE]\n\n")
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// ParseAnthropicSSEEvent parses a raw SSE data line into a map
|
|
func ParseAnthropicSSEEvent(data string) (map[string]interface{}, error) {
|
|
var result map[string]interface{}
|
|
err := json.Unmarshal([]byte(data), &result)
|
|
return result, err
|
|
}
|
|
|
|
// ConvertStreamEvents converts Anthropic SSE bytes to OpenAI SSE format
|
|
func ConvertStreamEvents(sseData []byte, id, model string) ([]byte, error) {
|
|
var buf bytes.Buffer
|
|
scanner := bufio.NewScanner(bytes.NewReader(sseData))
|
|
sc := NewStreamConverter(id, model)
|
|
|
|
for scanner.Scan() {
|
|
line := scanner.Text()
|
|
if !strings.HasPrefix(line, "data: ") {
|
|
continue
|
|
}
|
|
dataStr := strings.TrimPrefix(line, "data: ")
|
|
|
|
var event struct {
|
|
Type string `json:"type"`
|
|
}
|
|
if err := json.Unmarshal([]byte(dataStr), &event); err != nil {
|
|
continue
|
|
}
|
|
|
|
switch event.Type {
|
|
case "message_start":
|
|
var msgStart struct {
|
|
Message struct {
|
|
Id string `json:"id"`
|
|
Role string `json:"role"`
|
|
} `json:"message"`
|
|
}
|
|
json.Unmarshal([]byte(dataStr), &msgStart)
|
|
sc.Role = msgStart.Message.Role
|
|
|
|
case "content_block_start":
|
|
var blockStart struct {
|
|
Index int `json:"index"`
|
|
ContentBlock struct {
|
|
Type string `json:"type"`
|
|
Id string `json:"id"`
|
|
Name string `json:"name,omitempty"`
|
|
} `json:"content_block"`
|
|
}
|
|
if err := json.Unmarshal([]byte(dataStr), &blockStart); err == nil {
|
|
sc.AccBlocks[blockStart.Index] = &AccBlock{
|
|
Type: blockStart.ContentBlock.Type,
|
|
Name: blockStart.ContentBlock.Name,
|
|
}
|
|
}
|
|
|
|
case "content_block_delta":
|
|
var delta struct {
|
|
Index int `json:"index"`
|
|
Delta struct {
|
|
Text string `json:"text,omitempty"`
|
|
InputJSONDelta string `json:"input_json_delta,omitempty"`
|
|
} `json:"delta"`
|
|
}
|
|
if err := json.Unmarshal([]byte(dataStr), &delta); err == nil {
|
|
block := sc.AccBlocks[delta.Index]
|
|
if block != nil {
|
|
block.TextAcc += delta.Delta.Text
|
|
block.InputAcc += delta.Delta.InputJSONDelta
|
|
}
|
|
|
|
chunk := StreamChunk{
|
|
ID: sc.ID,
|
|
Object: "chat.completion.chunk",
|
|
Created: int64(1234567890),
|
|
Model: sc.Model,
|
|
Choices: []StreamChoice{
|
|
{
|
|
Index: delta.Index,
|
|
Delta: Delta{
|
|
Content: delta.Delta.Text,
|
|
},
|
|
},
|
|
},
|
|
}
|
|
data, _ := json.Marshal(chunk)
|
|
buf.WriteString(fmt.Sprintf("data: %s\n\n", data))
|
|
}
|
|
|
|
case "message_delta":
|
|
var msgDelta struct {
|
|
StopReason string `json:"stop_reason,omitempty"`
|
|
}
|
|
json.Unmarshal([]byte(dataStr), &msgDelta)
|
|
sc.Finish = mapStopReason(msgDelta.StopReason)
|
|
|
|
case "message_stop":
|
|
chunk := StreamChunk{
|
|
ID: sc.ID,
|
|
Object: "chat.completion.chunk",
|
|
Created: int64(1234567890),
|
|
Model: sc.Model,
|
|
Choices: []StreamChoice{
|
|
{
|
|
Index: 0,
|
|
Delta: Delta{},
|
|
FinishReason: sc.Finish,
|
|
},
|
|
},
|
|
}
|
|
data, _ := json.Marshal(chunk)
|
|
buf.WriteString(fmt.Sprintf("data: %s\n\n", data))
|
|
buf.WriteString("data: [DONE]\n\n")
|
|
}
|
|
}
|
|
|
|
return buf.Bytes(), nil
|
|
}
|