Mirrowel commited on
Commit
03558f6
·
1 Parent(s): 856f4f3

fix(client): ensure robust stream termination and usage reporting

Browse files

The `finally` block in the streaming process was not fully robust in handling various termination scenarios, such as client disconnections or early stream exhaustion. This could lead to inconsistent usage logging or unnecessary data transmission to disconnected clients.

This commit improves the reliability and correctness of stream cleanup:
- Always attempts to record internal usage for the stream, regardless of how it terminates, ensuring internal metrics are consistently captured.
- Conditionally sends a final usage chunk to the client only if they remain connected and the `request` object is available.
- Sends the `[DONE]` signal exclusively when the stream completes naturally and the client is still connected, preventing errors or unnecessary network traffic to disconnected clients.

Files changed (1) hide show
  1. src/rotator_library/client.py +25 -4
src/rotator_library/client.py CHANGED
@@ -269,14 +269,35 @@ class RotatingClient:
269
  raise
270
 
271
  finally:
272
- # Only record usage if the stream completed successfully and usage wasn't already recorded.
273
- if stream_completed and not usage_recorded:
 
 
 
274
  await self.usage_manager.record_success(key, model, stream)
275
-
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
276
  await self.usage_manager.release_key(key, model)
277
  lib_logger.info(f"STREAM FINISHED and lock released for key ...{key[-4:]}.")
278
 
279
- if stream_completed:
 
 
280
  yield "data: [DONE]\n\n"
281
 
282
  async def _execute_with_retry(self, api_call: callable, request: Optional[Any], **kwargs) -> Any:
 
269
  raise
270
 
271
  finally:
272
+ # This block now runs regardless of how the stream terminates (completion, client disconnect, etc.).
273
+ # The primary goal is to ensure usage is always logged internally.
274
+ if not usage_recorded:
275
+ # This will be triggered if the stream is exhausted OR if the client disconnects early.
276
+ # It ensures that we always attempt to log usage from the final stream object.
277
  await self.usage_manager.record_success(key, model, stream)
278
+
279
+ # We still attempt to send the final usage to the client, but only if they are still connected.
280
+ if request and not await request.is_disconnected() and hasattr(stream, 'usage') and stream.usage:
281
+ try:
282
+ final_usage_chunk = {
283
+ "id": getattr(stream, 'id', None),
284
+ "model": getattr(stream, 'model', None),
285
+ "object": "chat.completion.chunk",
286
+ "created": getattr(stream, 'created', None),
287
+ "choices": [],
288
+ "usage": stream.usage.dict() if hasattr(stream.usage, 'dict') else vars(stream.usage)
289
+ }
290
+ yield f"data: {json.dumps(final_usage_chunk)}\n\n"
291
+ lib_logger.info(f"Yielded final usage chunk for key ...{key[-4:]}.")
292
+ except Exception as e:
293
+ lib_logger.error(f"Failed to create or yield final usage chunk: {e}")
294
+
295
  await self.usage_manager.release_key(key, model)
296
  lib_logger.info(f"STREAM FINISHED and lock released for key ...{key[-4:]}.")
297
 
298
+ # Only send [DONE] if the stream completed naturally and the client is still there.
299
+ # This prevents sending [DONE] to a disconnected client or after an error.
300
+ if stream_completed and (not request or not await request.is_disconnected()):
301
  yield "data: [DONE]\n\n"
302
 
303
  async def _execute_with_retry(self, api_call: callable, request: Optional[Any], **kwargs) -> Any: