import threading import uuid from datetime import datetime, timezone from io import BytesIO from typing import Any, Dict, List, Optional from fastapi import FastAPI, File, HTTPException, UploadFile from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel, Field from rdflib import Graph import os from scripts.document_parser import DocumentParser from scripts.rag_engine import RAGEngine from scripts.text_processor import TextProcessor from scripts.vector_store import VectorStore def _utc_now_iso() -> str: return datetime.now(timezone.utc).isoformat() ALLOWED_FILE_TYPES = {"pdf", "docx", "pptx", "xlsx", "txt"} class IngestResponse(BaseModel): doc_id: str filename: str file_type: str num_chunks: int created_at: str class QueryRequest(BaseModel): doc_id: str = Field(..., min_length=1) question: str = Field(..., min_length=1) k: int = Field(3, ge=1, le=20) class QueryResponse(BaseModel): doc_id: str question: str answer: str sources: List[str] num_sources: int class _DocCacheEntry: def __init__( self, *, filename: str, file_type: str, chunks: List[str], vector_store: VectorStore, graph: Graph, created_at: str, ): self.filename = filename self.file_type = file_type self.chunks = chunks self.vector_store = vector_store self.graph = graph self.created_at = created_at class _AppState: def __init__(self) -> None: self.lock = threading.RLock() self.rag_engine: Optional[RAGEngine] = None self.doc_cache: Dict[str, _DocCacheEntry] = {} def get_rag_engine(self) -> RAGEngine: with self.lock: if self.rag_engine is None: model_path = os.getenv("CONTEXTIQ_MODEL_PATH", "/app/model/phi-3-openvino") device = os.getenv("CONTEXTIQ_DEVICE", "CPU") max_tokens = int(os.getenv("CONTEXTIQ_MAX_NEW_TOKENS", "512")) self.rag_engine = RAGEngine(model_path=model_path, device=device, max_tokens=max_tokens) return self.rag_engine def get_doc(self, doc_id: str) -> _DocCacheEntry: with self.lock: entry = self.doc_cache.get(doc_id) if entry is None: raise KeyError(doc_id) return entry state = _AppState() app = FastAPI( title="ContextIQ API", version="1.0.0", description="Upload documents and query them using the ContextIQ RAG engine.", ) cors_origins_raw = os.getenv("CONTEXTIQ_CORS_ALLOW_ORIGINS", "*").strip() allow_origins = ["*"] if cors_origins_raw == "*" else [o.strip() for o in cors_origins_raw.split(",") if o.strip()] app.add_middleware( CORSMiddleware, allow_origins=allow_origins, allow_credentials=False, allow_methods=["*"], allow_headers=["*"], ) @app.get("/health") def health() -> Dict[str, str]: return {"status": "ok"} @app.post("/v1/documents", response_model=IngestResponse) async def ingest_document( file: UploadFile = File(...), chunk_size: int = 500, chunk_overlap: int = 100, ) -> IngestResponse: filename = file.filename or "uploaded" file_type = (filename.rsplit(".", 1)[-1].lower() if "." in filename else "").strip() if file_type not in ALLOWED_FILE_TYPES: raise HTTPException(status_code=400, detail=f"Unsupported file type '{file_type}'. Allowed: {sorted(ALLOWED_FILE_TYPES)}") if chunk_size < 100 or chunk_size > 2000: raise HTTPException(status_code=400, detail="chunk_size must be between 100 and 2000") if chunk_overlap < 0 or chunk_overlap >= chunk_size: raise HTTPException(status_code=400, detail="chunk_overlap must be >= 0 and < chunk_size") payload = await file.read() if not payload: raise HTTPException(status_code=400, detail="Empty file") try: text = DocumentParser.parse_document(BytesIO(payload), file_type) except Exception as e: raise HTTPException(status_code=400, detail=str(e)) processor = TextProcessor(chunk_size=chunk_size, chunk_overlap=chunk_overlap) chunks, embeddings = processor.process_text(text) if not chunks: raise HTTPException(status_code=400, detail="No text chunks produced from document") # Build FAISS index vector_store = VectorStore() vector_store.create_index(embeddings) # Build knowledge graph — done once at ingest, reused on every query rag = state.get_rag_engine() graph = rag.build_knowledge_graph(chunks) doc_id = uuid.uuid4().hex created_at = _utc_now_iso() # Store everything in memory — no disk writes with state.lock: if len(state.doc_cache) >= 10: state.doc_cache.pop(next(iter(state.doc_cache))) state.doc_cache[doc_id] = _DocCacheEntry( filename=filename, file_type=file_type, chunks=chunks, vector_store=vector_store, graph=graph, created_at=created_at, ) return IngestResponse( doc_id=doc_id, filename=filename, file_type=file_type, num_chunks=len(chunks), created_at=created_at, ) @app.post("/v1/query", response_model=QueryResponse) def query(req: QueryRequest) -> QueryResponse: try: entry = state.get_doc(req.doc_id) except KeyError: raise HTTPException(status_code=404, detail="doc_id not found. Please re-upload your document.") try: rag = state.get_rag_engine() # Pass chunks, vector_store and graph directly — no shared mutation, # so multiple users can query simultaneously without interfering result: Dict[str, Any] = rag.query( question=req.question, chunks=entry.chunks, vector_store=entry.vector_store, graph=entry.graph, k=req.k, ) except Exception as e: raise HTTPException(status_code=500, detail=f"Query failed: {e}") return QueryResponse( doc_id=req.doc_id, question=req.question, answer=str(result.get("answer", "")), sources=list(result.get("sources", [])), num_sources=int(result.get("num_sources", 0)), )