Mirrowel commited on
Commit
d96253d
·
1 Parent(s): fd65a88

feat(client): Buffer and reassemble fragmented stream chunks

Browse files

The previous stream wrapper would discard any chunk that failed to parse as valid JSON. This caused data loss when providers sent fragmented JSON objects across multiple stream events, resulting in incomplete responses.

This commit refactors the `_stream_wrapper` to be more robust by introducing a buffering mechanism. When a JSON decoding error is detected, the raw chunk is appended to a buffer. The wrapper attempts to parse the entire buffer on each subsequent chunk arrival. Once a complete JSON object is successfully reassembled, it is yielded to the consumer and the buffer is cleared.

This change significantly improves the reliability of streaming responses by preventing data loss from fragmented data.

Files changed (1) hide show
  1. src/rotator_library/client.py +54 -16
src/rotator_library/client.py CHANGED
@@ -64,47 +64,85 @@ class RotatingClient:
64
  """
65
  A definitive hybrid wrapper for streaming responses that ensures usage is recorded
66
  and the key lock is released only after the stream is fully consumed.
67
- It exhaustively checks for usage data in all possible locations and gracefully
68
- handles JSON decoding errors from the stream.
69
  """
70
  usage_recorded = False
71
  stream_completed = False
72
  stream_iterator = stream.__aiter__()
73
-
 
74
  try:
75
  while True:
76
  try:
 
77
  chunk = await stream_iterator.__anext__()
 
 
 
 
 
 
 
 
 
 
78
  yield f"data: {json.dumps(chunk.dict())}\n\n"
79
-
80
- # 1. First, try to find usage in a chunk (for providers that send it mid-stream)
81
  if not usage_recorded and hasattr(chunk, 'usage') and chunk.usage:
82
  await self.usage_manager.record_success(key, model, chunk)
83
  usage_recorded = True
84
  lib_logger.info(f"Recorded usage from stream chunk for key ...{key[-4:]}")
85
-
86
  except StopAsyncIteration:
87
- # The stream finished successfully.
88
  stream_completed = True
 
 
89
  break
90
-
91
- except Exception as e:
92
- # This will catch JSONDecodeError and other potential issues with a chunk
93
- lib_logger.warning(f"Skipping a malformed chunk for key ...{key[-4:]}: {e}")
94
- continue # Skip to the next chunk
95
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
96
  finally:
97
- # 2. If not found in a chunk, try the final stream object itself (for other providers)
 
98
  if not usage_recorded:
99
- # This call is now safe because record_success is robust
100
  await self.usage_manager.record_success(key, model, stream)
101
  lib_logger.info(f"Recorded usage from final stream object for key ...{key[-4:]}")
102
 
103
- # 3. Release the key only after all attempts to record usage are complete
104
  await self.usage_manager.release_key(key, model)
105
  lib_logger.info(f"STREAM FINISHED and lock released for key ...{key[-4:]}.")
106
 
107
- # Only yield [DONE] if the stream completed successfully
108
  if stream_completed:
109
  yield "data: [DONE]\n\n"
110
 
 
64
  """
65
  A definitive hybrid wrapper for streaming responses that ensures usage is recorded
66
  and the key lock is released only after the stream is fully consumed.
67
+ It gracefully handles JSON decoding errors by buffering and attempting to
68
+ reassemble fragmented JSON objects only when an error is detected.
69
  """
70
  usage_recorded = False
71
  stream_completed = False
72
  stream_iterator = stream.__aiter__()
73
+ json_buffer = ""
74
+
75
  try:
76
  while True:
77
  try:
78
+ # 1. Await the next item from the stream iterator.
79
  chunk = await stream_iterator.__anext__()
80
+
81
+ # 2. If we receive a valid chunk while the buffer has content,
82
+ # it implies the buffered data was an unrecoverable fragment.
83
+ # Log it, discard the buffer, and proceed with the valid chunk.
84
+ if json_buffer:
85
+ lib_logger.warning(f"Discarding incomplete JSON buffer because a valid chunk was received: {json_buffer}")
86
+ json_buffer = ""
87
+
88
+ # 3. This is the "happy path" where the chunk is valid.
89
+ # Yield it in the Server-Sent Events (SSE) format.
90
  yield f"data: {json.dumps(chunk.dict())}\n\n"
91
+
92
+ # 4. Try to record usage from the valid chunk itself.
93
  if not usage_recorded and hasattr(chunk, 'usage') and chunk.usage:
94
  await self.usage_manager.record_success(key, model, chunk)
95
  usage_recorded = True
96
  lib_logger.info(f"Recorded usage from stream chunk for key ...{key[-4:]}")
97
+
98
  except StopAsyncIteration:
99
+ # 5. The stream has ended successfully.
100
  stream_completed = True
101
+ if json_buffer:
102
+ lib_logger.warning(f"Stream ended with incomplete data in buffer: {json_buffer}")
103
  break
 
 
 
 
 
104
 
105
+ except Exception as e:
106
+ # 6. An exception occurred, indicating a potentially malformed or fragmented chunk.
107
+ # This is where we enter our robust buffering and reassembly logic.
108
+ lib_logger.info(f"Malformed chunk detected for key ...{key[-4:]}. Attempting to buffer and reassemble.")
109
+
110
+ try:
111
+ # 6a. The raw chunk string is usually in the exception message from litellm.
112
+ # We extract it here. This is fragile but necessary.
113
+ raw_chunk = str(e).split("Received chunk:")[-1].strip()
114
+ json_buffer += raw_chunk
115
+
116
+ # 6b. Try to parse the entire buffer.
117
+ try:
118
+ parsed_data = json.loads(json_buffer)
119
+ # Success! The buffer now contains a complete JSON object.
120
+ lib_logger.info(f"Successfully reassembled JSON from buffer: {json_buffer}")
121
+ yield f"data: {json.dumps(parsed_data)}\n\n"
122
+ json_buffer = "" # Clear the buffer to start fresh.
123
+ except json.JSONDecodeError:
124
+ # The buffer is still not a complete JSON object.
125
+ # We'll continue to the next loop iteration to get more chunks.
126
+ lib_logger.info(f"Buffer is still not a complete JSON object. Waiting for more chunks.")
127
+ continue
128
+ except Exception as buffer_exc:
129
+ # If our own buffering logic fails, log it and reset to prevent getting stuck.
130
+ lib_logger.error(f"Error during stream buffering logic: {buffer_exc}. Discarding buffer.")
131
+ json_buffer = ""
132
+ continue
133
  finally:
134
+ # 7. This block ensures that usage is recorded and the key is released,
135
+ # no matter how the stream terminates.
136
  if not usage_recorded:
137
+ # If usage wasn't found in any chunk, try to get it from the final stream object.
138
  await self.usage_manager.record_success(key, model, stream)
139
  lib_logger.info(f"Recorded usage from final stream object for key ...{key[-4:]}")
140
 
141
+ # 8. Release the key so it can be used by other requests.
142
  await self.usage_manager.release_key(key, model)
143
  lib_logger.info(f"STREAM FINISHED and lock released for key ...{key[-4:]}.")
144
 
145
+ # 9. Only send the [DONE] message if the stream completed without being aborted.
146
  if stream_completed:
147
  yield "data: [DONE]\n\n"
148