bibibi12345 commited on
Commit
0053c86
·
1 Parent(s): 611ae9c

added keepalive message

Browse files
Files changed (1) hide show
  1. main.py +117 -38
main.py CHANGED
@@ -1,6 +1,8 @@
1
  import os
2
  import json
3
  import uuid
 
 
4
  from typing import List, Optional, Literal, Dict, Any, AsyncGenerator
5
 
6
  import httpx
@@ -233,51 +235,128 @@ async def chat_completions(
233
  }
234
  return JSONResponse(content=response_payload)
235
  else:
236
- # Client wants streaming: Simulate streaming from the plain text response
237
  async def stream_generator() -> AsyncGenerator[str, None]:
238
- # Ensure necessary imports are available (time, json, uuid are already imported)
239
-
240
- chunk_id = f"chatcmpl-{uuid.uuid4()}"
241
- model_name = request.model # Use the model requested by the client
242
-
243
- # Use the plain text directly as the full content
244
- full_content = flowith_text
245
-
246
- # Define chunk size
247
- chunk_size = 20
248
-
249
- # Stream fixed-size chunks
250
- for i in range(0, len(full_content), chunk_size):
251
- content_piece = full_content[i:i + chunk_size]
252
- chunk = {
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
253
  "id": chunk_id,
254
  "object": "chat.completion.chunk",
255
- "created": int(time.time()), # New timestamp for each chunk
256
- "model": model_name,
257
  "choices": [{
258
  "index": 0,
259
- "delta": {"content": content_piece}, # Use the 20-char piece
260
- "finish_reason": None
261
  }]
262
  }
263
- yield f"data: {json.dumps(chunk)}\n\n"
264
- # Optional delay for simulation
265
- # await asyncio.sleep(0.01)
266
-
267
- # Send the final chunk with finish_reason
268
- final_chunk = {
269
- "id": chunk_id,
270
- "object": "chat.completion.chunk",
271
- "created": int(time.time()), # Use a final timestamp
272
- "model": model_name,
273
- "choices": [{
274
- "index": 0,
275
- "delta": {}, # Empty delta for final chunk
276
- "finish_reason": "stop" # Assume 'stop'
277
- }]
278
- }
279
- yield f"data: {json.dumps(final_chunk)}\n\n"
280
- yield "data: [DONE]\n\n"
 
 
 
 
 
 
 
 
 
 
 
281
 
282
  return StreamingResponse(stream_generator(), media_type="text/event-stream")
283
 
 
1
  import os
2
  import json
3
  import uuid
4
+ import asyncio # <-- Added import
5
+ import time # Ensure time is imported for timestamps
6
  from typing import List, Optional, Literal, Dict, Any, AsyncGenerator
7
 
8
  import httpx
 
235
  }
236
  return JSONResponse(content=response_payload)
237
  else:
238
+ # Client wants streaming: Implement keep-alive while fetching, then stream response
239
  async def stream_generator() -> AsyncGenerator[str, None]:
240
+ response_ready_event = asyncio.Event()
241
+ keep_alive_task = None
242
+ fetch_task = None
243
+ sse_model_name = request.model # Use the model requested by the client
244
+
245
+ async def keep_alive_sender(event):
246
+ while not event.is_set():
247
+ try:
248
+ # Yield keep-alive chunk
249
+ keep_alive_data = {
250
+ "id": "chatcmpl-keepalive",
251
+ "object": "chat.completion.chunk",
252
+ "created": int(time.time()),
253
+ "model": sse_model_name, # Use client's requested model
254
+ "choices": [{"delta": {"content": ""}, "index": 0, "finish_reason": None}]
255
+ }
256
+ yield f"data: {json.dumps(keep_alive_data)}\n\n"
257
+ await asyncio.sleep(3) # Send every 3 seconds
258
+ except asyncio.CancelledError:
259
+ # print("Keep-alive cancelled") # Optional debug
260
+ break
261
+ except Exception as e:
262
+ # print(f"Keep-alive error: {e}") # Optional debug
263
+ break # Stop on error
264
+
265
+ async def fetch_and_process(event):
266
+ # This function now encapsulates the non-streaming fetch logic
267
+ # It uses variables from the outer scope (client, FLOWITH_API_URL, headers, flowith_payload)
268
+ # Note: flowith_text is already available in the outer scope from the initial non-streaming call
269
+ try:
270
+ # The actual HTTP request to Flowith has already completed
271
+ # in the outer scope before stream_generator is even called.
272
+ # We just need to signal completion and return the text.
273
+ # If the request logic needed to be *inside* here, it would look like:
274
+ # async with httpx.AsyncClient(timeout=300.0) as client:
275
+ # payload_bytes = json.dumps(flowith_payload.dict()).encode('utf-8')
276
+ # response = await client.post(FLOWITH_API_URL, content=payload_bytes, headers=headers)
277
+ # response.raise_for_status()
278
+ # fetched_text = response.text
279
+ # return fetched_text
280
+
281
+ # Since flowith_text is already fetched, just return it.
282
+ # Add a small delay to simulate work if needed for testing keep-alive
283
+ # await asyncio.sleep(5) # Simulate delay
284
+ return flowith_text # Return the already fetched text
285
+ finally:
286
+ event.set() # Signal that fetch is done (or simulated done)
287
+
288
+ try:
289
+ # Start keep-alive and fetch tasks
290
+ keep_alive_task = asyncio.create_task(keep_alive_sender(response_ready_event))
291
+ fetch_task = asyncio.create_task(fetch_and_process(response_ready_event))
292
+
293
+ # Wait for the actual response text (which is already available)
294
+ processed_flowith_text = await fetch_task
295
+
296
+ # Now that response is ready (event is set, keep-alive will stop), proceed with chunking
297
+ chunk_id = f"chatcmpl-{uuid.uuid4()}"
298
+ chunk_size = 20
299
+ full_content = processed_flowith_text
300
+
301
+ # Stream fixed-size content chunks
302
+ for i in range(0, len(full_content), chunk_size):
303
+ content_piece = full_content[i:i + chunk_size]
304
+ chunk = {
305
+ "id": chunk_id,
306
+ "object": "chat.completion.chunk",
307
+ "created": int(time.time()),
308
+ "model": sse_model_name,
309
+ "choices": [{
310
+ "index": 0,
311
+ "delta": {"content": content_piece},
312
+ "finish_reason": None
313
+ }]
314
+ }
315
+ yield f"data: {json.dumps(chunk)}\n\n"
316
+ # Optional small delay between content chunks
317
+ # await asyncio.sleep(0.01)
318
+
319
+ # Send the final chunk with finish_reason
320
+ final_chunk = {
321
  "id": chunk_id,
322
  "object": "chat.completion.chunk",
323
+ "created": int(time.time()),
324
+ "model": sse_model_name,
325
  "choices": [{
326
  "index": 0,
327
+ "delta": {},
328
+ "finish_reason": "stop"
329
  }]
330
  }
331
+ yield f"data: {json.dumps(final_chunk)}\n\n"
332
+ yield "data: [DONE]\n\n"
333
+
334
+ except Exception as e:
335
+ # Handle exceptions from fetch_task if needed
336
+ # print(f"Error during fetch/processing: {e}") # Optional debug
337
+ try:
338
+ error_content = f"Error processing stream: {e}"
339
+ error_chunk = {
340
+ "id": f"chatcmpl-error-{uuid.uuid4()}",
341
+ "object": "chat.completion.chunk",
342
+ "created": int(time.time()),
343
+ "model": sse_model_name,
344
+ "choices": [{"index": 0, "delta": {"content": error_content}, "finish_reason": "error"}] # Or use finish_reason: stop
345
+ }
346
+ yield f"data: {json.dumps(error_chunk)}\n\n"
347
+ except Exception as yield_err:
348
+ print(f"Error yielding error chunk: {yield_err}") # Log secondary error
349
+ finally:
350
+ yield "data: [DONE]\n\n" # Always send DONE if possible
351
+ finally:
352
+ # Ensure keep_alive task is stopped
353
+ if keep_alive_task and not keep_alive_task.done():
354
+ keep_alive_task.cancel()
355
+ # Optionally await cancellation, but event.set() should handle graceful exit
356
+ try:
357
+ await keep_alive_task
358
+ except asyncio.CancelledError:
359
+ pass # Expected
360
 
361
  return StreamingResponse(stream_generator(), media_type="text/event-stream")
362