bibibi12345 commited on
Commit
990adae
·
1 Parent(s): d86f2f1

revert back to fake streaming

Browse files
Files changed (1) hide show
  1. main.py +127 -127
main.py CHANGED
@@ -166,11 +166,11 @@ async def chat_completions(
166
  # 5. Prepare Headers for Flowith Request
167
  # Headers exactly matching the curl -H flags provided
168
  headers = {
169
- 'accept': 'text/event-stream', # Changed for streaming
170
  'accept-language': 'en-US,en;q=0.9,zh-CN;q=0.8,zh;q=0.7,zh-TW;q=0.6,ja;q=0.5',
171
  'authorization': FLOWITH_AUTH_TOKEN, # Send only the token, no "Bearer " prefix
172
  'content-type': 'application/json',
173
- 'responsetype': 'stream', # Keep this header
174
  'origin': 'https://flowith.net',
175
  'priority': 'u=1, i',
176
  'referer': 'https://flowith.net/',
@@ -189,139 +189,139 @@ async def chat_completions(
189
  # Need JSONResponse
190
  from fastapi.responses import JSONResponse
191
 
192
- # Need JSONResponse
193
- from fastapi.responses import JSONResponse
194
-
195
  async with httpx.AsyncClient(timeout=300.0) as client:
196
  try:
197
- # Serialize payload manually for the stream request
198
- payload_json = flowith_payload.dict()
199
-
200
- # Use client.stream for real streaming
201
- async with client.stream("POST", FLOWITH_API_URL, headers=headers, json=payload_json, timeout=300.0) as response:
202
- # Check status *after* starting stream read or getting headers
203
- # It's often better to check after reading the first chunk or headers
204
- # For simplicity here, we might check early, but be aware Flowith might send errors mid-stream
205
- # Let's check status right away, but handle potential errors during iteration too.
 
 
 
206
  try:
207
- response.raise_for_status() # Check initial status
208
- except httpx.HTTPStatusError as status_exc:
209
- # Attempt to read body for more details if possible
210
- try:
211
- error_body = await status_exc.response.aread()
212
- detail_msg = f"Flowith API Error ({status_exc.response.status_code}): {error_body.decode('utf-8', errors='replace')}"
213
- except Exception:
214
- detail_msg = f"Flowith API Error ({status_exc.response.status_code})"
215
- raise HTTPException(status_code=status_exc.response.status_code, detail=detail_msg) from status_exc
216
-
217
-
218
- # 7. Handle response based on *client's* request.stream preference
219
- if request.stream:
220
- # Client wants streaming: Forward Flowith's stream directly
221
- async def stream_forwarder():
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
222
  try:
223
- async for chunk in response.aiter_bytes():
224
- # Assuming Flowith sends SSE-compatible chunks (or raw data we want to forward)
225
- # If Flowith sends *only* text content per chunk, we might need to format it:
226
- # yield f"data: {json.dumps({'choices': [{'delta': {'content': chunk.decode()}}]})}\n\n"
227
- # For now, let's assume Flowith sends pre-formatted SSE or raw bytes are okay.
228
- yield chunk
229
- # Optionally yield a final [DONE] message if Flowith doesn't
230
- # yield b"data: [DONE]\n\n"
231
- except httpx.RequestError as stream_exc:
232
- print(f"Error during Flowith stream read: {stream_exc}")
233
- # Decide how to signal this error to the client.
234
- # Option 1: Raise an exception (FastAPI might handle it)
235
- # raise HTTPException(status_code=503, detail=f"Stream read error: {stream_exc}")
236
- # Option 2: Yield an error message within the stream (if client supports it)
237
- error_content = f"Stream read error: {type(stream_exc).__name__}"
238
- error_chunk = {"id": f"chatcmpl-streamerror-{uuid.uuid4()}", "object": "chat.completion.chunk", "created": int(time.time()), "model": request.model, "choices": [{"delta": {"content": error_content }, "index": 0, "finish_reason": "error"}]}
239
- yield f"data: {json.dumps(error_chunk)}\n\n".encode('utf-8')
240
- yield b"data: [DONE]\n\n" # Still send DONE after error chunk
241
  except Exception as e:
242
- print(f"Unexpected error during stream forward: {e}")
243
- error_content = f"Unexpected stream error: {type(e).__name__}"
244
- error_chunk = {"id": f"chatcmpl-streamerror-{uuid.uuid4()}", "object": "chat.completion.chunk", "created": int(time.time()), "model": request.model, "choices": [{"delta": {"content": error_content }, "index": 0, "finish_reason": "error"}]}
245
- yield f"data: {json.dumps(error_chunk)}\n\n".encode('utf-8')
246
- yield b"data: [DONE]\n\n" # Still send DONE after error chunk
247
 
 
 
248
 
249
- return StreamingResponse(stream_forwarder(), media_type="text/event-stream")
250
-
251
- else:
252
- # Client wants non-streaming: Accumulate chunks
253
- chunks = []
254
  try:
255
- async for chunk in response.aiter_bytes():
256
- chunks.append(chunk)
257
- except httpx.RequestError as stream_exc:
258
- print(f"Error during Flowith stream read (non-streaming mode): {stream_exc}")
259
- raise HTTPException(status_code=503, detail=f"Stream read error: {stream_exc}")
260
- except Exception as e:
261
- print(f"Unexpected error during stream accumulation: {e}")
262
- raise HTTPException(status_code=500, detail=f"Unexpected stream error: {e}")
263
-
264
-
265
- full_response_bytes = b"".join(chunks)
266
- flowith_text = full_response_bytes.decode('utf-8', errors='replace')
267
- print(f"Accumulated response preview: {flowith_text[:500]}")
268
-
269
- # Construct OpenAI-compatible JSON
270
- completion_id = f"chatcmpl-{uuid.uuid4()}"
271
- created_timestamp = int(time.time())
272
- response_payload = {
273
- "id": completion_id,
274
- "object": "chat.completion",
275
- "created": created_timestamp,
276
- "model": request.model, # Use the model from the original request
277
- "choices": [{
278
- "index": 0,
279
- "message": {
280
- "role": "assistant",
281
- "content": flowith_text
282
- },
283
- "finish_reason": "stop" # Assume stop
284
- }],
285
- # "usage": {...} # Usage stats are typically not available/meaningful here
286
- }
287
- return JSONResponse(content=response_payload)
288
-
289
- # Keep outer exception handling for initial connection errors etc.
290
- except httpx.RequestError as exc:
291
- print(f"Error requesting Flowith: {exc}")
292
- raise HTTPException(status_code=503, detail=f"Error connecting to Flowith service: {exc}")
293
- except HTTPException as http_exc:
294
- # Re-raise HTTPExceptions (e.g., from status code check or JSON parsing)
295
- raise http_exc
296
- except Exception as exc:
297
- print(f"Unexpected error during Flowith request/processing: {exc}")
298
- import traceback
299
- traceback.print_exc()
300
- raise HTTPException(status_code=500, detail=f"Internal server error: {exc}")
301
-
302
-
303
- # --- Models Endpoint ---
304
- @app.get("/v1/models", response_model=ModelList)
305
- async def list_models(api_key: str = Depends(verify_api_key)): # Protect with existing auth
306
- """
307
- Lists the available models based on the models.json mapping.
308
- Follows the OpenAI API format.
309
- """
310
- model_cards = [
311
- ModelCard(id=model_id) for model_id in model_mappings.keys()
312
- ]
313
- return ModelList(data=model_cards)
314
-
315
-
316
- # --- Optional: Add a root endpoint for health check ---
317
- @app.get("/")
318
- async def root():
319
- return {"message": "OpenAI to Flowith Proxy is running"}
320
-
321
- # --- To run locally (for development) ---
322
- # if __name__ == "__main__":
323
- # import uvicorn
324
- # uvicorn.run(app, host="0.0.0.0", port=8000)
325
  fetch_task.cancel()
326
  # Send DONE message regardless of success/failure/cancellation
327
  yield "data: [DONE]\n\n"
 
166
  # 5. Prepare Headers for Flowith Request
167
  # Headers exactly matching the curl -H flags provided
168
  headers = {
169
+ 'accept': '*/*',
170
  'accept-language': 'en-US,en;q=0.9,zh-CN;q=0.8,zh;q=0.7,zh-TW;q=0.6,ja;q=0.5',
171
  'authorization': FLOWITH_AUTH_TOKEN, # Send only the token, no "Bearer " prefix
172
  'content-type': 'application/json',
173
+ 'responsetype': 'stream', # Restore this header
174
  'origin': 'https://flowith.net',
175
  'priority': 'u=1, i',
176
  'referer': 'https://flowith.net/',
 
189
  # Need JSONResponse
190
  from fastapi.responses import JSONResponse
191
 
 
 
 
192
  async with httpx.AsyncClient(timeout=300.0) as client:
193
  try:
194
+ # Serialize payload manually
195
+ payload_bytes = json.dumps(flowith_payload.dict()).encode('utf-8')
196
+
197
+ # Make a non-streaming POST request to Flowith
198
+ response = await client.post(
199
+ FLOWITH_API_URL,
200
+ content=payload_bytes,
201
+ headers=headers,
202
+ )
203
+
204
+ # Check status code after receiving the full response
205
+ if response.status_code != 200:
206
  try:
207
+ error_detail = response.text # Use .text for non-streaming
208
+ detail_msg = f"Flowith API Error ({response.status_code}): {error_detail}"
209
+ except Exception:
210
+ detail_msg = f"Flowith API Error ({response.status_code})"
211
+ raise HTTPException(status_code=response.status_code, detail=detail_msg)
212
+
213
+ # Get the plain text response directly
214
+ flowith_text = response.text
215
+
216
+ # 7. Handle response based on *client's* request.stream preference
217
+ if not request.stream:
218
+ # Client wants non-streaming: Construct OpenAI-compatible JSON from plain text
219
+ completion_id = f"chatcmpl-{uuid.uuid4()}"
220
+ created_timestamp = int(time.time())
221
+ response_payload = {
222
+ "id": completion_id,
223
+ "object": "chat.completion",
224
+ "created": created_timestamp,
225
+ "model": request.model, # Use the model from the original request
226
+ "choices": [{
227
+ "index": 0,
228
+ "message": {
229
+ "role": "assistant",
230
+ "content": flowith_text # Use the plain text here
231
+ },
232
+ "finish_reason": "stop" # Assume stop
233
+ }],
234
+ # "usage": {...} # Usage stats are typically not available/meaningful here
235
+ }
236
+ return JSONResponse(content=response_payload)
237
+ else:
238
+ # Client wants streaming: Implement keep-alive while fetching, then stream response
239
+ async def stream_generator():
240
+ response_ready_event = asyncio.Event()
241
+ fetch_task = None
242
+ flowith_text_local = None # Use a local variable to avoid confusion with outer scope
243
+ error_occurred = None
244
+ sse_model_name = request.model # Use the model requested by the client
245
+
246
+ # Coroutine to fetch data and signal completion
247
+ async def fetch_and_process(event):
248
+ nonlocal flowith_text_local, error_occurred
249
  try:
250
+ # === Logic to prepare request (model_name, flowith_messages, etc.) ===
251
+ # These variables are captured from the outer scope:
252
+ # FLOWITH_API_URL, headers, flowith_payload
253
+ # The actual request is made here now, not before calling stream_generator
254
+ async with httpx.AsyncClient(timeout=300.0) as client:
255
+ payload_bytes = json.dumps(flowith_payload.dict()).encode('utf-8') # Use outer flowith_payload
256
+ response = await client.post(
257
+ FLOWITH_API_URL, headers=headers, content=payload_bytes # Use outer headers
258
+ )
259
+ response.raise_for_status() # Raise HTTPError for bad responses (4xx or 5xx)
260
+ flowith_text_local = response.text # Store in local variable
 
 
 
 
 
 
 
261
  except Exception as e:
262
+ # print(f"Error fetching from Flowith: {e}") # Optional debug
263
+ error_occurred = e # Store error to yield later
264
+ finally:
265
+ event.set() # Signal completion or failure
 
266
 
267
+ # Start fetching in the background
268
+ fetch_task = asyncio.create_task(fetch_and_process(response_ready_event))
269
 
 
 
 
 
 
270
  try:
271
+ # Send keep-alive chunks while waiting
272
+ while not response_ready_event.is_set():
273
+ keep_alive_data = {"id": "chatcmpl-keepalive", "object": "chat.completion.chunk", "created": int(time.time()), "model": sse_model_name, "choices": [{"delta": {"content": ""}, "index": 0, "finish_reason": None}]}
274
+ yield f"data: {json.dumps(keep_alive_data)}\n\n"
275
+ # Wait for a short period or until the event is set
276
+ try:
277
+ await asyncio.wait_for(response_ready_event.wait(), timeout=3.0)
278
+ except asyncio.TimeoutError:
279
+ pass # Timeout means event not set, continue loop
280
+
281
+ # Event is set, check if fetch task had an error
282
+ if fetch_task.done() and fetch_task.exception():
283
+ # If fetch_task failed before setting event (shouldn't happen with finally, but check anyway)
284
+ error_occurred = fetch_task.exception()
285
+
286
+ if error_occurred:
287
+ # Yield an error chunk (optional, depends on desired behavior)
288
+ # print(f"Yielding error chunk: {error_occurred}") # Optional debug
289
+ error_content = f"Error processing request: {type(error_occurred).__name__}"
290
+ error_chunk = {"id": f"chatcmpl-error-{uuid.uuid4()}", "object": "chat.completion.chunk", "created": int(time.time()), "model": sse_model_name, "choices": [{"delta": {"content": error_content }, "index": 0, "finish_reason": "error"}]} # Use finish_reason 'error' if possible
291
+ yield f"data: {json.dumps(error_chunk)}\n\n"
292
+ elif flowith_text_local is not None:
293
+ # Fetch succeeded, yield content chunks
294
+ chunk_id = f"chatcmpl-{uuid.uuid4()}"
295
+ chunk_size = 20
296
+ full_content = flowith_text_local
297
+
298
+ for i in range(0, len(full_content), chunk_size):
299
+ content_piece = full_content[i:i + chunk_size]
300
+ chunk = {
301
+ "id": chunk_id,
302
+ "object": "chat.completion.chunk",
303
+ "created": int(time.time()),
304
+ "model": sse_model_name,
305
+ "choices": [{"index": 0, "delta": {"content": content_piece}, "finish_reason": None}]
306
+ }
307
+ yield f"data: {json.dumps(chunk)}\n\n"
308
+
309
+ # Yield final chunk
310
+ final_chunk = {
311
+ "id": chunk_id,
312
+ "object": "chat.completion.chunk",
313
+ "created": int(time.time()),
314
+ "model": sse_model_name,
315
+ "choices": [{"index": 0, "delta": {}, "finish_reason": "stop"}]
316
+ }
317
+ yield f"data: {json.dumps(final_chunk)}\n\n"
318
+
319
+ except asyncio.CancelledError:
320
+ # print("Stream generator cancelled (client disconnected)") # Optional debug
321
+ raise # Re-raise cancellation
322
+ finally:
323
+ # Ensure fetch task is cancelled if generator exits early
324
+ if fetch_task and not fetch_task.done():
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
325
  fetch_task.cancel()
326
  # Send DONE message regardless of success/failure/cancellation
327
  yield "data: [DONE]\n\n"