bibibi12345 commited on
Commit
504d810
·
1 Parent(s): 56d3b84

fixed streaming

Browse files
Files changed (1) hide show
  1. main.py +124 -80
main.py CHANGED
@@ -134,25 +134,24 @@ async def chat_completions(
134
  is_claude_or_gemini = "claude" in flowith_model_name.lower() or "gemini" in flowith_model_name.lower()
135
 
136
  for msg in request.messages:
137
- role = msg.role
138
- if is_claude_or_gemini and role == "system":
139
- # Convert system message to user message for Claude/Gemini via Flowith
140
- role = "user"
141
- elif role == "system":
142
- # If it's a system message but not for Claude/Gemini, Flowith might not support it directly.
143
- # Option 1: Skip it (might lose context)
144
- # continue
145
- # Option 2: Convert to user (might change semantics)
146
- role = "user"
147
- # Option 3: Prepend to the next user message (complex)
148
- # For now, converting to 'user' is a simple approach.
149
- print(f"Warning: Converting system message to 'user' for model {flowith_model_name}")
150
-
151
- # Ensure only 'user' or 'assistant' roles are sent to Flowith
152
- if role in ["user", "assistant"]:
153
- processed_messages.append(FlowithMessage(role=role, content=msg.content))
154
- # else: # Handle unexpected roles if necessary
155
-
156
 
157
  # 4. Construct Flowith Request Payload
158
  flowith_payload = FlowithRequest(
@@ -168,11 +167,11 @@ async def chat_completions(
168
  'accept': '*/*',
169
  'accept-language': 'en-US,en;q=0.9,zh-CN;q=0.8,zh;q=0.7,zh-TW;q=0.6,ja;q=0.5',
170
  'authorization': FLOWITH_AUTH_TOKEN, # Send only the token, no "Bearer " prefix
171
- 'content-type': 'application/json',
 
172
  'origin': 'https://flowith.net',
173
  'priority': 'u=1, i',
174
  'referer': 'https://flowith.net/',
175
- 'responsetype': 'stream', # Added - Was present in -H flags
176
  'sec-ch-ua': '"Google Chrome";v="135", "Not-A.Brand";v="8", "Chromium";v="135"',
177
  'sec-ch-ua-mobile': '?0',
178
  'sec-ch-ua-platform': '"Windows"',
@@ -183,77 +182,122 @@ async def chat_completions(
183
  }
184
 
185
  # 6. Make Asynchronous Request to Flowith
186
- async with httpx.AsyncClient(timeout=300.0) as client: # Increased timeout for potentially long requests
 
 
 
 
 
187
  try:
188
- # Always use stream for the request to Flowith
189
- # Serialize payload manually for content=
190
  payload_bytes = json.dumps(flowith_payload.dict()).encode('utf-8')
191
- response_stream = client.stream( # Changed from post to stream
192
- "POST", # Explicitly set method for stream
 
193
  FLOWITH_API_URL,
194
- content=payload_bytes, # Send serialized bytes
195
  headers=headers,
196
  )
197
- # Need to acquire the stream context
198
- async with response_stream as response:
199
- # Check status code *before* attempting to read the stream body
200
- if response.status_code != 200:
201
- # Attempt to read error details from the response body if possible
202
- try:
203
- error_detail = await response.aread()
204
- detail_msg = f"Flowith API Error ({response.status_code}): {error_detail.decode()}"
205
- except Exception:
206
- detail_msg = f"Flowith API Error ({response.status_code})"
207
- # No need to manually close here, 'async with' handles it
208
- raise HTTPException(status_code=response.status_code, detail=detail_msg)
209
-
210
- # 7. Handle Flowith Response based on *client's* request.stream preference
211
- if request.stream:
212
- # Client wants streaming: Use StreamingResponse with the helper
213
- return StreamingResponse(
214
- stream_flowith_response(response), # Pass the response object itself
215
- media_type="text/event-stream"
216
- )
217
- else:
218
- # Client wants non-streaming: Accumulate the response
219
- full_response_bytes = bytearray()
220
- try:
221
- async for chunk in response.aiter_bytes():
222
- full_response_bytes.extend(chunk)
223
- except Exception as e:
224
- # Handle potential errors during stream reading
225
- print(f"Error reading stream from Flowith: {e}")
226
- raise HTTPException(status_code=502, detail=f"Error reading stream from Flowith: {e}")
227
- finally:
228
- # 'async with' ensures the stream is closed
229
- pass
230
-
231
- # Decode the accumulated bytes
232
- full_response_text = full_response_bytes.decode('utf-8')
233
-
234
- # Try to parse as JSON
235
  try:
236
- response_data = json.loads(full_response_text)
237
- from fastapi.responses import JSONResponse # Import locally if not already global
238
- return JSONResponse(content=response_data)
239
- except json.JSONDecodeError:
240
- # If not valid JSON, return as plain text
241
- print(f"Warning: Flowith response was not valid JSON. Returning as plain text. Content: {full_response_text[:200]}...") # Log snippet
242
- from fastapi.responses import PlainTextResponse # Import locally
243
- return PlainTextResponse(content=full_response_text)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
244
 
245
  except httpx.RequestError as exc:
246
  print(f"Error requesting Flowith: {exc}")
247
  raise HTTPException(status_code=503, detail=f"Error connecting to Flowith service: {exc}")
248
  except HTTPException as http_exc:
249
- # Re-raise HTTPExceptions raised during stream processing
250
- raise http_exc
251
  except Exception as exc:
252
- print(f"Unexpected error during Flowith request/processing: {exc}")
253
- # Log the traceback for debugging
254
- import traceback
255
- traceback.print_exc()
256
- raise HTTPException(status_code=500, detail=f"Internal server error: {exc}")
257
 
258
 
259
  # --- Models Endpoint ---
 
134
  is_claude_or_gemini = "claude" in flowith_model_name.lower() or "gemini" in flowith_model_name.lower()
135
 
136
  for msg in request.messages:
137
+ original_role = msg.role.lower() # Get original role
138
+ new_role = original_role # Start with original role
139
+
140
+ # Check for Claude/Gemini system message conversion based on *requested* model
141
+ is_claude_or_gemini_requested = "claude" in request.model.lower() or "gemini" in request.model.lower()
142
+ if is_claude_or_gemini_requested and original_role == "system":
143
+ new_role = "user"
144
+ # Check for non-standard roles (applies AFTER potential system->user conversion for C/G)
145
+ elif original_role not in {"user", "assistant", "system"}:
146
+ new_role = "user"
147
+ print(f"Warning: Converting non-standard role '{original_role}' to 'user' for model {request.model}")
148
+
149
+ # Append message with the determined new_role, but only if it's valid for Flowith
150
+ # Flowith only accepts 'user' and 'assistant'
151
+ if new_role in ["user", "assistant"]:
152
+ processed_messages.append(FlowithMessage(role=new_role, content=msg.content))
153
+ # else: # Log or skip roles that are still 'system' after processing
154
+ # print(f"Skipping message with final role '{new_role}' as it's not 'user' or 'assistant'. Original role: '{original_role}'")
 
155
 
156
  # 4. Construct Flowith Request Payload
157
  flowith_payload = FlowithRequest(
 
167
  'accept': '*/*',
168
  'accept-language': 'en-US,en;q=0.9,zh-CN;q=0.8,zh;q=0.7,zh-TW;q=0.6,ja;q=0.5',
169
  'authorization': FLOWITH_AUTH_TOKEN, # Send only the token, no "Bearer " prefix
170
+ 'content-type': 'application/json',
171
+ 'responsetype': 'stream', # Restore this header
172
  'origin': 'https://flowith.net',
173
  'priority': 'u=1, i',
174
  'referer': 'https://flowith.net/',
 
175
  'sec-ch-ua': '"Google Chrome";v="135", "Not-A.Brand";v="8", "Chromium";v="135"',
176
  'sec-ch-ua-mobile': '?0',
177
  'sec-ch-ua-platform': '"Windows"',
 
182
  }
183
 
184
  # 6. Make Asynchronous Request to Flowith
185
+ # Need time for simulated streaming chunks
186
+ import time
187
+ # Need JSONResponse
188
+ from fastapi.responses import JSONResponse
189
+
190
+ async with httpx.AsyncClient(timeout=300.0) as client:
191
  try:
192
+ # Serialize payload manually
 
193
  payload_bytes = json.dumps(flowith_payload.dict()).encode('utf-8')
194
+
195
+ # Make a non-streaming POST request to Flowith
196
+ response = await client.post(
197
  FLOWITH_API_URL,
198
+ content=payload_bytes,
199
  headers=headers,
200
  )
201
+
202
+ # Check status code after receiving the full response
203
+ if response.status_code != 200:
204
+ try:
205
+ error_detail = response.text # Use .text for non-streaming
206
+ detail_msg = f"Flowith API Error ({response.status_code}): {error_detail}"
207
+ except Exception:
208
+ detail_msg = f"Flowith API Error ({response.status_code})"
209
+ raise HTTPException(status_code=response.status_code, detail=detail_msg)
210
+
211
+ # Attempt to parse the full response body as JSON
212
+ try:
213
+ flowith_data = response.json()
214
+ except json.JSONDecodeError as e:
215
+ print(f"Error decoding Flowith JSON response: {e}. Response text: {response.text[:200]}...")
216
+ raise HTTPException(status_code=502, detail=f"Invalid JSON response from Flowith: {e}")
217
+
218
+ # 7. Handle response based on *client's* request.stream preference
219
+ if not request.stream:
220
+ # Client wants non-streaming: Return the parsed Flowith data directly
221
+ return JSONResponse(content=flowith_data)
222
+ else:
223
+ # Client wants streaming: Simulate streaming from the complete response
224
+ # Client wants streaming: Simulate streaming word-by-word from the complete response
225
+ async def stream_generator() -> AsyncGenerator[str, None]:
226
+ # Ensure necessary imports are available (time, json, uuid are already imported)
227
+ # import time # Already imported around line 186
228
+ # import json # Already imported at top
229
+ # import uuid # Already imported at top
230
+ # import asyncio # Needed only if adding delay
231
+
232
+ chunk_id = f"chatcmpl-{uuid.uuid4()}"
233
+ model_name = request.model # Use the model requested by the client
234
+
235
+ # Extract full content safely
236
+ full_content = ""
 
 
237
  try:
238
+ # Try the expected structure first
239
+ full_content = flowith_data.get("choices", [{}])[0].get("message", {}).get("content", "")
240
+ if not full_content:
241
+ # Fallback: Check for other common fields
242
+ full_content = flowith_data.get("text", flowith_data.get("completion", ""))
243
+
244
+ if not full_content:
245
+ print(f"Warning: Could not extract content for streaming from Flowith response: {flowith_data}")
246
+ full_content = "" # Default to empty if extraction fails
247
+
248
+ except (AttributeError, IndexError, TypeError) as e:
249
+ print(f"Error extracting content for streaming: {e}. Data: {flowith_data}")
250
+ full_content = "" # Default to empty on error
251
+
252
+ # Define chunk size
253
+ chunk_size = 20
254
+
255
+ # Stream fixed-size chunks
256
+ for i in range(0, len(full_content), chunk_size):
257
+ content_piece = full_content[i:i + chunk_size]
258
+ chunk = {
259
+ "id": chunk_id,
260
+ "object": "chat.completion.chunk",
261
+ "created": int(time.time()), # New timestamp for each chunk
262
+ "model": model_name,
263
+ "choices": [{
264
+ "index": 0,
265
+ "delta": {"content": content_piece}, # Use the 20-char piece
266
+ "finish_reason": None
267
+ }]
268
+ }
269
+ yield f"data: {json.dumps(chunk)}\n\n"
270
+ # Optional delay for simulation
271
+ # await asyncio.sleep(0.01)
272
+
273
+ # Send the final chunk with finish_reason
274
+ final_chunk = {
275
+ "id": chunk_id,
276
+ "object": "chat.completion.chunk",
277
+ "created": int(time.time()), # Use a final timestamp
278
+ "model": model_name,
279
+ "choices": [{
280
+ "index": 0,
281
+ "delta": {}, # Empty delta for final chunk
282
+ "finish_reason": "stop" # Assume 'stop'
283
+ }]
284
+ }
285
+ yield f"data: {json.dumps(final_chunk)}\n\n"
286
+ yield "data: [DONE]\n\n"
287
+
288
+ return StreamingResponse(stream_generator(), media_type="text/event-stream")
289
 
290
  except httpx.RequestError as exc:
291
  print(f"Error requesting Flowith: {exc}")
292
  raise HTTPException(status_code=503, detail=f"Error connecting to Flowith service: {exc}")
293
  except HTTPException as http_exc:
294
+ # Re-raise HTTPExceptions (e.g., from status code check or JSON parsing)
295
+ raise http_exc
296
  except Exception as exc:
297
+ print(f"Unexpected error during Flowith request/processing: {exc}")
298
+ import traceback
299
+ traceback.print_exc()
300
+ raise HTTPException(status_code=500, detail=f"Internal server error: {exc}")
 
301
 
302
 
303
  # --- Models Endpoint ---