Spaces:
Sleeping
Sleeping
| """ | |
| RAG query endpoints. | |
| POST /v1/query | |
| Standard JSON response. Runs the full hybrid search pipeline via | |
| engine.query() and returns answer + numbered citations + RAGAS scores | |
| (null until the background Celery task completes). Uses Groq as the | |
| LLM — no Gemini API key required. | |
| POST /v1/query/stream | |
| Server-Sent Events streaming response. Uses the Gemini SDK to stream | |
| answer tokens as they are generated. Requires GEMINI_API_KEY to be set. | |
| Both endpoints share _resolve_chunks_and_context() for retrieval so the | |
| confidence gate and hybrid search behaviour are identical. | |
| """ | |
| import json | |
| import uuid | |
| from typing import Optional | |
| from fastapi import APIRouter, Depends, HTTPException | |
| from fastapi.responses import StreamingResponse | |
| from pydantic import BaseModel, field_validator | |
| from sqlmodel import Session, select | |
| from app.deps import get_current_user, get_db | |
| from app.models.db import Job, JobStatus, User | |
| router = APIRouter() | |
| class QueryRequest(BaseModel): | |
| question: str | |
| job_ids: Optional[list[uuid.UUID]] = None | |
| def question_not_empty(cls, v): | |
| if not v or not v.strip(): | |
| raise ValueError("question must not be empty") | |
| return v.strip() | |
| def query_documents( | |
| req: QueryRequest, | |
| current_user: User = Depends(get_current_user), | |
| db: Session = Depends(get_db), | |
| ): | |
| from app.config import settings | |
| from app.rag import engine | |
| if req.job_ids is not None: | |
| for jid in req.job_ids: | |
| job = db.get(Job, jid) | |
| if not job: | |
| raise HTTPException(status_code=404, detail=f"Job {jid} not found") | |
| if job.status != JobStatus.completed: | |
| raise HTTPException(status_code=400, detail=f"Job {jid} is not COMPLETED") | |
| job_ids_str = [str(j) for j in req.job_ids] | |
| else: | |
| job_ids_str = None | |
| return engine.query( | |
| question=req.question, | |
| job_ids=job_ids_str, | |
| user_id=current_user.id, | |
| db=db, | |
| settings=settings, | |
| ) | |
| def query_stream( | |
| req: QueryRequest, | |
| current_user: User = Depends(get_current_user), | |
| db: Session = Depends(get_db), | |
| ): | |
| """Streaming RAG query — returns Server-Sent Events with answer chunks.""" | |
| from app.config import settings as _settings | |
| if req.job_ids is not None: | |
| for jid in req.job_ids: | |
| job = db.get(Job, jid) | |
| if not job: | |
| raise HTTPException(status_code=404, detail=f"Job {jid} not found") | |
| if job.status != JobStatus.completed: | |
| raise HTTPException(status_code=400, detail=f"Job {jid} is not COMPLETED") | |
| job_ids_str = [str(j) for j in req.job_ids] | |
| else: | |
| # No filter — search all completed documents (shared knowledge base) | |
| job_ids_str = None | |
| def event_stream(): | |
| from app.rag.engine import RAG_SYSTEM_PROMPT, _resolve_chunks_and_context | |
| from google import genai | |
| from google.genai import types as genai_types | |
| try: | |
| result = _resolve_chunks_and_context(req.question, job_ids_str, _settings) | |
| if result.get("early_return"): | |
| yield f"data: {json.dumps({'type': 'done', **result['payload']})}\n\n" | |
| return | |
| chunks = result["chunks"] | |
| user_prompt = result["user_prompt"] | |
| avg_score = result["avg_score"] | |
| genai_client = genai.Client(api_key=_settings.GEMINI_API_KEY) | |
| full_text = "" | |
| for chunk in genai_client.models.generate_content_stream( | |
| model=_settings.GEMINI_MODEL, | |
| contents=user_prompt, | |
| config=genai_types.GenerateContentConfig(system_instruction=RAG_SYSTEM_PROMPT), | |
| ): | |
| piece = chunk.text or "" | |
| if piece: | |
| full_text += piece | |
| yield f"data: {json.dumps({'type': 'chunk', 'text': piece})}\n\n" | |
| citations = [ | |
| {"index": i + 1, "filename": c["filename"], "page_or_segment": c["page_or_segment"], "excerpt": c["text"][:200]} | |
| for i, c in enumerate(chunks) | |
| ] | |
| yield f"data: {json.dumps({'type': 'done', 'answer': full_text, 'citations': citations, 'confidence_gate_passed': True, 'avg_similarity_score': avg_score, 'ragas_scores': None})}\n\n" | |
| except Exception as exc: | |
| yield f"data: {json.dumps({'type': 'error', 'message': str(exc)})}\n\n" | |
| return StreamingResponse(event_stream(), media_type="text/event-stream") | |