| """ |
| api/app.py |
| |
| Production-ready FastAPI REST API for the SmartRAG pipeline. |
| |
| Features: |
| - /query endpoint with structured JSON response |
| - /health endpoint for monitoring |
| - /ingest endpoint to add new documents |
| - Request validation with Pydantic |
| - Async support + error handling |
| - CORS middleware for frontend integration |
| |
| Run: uvicorn api.app:app --reload --port 8000 |
| Docs: http://localhost:8000/docs |
| """ |
|
|
| import logging |
| import time |
| from pathlib import Path |
| from typing import List, Optional |
|
|
| from fastapi import FastAPI, HTTPException, BackgroundTasks |
| from fastapi.middleware.cors import CORSMiddleware |
| from pydantic import BaseModel, Field |
|
|
| import sys |
| sys.path.append(str(Path(__file__).parent.parent)) |
| from rag.pipeline import get_pipeline, RAGResponse |
| from rag.vectorstore import build_vectorstore |
| from langchain_core.documents import Document |
|
|
| logging.basicConfig(level=logging.INFO) |
| log = logging.getLogger(__name__) |
|
|
|
|
| |
| app = FastAPI( |
| title="SmartRAG API", |
| description="Production LLM RAG system with fine-tuned Mistral-7B", |
| version="1.0.0", |
| docs_url="/docs", |
| ) |
|
|
| app.add_middleware( |
| CORSMiddleware, |
| allow_origins=["*"], |
| allow_methods=["*"], |
| allow_headers=["*"], |
| ) |
|
|
|
|
| |
| class QueryRequest(BaseModel): |
| question: str = Field(..., min_length=3, max_length=2000, description="The question to answer") |
| top_k: Optional[int] = Field(None, ge=1, le=20, description="Number of chunks to retrieve") |
|
|
|
|
| class QueryResponse(BaseModel): |
| question: str |
| answer: str |
| sources: List[str] |
| num_chunks_retrieved: int |
| latency_ms: float |
|
|
|
|
| class IngestRequest(BaseModel): |
| texts: List[str] = Field(..., description="List of document texts to add to the knowledge base") |
| metadata: Optional[List[dict]] = Field(None, description="Metadata for each document") |
|
|
|
|
| class HealthResponse(BaseModel): |
| status: str |
| model_loaded: bool |
| vectorstore_loaded: bool |
| version: str |
|
|
|
|
| |
| pipeline = None |
| startup_error = None |
|
|
|
|
| |
| @app.on_event("startup") |
| async def startup(): |
| global pipeline, startup_error |
| try: |
| log.info("Loading SmartRAG pipeline...") |
| pipeline = get_pipeline() |
| log.info("β
Pipeline ready.") |
| except Exception as e: |
| startup_error = str(e) |
| log.error(f"Failed to load pipeline: {e}") |
|
|
|
|
| |
| @app.get("/health", response_model=HealthResponse) |
| async def health(): |
| """Health check endpoint for monitoring/load balancers.""" |
| return HealthResponse( |
| status="healthy" if pipeline else "degraded", |
| model_loaded=pipeline is not None, |
| vectorstore_loaded=pipeline is not None and pipeline.vectorstore is not None, |
| version="1.0.0", |
| ) |
|
|
|
|
| @app.post("/query", response_model=QueryResponse) |
| async def query(request: QueryRequest): |
| """ |
| Answer a question using the RAG pipeline. |
| |
| Returns the generated answer along with source documents |
| and performance metrics. |
| """ |
| if pipeline is None: |
| raise HTTPException( |
| status_code=503, |
| detail=f"Pipeline not loaded. Error: {startup_error}" |
| ) |
|
|
| start = time.perf_counter() |
|
|
| try: |
| response: RAGResponse = pipeline.query( |
| question=request.question, |
| top_k=request.top_k, |
| ) |
| except Exception as e: |
| log.error(f"Query failed: {e}") |
| raise HTTPException(status_code=500, detail=str(e)) |
|
|
| latency_ms = (time.perf_counter() - start) * 1000 |
|
|
| return QueryResponse( |
| question=response.question, |
| answer=response.answer, |
| sources=response.sources, |
| num_chunks_retrieved=response.num_chunks_retrieved, |
| latency_ms=round(latency_ms, 2), |
| ) |
|
|
|
|
| @app.post("/ingest") |
| async def ingest(request: IngestRequest, background_tasks: BackgroundTasks): |
| """ |
| Add new documents to the vector store knowledge base. |
| Ingestion runs in the background to avoid blocking. |
| """ |
| docs = [] |
| for i, text in enumerate(request.texts): |
| meta = request.metadata[i] if request.metadata else {} |
| docs.append(Document(page_content=text, metadata=meta)) |
|
|
| def _ingest(): |
| try: |
| build_vectorstore(docs=docs) |
| |
| from rag.vectorstore import load_vectorstore |
| pipeline.vectorstore = load_vectorstore() |
| log.info(f"β
Ingested {len(docs)} documents.") |
| except Exception as e: |
| log.error(f"Ingestion failed: {e}") |
|
|
| background_tasks.add_task(_ingest) |
|
|
| return { |
| "status": "accepted", |
| "message": f"Ingesting {len(docs)} documents in the background.", |
| } |
|
|
|
|
| @app.get("/") |
| async def root(): |
| return { |
| "name": "SmartRAG API", |
| "docs": "/docs", |
| "health": "/health", |
| } |
|
|