Skip to content

Commit 4635a96

Browse files
committed
streaming support
Signed-off-by: JaredforReal <w13431838023@gmail.com>
1 parent 877729f commit 4635a96

File tree

4 files changed

+118
-1
lines changed

4 files changed

+118
-1
lines changed

src/semantic-router/pkg/extproc/mapping_responses.go

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,3 +144,62 @@ func mapChatCompletionToResponses(chatCompletionJSON []byte) ([]byte, error) {
144144

145145
return json.Marshal(out)
146146
}
147+
148+
// translateSSEChunkToResponses converts a single OpenAI chat.completion.chunk SSE payload
149+
// (the JSON after "data: ") into Responses SSE events (delta/stop). Returns empty when not applicable.
150+
func translateSSEChunkToResponses(chunk []byte) ([][]byte, bool) {
151+
// Expect chunk JSON like {"id":"...","object":"chat.completion.chunk","created":...,"model":"...","choices":[{"index":0,"delta":{"role":"assistant","content":"..."},"finish_reason":null}]}
152+
var parsed map[string]interface{}
153+
if err := json.Unmarshal(chunk, &parsed); err != nil {
154+
return nil, false
155+
}
156+
if parsed["object"] != "chat.completion.chunk" {
157+
return nil, false
158+
}
159+
160+
created, _ := parsed["created"].(float64)
161+
// Emit a created event only once per stream (handled by caller)
162+
163+
// Extract content delta and finish_reason
164+
var deltaText string
165+
var finish string
166+
if arr, ok := parsed["choices"].([]interface{}); ok && len(arr) > 0 {
167+
if ch, ok := arr[0].(map[string]interface{}); ok {
168+
if fr, ok := ch["finish_reason"].(string); ok && fr != "" {
169+
finish = fr
170+
}
171+
if d, ok := ch["delta"].(map[string]interface{}); ok {
172+
if c, ok := d["content"].(string); ok {
173+
deltaText = c
174+
}
175+
}
176+
}
177+
}
178+
179+
var events [][]byte
180+
if deltaText != "" {
181+
ev := map[string]interface{}{
182+
"type": "response.output_text.delta",
183+
"delta": deltaText,
184+
}
185+
if created > 0 {
186+
ev["created"] = int64(created)
187+
}
188+
b, _ := json.Marshal(ev)
189+
events = append(events, b)
190+
}
191+
192+
if finish != "" {
193+
ev := map[string]interface{}{
194+
"type": "response.completed",
195+
"stop_reason": finish,
196+
}
197+
b, _ := json.Marshal(ev)
198+
events = append(events, b)
199+
}
200+
201+
if len(events) == 0 {
202+
return nil, false
203+
}
204+
return events, true
205+
}

src/semantic-router/pkg/extproc/mapping_responses_test.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,3 +44,11 @@ func TestMapChatCompletionToResponses_Minimal(t *testing.T) {
4444
t.Fatalf("stop_reason missing")
4545
}
4646
}
47+
48+
func TestTranslateSSEChunkToResponses(t *testing.T) {
49+
chunk := []byte(`{"id":"c1","object":"chat.completion.chunk","created":1,"model":"m","choices":[{"index":0,"delta":{"role":"assistant","content":"Hi"},"finish_reason":null}]}`)
50+
evs, ok := translateSSEChunkToResponses(chunk)
51+
if !ok || len(evs) == 0 {
52+
t.Fatalf("expected events")
53+
}
54+
}

src/semantic-router/pkg/extproc/request_handler.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -249,6 +249,9 @@ type RequestContext struct {
249249
TTFTRecorded bool
250250
TTFTSeconds float64
251251

252+
// Responses SSE translation state
253+
ResponsesStreamInit bool
254+
252255
// VSR decision tracking
253256
VSRSelectedCategory string // The category selected by VSR
254257
VSRReasoningMode string // "on" or "off" - whether reasoning mode was determined to be used

src/semantic-router/pkg/extproc/response_handler.go

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -198,7 +198,54 @@ func (r *OpenAIRouter) handleResponseBody(v *ext_proc.ProcessingRequest_Response
198198
}
199199
}
200200

201-
// For streaming chunks, just continue (no token parsing or cache update)
201+
// If Responses adapter is active for this request, translate SSE chunks
202+
if r.Config != nil && r.Config.EnableResponsesAdapter {
203+
if p, ok := ctx.Headers[":path"]; ok && strings.HasPrefix(p, "/v1/responses") {
204+
body := v.ResponseBody.Body
205+
// Envoy provides raw chunk bytes, typically like: "data: {json}\n\n" or "data: [DONE]\n\n"
206+
b := string(body)
207+
if strings.Contains(b, "[DONE]") {
208+
// Emit a final response.completed if not already concluded
209+
response := &ext_proc.ProcessingResponse{
210+
Response: &ext_proc.ProcessingResponse_ResponseBody{
211+
ResponseBody: &ext_proc.BodyResponse{
212+
Response: &ext_proc.CommonResponse{Status: ext_proc.CommonResponse_CONTINUE},
213+
},
214+
},
215+
}
216+
return response, nil
217+
}
218+
219+
// Extract JSON after "data: " prefix if present
220+
idx := strings.Index(b, "data:")
221+
var payload []byte
222+
if idx >= 0 {
223+
payload = []byte(strings.TrimSpace(b[idx+5:]))
224+
} else {
225+
payload = v.ResponseBody.Body
226+
}
227+
228+
if len(payload) > 0 && payload[0] == '{' {
229+
if !ctx.ResponsesStreamInit {
230+
// Emit an initial created event on first chunk
231+
ctx.ResponsesStreamInit = true
232+
// We don't inject a new chunk here; clients will see deltas below
233+
}
234+
events, ok := translateSSEChunkToResponses(payload)
235+
if ok && len(events) > 0 {
236+
// Rebuild body as multiple SSE events in Responses format
237+
var sb strings.Builder
238+
for _, ev := range events {
239+
sb.WriteString("data: ")
240+
sb.Write(ev)
241+
sb.WriteString("\n\n")
242+
}
243+
v.ResponseBody.Body = []byte(sb.String())
244+
}
245+
}
246+
}
247+
}
248+
202249
response := &ext_proc.ProcessingResponse{
203250
Response: &ext_proc.ProcessingResponse_ResponseBody{
204251
ResponseBody: &ext_proc.BodyResponse{

0 commit comments

Comments
 (0)