Files changed (1) hide show
  1. gen.py +378 -47
gen.py CHANGED
@@ -66,6 +66,10 @@ MODEL_MAP = {
66
  FALLBACK_MODEL = "meta-llama/llama-4-scout-17b-16e-instruct"
67
  FALLBACK_PROVIDER = "groq"
68
 
 
 
 
 
69
 
70
  # ──────────────────────────────────────────────
71
  # CENTRAL ROUTING LOGIC
@@ -208,24 +212,204 @@ async def call_chat_completions(
208
  extra_body: Optional[Dict[str, Any]] = None,
209
  ) -> Dict[str, Any]:
210
  """
211
- Non-streaming chat-completions call.
212
-
213
- Returns the full upstream JSON payload.
214
- Raises HTTPException on upstream errors.
 
 
 
 
 
 
 
 
215
  """
216
  url, api_key = _get_provider_url_and_key(provider)
217
  headers = {"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"}
218
- body = {"model": model, "messages": messages, "stream": False}
 
 
219
  if extra_body:
220
  body.update(extra_body)
 
221
 
222
- async with httpx.AsyncClient(timeout=None) as client:
223
- r = await client.post(url, json=body, headers=headers)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
224
 
225
- if r.status_code != 200:
226
- raise HTTPException(status_code=r.status_code, detail=r.text[:1000])
 
 
 
 
 
 
 
 
 
 
227
 
228
- return r.json()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
229
 
230
 
231
  def _extract_text_from_response(data: Dict[str, Any]) -> str:
@@ -253,6 +437,65 @@ def is_cinematic_image_prompt(prompt: str) -> bool:
253
  return False
254
 
255
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
256
  # ──────────────────────────────────────────────
257
  # IMAGE GENERATION
258
  # ──────────────────────────────────────────────
@@ -682,6 +925,10 @@ async def generate_text(
682
 
683
  await _check_chat_rate_limit(request, authorization, x_client_id)
684
 
 
 
 
 
685
  body["model"] = chosen_model
686
  stream = body.get("stream", False)
687
 
@@ -744,39 +991,79 @@ async def generate_text(
744
  sent_metadata = False
745
  async with httpx.AsyncClient(timeout=None) as client:
746
  async for chunk in stream_primary(client):
 
747
  if not sent_metadata:
748
- meta = {"router_metadata": {"model_name": MODEL_MAP.get(chosen_model, chosen_model)}}
 
 
 
 
749
  yield f"data: {json.dumps(meta)}\n\n"
750
  sent_metadata = True
751
 
752
- # Intercept the final non-[DONE] data chunk and normalize
753
- # the usage block so callers always see consistent field names.
754
- if chunk.startswith("data:") and "[DONE]" not in chunk:
 
 
 
 
755
  raw = chunk[5:].strip()
756
  try:
757
  obj = json.loads(raw)
758
- if isinstance(obj, dict) and "usage" in obj and isinstance(obj["usage"], dict):
759
- u = obj["usage"]
760
- input_tok = u.get("prompt_tokens") or u.get("input_tokens", 0)
761
- output_tok = u.get("completion_tokens") or u.get("output_tokens", 0)
762
- obj["usage"] = {
763
- "prompt_tokens": input_tok,
764
- "completion_tokens": output_tok,
765
- "total_tokens": input_tok + output_tok,
766
- "input_tokens": input_tok,
767
- "output_tokens": output_tok,
768
- }
769
- yield f"data: {json.dumps(obj)}\n\n"
770
- continue
771
  except Exception:
772
- pass
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
773
 
 
774
  yield chunk
775
 
776
  return StreamingResponse(
777
  event_generator(),
778
  media_type="text/event-stream",
779
- headers={"Cache-Control": "no-cache", "Connection": "keep-alive", "X-Accel-Buffering": "no"},
 
 
 
 
780
  )
781
 
782
  # ── non-streaming ─────────────────────────
@@ -789,7 +1076,11 @@ async def generate_text(
789
  fb_url, fb_key = _get_provider_url_and_key(FALLBACK_PROVIDER)
790
  fallback_body = dict(body)
791
  fallback_body["model"] = FALLBACK_MODEL
792
- r = await client.post(fb_url, json=fallback_body, headers={"Authorization": f"Bearer {fb_key}"})
 
 
 
 
793
 
794
  content_type = (r.headers.get("content-type") or "").lower()
795
  if "application/json" in content_type:
@@ -798,22 +1089,35 @@ async def generate_text(
798
  except Exception:
799
  payload = {"error": "Upstream returned invalid JSON"}
800
  else:
801
- # Normalize usage: upstream may use prompt_tokens/completion_tokens
802
- # (OpenAI/Groq style) — rewrite to a consistent shape and add
803
- # router_metadata so callers always see the same fields.
804
- if "usage" in payload and isinstance(payload["usage"], dict):
805
- u = payload["usage"]
806
- input_tok = u.get("prompt_tokens") or u.get("input_tokens", 0)
807
- output_tok = u.get("completion_tokens") or u.get("output_tokens", 0)
808
- payload["usage"] = {
809
- "prompt_tokens": input_tok,
810
- "completion_tokens": output_tok,
811
- "total_tokens": input_tok + output_tok,
812
- # also include the OpenAI Responses-API names for clients that expect them
813
- "input_tokens": input_tok,
814
- "output_tokens": output_tok,
815
- }
816
- payload.setdefault("router_metadata", {})["model_name"] = MODEL_MAP.get(chosen_model, chosen_model)
 
 
 
 
 
 
 
 
 
 
 
 
 
817
  else:
818
  payload = {
819
  "error": "Upstream returned non-JSON response",
@@ -1063,8 +1367,24 @@ async def create_responses(
1063
  },
1064
  })
1065
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1066
  try:
1067
- text, tool_calls, input_tokens, output_tokens = await _generate()
1068
  except HTTPException as exc:
1069
  yield sse("response.failed", {
1070
  "type": "response.failed",
@@ -1076,6 +1396,17 @@ async def create_responses(
1076
  })
1077
  yield "data: [DONE]\n\n"
1078
  return
 
 
 
 
 
 
 
 
 
 
 
1079
 
1080
  output_index = 0
1081
 
 
66
  FALLBACK_MODEL = "meta-llama/llama-4-scout-17b-16e-instruct"
67
  FALLBACK_PROVIDER = "groq"
68
 
69
+ # Header that API-key authenticated clients send so we know to stream
70
+ # thinking tokens back to them.
71
+ API_KEY_HEADER = "x-api-key"
72
+
73
 
74
  # ──────────────────────────────────────────────
75
  # CENTRAL ROUTING LOGIC
 
212
  extra_body: Optional[Dict[str, Any]] = None,
213
  ) -> Dict[str, Any]:
214
  """
215
+ Resilient chat-completions call designed to survive Cloudflare 524 timeouts.
216
+
217
+ Strategy:
218
+ 1. Ask the upstream for a *streaming* response so bytes trickle in
219
+ continuously, preventing Cloudflare's idle timeout from firing.
220
+ 2. Each chunk from aiter_lines() has its own deadline (CHUNK_TIMEOUT).
221
+ If navy goes completely silent mid-stream (common during long tool-call
222
+ generation) we detect the stall quickly and retry rather than waiting
223
+ for Cloudflare to 524 us.
224
+ 3. Retry up to MAX_ATTEMPTS times on transient errors or stalls,
225
+ with exponential back-off between attempts.
226
+ 4. On exhausted retries fall through to the Groq fallback.
227
  """
228
  url, api_key = _get_provider_url_and_key(provider)
229
  headers = {"Authorization": f"Bearer {api_key}", "Content-Type": "application/json"}
230
+
231
+ # Always request streaming upstream — we reassemble below.
232
+ body: Dict[str, Any] = {"model": model, "messages": messages, "stream": True}
233
  if extra_body:
234
  body.update(extra_body)
235
+ body["stream"] = True # force streaming even if caller passed stream=False
236
 
237
+ TRANSIENT = {502, 503, 524, 429}
238
+ MAX_ATTEMPTS = 3
239
+ # How long to wait for the *next chunk* before declaring a stall.
240
+ # Must be comfortably below Cloudflare's ~100 s idle-connection limit.
241
+ CHUNK_TIMEOUT = 60 # seconds
242
+
243
+ last_exc: Optional[Exception] = None
244
+
245
+ for attempt in range(MAX_ATTEMPTS):
246
+ if attempt:
247
+ await asyncio.sleep(2 ** attempt) # 2 s, 4 s
248
+
249
+ try:
250
+ async with httpx.AsyncClient(timeout=httpx.Timeout(300.0, read=300.0)) as client:
251
+ async with client.stream("POST", url, json=body, headers=headers) as r:
252
+ # Transient upstream error — retry.
253
+ if r.status_code in TRANSIENT:
254
+ body_bytes = await r.aread()
255
+ last_exc = HTTPException(
256
+ status_code=r.status_code,
257
+ detail=body_bytes.decode("utf-8", errors="replace")[:500],
258
+ )
259
+ print(f"[call_chat_completions] attempt {attempt+1} got {r.status_code}, retrying…")
260
+ continue
261
+
262
+ if r.status_code != 200:
263
+ body_bytes = await r.aread()
264
+ raise HTTPException(
265
+ status_code=r.status_code,
266
+ detail=body_bytes.decode("utf-8", errors="replace")[:1000],
267
+ )
268
+
269
+ # ── Reassemble streaming SSE into a single response object ──
270
+ accumulated_content = ""
271
+ accumulated_reasoning = ""
272
+ tool_calls_map: Dict[int, Dict[str, Any]] = {}
273
+ usage: Dict[str, Any] = {}
274
+ finish_reason: Optional[str] = None
275
+ resp_id = ""
276
+ resp_model = model
277
+ stalled = False
278
+
279
+ # Wrap each aiter_lines() call in a per-chunk timeout.
280
+ # This is the upstream keepalive mechanism: if navy stops
281
+ # sending bytes for CHUNK_TIMEOUT seconds we abort and retry
282
+ # the whole request rather than silently waiting for Cloudflare
283
+ # to kill us with a 524.
284
+ aiter = r.aiter_lines().__aiter__()
285
+ while True:
286
+ try:
287
+ line = await asyncio.wait_for(
288
+ aiter.__anext__(), timeout=CHUNK_TIMEOUT
289
+ )
290
+ except asyncio.TimeoutError:
291
+ print(
292
+ f"[call_chat_completions] attempt {attempt+1} "
293
+ f"stalled >{CHUNK_TIMEOUT}s waiting for next chunk — retrying"
294
+ )
295
+ stalled = True
296
+ break
297
+ except StopAsyncIteration:
298
+ break
299
+
300
+ if not line or not line.startswith("data:"):
301
+ continue
302
+ raw = line[5:].strip()
303
+ if raw == "[DONE]":
304
+ break
305
+ try:
306
+ obj = json.loads(raw)
307
+ except Exception:
308
+ continue
309
 
310
+ if not isinstance(obj, dict):
311
+ continue
312
+
313
+ resp_id = resp_id or obj.get("id", "")
314
+ resp_model = obj.get("model", resp_model)
315
+
316
+ if "usage" in obj and obj["usage"]:
317
+ usage = obj["usage"]
318
+
319
+ choices = obj.get("choices") or []
320
+ if not choices:
321
+ continue
322
 
323
+ choice = choices[0]
324
+ finish_reason = choice.get("finish_reason") or finish_reason
325
+ delta = choice.get("delta") or {}
326
+
327
+ # Accumulate text content.
328
+ dc = delta.get("content")
329
+ if dc:
330
+ accumulated_content += dc
331
+
332
+ # Accumulate reasoning / thinking tokens.
333
+ dr = delta.get("reasoning_content") or delta.get("reasoning")
334
+ if dr:
335
+ accumulated_reasoning += dr
336
+
337
+ # Accumulate tool-call argument chunks (streamed as fragments).
338
+ for tc_delta in (delta.get("tool_calls") or []):
339
+ idx = tc_delta.get("index", 0)
340
+ if idx not in tool_calls_map:
341
+ tool_calls_map[idx] = {
342
+ "id": tc_delta.get("id", ""),
343
+ "type": tc_delta.get("type", "function"),
344
+ "function": {"name": "", "arguments": ""},
345
+ }
346
+ existing = tool_calls_map[idx]
347
+ if tc_delta.get("id"):
348
+ existing["id"] = tc_delta["id"]
349
+ fn_delta = tc_delta.get("function") or {}
350
+ if fn_delta.get("name"):
351
+ existing["function"]["name"] += fn_delta["name"]
352
+ if fn_delta.get("arguments"):
353
+ existing["function"]["arguments"] += fn_delta["arguments"]
354
+
355
+ if stalled:
356
+ last_exc = Exception(f"navy stalled >{CHUNK_TIMEOUT}s between chunks")
357
+ continue # → next retry attempt
358
+
359
+ # Reassemble into a standard non-streaming response shape.
360
+ tool_calls_list = [tool_calls_map[i] for i in sorted(tool_calls_map)]
361
+
362
+ message: Dict[str, Any] = {"role": "assistant", "content": accumulated_content}
363
+ if accumulated_reasoning:
364
+ message["reasoning_content"] = accumulated_reasoning
365
+ if tool_calls_list:
366
+ message["tool_calls"] = tool_calls_list
367
+
368
+ return {
369
+ "id": resp_id,
370
+ "object": "chat.completion",
371
+ "model": resp_model,
372
+ "choices": [
373
+ {
374
+ "index": 0,
375
+ "message": message,
376
+ "finish_reason": finish_reason or "stop",
377
+ }
378
+ ],
379
+ "usage": usage,
380
+ }
381
+
382
+ except HTTPException:
383
+ raise
384
+ except (httpx.RemoteProtocolError, httpx.ReadError, httpx.ConnectError) as exc:
385
+ last_exc = exc
386
+ print(f"[call_chat_completions] attempt {attempt+1} network error: {exc}, retrying…")
387
+ continue
388
+
389
+ # All attempts exhausted — fall back to Groq.
390
+ print(f"[call_chat_completions] all attempts failed ({last_exc}), falling back to Groq")
391
+ fb_url, fb_key = _get_provider_url_and_key(FALLBACK_PROVIDER)
392
+ fb_headers = {"Authorization": f"Bearer {fb_key}", "Content-Type": "application/json"}
393
+ fallback_body = {
394
+ "model": FALLBACK_MODEL,
395
+ "messages": messages,
396
+ "stream": False,
397
+ }
398
+ if extra_body:
399
+ # Forward tools/tool_choice but not stream override.
400
+ for k in ("tools", "tool_choice"):
401
+ if k in extra_body:
402
+ fallback_body[k] = extra_body[k]
403
+
404
+ async with httpx.AsyncClient(timeout=httpx.Timeout(120.0)) as client:
405
+ fb_r = await client.post(fb_url, json=fallback_body, headers=fb_headers)
406
+
407
+ if fb_r.status_code != 200:
408
+ raise HTTPException(
409
+ status_code=fb_r.status_code,
410
+ detail=f"Primary and fallback both failed. Fallback: {fb_r.text[:500]}",
411
+ )
412
+ return fb_r.json()
413
 
414
 
415
  def _extract_text_from_response(data: Dict[str, Any]) -> str:
 
437
  return False
438
 
439
 
440
+ def _is_api_key_request(request: Request) -> bool:
441
+ """
442
+ Return True when the caller authenticated with an API key rather than a
443
+ session cookie / browser auth. We use this to decide whether to forward
444
+ think-tag / reasoning_content tokens to the client.
445
+ """
446
+ return bool(
447
+ request.headers.get(API_KEY_HEADER)
448
+ or request.headers.get("authorization", "").lower().startswith("bearer ")
449
+ )
450
+
451
+
452
+ def _inject_reasoning_into_chunk(obj: Dict[str, Any]) -> Dict[str, Any]:
453
+ """
454
+ Some navy models return thinking tokens in a non-standard
455
+ ``reasoning_content`` field inside each delta. When that field is
456
+ present we wrap it in <think>…</think> and prepend it to the regular
457
+ ``content`` delta so that every SSE-speaking client sees a single,
458
+ unified text stream.
459
+
460
+ The original ``reasoning_content`` field is preserved so clients that
461
+ know about it can still use it directly.
462
+ """
463
+ try:
464
+ delta = obj["choices"][0]["delta"]
465
+ except (KeyError, IndexError, TypeError):
466
+ return obj
467
+
468
+ reasoning = delta.get("reasoning_content") or delta.get("reasoning") or ""
469
+ content = delta.get("content") or ""
470
+
471
+ if reasoning and isinstance(reasoning, str):
472
+ # Wrap in <think> tags and prepend to the visible content delta.
473
+ wrapped = f"<think>{reasoning}</think>"
474
+ delta["content"] = wrapped + content
475
+ # Keep the raw field so native clients can parse it too.
476
+ delta["reasoning_content"] = reasoning
477
+ obj["choices"][0]["delta"] = delta
478
+
479
+ return obj
480
+
481
+
482
+ def _normalize_usage_block(obj: Dict[str, Any]) -> Dict[str, Any]:
483
+ """Rewrite the usage block to a canonical shape (in-place, returns obj)."""
484
+ if "usage" not in obj or not isinstance(obj.get("usage"), dict):
485
+ return obj
486
+ u = obj["usage"]
487
+ input_tok = u.get("prompt_tokens") or u.get("input_tokens", 0)
488
+ output_tok = u.get("completion_tokens") or u.get("output_tokens", 0)
489
+ obj["usage"] = {
490
+ "prompt_tokens": input_tok,
491
+ "completion_tokens": output_tok,
492
+ "total_tokens": input_tok + output_tok,
493
+ "input_tokens": input_tok,
494
+ "output_tokens": output_tok,
495
+ }
496
+ return obj
497
+
498
+
499
  # ──────────────────────────────────────────────
500
  # IMAGE GENERATION
501
  # ──────────────────────────────────────────────
 
925
 
926
  await _check_chat_rate_limit(request, authorization, x_client_id)
927
 
928
+ # Determine whether the caller is an API-key client that should receive
929
+ # raw thinking tokens.
930
+ forward_thinking = _is_api_key_request(request)
931
+
932
  body["model"] = chosen_model
933
  stream = body.get("stream", False)
934
 
 
991
  sent_metadata = False
992
  async with httpx.AsyncClient(timeout=None) as client:
993
  async for chunk in stream_primary(client):
994
+ # ── emit router metadata once as the very first SSE frame ──
995
  if not sent_metadata:
996
+ meta = {
997
+ "router_metadata": {
998
+ "model_name": MODEL_MAP.get(chosen_model, chosen_model)
999
+ }
1000
+ }
1001
  yield f"data: {json.dumps(meta)}\n\n"
1002
  sent_metadata = True
1003
 
1004
+ # ── pass [DONE] straight through ──────────────────────────
1005
+ if "data: [DONE]" in chunk:
1006
+ yield chunk
1007
+ continue
1008
+
1009
+ # ── process data: … lines ─────────────────────────────────
1010
+ if chunk.startswith("data:"):
1011
  raw = chunk[5:].strip()
1012
  try:
1013
  obj = json.loads(raw)
 
 
 
 
 
 
 
 
 
 
 
 
 
1014
  except Exception:
1015
+ # Not valid JSON — forward verbatim (keeps partial
1016
+ # chunks from blocking the stream).
1017
+ yield chunk
1018
+ continue
1019
+
1020
+ if not isinstance(obj, dict):
1021
+ yield chunk
1022
+ continue
1023
+
1024
+ # Normalize usage block whenever it appears.
1025
+ _normalize_usage_block(obj)
1026
+
1027
+ # ── thinking / reasoning tokens ───────────────────────
1028
+ # Navy models may embed thinking in two ways:
1029
+ #
1030
+ # 1. As delta.reasoning_content (separate field)
1031
+ # 2. Inline inside delta.content wrapped in <think>…</think>
1032
+ #
1033
+ # For API-key callers we always surface both forms.
1034
+ # For browser/session callers we strip reasoning_content
1035
+ # so it doesn't confuse UI clients that don't expect it,
1036
+ # but <think> tags already present in content are left
1037
+ # alone (they arrived that way from upstream).
1038
+ if forward_thinking:
1039
+ # Merge reasoning_content into content as
1040
+ # <think>…</think> and keep the raw field.
1041
+ obj = _inject_reasoning_into_chunk(obj)
1042
+ else:
1043
+ # Strip the non-standard field so browser clients
1044
+ # don't see unexpected keys.
1045
+ try:
1046
+ delta = obj["choices"][0]["delta"]
1047
+ delta.pop("reasoning_content", None)
1048
+ delta.pop("reasoning", None)
1049
+ obj["choices"][0]["delta"] = delta
1050
+ except (KeyError, IndexError, TypeError):
1051
+ pass
1052
+
1053
+ yield f"data: {json.dumps(obj)}\n\n"
1054
+ continue
1055
 
1056
+ # ── any other line (comments, keep-alives, …) ─────────────
1057
  yield chunk
1058
 
1059
  return StreamingResponse(
1060
  event_generator(),
1061
  media_type="text/event-stream",
1062
+ headers={
1063
+ "Cache-Control": "no-cache",
1064
+ "Connection": "keep-alive",
1065
+ "X-Accel-Buffering": "no",
1066
+ },
1067
  )
1068
 
1069
  # ── non-streaming ─────────────────────────
 
1076
  fb_url, fb_key = _get_provider_url_and_key(FALLBACK_PROVIDER)
1077
  fallback_body = dict(body)
1078
  fallback_body["model"] = FALLBACK_MODEL
1079
+ r = await client.post(
1080
+ fb_url,
1081
+ json=fallback_body,
1082
+ headers={"Authorization": f"Bearer {fb_key}"},
1083
+ )
1084
 
1085
  content_type = (r.headers.get("content-type") or "").lower()
1086
  if "application/json" in content_type:
 
1089
  except Exception:
1090
  payload = {"error": "Upstream returned invalid JSON"}
1091
  else:
1092
+ # Normalize usage fields.
1093
+ _normalize_usage_block(payload)
1094
+
1095
+ # ── thinking tokens in non-streaming responses ────────────────────
1096
+ # Some navy models put thinking content in
1097
+ # message.reasoning_content. For API-key callers we prepend it to
1098
+ # message.content wrapped in <think>…</think>; for others we drop
1099
+ # the non-standard field.
1100
+ try:
1101
+ message = payload["choices"][0]["message"]
1102
+ reasoning = (
1103
+ message.pop("reasoning_content", None)
1104
+ or message.pop("reasoning", None)
1105
+ or ""
1106
+ )
1107
+ if reasoning and isinstance(reasoning, str):
1108
+ if forward_thinking:
1109
+ existing = message.get("content") or ""
1110
+ message["content"] = f"<think>{reasoning}</think>{existing}"
1111
+ # Restore the raw field for clients that want it.
1112
+ message["reasoning_content"] = reasoning
1113
+ # else: already popped — nothing to do.
1114
+ payload["choices"][0]["message"] = message
1115
+ except (KeyError, IndexError, TypeError):
1116
+ pass
1117
+
1118
+ payload.setdefault("router_metadata", {})["model_name"] = MODEL_MAP.get(
1119
+ chosen_model, chosen_model
1120
+ )
1121
  else:
1122
  payload = {
1123
  "error": "Upstream returned non-JSON response",
 
1367
  },
1368
  })
1369
 
1370
+ # ── Run _generate() in the background, pinging every 15 s ──────────────
1371
+ # Without keepalive bytes, Cloudflare (524) and Codex both drop the
1372
+ # connection while the model is thinking or accumulating tool arguments.
1373
+ # SSE comment lines (": ping") are invisible to application code but
1374
+ # reset every proxy's idle-timeout counter.
1375
+ PING_INTERVAL = 15 # seconds
1376
+ gen_task: asyncio.Task = asyncio.ensure_future(_generate())
1377
+
1378
+ while not gen_task.done():
1379
+ try:
1380
+ await asyncio.wait_for(asyncio.shield(gen_task), timeout=PING_INTERVAL)
1381
+ except asyncio.TimeoutError:
1382
+ yield ": ping\n\n"
1383
+ except Exception:
1384
+ break # real error — handled below
1385
+
1386
  try:
1387
+ text, tool_calls, input_tokens, output_tokens = gen_task.result()
1388
  except HTTPException as exc:
1389
  yield sse("response.failed", {
1390
  "type": "response.failed",
 
1396
  })
1397
  yield "data: [DONE]\n\n"
1398
  return
1399
+ except Exception as exc:
1400
+ yield sse("response.failed", {
1401
+ "type": "response.failed",
1402
+ "response": {
1403
+ "id": response_id, "object": "response",
1404
+ "created_at": ts, "status": "failed", "model": chosen_model,
1405
+ "error": {"code": "upstream_error", "message": str(exc)},
1406
+ },
1407
+ })
1408
+ yield "data: [DONE]\n\n"
1409
+ return
1410
 
1411
  output_index = 0
1412