Mirrowel commited on
Commit
c590f47
Β·
1 Parent(s): 664a3d8

fix(providers/iflow): πŸ› stabilize iFlow streaming behavior and model handling

Browse files

iFlow integration produced sporadic HTTP 406s and missed finalization info for streamed responses. This change:

- Stop transmitting unsupported stream_options to iFlow (apply stream metadata only for other providers) to avoid 406 responses
- Normalize model identifiers by removing any leading provider tag so iFlow receives the raw model name
- Rework SSE parsing to accept both `data:` and `data: ` prefixes and to handle final chunks that contain both choices and usage by emitting the content first, then the usage
- Add explicit branches for usage-only and content-only chunks to ensure consistent output shapes

src/rotator_library/client.py CHANGED
@@ -1552,11 +1552,22 @@ class RotatingClient:
1552
  Returns:
1553
  The completion response object, or an async generator for streaming responses, or None if all retries fail.
1554
  """
 
 
 
 
 
 
 
 
1555
  if kwargs.get("stream"):
1556
- if "stream_options" not in kwargs:
1557
- kwargs["stream_options"] = {}
1558
- if "include_usage" not in kwargs["stream_options"]:
1559
- kwargs["stream_options"]["include_usage"] = True
 
 
 
1560
  return self._streaming_acompletion_with_retry(
1561
  request=request, pre_request_callback=pre_request_callback, **kwargs
1562
  )
 
1552
  Returns:
1553
  The completion response object, or an async generator for streaming responses, or None if all retries fail.
1554
  """
1555
+ # Handle iflow provider: remove stream_options to avoid HTTP 406
1556
+ model = kwargs.get("model", "")
1557
+ provider = model.split("/")[0] if "/" in model else ""
1558
+
1559
+ if provider == "iflow" and "stream_options" in kwargs:
1560
+ lib_logger.debug("Removing stream_options for iflow provider to avoid HTTP 406")
1561
+ kwargs.pop("stream_options", None)
1562
+
1563
  if kwargs.get("stream"):
1564
+ # Only add stream_options for providers that support it (excluding iflow)
1565
+ if provider != "iflow":
1566
+ if "stream_options" not in kwargs:
1567
+ kwargs["stream_options"] = {}
1568
+ if "include_usage" not in kwargs["stream_options"]:
1569
+ kwargs["stream_options"]["include_usage"] = True
1570
+
1571
  return self._streaming_acompletion_with_retry(
1572
  request=request, pre_request_callback=pre_request_callback, **kwargs
1573
  )
src/rotator_library/providers/iflow_provider.py CHANGED
@@ -282,12 +282,29 @@ class IFlowProvider(IFlowAuthBase, ProviderInterface):
282
  """
283
  Converts a raw iFlow SSE chunk to an OpenAI-compatible chunk.
284
  Since iFlow is OpenAI-compatible, minimal conversion is needed.
 
 
 
285
  """
286
  if not isinstance(chunk, dict):
287
  return
288
 
289
- # Handle usage data
290
- if usage_data := chunk.get("usage"):
 
 
 
 
 
 
 
 
 
 
 
 
 
 
291
  yield {
292
  "choices": [], "model": model_id, "object": "chat.completion.chunk",
293
  "id": chunk.get("id", f"chatcmpl-iflow-{time.time()}"),
@@ -300,19 +317,30 @@ class IFlowProvider(IFlowAuthBase, ProviderInterface):
300
  }
301
  return
302
 
303
- # Handle content data
304
- choices = chunk.get("choices", [])
305
- if not choices:
 
 
 
 
 
 
 
 
 
306
  return
307
 
308
- # iFlow returns OpenAI-compatible format, so we can mostly pass through
309
- yield {
310
- "choices": choices,
311
- "model": model_id,
312
- "object": "chat.completion.chunk",
313
- "id": chunk.get("id", f"chatcmpl-iflow-{time.time()}"),
314
- "created": chunk.get("created", int(time.time()))
315
- }
 
 
316
 
317
  def _stream_to_completion_response(self, chunks: List[litellm.ModelResponse]) -> litellm.ModelResponse:
318
  """
@@ -429,8 +457,12 @@ class IFlowProvider(IFlowAuthBase, ProviderInterface):
429
  # CRITICAL: get_api_details returns api_key, NOT access_token
430
  api_base, api_key = await self.get_api_details(credential_path)
431
 
 
 
 
 
432
  # Build clean payload with only supported parameters
433
- payload = self._build_request_payload(**kwargs)
434
 
435
  headers = {
436
  "Authorization": f"Bearer {api_key}", # Uses api_key from user info
@@ -487,9 +519,16 @@ class IFlowProvider(IFlowAuthBase, ProviderInterface):
487
  # Process successful streaming response
488
  async for line in response.aiter_lines():
489
  file_logger.log_response_chunk(line)
490
- if line.startswith('data: '):
491
- data_str = line[6:]
492
- if data_str == "[DONE]":
 
 
 
 
 
 
 
493
  break
494
  try:
495
  chunk = json.loads(data_str)
 
282
  """
283
  Converts a raw iFlow SSE chunk to an OpenAI-compatible chunk.
284
  Since iFlow is OpenAI-compatible, minimal conversion is needed.
285
+
286
+ CRITICAL FIX: Handle chunks with BOTH usage and choices (final chunk)
287
+ without early return to ensure finish_reason is properly processed.
288
  """
289
  if not isinstance(chunk, dict):
290
  return
291
 
292
+ # Get choices and usage data
293
+ choices = chunk.get("choices", [])
294
+ usage_data = chunk.get("usage")
295
+
296
+ # Handle chunks with BOTH choices and usage (typical for final chunk)
297
+ # CRITICAL: Process choices FIRST to capture finish_reason, then yield usage
298
+ if choices and usage_data:
299
+ # Yield the choice chunk first (contains finish_reason)
300
+ yield {
301
+ "choices": choices,
302
+ "model": model_id,
303
+ "object": "chat.completion.chunk",
304
+ "id": chunk.get("id", f"chatcmpl-iflow-{time.time()}"),
305
+ "created": chunk.get("created", int(time.time()))
306
+ }
307
+ # Then yield the usage chunk
308
  yield {
309
  "choices": [], "model": model_id, "object": "chat.completion.chunk",
310
  "id": chunk.get("id", f"chatcmpl-iflow-{time.time()}"),
 
317
  }
318
  return
319
 
320
+ # Handle usage-only chunks
321
+ if usage_data:
322
+ yield {
323
+ "choices": [], "model": model_id, "object": "chat.completion.chunk",
324
+ "id": chunk.get("id", f"chatcmpl-iflow-{time.time()}"),
325
+ "created": chunk.get("created", int(time.time())),
326
+ "usage": {
327
+ "prompt_tokens": usage_data.get("prompt_tokens", 0),
328
+ "completion_tokens": usage_data.get("completion_tokens", 0),
329
+ "total_tokens": usage_data.get("total_tokens", 0),
330
+ }
331
+ }
332
  return
333
 
334
+ # Handle content-only chunks
335
+ if choices:
336
+ # iFlow returns OpenAI-compatible format, so we can mostly pass through
337
+ yield {
338
+ "choices": choices,
339
+ "model": model_id,
340
+ "object": "chat.completion.chunk",
341
+ "id": chunk.get("id", f"chatcmpl-iflow-{time.time()}"),
342
+ "created": chunk.get("created", int(time.time()))
343
+ }
344
 
345
  def _stream_to_completion_response(self, chunks: List[litellm.ModelResponse]) -> litellm.ModelResponse:
346
  """
 
457
  # CRITICAL: get_api_details returns api_key, NOT access_token
458
  api_base, api_key = await self.get_api_details(credential_path)
459
 
460
+ # Strip provider prefix from model name (e.g., "iflow/Qwen3-Coder-Plus" -> "Qwen3-Coder-Plus")
461
+ model_name = model.split('/')[-1]
462
+ kwargs_with_stripped_model = {**kwargs, 'model': model_name}
463
+
464
  # Build clean payload with only supported parameters
465
+ payload = self._build_request_payload(**kwargs_with_stripped_model)
466
 
467
  headers = {
468
  "Authorization": f"Bearer {api_key}", # Uses api_key from user info
 
519
  # Process successful streaming response
520
  async for line in response.aiter_lines():
521
  file_logger.log_response_chunk(line)
522
+
523
+ # CRITICAL FIX: Handle both "data:" (no space) and "data: " (with space)
524
+ if line.startswith('data:'):
525
+ # Extract data after "data:" prefix, handling both formats
526
+ if line.startswith('data: '):
527
+ data_str = line[6:] # Skip "data: "
528
+ else:
529
+ data_str = line[5:] # Skip "data:"
530
+
531
+ if data_str.strip() == "[DONE]":
532
  break
533
  try:
534
  chunk = json.loads(data_str)