Code_LLM / src /api /routes.py
AnatoliiG
Update routes.py
fca7a73
raw
history blame
4.03 kB
import asyncio
import json
from fastapi import APIRouter, Request
from fastapi.responses import JSONResponse, StreamingResponse
from src.core.config import settings
from src.core.engine import engine
from src.utils.helpers import get_clean_text
router = APIRouter()
@router.post("/chat/completions")
async def chat_completions(request: Request):
if not engine.llm:
return JSONResponse({"error": "Model not loaded"}, status_code=500)
data = await request.json()
messages = [
{"role": m.get("role", "user"), "content": get_clean_text(m.get("content"))}
for m in data.get("messages", [])
]
stream = data.get("stream", True)
async def stream_generator():
# Ensure sequential processing: acquire the engine lock for the whole generation
async with engine.lock:
import threading
# Use an asyncio.Queue to safely transfer chunks from a blocking worker thread to the async generator
q: asyncio.Queue = asyncio.Queue()
stop_event = threading.Event()
loop = asyncio.get_running_loop()
def worker():
try:
for chunk in engine.llm.create_chat_completion(
messages=messages,
max_tokens=int(
data.get("max_tokens", settings.DEFAULT_MAX_TOKENS)
),
temperature=float(
data.get("temperature", settings.DEFAULT_TEMP)
),
stream=True,
):
# stop early if requested (e.g. client disconnected)
if stop_event.is_set():
break
loop.call_soon_threadsafe(q.put_nowait, chunk)
except Exception as e:
# Pass exception to the async side so we can surface an error or terminate cleanly
loop.call_soon_threadsafe(q.put_nowait, {"__error": str(e)})
finally:
# Sentinel to mark completion
loop.call_soon_threadsafe(q.put_nowait, None)
# Run the blocking model iteration in a thread so it doesn't block the event loop
worker_future = loop.run_in_executor(None, worker)
try:
while True:
item = await q.get()
if item is None:
# worker finished normally
break
# If worker reported an error, stream it and break
if isinstance(item, dict) and item.get("__error"):
yield f"data: {json.dumps({'error': item['__error']})}\n\n"
break
yield f"data: {json.dumps(item)}\n\n"
yield "data: [DONE]\n\n"
except asyncio.CancelledError:
# Client disconnected: signal the worker to stop and wait for it to finish, then re-raise to terminate streaming
stop_event.set()
try:
await worker_future
except Exception:
pass
raise
finally:
# Ensure worker is signalled to stop and awaited (idempotent)
stop_event.set()
try:
await worker_future
except Exception:
pass
if stream:
return StreamingResponse(stream_generator(), media_type="text/event-stream")
# For non-streaming responses: keep sequential processing but run blocking work in a thread
async with engine.lock:
result = await asyncio.to_thread(
engine.generate,
messages,
data.get("max_tokens", settings.DEFAULT_MAX_TOKENS),
data.get("temperature", settings.DEFAULT_TEMP),
stream=False,
)
return result