Chrunos commited on
Commit
b855f82
·
verified ·
1 Parent(s): c2008cc

Update app.py

Browse files
Files changed (1) hide show
  1. app.py +42 -28
app.py CHANGED
@@ -4,6 +4,8 @@ import logging
4
  import uuid
5
  from datetime import datetime, timezone
6
  from typing import Optional, Dict, Any
 
 
7
 
8
  from fastapi import FastAPI, HTTPException, Body, BackgroundTasks, Path
9
  from fastapi.responses import StreamingResponse
@@ -166,50 +168,62 @@ async def direct_chat(payload: ChatPayload):
166
  raise HTTPException(status_code=500, detail="Custom API key not configured.")
167
 
168
  async def custom_api_streamer():
 
169
  client = None
 
170
  try:
171
  logger.info("Sending request to Custom API for /chat.")
172
 
173
- # Use AsyncOpenAI for proper async handling
174
- from openai import AsyncOpenAI
175
- client = AsyncOpenAI(
176
- api_key=custom_api_key_secret,
177
- base_url=custom_api_base_url,
178
- timeout=30.0 # Add timeout to prevent hanging connections
179
- )
 
 
 
 
 
 
180
 
181
- stream = await client.chat.completions.create(
182
- model=custom_api_model,
183
- temperature=payload.temperature,
184
- messages=[{"role": "user", "content": payload.message}],
185
- stream=True
186
- )
187
 
188
- async for chunk in stream:
189
  try:
190
- content_to_yield = None
191
- if hasattr(chunk.choices[0].delta, "reasoning_content") and chunk.choices[0].delta.reasoning_content:
192
- content_to_yield = chunk.choices[0].delta.reasoning_content
193
- elif chunk.choices[0].delta and chunk.choices[0].delta.content:
194
- content_to_yield = chunk.choices[0].delta.content
195
-
196
- if content_to_yield:
197
- yield content_to_yield
198
 
199
- except (IndexError, AttributeError) as e:
200
- logger.warning(f"Skipping malformed chunk: {e}")
201
- continue
202
-
 
 
 
 
 
 
 
 
 
203
  except Exception as e:
204
  logger.error(f"Error during Custom API call for /chat: {e}", exc_info=True)
205
  yield f"Error processing with Custom API: {str(e)}"
206
  finally:
207
- # Ensure proper cleanup of the client
208
  if client:
209
  try:
210
- await client.close()
211
  except Exception as cleanup_error:
212
  logger.warning(f"Error closing OpenAI client: {cleanup_error}")
 
213
 
214
  return StreamingResponse(
215
  custom_api_streamer(),
 
4
  import uuid
5
  from datetime import datetime, timezone
6
  from typing import Optional, Dict, Any
7
+ import asyncio
8
+ from concurrent.futures import ThreadPoolExecutor
9
 
10
  from fastapi import FastAPI, HTTPException, Body, BackgroundTasks, Path
11
  from fastapi.responses import StreamingResponse
 
168
  raise HTTPException(status_code=500, detail="Custom API key not configured.")
169
 
170
  async def custom_api_streamer():
171
+ executor = ThreadPoolExecutor(max_workers=1)
172
  client = None
173
+
174
  try:
175
  logger.info("Sending request to Custom API for /chat.")
176
 
177
+ def create_stream():
178
+ nonlocal client
179
+ client = openai.OpenAI(
180
+ api_key=custom_api_key_secret,
181
+ base_url=custom_api_base_url,
182
+ timeout=30.0
183
+ )
184
+ return client.chat.completions.create(
185
+ model=custom_api_model,
186
+ temperature=payload.temperature,
187
+ messages=[{"role": "user", "content": payload.message}],
188
+ stream=True
189
+ )
190
 
191
+ # Run the blocking call in a thread
192
+ stream = await asyncio.get_event_loop().run_in_executor(executor, create_stream)
 
 
 
 
193
 
194
+ def process_chunk():
195
  try:
196
+ for chunk in stream:
197
+ content_to_yield = None
198
+ if hasattr(chunk.choices[0].delta, "reasoning_content") and chunk.choices[0].delta.reasoning_content:
199
+ content_to_yield = chunk.choices[0].delta.reasoning_content
200
+ elif chunk.choices[0].delta and chunk.choices[0].delta.content:
201
+ content_to_yield = chunk.choices[0].delta.content
 
 
202
 
203
+ if content_to_yield:
204
+ yield content_to_yield
205
+ except Exception as e:
206
+ logger.error(f"Error processing stream chunk: {e}")
207
+ yield f"Error processing stream: {str(e)}"
208
+
209
+ # Process chunks in the thread pool
210
+ loop = asyncio.get_event_loop()
211
+ chunks = await loop.run_in_executor(executor, lambda: list(process_chunk()))
212
+
213
+ for chunk in chunks:
214
+ yield chunk
215
+
216
  except Exception as e:
217
  logger.error(f"Error during Custom API call for /chat: {e}", exc_info=True)
218
  yield f"Error processing with Custom API: {str(e)}"
219
  finally:
220
+ # Cleanup
221
  if client:
222
  try:
223
+ client.close()
224
  except Exception as cleanup_error:
225
  logger.warning(f"Error closing OpenAI client: {cleanup_error}")
226
+ executor.shutdown(wait=False)
227
 
228
  return StreamingResponse(
229
  custom_api_streamer(),