Skip to content

Commit f038706

Browse files
committed
fix: convert non-streaming /chat/retrieve to stream when the upstream
/messages is on the streaming mode
1 parent 4328a73 commit f038706

File tree

2 files changed

+208
-18
lines changed

2 files changed

+208
-18
lines changed

internal/runtime/executor/openai_compat_executor.go

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,9 @@ func (e *OpenAICompatExecutor) Execute(ctx context.Context, auth *cliproxyauth.A
6868
url = strings.TrimSuffix(baseURL, "/") + "/chat/completions"
6969
}
7070

71+
log.Debugf("OpenAICompatExecutor ExecuteStream: Request ID: %s, %s endpoint, model: %s, isWebSearch: %t", req.Model, url, req.Model, isWebSearch)
72+
log.Debugf("OpenAICompatExecutor ExecuteStream: payload: %s", string(translated))
73+
7174
httpReq, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewReader(translated))
7275
if err != nil {
7376
return resp, err
@@ -112,10 +115,11 @@ func (e *OpenAICompatExecutor) Execute(ctx context.Context, auth *cliproxyauth.A
112115
}
113116
}()
114117
recordAPIResponseMetadata(ctx, e.cfg, httpResp.StatusCode, httpResp.Header.Clone())
118+
log.Debugf("OpenAICompatExecutor Execute: HTTP Response status: %d, headers: %v", httpResp.StatusCode, httpResp.Header)
115119
if httpResp.StatusCode < 200 || httpResp.StatusCode >= 300 {
116120
b, _ := io.ReadAll(httpResp.Body)
117121
appendAPIResponseChunk(ctx, e.cfg, b)
118-
log.Debugf("request error, error status: %d, error body: %s", httpResp.StatusCode, summarizeErrorBody(httpResp.Header.Get("Content-Type"), b))
122+
log.Debugf("OpenAICompatExecutor Execute: request error, error status: %d, error body: %s", httpResp.StatusCode, summarizeErrorBody(httpResp.Header.Get("Content-Type"), b))
119123
err = statusErr{code: httpResp.StatusCode, msg: string(b)}
120124
return resp, err
121125
}
@@ -130,9 +134,12 @@ func (e *OpenAICompatExecutor) Execute(ctx context.Context, auth *cliproxyauth.A
130134
var out string
131135
var param any
132136
if isWebSearch {
137+
log.Debugf("OpenAICompatExecutor Execute: Web search response received, request model: %s, raw response: %s", req.Model, string(body))
133138
// For web search responses, we need to format them properly for Claude
134139
// The /chat/retrieve endpoint returns a different format than OpenAI
135-
out = sdktranslator.TranslateNonStream(ctx, to, from, req.Model, bytes.Clone(opts.OriginalRequest), translated, body, &param)
140+
translatedOut := sdktranslator.TranslateNonStream(ctx, to, from, req.Model, bytes.Clone(opts.OriginalRequest), translated, body, &param)
141+
log.Debugf("OpenAICompatExecutor Execute: Web search response translated to: %s", translatedOut)
142+
out = translatedOut
136143
} else {
137144
// Standard OpenAI response handling
138145
reporter.publish(ctx, parseOpenAIUsage(body))
@@ -141,6 +148,7 @@ func (e *OpenAICompatExecutor) Execute(ctx context.Context, auth *cliproxyauth.A
141148
// Translate response back to source format when needed
142149
out = sdktranslator.TranslateNonStream(ctx, to, from, req.Model, bytes.Clone(opts.OriginalRequest), translated, body, &param)
143150
}
151+
log.Debugf("OpenAICompatExecutor Execute: Response translated to: %s", out)
144152

145153
resp = cliproxyexecutor.Response{Payload: []byte(out)}
146154
return resp, nil
@@ -218,10 +226,11 @@ func (e *OpenAICompatExecutor) ExecuteStream(ctx context.Context, auth *cliproxy
218226
return nil, err
219227
}
220228
recordAPIResponseMetadata(ctx, e.cfg, httpResp.StatusCode, httpResp.Header.Clone())
229+
log.Debugf("OpenAICompatExecutor ExecuteStream: HTTP Response status: %d, headers: %v", httpResp.StatusCode, httpResp.Header)
221230
if httpResp.StatusCode < 200 || httpResp.StatusCode >= 300 {
222231
b, _ := io.ReadAll(httpResp.Body)
223232
appendAPIResponseChunk(ctx, e.cfg, b)
224-
log.Debugf("request error, error status: %d, error body: %s", httpResp.StatusCode, summarizeErrorBody(httpResp.Header.Get("Content-Type"), b))
233+
log.Debugf("OpenAICompatExecutor ExecuteStream: request error, error status: %d, error body: %s", httpResp.StatusCode, summarizeErrorBody(httpResp.Header.Get("Content-Type"), b))
225234
if errClose := httpResp.Body.Close(); errClose != nil {
226235
log.Errorf("openai compat executor: close response body error: %v", errClose)
227236
}
@@ -250,13 +259,15 @@ func (e *OpenAICompatExecutor) ExecuteStream(ctx context.Context, auth *cliproxy
250259
return
251260
}
252261

262+
log.Debugf("OpenAICompatExecutor ExecuteStream: Web search response received, raw response: %s", string(body))
253263
appendAPIResponseChunk(ctx, e.cfg, body)
254264

255-
// Translate the single web search response
256-
// The response translator should handle web search response format
265+
// Translate the single web search response to SSE events
266+
// The response translator should handle web search response format and generate SSE events
257267
var param any
258268
chunks := sdktranslator.TranslateStream(ctx, to, from, req.Model, bytes.Clone(opts.OriginalRequest), translated, body, &param)
259269
for i := range chunks {
270+
log.Debugf("OpenAICompatExecutor ExecuteStream: Web search SSE event chunk: %s", chunks[i])
260271
out <- cliproxyexecutor.StreamChunk{Payload: []byte(chunks[i])}
261272
}
262273
} else {

internal/translator/openai/claude/openai_claude_response.go

Lines changed: 192 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -74,26 +74,49 @@ func ConvertOpenAIResponseToClaude(_ context.Context, modelName string, original
7474
FinishReason: "",
7575
ContentBlocksStopped: false,
7676
MessageDeltaSent: false,
77+
MessageStarted: false,
7778
}
7879
}
7980

80-
// Check if this is a web search response (non-streaming case)
81-
// When handling streaming, the web search response should come as a single chunk
82-
if isWebSearchResponse(rawJSON) || (bytes.HasPrefix(rawJSON, dataTag) && isWebSearchResponse(bytes.TrimSpace(rawJSON[5:]))) {
83-
// For web search responses, create a single Claude message chunk
84-
webSearchRaw := rawJSON
85-
if bytes.HasPrefix(rawJSON, dataTag) {
86-
webSearchRaw = bytes.TrimSpace(rawJSON[5:])
81+
// Check if this is a web search response
82+
// This can be either raw JSON (non-streaming context) or prefixed with dataTag (streaming context)
83+
root := gjson.ParseBytes(rawJSON)
84+
if root.Get("data").Exists() && root.Get("data").IsArray() {
85+
// This is a web search response - treat it as SSE events for streaming contexts
86+
return convertWebSearchResponseToClaudeSSE(rawJSON, modelName, (*param).(*ConvertOpenAIResponseToAnthropicParams))
87+
}
88+
89+
// Check if it's data-tag prefixed - if so, extract the JSON part
90+
if bytes.HasPrefix(rawJSON, dataTag) {
91+
jsonPart := bytes.TrimSpace(rawJSON[5:])
92+
// Check if the content part is a web search response
93+
if isWebSearchResponse(jsonPart) {
94+
return convertWebSearchResponseToClaudeSSE(jsonPart, modelName, (*param).(*ConvertOpenAIResponseToAnthropicParams))
8795
}
8896

89-
converted := convertWebSearchResponseToClaude(webSearchRaw, modelName)
90-
return []string{converted}
97+
rawJSON = jsonPart
98+
} else if bytes.HasPrefix(rawJSON, []byte("data: ")) {
99+
// Handle different data prefix pattern if needed
100+
jsonPart := bytes.TrimSpace(rawJSON[6:])
101+
if isWebSearchResponse(jsonPart) {
102+
return convertWebSearchResponseToClaudeSSE(jsonPart, modelName, (*param).(*ConvertOpenAIResponseToAnthropicParams))
103+
}
91104
}
92105

93-
if !bytes.HasPrefix(rawJSON, dataTag) {
94-
return []string{}
106+
// Non-web-search response handling continues normally
107+
if !bytes.HasPrefix(rawJSON, dataTag) && !bytes.HasPrefix(rawJSON, []byte("data: ")) {
108+
if bytes.HasPrefix(rawJSON, []byte("[DONE]")) || string(rawJSON) == "[DONE]" {
109+
return convertOpenAIDoneToAnthropic((*param).(*ConvertOpenAIResponseToAnthropicParams))
110+
}
111+
return convertOpenAINonStreamingToAnthropic(rawJSON)
112+
}
113+
114+
// Trim data tag if present
115+
if bytes.HasPrefix(rawJSON, dataTag) {
116+
rawJSON = bytes.TrimSpace(rawJSON[5:])
117+
} else if bytes.HasPrefix(rawJSON, []byte("data: ")) {
118+
rawJSON = bytes.TrimSpace(rawJSON[6:])
95119
}
96-
rawJSON = bytes.TrimSpace(rawJSON[5:])
97120

98121
// Check if this is the [DONE] marker
99122
rawStr := strings.TrimSpace(string(rawJSON))
@@ -671,8 +694,164 @@ func isWebSearchResponse(rawJSON []byte) bool {
671694
return root.Get("data").Exists() && root.Get("data").IsArray()
672695
}
673696

697+
// convertWebSearchResponseToClaudeSSE converts a web search response to Claude SSE events
698+
func convertWebSearchResponseToClaudeSSE(rawJSON []byte, originalModelName string, param *ConvertOpenAIResponseToAnthropicParams) []string {
699+
root := gjson.ParseBytes(rawJSON)
700+
701+
var results []string
702+
703+
// Initialize message ID and other fields if not already done
704+
if param.MessageID == "" {
705+
param.MessageID = generateMessageID()
706+
}
707+
if param.Model == "" {
708+
param.Model = originalModelName
709+
}
710+
711+
// Only send message_start if we haven't sent it yet
712+
if !param.MessageStarted {
713+
// Create message_start event
714+
messageStart := map[string]interface{}{
715+
"type": "message_start",
716+
"message": map[string]interface{}{
717+
"id": param.MessageID,
718+
"type": "message",
719+
"role": "assistant",
720+
"model": param.Model,
721+
"content": []interface{}{},
722+
"stop_reason": nil,
723+
"stop_sequence": nil,
724+
"usage": map[string]interface{}{
725+
"input_tokens": 0,
726+
"output_tokens": 0,
727+
},
728+
},
729+
}
730+
messageStartJSON, _ := json.Marshal(messageStart)
731+
results = append(results, "event: message_start\ndata: "+string(messageStartJSON)+"\n\n")
732+
param.MessageStarted = true
733+
}
734+
735+
// Get search results from "data" array
736+
contentText := ""
737+
dataResults := root.Get("data")
738+
if dataResults.Exists() && dataResults.IsArray() {
739+
resultsArray := dataResults.Array()
740+
if len(resultsArray) > 0 {
741+
// Create a summary content text with search results
742+
var searchSummary strings.Builder
743+
for i, result := range resultsArray {
744+
if i > 0 {
745+
searchSummary.WriteString("\n\n") // Separate results clearly
746+
}
747+
748+
title := result.Get("title").String()
749+
url := result.Get("url").String()
750+
abstract := result.Get("abstractInfo").String()
751+
752+
// Add the result to summary
753+
if title != "" {
754+
searchSummary.WriteString("## ")
755+
searchSummary.WriteString(title)
756+
searchSummary.WriteString("\n")
757+
}
758+
if url != "" {
759+
searchSummary.WriteString("URL: ")
760+
searchSummary.WriteString(url)
761+
searchSummary.WriteString("\n")
762+
}
763+
if abstract != "" {
764+
// Limit the abstract length to avoid very long responses
765+
cleanAbstract := strings.ReplaceAll(abstract, "<[^>]*>", "") // Remove HTML tags
766+
if len(cleanAbstract) > 500 {
767+
cleanAbstract = cleanAbstract[:500] + "..."
768+
}
769+
searchSummary.WriteString(cleanAbstract)
770+
}
771+
}
772+
contentText = searchSummary.String()
773+
}
774+
}
775+
776+
// Send content_block_start
777+
if contentText != "" {
778+
contentBlockStart := map[string]interface{}{
779+
"type": "content_block_start",
780+
"index": 0,
781+
"content_block": map[string]interface{}{
782+
"type": "text",
783+
"text": "",
784+
},
785+
}
786+
contentBlockStartJSON, _ := json.Marshal(contentBlockStart)
787+
results = append(results, "event: content_block_start\ndata: "+string(contentBlockStartJSON)+"\n\n")
788+
789+
// Send content_block_delta with the content in chunks (to simulate streaming)
790+
if len(contentText) > 0 {
791+
// Break content into smaller chunks to better simulate streaming
792+
// Break content into reasonable-sized chunks to simulate streaming (avoiding tiny chunks)
793+
charsPerChunk := 100
794+
for i := 0; i < len(contentText); i += charsPerChunk {
795+
end := i + charsPerChunk
796+
if end > len(contentText) {
797+
end = len(contentText)
798+
}
799+
chunk := contentText[i:end]
800+
801+
contentDelta := map[string]interface{}{
802+
"type": "content_block_delta",
803+
"index": 0,
804+
"delta": map[string]interface{}{
805+
"type": "text_delta",
806+
"text": chunk,
807+
},
808+
}
809+
contentDeltaJSON, _ := json.Marshal(contentDelta)
810+
results = append(results, "event: content_block_delta\ndata: "+string(contentDeltaJSON)+"\n\n")
811+
812+
// Accumulate content
813+
param.ContentAccumulator.WriteString(chunk)
814+
}
815+
}
816+
817+
// Send content_block_stop
818+
contentBlockStop := map[string]interface{}{
819+
"type": "content_block_stop",
820+
"index": 0,
821+
}
822+
contentBlockStopJSON, _ := json.Marshal(contentBlockStop)
823+
results = append(results, "event: content_block_stop\ndata: "+string(contentBlockStopJSON)+"\n\n")
824+
825+
param.ContentBlocksStopped = true
826+
}
827+
828+
// Send message_delta with stop reason and usage
829+
messageDelta := map[string]interface{}{
830+
"type": "message_delta",
831+
"delta": map[string]interface{}{
832+
"stop_reason": "end_turn",
833+
"stop_sequence": nil,
834+
},
835+
"usage": map[string]interface{}{
836+
"output_tokens": len(contentText) / 4, // Rough approximation of tokens (4 chars per token)
837+
},
838+
}
839+
messageDeltaJSON, _ := json.Marshal(messageDelta)
840+
results = append(results, "event: message_delta\ndata: "+string(messageDeltaJSON)+"\n\n")
841+
param.MessageDeltaSent = true
842+
843+
// Send message_stop event
844+
messageStop := map[string]interface{}{
845+
"type": "message_stop",
846+
}
847+
messageStopJSON, _ := json.Marshal(messageStop)
848+
results = append(results, "event: message_stop\ndata: "+string(messageStopJSON)+"\n\n")
849+
850+
return results
851+
}
852+
674853
// convertWebSearchResponseToClaude converts a web search response (like from /chat/retrieve)
675-
// to Claude-compatible message format
854+
// to Claude-compatible message format (the complete message, used in non-streaming contexts)
676855
func convertWebSearchResponseToClaude(rawJSON []byte, originalModelName string) string {
677856
root := gjson.ParseBytes(rawJSON)
678857

0 commit comments

Comments
 (0)