package main import ( "bufio" "bytes" "encoding/json" "fmt" "io" "strings" ) // StreamConverter holds state during streaming type StreamConverter struct { ID string Model string Role string Finish string AccBlocks map[int]*AccBlock ToolIndex int } type AccBlock struct { Type string Name string InputAcc string TextAcc string } func NewStreamConverter(id, model string) *StreamConverter { return &StreamConverter{ ID: id, Model: model, AccBlocks: make(map[int]*AccBlock), } } // HandleAnthropicStream reads Anthropic SSE events, writes OpenAI SSE func (sc *StreamConverter) HandleAnthropicStream(reader *bufio.Reader, writer io.Writer) error { created := int64(1234567890) for { line, err := reader.ReadString('\n') if err != nil { if err == io.EOF { break } return fmt.Errorf("read error: %w", err) } line = strings.TrimSpace(line) if line == "" { continue } // Skip comment lines (starting with colon) if strings.HasPrefix(line, ":") { continue } // Parse SSE data line: "data: {...}" if !strings.HasPrefix(line, "data: ") { continue } dataStr := strings.TrimPrefix(line, "data: ") // Try to parse as SSE event with type field first var event struct { Type string `json:"type"` } if err := json.Unmarshal([]byte(dataStr), &event); err != nil { continue } switch event.Type { case "message_start": var msgStart struct { Type string `json:"type"` Message struct { Id string `json:"id"` Role string `json:"role"` Type string `json:"type"` } `json:"message"` } if err := json.Unmarshal([]byte(dataStr), &msgStart); err == nil { sc.Role = msgStart.Message.Role } case "content_block_start": var blockStart struct { Type string `json:"type"` Index int `json:"index"` ContentBlock struct { Type string `json:"type"` Id string `json:"id"` Name string `json:"name,omitempty"` } `json:"content_block"` } if err := json.Unmarshal([]byte(dataStr), &blockStart); err == nil { sc.AccBlocks[blockStart.Index] = &AccBlock{ Type: blockStart.ContentBlock.Type, Name: blockStart.ContentBlock.Name, } if blockStart.ContentBlock.Type == "tool_use" { sc.ToolIndex = blockStart.Index } } case "content_block_delta": var delta struct { Type string `json:"type"` Index int `json:"index"` Delta struct { Text string `json:"text,omitempty"` InputJSONDelta string `json:"input_json_delta,omitempty"` } `json:"delta"` } if err := json.Unmarshal([]byte(dataStr), &delta); err == nil { block := sc.AccBlocks[delta.Index] if block != nil { block.TextAcc += delta.Delta.Text block.InputAcc += delta.Delta.InputJSONDelta } // Write OpenAI format chunk if delta.Delta.Text != "" { chunk := StreamChunk{ ID: sc.ID, Object: "chat.completion.chunk", Created: created, Model: sc.Model, Choices: []StreamChoice{ { Index: delta.Index, Delta: Delta{ Content: delta.Delta.Text, }, }, }, } data, _ := json.Marshal(chunk) fmt.Fprintf(writer, "data: %s\n\n", data) } if delta.Delta.InputJSONDelta != "" { block := sc.AccBlocks[delta.Index] if block != nil { chunk := StreamChunk{ ID: sc.ID, Object: "chat.completion.chunk", Created: created, Model: sc.Model, Choices: []StreamChoice{ { Index: delta.Index, Delta: Delta{ ToolCalls: []StreamToolCall{ { Index: delta.Index, ID: block.Name + "_pending", Function: StreamFunction{ Arguments: delta.Delta.InputJSONDelta, }, }, }, }, }, }, } data, _ := json.Marshal(chunk) fmt.Fprintf(writer, "data: %s\n\n", data) } } } case "content_block_stop": var blockStop struct { Type string `json:"type"` Index int `json:"index"` } if err := json.Unmarshal([]byte(dataStr), &blockStop); err == nil { // Nothing to output here, accumulated data was already streamed } case "message_delta": var msgDelta struct { Type string `json:"type"` Delta struct { StopSequence string `json:"stop_sequence,omitempty"` } `json:"delta"` Usage *AnthropicUsage `json:"usage,omitempty"` StopReason string `json:"stop_reason,omitempty"` } if err := json.Unmarshal([]byte(dataStr), &msgDelta); err == nil { if msgDelta.StopReason != "" { sc.Finish = mapStopReason(msgDelta.StopReason) } } case "message_stop": // Send final chunk with finish_reason chunk := StreamChunk{ ID: sc.ID, Object: "chat.completion.chunk", Created: created, Model: sc.Model, Choices: []StreamChoice{ { Index: 0, Delta: Delta{}, FinishReason: sc.Finish, }, }, } data, _ := json.Marshal(chunk) fmt.Fprintf(writer, "data: %s\n\n", data) fmt.Fprintf(writer, "data: [DONE]\n\n") } } return nil } // ParseAnthropicSSEEvent parses a raw SSE data line into a map func ParseAnthropicSSEEvent(data string) (map[string]interface{}, error) { var result map[string]interface{} err := json.Unmarshal([]byte(data), &result) return result, err } // ConvertStreamEvents converts Anthropic SSE bytes to OpenAI SSE format func ConvertStreamEvents(sseData []byte, id, model string) ([]byte, error) { var buf bytes.Buffer scanner := bufio.NewScanner(bytes.NewReader(sseData)) sc := NewStreamConverter(id, model) for scanner.Scan() { line := scanner.Text() if !strings.HasPrefix(line, "data: ") { continue } dataStr := strings.TrimPrefix(line, "data: ") var event struct { Type string `json:"type"` } if err := json.Unmarshal([]byte(dataStr), &event); err != nil { continue } switch event.Type { case "message_start": var msgStart struct { Message struct { Id string `json:"id"` Role string `json:"role"` } `json:"message"` } json.Unmarshal([]byte(dataStr), &msgStart) sc.Role = msgStart.Message.Role case "content_block_start": var blockStart struct { Index int `json:"index"` ContentBlock struct { Type string `json:"type"` Id string `json:"id"` Name string `json:"name,omitempty"` } `json:"content_block"` } if err := json.Unmarshal([]byte(dataStr), &blockStart); err == nil { sc.AccBlocks[blockStart.Index] = &AccBlock{ Type: blockStart.ContentBlock.Type, Name: blockStart.ContentBlock.Name, } } case "content_block_delta": var delta struct { Index int `json:"index"` Delta struct { Text string `json:"text,omitempty"` InputJSONDelta string `json:"input_json_delta,omitempty"` } `json:"delta"` } if err := json.Unmarshal([]byte(dataStr), &delta); err == nil { block := sc.AccBlocks[delta.Index] if block != nil { block.TextAcc += delta.Delta.Text block.InputAcc += delta.Delta.InputJSONDelta } chunk := StreamChunk{ ID: sc.ID, Object: "chat.completion.chunk", Created: int64(1234567890), Model: sc.Model, Choices: []StreamChoice{ { Index: delta.Index, Delta: Delta{ Content: delta.Delta.Text, }, }, }, } data, _ := json.Marshal(chunk) buf.WriteString(fmt.Sprintf("data: %s\n\n", data)) } case "message_delta": var msgDelta struct { StopReason string `json:"stop_reason,omitempty"` } json.Unmarshal([]byte(dataStr), &msgDelta) sc.Finish = mapStopReason(msgDelta.StopReason) case "message_stop": chunk := StreamChunk{ ID: sc.ID, Object: "chat.completion.chunk", Created: int64(1234567890), Model: sc.Model, Choices: []StreamChoice{ { Index: 0, Delta: Delta{}, FinishReason: sc.Finish, }, }, } data, _ := json.Marshal(chunk) buf.WriteString(fmt.Sprintf("data: %s\n\n", data)) buf.WriteString("data: [DONE]\n\n") } } return buf.Bytes(), nil }