bibibi12345 commited on
Commit
f5fea6c
·
1 Parent(s): 2cdaad6

test real streaming

Browse files
Files changed (1) hide show
  1. main.py +92 -135
main.py CHANGED
@@ -166,11 +166,11 @@ async def chat_completions(
166
  # 5. Prepare Headers for Flowith Request
167
  # Headers exactly matching the curl -H flags provided
168
  headers = {
169
- 'accept': '*/*',
170
  'accept-language': 'en-US,en;q=0.9,zh-CN;q=0.8,zh;q=0.7,zh-TW;q=0.6,ja;q=0.5',
171
  'authorization': FLOWITH_AUTH_TOKEN, # Send only the token, no "Bearer " prefix
172
  'content-type': 'application/json',
173
- 'responsetype': 'stream', # Restore this header
174
  'origin': 'https://flowith.net',
175
  'priority': 'u=1, i',
176
  'referer': 'https://flowith.net/',
@@ -189,147 +189,104 @@ async def chat_completions(
189
  # Need JSONResponse
190
  from fastapi.responses import JSONResponse
191
 
 
 
 
192
  async with httpx.AsyncClient(timeout=300.0) as client:
193
  try:
194
- # Serialize payload manually
195
- payload_bytes = json.dumps(flowith_payload.dict()).encode('utf-8')
196
-
197
- # Make a non-streaming POST request to Flowith
198
- response = await client.post(
199
- FLOWITH_API_URL,
200
- content=payload_bytes,
201
- headers=headers,
202
- )
203
-
204
- # Check status code after receiving the full response
205
- if response.status_code != 200:
206
  try:
207
- error_detail = response.text # Use .text for non-streaming
208
- detail_msg = f"Flowith API Error ({response.status_code}): {error_detail}"
209
- except Exception:
210
- detail_msg = f"Flowith API Error ({response.status_code})"
211
- raise HTTPException(status_code=response.status_code, detail=detail_msg)
212
-
213
- # Get the plain text response directly
214
- flowith_text = response.text
215
- print(f"response preview: {flowith_text[:500]}")
216
- # 7. Handle response based on *client's* request.stream preference
217
- if not request.stream:
218
- # Client wants non-streaming: Construct OpenAI-compatible JSON from plain text
219
- completion_id = f"chatcmpl-{uuid.uuid4()}"
220
- created_timestamp = int(time.time())
221
- response_payload = {
222
- "id": completion_id,
223
- "object": "chat.completion",
224
- "created": created_timestamp,
225
- "model": request.model, # Use the model from the original request
226
- "choices": [{
227
- "index": 0,
228
- "message": {
229
- "role": "assistant",
230
- "content": flowith_text # Use the plain text here
231
- },
232
- "finish_reason": "stop" # Assume stop
233
- }],
234
- # "usage": {...} # Usage stats are typically not available/meaningful here
235
- }
236
- return JSONResponse(content=response_payload)
237
- else:
238
- # Client wants streaming: Implement keep-alive while fetching, then stream response
239
- async def stream_generator():
240
- response_ready_event = asyncio.Event()
241
- fetch_task = None
242
- flowith_text_local = None # Use a local variable to avoid confusion with outer scope
243
- error_occurred = None
244
- sse_model_name = request.model # Use the model requested by the client
245
-
246
- # Coroutine to fetch data and signal completion
247
- async def fetch_and_process(event):
248
- nonlocal flowith_text_local, error_occurred
249
  try:
250
- # === Logic to prepare request (model_name, flowith_messages, etc.) ===
251
- # These variables are captured from the outer scope:
252
- # FLOWITH_API_URL, headers, flowith_payload
253
- # The actual request is made here now, not before calling stream_generator
254
- async with httpx.AsyncClient(timeout=300.0) as client:
255
- payload_bytes = json.dumps(flowith_payload.dict()).encode('utf-8') # Use outer flowith_payload
256
- response = await client.post(
257
- FLOWITH_API_URL, headers=headers, content=payload_bytes # Use outer headers
258
- )
259
- response.raise_for_status() # Raise HTTPError for bad responses (4xx or 5xx)
260
- flowith_text_local = response.text # Store in local variable
261
-
 
 
 
 
 
 
262
  except Exception as e:
263
- # print(f"Error fetching from Flowith: {e}") # Optional debug
264
- error_occurred = e # Store error to yield later
265
- finally:
266
- event.set() # Signal completion or failure
 
267
 
268
- # Start fetching in the background
269
- fetch_task = asyncio.create_task(fetch_and_process(response_ready_event))
270
 
271
- try:
272
- # Send keep-alive chunks while waiting
273
- while not response_ready_event.is_set():
274
- keep_alive_data = {"id": "chatcmpl-keepalive", "object": "chat.completion.chunk", "created": int(time.time()), "model": sse_model_name, "choices": [{"delta": {"content": ""}, "index": 0, "finish_reason": None}]}
275
- yield f"data: {json.dumps(keep_alive_data)}\n\n"
276
- # Wait for a short period or until the event is set
277
- try:
278
- await asyncio.wait_for(response_ready_event.wait(), timeout=3.0)
279
- except asyncio.TimeoutError:
280
- pass # Timeout means event not set, continue loop
281
-
282
- # Event is set, check if fetch task had an error
283
- if fetch_task.done() and fetch_task.exception():
284
- # If fetch_task failed before setting event (shouldn't happen with finally, but check anyway)
285
- error_occurred = fetch_task.exception()
286
-
287
- if error_occurred:
288
- # Yield an error chunk (optional, depends on desired behavior)
289
- # print(f"Yielding error chunk: {error_occurred}") # Optional debug
290
- error_content = f"Error processing request: {type(error_occurred).__name__}"
291
- error_chunk = {"id": f"chatcmpl-error-{uuid.uuid4()}", "object": "chat.completion.chunk", "created": int(time.time()), "model": sse_model_name, "choices": [{"delta": {"content": error_content }, "index": 0, "finish_reason": "error"}]} # Use finish_reason 'error' if possible
292
- yield f"data: {json.dumps(error_chunk)}\n\n"
293
- elif flowith_text_local is not None:
294
- # Fetch succeeded, yield content chunks
295
- chunk_id = f"chatcmpl-{uuid.uuid4()}"
296
- chunk_size = 20
297
- full_content = flowith_text_local
298
-
299
- for i in range(0, len(full_content), chunk_size):
300
- content_piece = full_content[i:i + chunk_size]
301
- chunk = {
302
- "id": chunk_id,
303
- "object": "chat.completion.chunk",
304
- "created": int(time.time()),
305
- "model": sse_model_name,
306
- "choices": [{"index": 0, "delta": {"content": content_piece}, "finish_reason": None}]
307
- }
308
- yield f"data: {json.dumps(chunk)}\n\n"
309
-
310
- # Yield final chunk
311
- final_chunk = {
312
- "id": chunk_id,
313
- "object": "chat.completion.chunk",
314
- "created": int(time.time()),
315
- "model": sse_model_name,
316
- "choices": [{"index": 0, "delta": {}, "finish_reason": "stop"}]
317
- }
318
- yield f"data: {json.dumps(final_chunk)}\n\n"
319
-
320
- except asyncio.CancelledError:
321
- # print("Stream generator cancelled (client disconnected)") # Optional debug
322
- raise # Re-raise cancellation
323
- finally:
324
- # Ensure fetch task is cancelled if generator exits early
325
- if fetch_task and not fetch_task.done():
326
- fetch_task.cancel()
327
- # Send DONE message regardless of success/failure/cancellation
328
- yield "data: [DONE]\n\n"
329
-
330
- # The return statement remains the same
331
- return StreamingResponse(stream_generator(), media_type="text/event-stream")
332
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
333
  except httpx.RequestError as exc:
334
  print(f"Error requesting Flowith: {exc}")
335
  raise HTTPException(status_code=503, detail=f"Error connecting to Flowith service: {exc}")
 
166
  # 5. Prepare Headers for Flowith Request
167
  # Headers exactly matching the curl -H flags provided
168
  headers = {
169
+ 'accept': 'text/event-stream', # Changed for streaming
170
  'accept-language': 'en-US,en;q=0.9,zh-CN;q=0.8,zh;q=0.7,zh-TW;q=0.6,ja;q=0.5',
171
  'authorization': FLOWITH_AUTH_TOKEN, # Send only the token, no "Bearer " prefix
172
  'content-type': 'application/json',
173
+ 'responsetype': 'stream', # Keep this header
174
  'origin': 'https://flowith.net',
175
  'priority': 'u=1, i',
176
  'referer': 'https://flowith.net/',
 
189
  # Need JSONResponse
190
  from fastapi.responses import JSONResponse
191
 
192
+ # Need JSONResponse
193
+ from fastapi.responses import JSONResponse
194
+
195
  async with httpx.AsyncClient(timeout=300.0) as client:
196
  try:
197
+ # Serialize payload manually for the stream request
198
+ payload_json = flowith_payload.dict()
199
+
200
+ # Use client.stream for real streaming
201
+ async with client.stream("POST", FLOWITH_API_URL, headers=headers, json=payload_json, timeout=300.0) as response:
202
+ # Check status *after* starting stream read or getting headers
203
+ # It's often better to check after reading the first chunk or headers
204
+ # For simplicity here, we might check early, but be aware Flowith might send errors mid-stream
205
+ # Let's check status right away, but handle potential errors during iteration too.
 
 
 
206
  try:
207
+ response.raise_for_status() # Check initial status
208
+ except httpx.HTTPStatusError as status_exc:
209
+ # Attempt to read body for more details if possible
210
+ try:
211
+ error_body = await status_exc.response.aread()
212
+ detail_msg = f"Flowith API Error ({status_exc.response.status_code}): {error_body.decode('utf-8', errors='replace')}"
213
+ except Exception:
214
+ detail_msg = f"Flowith API Error ({status_exc.response.status_code})"
215
+ raise HTTPException(status_code=status_exc.response.status_code, detail=detail_msg) from status_exc
216
+
217
+
218
+ # 7. Handle response based on *client's* request.stream preference
219
+ if request.stream:
220
+ # Client wants streaming: Forward Flowith's stream directly
221
+ async def stream_forwarder():
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
222
  try:
223
+ async for chunk in response.aiter_bytes():
224
+ # Assuming Flowith sends SSE-compatible chunks (or raw data we want to forward)
225
+ # If Flowith sends *only* text content per chunk, we might need to format it:
226
+ # yield f"data: {json.dumps({'choices': [{'delta': {'content': chunk.decode()}}]})}\n\n"
227
+ # For now, let's assume Flowith sends pre-formatted SSE or raw bytes are okay.
228
+ yield chunk
229
+ # Optionally yield a final [DONE] message if Flowith doesn't
230
+ # yield b"data: [DONE]\n\n"
231
+ except httpx.RequestError as stream_exc:
232
+ print(f"Error during Flowith stream read: {stream_exc}")
233
+ # Decide how to signal this error to the client.
234
+ # Option 1: Raise an exception (FastAPI might handle it)
235
+ # raise HTTPException(status_code=503, detail=f"Stream read error: {stream_exc}")
236
+ # Option 2: Yield an error message within the stream (if client supports it)
237
+ error_content = f"Stream read error: {type(stream_exc).__name__}"
238
+ error_chunk = {"id": f"chatcmpl-streamerror-{uuid.uuid4()}", "object": "chat.completion.chunk", "created": int(time.time()), "model": request.model, "choices": [{"delta": {"content": error_content }, "index": 0, "finish_reason": "error"}]}
239
+ yield f"data: {json.dumps(error_chunk)}\n\n".encode('utf-8')
240
+ yield b"data: [DONE]\n\n" # Still send DONE after error chunk
241
  except Exception as e:
242
+ print(f"Unexpected error during stream forward: {e}")
243
+ error_content = f"Unexpected stream error: {type(e).__name__}"
244
+ error_chunk = {"id": f"chatcmpl-streamerror-{uuid.uuid4()}", "object": "chat.completion.chunk", "created": int(time.time()), "model": request.model, "choices": [{"delta": {"content": error_content }, "index": 0, "finish_reason": "error"}]}
245
+ yield f"data: {json.dumps(error_chunk)}\n\n".encode('utf-8')
246
+ yield b"data: [DONE]\n\n" # Still send DONE after error chunk
247
 
 
 
248
 
249
+ return StreamingResponse(stream_forwarder(), media_type="text/event-stream")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
250
 
251
+ else:
252
+ # Client wants non-streaming: Accumulate chunks
253
+ chunks = []
254
+ try:
255
+ async for chunk in response.aiter_bytes():
256
+ chunks.append(chunk)
257
+ except httpx.RequestError as stream_exc:
258
+ print(f"Error during Flowith stream read (non-streaming mode): {stream_exc}")
259
+ raise HTTPException(status_code=503, detail=f"Stream read error: {stream_exc}")
260
+ except Exception as e:
261
+ print(f"Unexpected error during stream accumulation: {e}")
262
+ raise HTTPException(status_code=500, detail=f"Unexpected stream error: {e}")
263
+
264
+
265
+ full_response_bytes = b"".join(chunks)
266
+ flowith_text = full_response_bytes.decode('utf-8', errors='replace')
267
+ print(f"Accumulated response preview: {flowith_text[:500]}")
268
+
269
+ # Construct OpenAI-compatible JSON
270
+ completion_id = f"chatcmpl-{uuid.uuid4()}"
271
+ created_timestamp = int(time.time())
272
+ response_payload = {
273
+ "id": completion_id,
274
+ "object": "chat.completion",
275
+ "created": created_timestamp,
276
+ "model": request.model, # Use the model from the original request
277
+ "choices": [{
278
+ "index": 0,
279
+ "message": {
280
+ "role": "assistant",
281
+ "content": flowith_text
282
+ },
283
+ "finish_reason": "stop" # Assume stop
284
+ }],
285
+ # "usage": {...} # Usage stats are typically not available/meaningful here
286
+ }
287
+ return JSONResponse(content=response_payload)
288
+
289
+ # Keep outer exception handling for initial connection errors etc.
290
  except httpx.RequestError as exc:
291
  print(f"Error requesting Flowith: {exc}")
292
  raise HTTPException(status_code=503, detail=f"Error connecting to Flowith service: {exc}")