Qurio / backend-python /src /routes /stream_chat.py
veeiiinnnnn's picture
new
592cb1d
"""
Stream chat API routes.
FastAPI endpoints for streaming chat completion.
"""
import asyncio
import json
from collections.abc import AsyncGenerator
from fastapi import APIRouter, Request, Response
from sse_starlette.sse import EventSourceResponse
from ..models.stream_chat import StreamChatRequest
from ..services.stream_chat import get_stream_chat_service
from ..services.tools import list_tools # Imported list_tools
from ..utils.sse import get_sse_config
from ._request_secrets import apply_stream_secret_headers
router = APIRouter(tags=["stream-chat"])
@router.post("/stream-chat")
async def stream_chat(request: Request) -> Response:
"""
Stream chat completion with support for multiple AI providers.
Request body:
{
"provider": "gemini" | "openai" | "openai_compatibility" | "siliconflow" | "glm" | "deepseek" | "volcengine" | "modelscope" | "kimi" | "nvidia" | "minimax",
"baseUrl": "Custom base URL (optional)",
"model": "model-name" (optional),
"messages": [...],
"tools": [...] (optional),
"toolChoice": ... (optional),
"responseFormat": {...} (optional),
"thinking": {...} (optional),
"temperature": 0.7 (optional),
"top_k": 40 (optional),
"top_p": 0.9 (optional),
"frequency_penalty": 0 (optional),
"presence_penalty": 0 (optional),
"contextTurns": 6 (optional),
"toolIds": ["calculator", "local_time"] (optional),
"searchProvider": "tavily" (optional),
"searchBackend": "auto|exa|duckduckgo|google|bing|brave|yandex|yahoo" (optional)
}
Sensitive headers:
- x-llm-api-key
- x-tavily-api-key
- x-serpapi-api-key
- x-exa-api-key
- x-summary-api-key
- x-memory-api-key
Response: Server-Sent Events stream
- data: {"type":"text","content":"..."}
- data: {"type":"thought","content":"..."}
- data: {"type":"tool_call","name":"...","arguments":"..."}
- data: {"type":"tool_result","name":"...","output":"..."}
- data: {"type":"done","content":"...","thought":"...","sources":[...]}
- data: {"type":"error","error":"..."}
"""
# Parse request body
body = apply_stream_secret_headers(request, await request.json())
stream_request = StreamChatRequest(**body)
if not stream_request.user_id:
stream_request.user_id = "default-user"
# Get SSE config
sse_config = get_sse_config()
async def event_generator() -> AsyncGenerator[dict[str, str], None]:
"""Generate SSE events."""
try:
# Get stream chat service
service = get_stream_chat_service()
# Stream chat completion
async for event in service.stream_chat(stream_request):
if await request.is_disconnected():
break
# Send event as SSE data
yield {"data": json.dumps(event, ensure_ascii=False)}
except asyncio.CancelledError:
return
except Exception as e:
# Send error event
error_event = {"type": "error", "error": str(e)}
yield {"data": json.dumps(error_event, ensure_ascii=False)}
# Create EventSourceResponse
return EventSourceResponse(
event_generator(),
media_type="text/event-stream",
ping=sse_config.heartbeat_ms / 1000,
)
@router.get("/tools")
async def get_tools() -> dict[str, list[dict]]:
"""Get list of available tools."""
tools = list_tools()
return {"tools": tools}
@router.get("/health")
async def health_check() -> dict[str, str]:
"""Health check endpoint."""
return {"status": "ok", "message": "Qurio Python backend is running"}