Multimodel_Rag / app /api /query.py
Dhrumil Parikh
deploy GeminiRAG
cdc55f4
Raw
History Blame Contribute Delete
4.66 kB
"""
RAG query endpoints.
POST /v1/query
Standard JSON response. Runs the full hybrid search pipeline via
engine.query() and returns answer + numbered citations + RAGAS scores
(null until the background Celery task completes). Uses Groq as the
LLM — no Gemini API key required.
POST /v1/query/stream
Server-Sent Events streaming response. Uses the Gemini SDK to stream
answer tokens as they are generated. Requires GEMINI_API_KEY to be set.
Both endpoints share _resolve_chunks_and_context() for retrieval so the
confidence gate and hybrid search behaviour are identical.
"""
import json
import uuid
from typing import Optional
from fastapi import APIRouter, Depends, HTTPException
from fastapi.responses import StreamingResponse
from pydantic import BaseModel, field_validator
from sqlmodel import Session, select
from app.deps import get_current_user, get_db
from app.models.db import Job, JobStatus, User
router = APIRouter()
class QueryRequest(BaseModel):
question: str
job_ids: Optional[list[uuid.UUID]] = None
@field_validator("question")
@classmethod
def question_not_empty(cls, v):
if not v or not v.strip():
raise ValueError("question must not be empty")
return v.strip()
@router.post("/query")
def query_documents(
req: QueryRequest,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db),
):
from app.config import settings
from app.rag import engine
if req.job_ids is not None:
for jid in req.job_ids:
job = db.get(Job, jid)
if not job:
raise HTTPException(status_code=404, detail=f"Job {jid} not found")
if job.status != JobStatus.completed:
raise HTTPException(status_code=400, detail=f"Job {jid} is not COMPLETED")
job_ids_str = [str(j) for j in req.job_ids]
else:
job_ids_str = None
return engine.query(
question=req.question,
job_ids=job_ids_str,
user_id=current_user.id,
db=db,
settings=settings,
)
@router.post("/query/stream")
def query_stream(
req: QueryRequest,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db),
):
"""Streaming RAG query — returns Server-Sent Events with answer chunks."""
from app.config import settings as _settings
if req.job_ids is not None:
for jid in req.job_ids:
job = db.get(Job, jid)
if not job:
raise HTTPException(status_code=404, detail=f"Job {jid} not found")
if job.status != JobStatus.completed:
raise HTTPException(status_code=400, detail=f"Job {jid} is not COMPLETED")
job_ids_str = [str(j) for j in req.job_ids]
else:
# No filter — search all completed documents (shared knowledge base)
job_ids_str = None
def event_stream():
from app.rag.engine import RAG_SYSTEM_PROMPT, _resolve_chunks_and_context
from google import genai
from google.genai import types as genai_types
try:
result = _resolve_chunks_and_context(req.question, job_ids_str, _settings)
if result.get("early_return"):
yield f"data: {json.dumps({'type': 'done', **result['payload']})}\n\n"
return
chunks = result["chunks"]
user_prompt = result["user_prompt"]
avg_score = result["avg_score"]
genai_client = genai.Client(api_key=_settings.GEMINI_API_KEY)
full_text = ""
for chunk in genai_client.models.generate_content_stream(
model=_settings.GEMINI_MODEL,
contents=user_prompt,
config=genai_types.GenerateContentConfig(system_instruction=RAG_SYSTEM_PROMPT),
):
piece = chunk.text or ""
if piece:
full_text += piece
yield f"data: {json.dumps({'type': 'chunk', 'text': piece})}\n\n"
citations = [
{"index": i + 1, "filename": c["filename"], "page_or_segment": c["page_or_segment"], "excerpt": c["text"][:200]}
for i, c in enumerate(chunks)
]
yield f"data: {json.dumps({'type': 'done', 'answer': full_text, 'citations': citations, 'confidence_gate_passed': True, 'avg_similarity_score': avg_score, 'ragas_scores': None})}\n\n"
except Exception as exc:
yield f"data: {json.dumps({'type': 'error', 'message': str(exc)})}\n\n"
return StreamingResponse(event_stream(), media_type="text/event-stream")