QModel / app /routers /chat.py
aelgendy's picture
Upload folder using huggingface_hub
0eb0a7a
"""Chat / inference endpoints β€” OpenAI-compatible + convenience /ask."""
from __future__ import annotations
import json
import logging
import time
from typing import Literal, Optional
from fastapi import APIRouter, HTTPException, Query
from fastapi.responses import StreamingResponse
from app.config import cfg
from app.models import (
AskResponse,
ChatCompletionChoice,
ChatCompletionMessage,
ChatCompletionRequest,
ChatCompletionResponse,
)
from app.state import check_ready, run_rag_pipeline, state
logger = logging.getLogger("qmodel.chat")
router = APIRouter(tags=["inference"])
# ───────────────────────────────────────────────────────
# GET /ask β€” convenience RAG query endpoint
# ───────────────────────────────────────────────────────
@router.get("/ask", response_model=AskResponse)
async def ask(
q: str = Query(..., min_length=1, max_length=500, description="Your Islamic question"),
top_k: int = Query(5, ge=1, le=20, description="Number of sources to retrieve"),
source_type: Optional[Literal["quran", "hadith"]] = Query(None, description="Filter: quran | hadith"),
grade_filter: Optional[str] = Query(None, description="Hadith grade filter: sahih | hasan"),
):
"""Direct RAG query with full source attribution.
Returns an AI-generated answer grounded in Quran and Hadith sources,
with language detection, intent classification, and scored references.
"""
check_ready()
result = await run_rag_pipeline(q, top_k=top_k, source_type=source_type, grade_filter=grade_filter)
return AskResponse(
question=q,
answer=result["answer"],
language=result["language"],
intent=result["intent"],
analysis=result.get("analysis"),
sources=[
{
"source": s.get("source") or s.get("reference", ""),
"type": s.get("type", ""),
"grade": s.get("grade"),
"arabic": s.get("arabic", ""),
"english": s.get("english", ""),
"_score": round(s.get("_score", 0), 4),
}
for s in result.get("sources", [])
],
top_score=round(result["top_score"], 4),
latency_ms=result["latency_ms"],
)
# ───────────────────────────────────────────────────────
# POST /v1/chat/completions β€” OpenAI-compatible
# ───────────────────────────────────────────────────────
@router.post("/v1/chat/completions", response_model=ChatCompletionResponse)
async def chat_completions(request: ChatCompletionRequest):
"""OpenAI-compatible chat completions endpoint (for Open-WebUI integration)."""
check_ready()
user_messages = [m.content for m in request.messages if m.role == "user"]
if not user_messages:
raise HTTPException(status_code=400, detail="No user message in request")
question = user_messages[-1]
top_k = request.top_k or cfg.TOP_K_RETURN
try:
result = await run_rag_pipeline(question, top_k=top_k)
except HTTPException:
raise
except Exception as exc:
logger.error("Pipeline error: %s", exc)
raise HTTPException(status_code=500, detail="Internal pipeline error")
if request.stream:
return StreamingResponse(
_stream_response(result, request.model),
media_type="text/event-stream",
)
return ChatCompletionResponse(
id=f"qmodel-{int(time.time() * 1000)}",
created=int(time.time()),
model=request.model,
choices=[
ChatCompletionChoice(
index=0,
message=ChatCompletionMessage(
role="assistant",
content=result["answer"],
),
)
],
usage={
"prompt_tokens": -1,
"completion_tokens": -1,
"total_tokens": -1,
},
x_metadata={
"language": result["language"],
"intent": result["intent"],
"top_score": round(result["top_score"], 4),
"latency_ms": result["latency_ms"],
"sources_count": len(result["sources"]),
"sources": [
{
"source": s.get("source") or s.get("reference", ""),
"type": s.get("type", ""),
"grade": s.get("grade"),
"score": round(s.get("_score", 0), 4),
}
for s in result.get("sources", [])[:5]
],
"analysis": result.get("analysis"),
},
)
async def _stream_response(result: dict, model: str):
"""Stream response chunks in OpenAI SSE format."""
answer = result.get("answer", "")
for line in answer.split("\n"):
chunk = {
"id": f"qmodel-{int(time.time() * 1000)}",
"object": "chat.completion.chunk",
"created": int(time.time()),
"model": model,
"choices": [{
"index": 0,
"delta": {"content": line + "\n"},
"finish_reason": None,
}],
}
yield f"data: {json.dumps(chunk)}\n\n"
final = {
"id": f"qmodel-{int(time.time() * 1000)}",
"object": "chat.completion.chunk",
"created": int(time.time()),
"model": model,
"choices": [{
"index": 0,
"delta": {},
"finish_reason": "stop",
}],
}
yield f"data: {json.dumps(final)}\n\n"
yield "data: [DONE]\n\n"