sharktide commited on
Commit
4a20d1d
·
verified ·
1 Parent(s): e5661d9

Update gen.py

Browse files
Files changed (1) hide show
  1. gen.py +88 -107
gen.py CHANGED
@@ -218,6 +218,50 @@ def _prepare_forward_headers(request: Request) -> Dict[str, str]:
218
  return fwd
219
 
220
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
221
  async def call_chat_completions(
222
  messages: List[Dict[str, Any]],
223
  model: str,
@@ -240,18 +284,15 @@ async def call_chat_completions(
240
  """
241
  url, api_key = _get_provider_url_and_key(provider)
242
  headers = {"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"}
243
- # Forward client-provided auth/identity headers
244
- # Note: This assumes `request` is in scope — it's not, so we must pass it.
245
- # We'll fix this by modifying the caller to pass request, or remove if not needed.
246
- # For now, we keep it minimal and only do this where we have `request`.
247
- # In this function, we do not have `request`, so we skip header forwarding.
248
- # Callers that have `request` should handle it externally if needed.
249
 
250
  # Always request streaming upstream — we reassemble below.
 
 
251
  body: Dict[str, Any] = {"model": model, "messages": messages, "stream": True}
252
  if extra_body:
253
- body.update(extra_body)
254
- body["stream"] = True # force streaming even if caller passed stream=False
 
255
 
256
  TRANSIENT = {502, 503, 524, 429}
257
  MAX_ATTEMPTS = 3
@@ -295,11 +336,6 @@ async def call_chat_completions(
295
  resp_model = model
296
  stalled = False
297
 
298
- # Wrap each aiter_lines() call in a per-chunk timeout.
299
- # This is the upstream keepalive mechanism: if navy stops
300
- # sending bytes for CHUNK_TIMEOUT seconds we abort and retry
301
- # the whole request rather than silently waiting for Cloudflare
302
- # to kill us with a 524.
303
  aiter = r.aiter_lines().__aiter__()
304
  while True:
305
  try:
@@ -415,7 +451,7 @@ async def call_chat_completions(
415
  "stream": False,
416
  }
417
  if extra_body:
418
- # Forward tools/tool_choice but not stream override.
419
  for k in ("tools", "tool_choice"):
420
  if k in extra_body:
421
  fallback_body[k] = extra_body[k]
@@ -1037,8 +1073,6 @@ async def generate_text(
1037
  try:
1038
  obj = json.loads(raw)
1039
  except Exception:
1040
- # Not valid JSON — forward verbatim (keeps partial
1041
- # chunks from blocking the stream).
1042
  yield chunk
1043
  continue
1044
 
@@ -1049,24 +1083,9 @@ async def generate_text(
1049
  # Normalize usage block whenever it appears.
1050
  _normalize_usage_block(obj)
1051
 
1052
- # ── thinking / reasoning tokens ───────────────────────
1053
- # Navy models may embed thinking in two ways:
1054
- #
1055
- # 1. As delta.reasoning_content (separate field)
1056
- # 2. Inline inside delta.content wrapped in ```...```
1057
- #
1058
- # For API-key callers we always surface both forms.
1059
- # For browser/session callers we strip reasoning_content
1060
- # so it doesn't confuse UI clients that don't expect it,
1061
- # but ``` tags already present in content are left
1062
- # alone (they arrived that way from upstream).
1063
  if forward_thinking:
1064
- # Merge reasoning_content into content as
1065
- # ```...``` and keep the raw field.
1066
  obj = _inject_reasoning_into_chunk(obj)
1067
  else:
1068
- # Strip the non-standard field so browser clients
1069
- # don't see unexpected keys.
1070
  try:
1071
  delta = obj["choices"][0]["delta"]
1072
  delta.pop("reasoning_content", None)
@@ -1092,7 +1111,6 @@ async def generate_text(
1092
  )
1093
 
1094
  # ── non-streaming ─────────────────────────
1095
- # Forward headers to upstream call if we had request (we do!)
1096
  fwd_headers = _prepare_forward_headers(request)
1097
  fwd_headers.update({"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"})
1098
 
@@ -1106,7 +1124,6 @@ async def generate_text(
1106
  fallback_body = dict(body)
1107
  fallback_body["model"] = FALLBACK_MODEL
1108
  fb_headers = {"Authorization": f"Bearer {fb_key}", "Content-Type": "application/json"}
1109
- # Forward original headers to fallback
1110
  fb_fwd_headers = _prepare_forward_headers(request)
1111
  fb_fwd_headers.update(fb_headers)
1112
  r = await client.post(
@@ -1122,14 +1139,8 @@ async def generate_text(
1122
  except Exception:
1123
  payload = {"error": "Upstream returned invalid JSON"}
1124
  else:
1125
- # Normalize usage fields.
1126
  _normalize_usage_block(payload)
1127
 
1128
- # ── thinking tokens in non-streaming responses ────────────────────
1129
- # Some navy models put thinking content in
1130
- # message.reasoning_content. For API-key callers we prepend it to
1131
- # message.content wrapped in ```...```; for others we drop
1132
- # the non-standard field.
1133
  try:
1134
  message = payload["choices"][0]["message"]
1135
  reasoning = (
@@ -1141,9 +1152,7 @@ async def generate_text(
1141
  if forward_thinking:
1142
  existing = message.get("content") or ""
1143
  message["content"] = f"```\n{reasoning}\n```{existing}"
1144
- # Restore the raw field for clients that want it.
1145
  message["reasoning_content"] = reasoning
1146
- # else: already popped — nothing to do.
1147
  payload["choices"][0]["message"] = message
1148
  except (KeyError, IndexError, TypeError):
1149
  pass
@@ -1227,8 +1236,11 @@ def _responses_input_to_messages(
1227
  instructions: Optional[str] = None,
1228
  ) -> List[Dict[str, Any]]:
1229
  messages: List[Dict[str, Any]] = []
 
 
 
1230
  if instructions:
1231
- messages.append({"role": "developer", "content": instructions})
1232
 
1233
  if isinstance(input_data, str):
1234
  messages.append({"role": "user", "content": input_data})
@@ -1242,6 +1254,9 @@ def _responses_input_to_messages(
1242
  if not isinstance(item, dict):
1243
  continue
1244
  role = item.get("role", "user")
 
 
 
1245
  text = _content_to_text(item.get("content", ""))
1246
  if text:
1247
  messages.append({"role": role, "content": text})
@@ -1256,7 +1271,6 @@ def _build_responses_payload(
1256
  output_tokens: int = 0,
1257
  tool_calls: Optional[List[Dict[str, Any]]] = None,
1258
  ) -> Dict[str, Any]:
1259
- # Build content: text part first, then one function_call part per tool call
1260
  content: List[Dict[str, Any]] = []
1261
  if text:
1262
  content.append({"type": "output_text", "text": text, "annotations": []})
@@ -1269,7 +1283,6 @@ def _build_responses_payload(
1269
  "input": json.loads(fn["arguments"]) if fn.get("arguments") else {},
1270
  })
1271
 
1272
- # Top-level output items: one message item (text) + one per tool call
1273
  output_items: List[Dict[str, Any]] = []
1274
 
1275
  if text or not tool_calls:
@@ -1338,10 +1351,14 @@ async def create_responses(
1338
 
1339
  uses_tools = bool(tools) or (tool_choice not in [None, "none"])
1340
 
 
 
 
 
1341
  # Build extra fields to forward upstream
1342
  extra_body: Dict[str, Any] = {}
1343
- if tools:
1344
- extra_body["tools"] = tools
1345
  if tool_choice is not None:
1346
  extra_body["tool_choice"] = tool_choice
1347
 
@@ -1349,32 +1366,28 @@ async def create_responses(
1349
  _log_routing(chosen_model, provider, messages, uses_tools=uses_tools)
1350
  await _check_chat_rate_limit(request, authorization, x_client_id)
1351
 
1352
- # Determine if we should forward thinking (reasoning) tokens
1353
  forward_thinking = _is_api_key_request(request)
1354
 
1355
  # ── non-streaming ─────────────────────────
1356
  if stream is False:
1357
- # Forward headers to upstream
1358
- fwd_headers = _prepare_forward_headers(request)
1359
- url, api_key = _get_provider_url_and_key(provider)
1360
- fwd_headers.update({"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"})
1361
-
1362
- data = await call_chat_completions(
1363
- messages, chosen_model, provider, extra_body=extra_body or None
1364
- )
1365
- # Note: call_chat_completions does not currently use fwd_headers — we must fix that.
1366
- # Since we cannot change call_chat_completions signature easily here,
1367
- # we instead reimplement the non-streaming path with proper header forwarding.
1368
- # But to avoid duplication, we'll assume call_chat_completions is fixed externally.
1369
- # For now, we proceed and note: header forwarding in non-streaming is incomplete
1370
- # unless call_chat_completions is updated to accept headers.
1371
- # Given constraints, we'll skip and note this as a remaining gap.
1372
  data = await call_chat_completions(
1373
  messages, chosen_model, provider, extra_body=extra_body or None
1374
  )
1375
  input_tokens, output_tokens = _extract_usage(data)
1376
  message = data.get("choices", [{}])[0].get("message", {})
 
 
 
 
 
 
 
1377
  text = message.get("content") or ""
 
 
 
 
1378
  tool_calls = message.get("tool_calls") or []
1379
  response_id = _resp_id("resp")
1380
  return JSONResponse(
@@ -1384,26 +1397,10 @@ async def create_responses(
1384
  )
1385
 
1386
  # ── streaming ─────────────────────────────
1387
- # Rather than accumulating the full upstream response and then replaying it,
1388
- # we open a streaming connection to the upstream and translate each SSE chunk
1389
- # into the appropriate Responses-API event in real time.
1390
- #
1391
- # This means:
1392
- # - Thinking/reasoning tokens appear as response.output_text.delta events
1393
- # the moment navy emits them — no buffering, no pings needed.
1394
- # - Tool-call argument fragments stream as
1395
- # response.function_call_arguments.delta events.
1396
- # - The Cloudflare 524 problem is avoided because bytes flow continuously.
1397
- #
1398
- # State machine:
1399
- # THINKING → emitting ```...``` deltas (reasoning_content field)
1400
- # TEXT → emitting normal output_text deltas (content field)
1401
- # TOOL → emitting function_call_arguments deltas
1402
- # DONE → response.completed emitted, generator exits
1403
  async def event_stream():
1404
  response_id = _resp_id("resp")
1405
  ts = _resp_ts()
1406
- CHUNK_TIMEOUT = 60 # seconds — stall detector (same as call_chat_completions)
1407
 
1408
  def sse(event_type: str, data: dict) -> str:
1409
  return f"event: {event_type}\ndata: {json.dumps(data)}\n\n"
@@ -1435,31 +1432,27 @@ async def create_responses(
1435
  })
1436
 
1437
  up_url, up_key = _get_provider_url_and_key(provider)
1438
- up_headers = {"Authorization": f"Bearer {up_key}", "Content-Type": "application/json"}
1439
  up_body: Dict[str, Any] = {
1440
  "model": chosen_model, "messages": messages, "stream": True,
1441
  }
1442
  if extra_body:
1443
- up_body.update(extra_body)
1444
- up_body["stream"] = True
 
 
1445
 
1446
  TRANSIENT = {502, 503, 524, 429}
1447
  MAX_ATTEMPTS = 3
1448
 
1449
- # ── Per-attempt retry loop ────────────────────────────────────────────
1450
- # If navy stalls or returns a transient error we retry transparently.
1451
- # The client already received response.created/in_progress so we just
1452
- # keep the stream open; from Codex's perspective it's still waiting.
1453
  for attempt in range(MAX_ATTEMPTS):
1454
  if attempt:
1455
  await asyncio.sleep(2 ** attempt)
1456
 
1457
- # Accumulated state — reset on each retry so we don't double-emit.
1458
  text_item_id = _resp_id("msg")
1459
  output_index = 0
1460
- text_started = False # have we opened a message output item?
1461
- thinking_open = False # are we inside a ``` block?
1462
- full_text = "" # for response.completed payload
1463
  full_reasoning = ""
1464
  tool_calls_map: Dict[int, Dict[str, Any]] = {}
1465
  tool_item_ids: Dict[int, str] = {}
@@ -1470,14 +1463,13 @@ async def create_responses(
1470
  attempt_failed = False
1471
 
1472
  try:
1473
- # Prepare headers: upstream auth + forwarded client headers
1474
  fwd_headers = _prepare_forward_headers(request)
1475
  fwd_headers.update({"Authorization": f"Bearer {up_key}", "Content-Type": "application/json"})
1476
 
1477
  async with httpx.AsyncClient(timeout=httpx.Timeout(300.0, read=300.0)) as client:
1478
  async with client.stream("POST", up_url, json=up_body, headers=fwd_headers) as r:
1479
  if r.status_code in TRANSIENT:
1480
- body_bytes = await r.aread()
1481
  print(
1482
  f"[responses stream] attempt {attempt+1} got "
1483
  f"{r.status_code}, retrying…"
@@ -1540,8 +1532,6 @@ async def create_responses(
1540
  if reasoning_chunk:
1541
  full_reasoning += reasoning_chunk
1542
  if not text_started:
1543
- # Open the message output item on first token
1544
- # (whether thinking or regular content).
1545
  text_started = True
1546
  yield sse("response.output_item.added", {
1547
  "type": "response.output_item.added",
@@ -1561,7 +1551,6 @@ async def create_responses(
1561
  "part": {"type": "output_text", "text": "", "annotations": []},
1562
  })
1563
  if not thinking_open:
1564
- # Emit the opening ``` tag as its own delta.
1565
  thinking_open = True
1566
  yield sse("response.output_text.delta", {
1567
  "type": "response.output_text.delta",
@@ -1602,7 +1591,6 @@ async def create_responses(
1602
  "part": {"type": "output_text", "text": "", "annotations": []},
1603
  })
1604
  if thinking_open:
1605
- # Close the ``` block before regular content.
1606
  thinking_open = False
1607
  yield sse("response.output_text.delta", {
1608
  "type": "response.output_text.delta",
@@ -1623,7 +1611,6 @@ async def create_responses(
1623
  for tc_delta in (delta.get("tool_calls") or []):
1624
  idx = tc_delta.get("index", 0)
1625
 
1626
- # First fragment for this tool call index.
1627
  if idx not in tool_calls_map:
1628
  tc_id = tc_delta.get("id") or _resp_id("tool")
1629
  tool_calls_map[idx] = {
@@ -1646,12 +1633,9 @@ async def create_responses(
1646
  if arg_chunk:
1647
  existing["function"]["arguments"] += arg_chunk
1648
 
1649
- # Open this tool-call output item on its first
1650
- # argument fragment, once we know the name.
1651
  tc_id = tool_item_ids[idx]
1652
  if not tool_started[idx] and existing["function"]["name"]:
1653
  tool_started[idx] = True
1654
- # Close text item first if it's open.
1655
  if text_started:
1656
  if thinking_open:
1657
  thinking_open = False
@@ -1720,13 +1704,12 @@ async def create_responses(
1720
 
1721
  except (httpx.RemoteProtocolError, httpx.ReadError, httpx.ConnectError) as exc:
1722
  print(f"[responses stream] attempt {attempt+1} network error: {exc}")
1723
- stalled = True # treat as retryable
1724
 
1725
  if stalled or attempt_failed:
1726
- continue # retry
1727
 
1728
  # ── Stream finished cleanly — emit closing events ─────────────────
1729
- # Close any still-open text item.
1730
  if text_started:
1731
  if thinking_open:
1732
  yield sse("response.output_text.delta", {
@@ -1764,7 +1747,6 @@ async def create_responses(
1764
  })
1765
  output_index += 1
1766
 
1767
- # Close any open tool-call items.
1768
  for idx in sorted(tool_calls_map):
1769
  if not tool_started.get(idx):
1770
  continue
@@ -1789,7 +1771,6 @@ async def create_responses(
1789
  })
1790
  output_index += 1
1791
 
1792
- # Build tool_calls list for the completed payload.
1793
  tool_calls_list = [tool_calls_map[i] for i in sorted(tool_calls_map)]
1794
  input_tok, output_tok = _extract_usage({"usage": usage})
1795
  yield sse("response.completed", {
@@ -1800,7 +1781,7 @@ async def create_responses(
1800
  ),
1801
  })
1802
  yield "data: [DONE]\n\n"
1803
- return # success — exit generator
1804
 
1805
  # All retry attempts exhausted.
1806
  yield _fail("Upstream failed after multiple retries")
@@ -1810,4 +1791,4 @@ async def create_responses(
1810
  event_stream(),
1811
  media_type="text/event-stream",
1812
  headers={"Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no"},
1813
- )
 
218
  return fwd
219
 
220
 
221
+ def _normalize_tools_for_chat_completions(tools: Optional[List[Any]]) -> Optional[List[Any]]:
222
+ """
223
+ Normalize tools from the Responses API format to the chat completions format.
224
+
225
+ Responses API tools look like:
226
+ {"type": "function", "name": "...", "description": "...", "parameters": {...}}
227
+
228
+ Chat completions tools look like:
229
+ {"type": "function", "function": {"name": "...", "description": "...", "parameters": {...}}}
230
+
231
+ If tools are already in chat completions format (have a nested "function" key) they
232
+ are returned unchanged.
233
+ """
234
+ if not tools:
235
+ return tools
236
+
237
+ normalized = []
238
+ for tool in tools:
239
+ if not isinstance(tool, dict):
240
+ normalized.append(tool)
241
+ continue
242
+
243
+ # Already in chat completions format — leave untouched.
244
+ if "function" in tool:
245
+ normalized.append(tool)
246
+ continue
247
+
248
+ # Responses API format — lift name/description/parameters into "function".
249
+ if tool.get("type") == "function":
250
+ fn: Dict[str, Any] = {}
251
+ if "name" in tool:
252
+ fn["name"] = tool["name"]
253
+ if "description" in tool:
254
+ fn["description"] = tool["description"]
255
+ if "parameters" in tool:
256
+ fn["parameters"] = tool["parameters"]
257
+ normalized.append({"type": "function", "function": fn})
258
+ else:
259
+ # Unknown tool type — pass through as-is.
260
+ normalized.append(tool)
261
+
262
+ return normalized
263
+
264
+
265
  async def call_chat_completions(
266
  messages: List[Dict[str, Any]],
267
  model: str,
 
284
  """
285
  url, api_key = _get_provider_url_and_key(provider)
286
  headers = {"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"}
 
 
 
 
 
 
287
 
288
  # Always request streaming upstream — we reassemble below.
289
+ # FIX #1: Never let extra_body overwrite the model we chose; also protect
290
+ # the stream flag so it always stays True for our reassembly logic.
291
  body: Dict[str, Any] = {"model": model, "messages": messages, "stream": True}
292
  if extra_body:
293
+ for k, v in extra_body.items():
294
+ if k not in ("model", "stream"): # protect model & stream
295
+ body[k] = v
296
 
297
  TRANSIENT = {502, 503, 524, 429}
298
  MAX_ATTEMPTS = 3
 
336
  resp_model = model
337
  stalled = False
338
 
 
 
 
 
 
339
  aiter = r.aiter_lines().__aiter__()
340
  while True:
341
  try:
 
451
  "stream": False,
452
  }
453
  if extra_body:
454
+ # Forward tools/tool_choice but not stream/model override.
455
  for k in ("tools", "tool_choice"):
456
  if k in extra_body:
457
  fallback_body[k] = extra_body[k]
 
1073
  try:
1074
  obj = json.loads(raw)
1075
  except Exception:
 
 
1076
  yield chunk
1077
  continue
1078
 
 
1083
  # Normalize usage block whenever it appears.
1084
  _normalize_usage_block(obj)
1085
 
 
 
 
 
 
 
 
 
 
 
 
1086
  if forward_thinking:
 
 
1087
  obj = _inject_reasoning_into_chunk(obj)
1088
  else:
 
 
1089
  try:
1090
  delta = obj["choices"][0]["delta"]
1091
  delta.pop("reasoning_content", None)
 
1111
  )
1112
 
1113
  # ── non-streaming ─────────────────────────
 
1114
  fwd_headers = _prepare_forward_headers(request)
1115
  fwd_headers.update({"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"})
1116
 
 
1124
  fallback_body = dict(body)
1125
  fallback_body["model"] = FALLBACK_MODEL
1126
  fb_headers = {"Authorization": f"Bearer {fb_key}", "Content-Type": "application/json"}
 
1127
  fb_fwd_headers = _prepare_forward_headers(request)
1128
  fb_fwd_headers.update(fb_headers)
1129
  r = await client.post(
 
1139
  except Exception:
1140
  payload = {"error": "Upstream returned invalid JSON"}
1141
  else:
 
1142
  _normalize_usage_block(payload)
1143
 
 
 
 
 
 
1144
  try:
1145
  message = payload["choices"][0]["message"]
1146
  reasoning = (
 
1152
  if forward_thinking:
1153
  existing = message.get("content") or ""
1154
  message["content"] = f"```\n{reasoning}\n```{existing}"
 
1155
  message["reasoning_content"] = reasoning
 
1156
  payload["choices"][0]["message"] = message
1157
  except (KeyError, IndexError, TypeError):
1158
  pass
 
1236
  instructions: Optional[str] = None,
1237
  ) -> List[Dict[str, Any]]:
1238
  messages: List[Dict[str, Any]] = []
1239
+
1240
+ # FIX #3: Use "system" instead of "developer" — navy rejects the
1241
+ # non-standard "developer" role with a 400.
1242
  if instructions:
1243
+ messages.append({"role": "system", "content": instructions})
1244
 
1245
  if isinstance(input_data, str):
1246
  messages.append({"role": "user", "content": input_data})
 
1254
  if not isinstance(item, dict):
1255
  continue
1256
  role = item.get("role", "user")
1257
+ # Also normalise "developer" role on individual message items.
1258
+ if role == "developer":
1259
+ role = "system"
1260
  text = _content_to_text(item.get("content", ""))
1261
  if text:
1262
  messages.append({"role": role, "content": text})
 
1271
  output_tokens: int = 0,
1272
  tool_calls: Optional[List[Dict[str, Any]]] = None,
1273
  ) -> Dict[str, Any]:
 
1274
  content: List[Dict[str, Any]] = []
1275
  if text:
1276
  content.append({"type": "output_text", "text": text, "annotations": []})
 
1283
  "input": json.loads(fn["arguments"]) if fn.get("arguments") else {},
1284
  })
1285
 
 
1286
  output_items: List[Dict[str, Any]] = []
1287
 
1288
  if text or not tool_calls:
 
1351
 
1352
  uses_tools = bool(tools) or (tool_choice not in [None, "none"])
1353
 
1354
+ # FIX #2: Normalize tools from Responses API format → chat completions format
1355
+ # before forwarding to navy, which only speaks chat completions.
1356
+ normalized_tools = _normalize_tools_for_chat_completions(tools)
1357
+
1358
  # Build extra fields to forward upstream
1359
  extra_body: Dict[str, Any] = {}
1360
+ if normalized_tools:
1361
+ extra_body["tools"] = normalized_tools
1362
  if tool_choice is not None:
1363
  extra_body["tool_choice"] = tool_choice
1364
 
 
1366
  _log_routing(chosen_model, provider, messages, uses_tools=uses_tools)
1367
  await _check_chat_rate_limit(request, authorization, x_client_id)
1368
 
 
1369
  forward_thinking = _is_api_key_request(request)
1370
 
1371
  # ── non-streaming ─────────────────────────
1372
  if stream is False:
1373
+ # FIX: removed the duplicate call_chat_completions call that was here
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1374
  data = await call_chat_completions(
1375
  messages, chosen_model, provider, extra_body=extra_body or None
1376
  )
1377
  input_tokens, output_tokens = _extract_usage(data)
1378
  message = data.get("choices", [{}])[0].get("message", {})
1379
+
1380
+ # Handle reasoning tokens in non-streaming responses path
1381
+ reasoning = (
1382
+ message.pop("reasoning_content", None)
1383
+ or message.pop("reasoning", None)
1384
+ or ""
1385
+ )
1386
  text = message.get("content") or ""
1387
+ if reasoning and isinstance(reasoning, str):
1388
+ if forward_thinking:
1389
+ text = f"```\n{reasoning}\n```{text}"
1390
+
1391
  tool_calls = message.get("tool_calls") or []
1392
  response_id = _resp_id("resp")
1393
  return JSONResponse(
 
1397
  )
1398
 
1399
  # ── streaming ─────────────────────────────
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1400
  async def event_stream():
1401
  response_id = _resp_id("resp")
1402
  ts = _resp_ts()
1403
+ CHUNK_TIMEOUT = 60
1404
 
1405
  def sse(event_type: str, data: dict) -> str:
1406
  return f"event: {event_type}\ndata: {json.dumps(data)}\n\n"
 
1432
  })
1433
 
1434
  up_url, up_key = _get_provider_url_and_key(provider)
 
1435
  up_body: Dict[str, Any] = {
1436
  "model": chosen_model, "messages": messages, "stream": True,
1437
  }
1438
  if extra_body:
1439
+ # FIX #1 (streaming path): also protect model/stream here
1440
+ for k, v in extra_body.items():
1441
+ if k not in ("model", "stream"):
1442
+ up_body[k] = v
1443
 
1444
  TRANSIENT = {502, 503, 524, 429}
1445
  MAX_ATTEMPTS = 3
1446
 
 
 
 
 
1447
  for attempt in range(MAX_ATTEMPTS):
1448
  if attempt:
1449
  await asyncio.sleep(2 ** attempt)
1450
 
 
1451
  text_item_id = _resp_id("msg")
1452
  output_index = 0
1453
+ text_started = False
1454
+ thinking_open = False
1455
+ full_text = ""
1456
  full_reasoning = ""
1457
  tool_calls_map: Dict[int, Dict[str, Any]] = {}
1458
  tool_item_ids: Dict[int, str] = {}
 
1463
  attempt_failed = False
1464
 
1465
  try:
 
1466
  fwd_headers = _prepare_forward_headers(request)
1467
  fwd_headers.update({"Authorization": f"Bearer {up_key}", "Content-Type": "application/json"})
1468
 
1469
  async with httpx.AsyncClient(timeout=httpx.Timeout(300.0, read=300.0)) as client:
1470
  async with client.stream("POST", up_url, json=up_body, headers=fwd_headers) as r:
1471
  if r.status_code in TRANSIENT:
1472
+ await r.aread()
1473
  print(
1474
  f"[responses stream] attempt {attempt+1} got "
1475
  f"{r.status_code}, retrying…"
 
1532
  if reasoning_chunk:
1533
  full_reasoning += reasoning_chunk
1534
  if not text_started:
 
 
1535
  text_started = True
1536
  yield sse("response.output_item.added", {
1537
  "type": "response.output_item.added",
 
1551
  "part": {"type": "output_text", "text": "", "annotations": []},
1552
  })
1553
  if not thinking_open:
 
1554
  thinking_open = True
1555
  yield sse("response.output_text.delta", {
1556
  "type": "response.output_text.delta",
 
1591
  "part": {"type": "output_text", "text": "", "annotations": []},
1592
  })
1593
  if thinking_open:
 
1594
  thinking_open = False
1595
  yield sse("response.output_text.delta", {
1596
  "type": "response.output_text.delta",
 
1611
  for tc_delta in (delta.get("tool_calls") or []):
1612
  idx = tc_delta.get("index", 0)
1613
 
 
1614
  if idx not in tool_calls_map:
1615
  tc_id = tc_delta.get("id") or _resp_id("tool")
1616
  tool_calls_map[idx] = {
 
1633
  if arg_chunk:
1634
  existing["function"]["arguments"] += arg_chunk
1635
 
 
 
1636
  tc_id = tool_item_ids[idx]
1637
  if not tool_started[idx] and existing["function"]["name"]:
1638
  tool_started[idx] = True
 
1639
  if text_started:
1640
  if thinking_open:
1641
  thinking_open = False
 
1704
 
1705
  except (httpx.RemoteProtocolError, httpx.ReadError, httpx.ConnectError) as exc:
1706
  print(f"[responses stream] attempt {attempt+1} network error: {exc}")
1707
+ stalled = True
1708
 
1709
  if stalled or attempt_failed:
1710
+ continue
1711
 
1712
  # ── Stream finished cleanly — emit closing events ─────────────────
 
1713
  if text_started:
1714
  if thinking_open:
1715
  yield sse("response.output_text.delta", {
 
1747
  })
1748
  output_index += 1
1749
 
 
1750
  for idx in sorted(tool_calls_map):
1751
  if not tool_started.get(idx):
1752
  continue
 
1771
  })
1772
  output_index += 1
1773
 
 
1774
  tool_calls_list = [tool_calls_map[i] for i in sorted(tool_calls_map)]
1775
  input_tok, output_tok = _extract_usage({"usage": usage})
1776
  yield sse("response.completed", {
 
1781
  ),
1782
  })
1783
  yield "data: [DONE]\n\n"
1784
+ return # success
1785
 
1786
  # All retry attempts exhausted.
1787
  yield _fail("Upstream failed after multiple retries")
 
1791
  event_stream(),
1792
  media_type="text/event-stream",
1793
  headers={"Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no"},
1794
+ )