Mirrowel commited on
Commit
f76ad1b
·
1 Parent(s): a20e9d0

fix: Prevent stream failure on malformed chunks

Browse files

Refactor the `_stream_wrapper` to gracefully handle errors within individual stream chunks.

The previous `async for` loop would cause the entire stream to fail if a single chunk was malformed (e.g., invalid JSON).

By switching to a `while` loop with an explicit iterator and a `try...except` block, the wrapper can now catch exceptions, log a warning, and skip the problematic chunk. This ensures the rest of the stream is processed, improving the client's robustness.

Files changed (1) hide show
  1. src/rotator_library/client.py +25 -9
src/rotator_library/client.py CHANGED
@@ -64,19 +64,35 @@ class RotatingClient:
64
  """
65
  A definitive hybrid wrapper for streaming responses that ensures usage is recorded
66
  and the key lock is released only after the stream is fully consumed.
67
- It exhaustively checks for usage data in all possible locations.
 
68
  """
69
  usage_recorded = False
70
  stream_completed = False
 
 
71
  try:
72
- async for chunk in stream:
73
- yield f"data: {json.dumps(chunk.dict())}\n\n"
74
- # 1. First, try to find usage in a chunk (for providers that send it mid-stream)
75
- if not usage_recorded and hasattr(chunk, 'usage') and chunk.usage:
76
- await self.usage_manager.record_success(key, model, chunk)
77
- usage_recorded = True
78
- lib_logger.info(f"Recorded usage from stream chunk for key ...{key[-4:]}")
79
- stream_completed = True
 
 
 
 
 
 
 
 
 
 
 
 
 
80
  finally:
81
  # 2. If not found in a chunk, try the final stream object itself (for other providers)
82
  if not usage_recorded:
 
64
  """
65
  A definitive hybrid wrapper for streaming responses that ensures usage is recorded
66
  and the key lock is released only after the stream is fully consumed.
67
+ It exhaustively checks for usage data in all possible locations and gracefully
68
+ handles JSON decoding errors from the stream.
69
  """
70
  usage_recorded = False
71
  stream_completed = False
72
+ stream_iterator = stream.__aiter__()
73
+
74
  try:
75
+ while True:
76
+ try:
77
+ chunk = await stream_iterator.__anext__()
78
+ yield f"data: {json.dumps(chunk.dict())}\n\n"
79
+
80
+ # 1. First, try to find usage in a chunk (for providers that send it mid-stream)
81
+ if not usage_recorded and hasattr(chunk, 'usage') and chunk.usage:
82
+ await self.usage_manager.record_success(key, model, chunk)
83
+ usage_recorded = True
84
+ lib_logger.info(f"Recorded usage from stream chunk for key ...{key[-4:]}")
85
+
86
+ except StopAsyncIteration:
87
+ # The stream finished successfully.
88
+ stream_completed = True
89
+ break
90
+
91
+ except Exception as e:
92
+ # This will catch JSONDecodeError and other potential issues with a chunk
93
+ lib_logger.warning(f"Skipping a malformed chunk for key ...{key[-4:]}: {e}")
94
+ continue # Skip to the next chunk
95
+
96
  finally:
97
  # 2. If not found in a chunk, try the final stream object itself (for other providers)
98
  if not usage_recorded: