ShunTay12
Supabase keys must only be kept in FastAPI backend
ecd1a81
"""FastAPI application for Sejarah RAG backend."""
import logging
from collections.abc import AsyncGenerator
from contextlib import asynccontextmanager
from typing import Optional
import uvicorn
from fastapi import FastAPI, File, Form, HTTPException, UploadFile, Depends
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
from app.api.deps import verify_admin
from app.core.config import settings
from app.core.database import db
from app.schemas.rag import (
AnswerRequest,
AnswerResponse,
DebugInfo,
ErrorResponse,
HealthResponse,
SeedResponse,
)
from app.services.rag import rag_service
logger = logging.getLogger(__name__)
# Configure structured logging
logging.basicConfig(
level=logging.DEBUG if settings.debug else logging.INFO,
format="%(asctime)s | %(levelname)-8s | %(name)s | %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
# Silence noisy third-party loggers
for _noisy in ("hpack", "httpcore", "httpx", "watchfiles", "multipart"):
logging.getLogger(_noisy).setLevel(logging.WARNING)
@asynccontextmanager
async def lifespan(app: FastAPI) -> AsyncGenerator[None]:
"""Application lifespan events for startup and shutdown."""
# Startup
logger.info("Starting RAG API application")
try:
await db.connect()
logger.info("Application startup completed successfully")
except Exception as e:
logger.error(f"Application startup failed: {e}")
raise
yield
# Shutdown
logger.info("Shutting down RAG API application")
app = FastAPI(
title=settings.project_name,
description=(
"A production-ready FastAPI backend demonstrating "
"Retrieval-Augmented Generation (RAG) with vector similarity search "
"for Malaysian History textbooks (Buku Teks Sejarah) ."
),
version=settings.version,
lifespan=lifespan,
responses={
500: {"model": ErrorResponse, "description": "Internal Server Error"},
},
)
# CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=[
"http://localhost:3000",
"http://127.0.0.1:3000",
"https://sejarah-rag-website.vercel.app",
],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# ─── Endpoints ────────────────────────────────────────────────────────
@app.get("/health", response_model=HealthResponse, tags=["Health"])
async def health_check() -> HealthResponse:
"""Check application health status."""
return HealthResponse(status="healthy", version=settings.version)
@app.post(
"/seed",
response_model=SeedResponse,
tags=["RAG"],
summary="Seed knowledge base",
responses={
400: {"model": ErrorResponse, "description": "Invalid request"},
401: {"model": ErrorResponse, "description": "Unauthorized"},
403: {"model": ErrorResponse, "description": "Forbidden"},
500: {"model": ErrorResponse, "description": "Seeding failed"},
},
)
async def seed_documents(
file: UploadFile,
_=Depends(verify_admin),
) -> SeedResponse:
"""Seed the knowledge base with an uploaded markdown document.
The uploaded `.md` file is chunked, embedded, and stored in the
vector database.
"""
filename = file.filename or "upload.md"
if not filename.endswith(".md"):
raise HTTPException(
status_code=400,
detail="Only .md files are supported.",
)
try:
raw = await file.read()
content = raw.decode("utf-8")
except UnicodeDecodeError as e:
raise HTTPException(
status_code=400,
detail="File is not valid UTF-8 text.",
) from e
try:
logger.info(f"Starting document seeding: {filename}")
inserted_count = await rag_service.seed_documents(filename, content)
logger.info(f"Seeding completed: {inserted_count} chunks inserted")
return SeedResponse(
inserted=inserted_count,
message=f"Successfully seeded {inserted_count} chunks.",
)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e)) from e
except Exception as e:
logger.error(f"Seeding failed: {e}")
raise HTTPException(
status_code=500,
detail=f"Failed to seed documents: {e!s}",
) from e
@app.post(
"/answer",
response_model=AnswerResponse,
tags=["RAG"],
summary="Answer a question using RAG",
responses={
422: {"model": ErrorResponse, "description": "Validation error"},
500: {"model": ErrorResponse, "description": "Processing failed"},
},
)
async def answer_question(
query: Optional[str] = Form(default="", description="The question to answer."),
top_k: int = Form(default=6, description="Number of top-k chunks to retrieve."),
image: Optional[UploadFile] = File(
default=None, description="Optional image upload."
),
) -> AnswerResponse:
"""Answer a question using Retrieval-Augmented Generation.
Pipeline:
1. Embed the query
2. Vector similarity search to find relevant chunks
3. Generate answer using LLM with context
4. Return answer with citations and debug info
"""
try:
logger.info(f"Processing query: '{query[:100] if query else 'No query'}...'")
result = await rag_service.answer_question(
query=query, top_k=top_k, image=image
)
response = AnswerResponse(
text=result["text"],
citations=result["citations"],
debug=DebugInfo(
top_doc_ids=result["debug"]["top_doc_ids"],
latency_ms=result["debug"]["latency_ms"],
),
)
logger.info(f"Query processed in {result['debug']['latency_ms']}ms")
return response
except Exception as e:
logger.error(f"Query processing failed: {e}")
raise HTTPException(
status_code=500, detail=f"Failed to process query: {e!s}"
) from e
# ─── Error Handlers ──────────────────────────────────────────────────
@app.exception_handler(404)
async def not_found_handler(request, exc) -> JSONResponse:
"""Handle 404 errors."""
return JSONResponse(
status_code=404,
content={
"error": "Not Found",
"detail": "The requested endpoint does not exist",
"available_endpoints": [
"/health",
"/seed",
"/answer",
],
},
)
@app.exception_handler(500)
async def internal_error_handler(request, exc) -> JSONResponse:
"""Handle 500 errors."""
logger.error(f"Internal server error: {exc}")
return JSONResponse(
status_code=500,
content={
"error": "Internal Server Error",
"detail": "An unexpected error occurred. Please check the logs.",
},
)
if __name__ == "__main__":
uvicorn.run("main:app", host="0.0.0.0", port=7860, reload=True)