contextIQ / main.py
satheeshbhukya
first commit
bd91918
import threading
import uuid
from datetime import datetime, timezone
from io import BytesIO
from typing import Any, Dict, List, Optional
from fastapi import FastAPI, File, HTTPException, UploadFile
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, Field
from rdflib import Graph
import os
from scripts.document_parser import DocumentParser
from scripts.rag_engine import RAGEngine
from scripts.text_processor import TextProcessor
from scripts.vector_store import VectorStore
def _utc_now_iso() -> str:
return datetime.now(timezone.utc).isoformat()
ALLOWED_FILE_TYPES = {"pdf", "docx", "pptx", "xlsx", "txt"}
class IngestResponse(BaseModel):
doc_id: str
filename: str
file_type: str
num_chunks: int
created_at: str
class QueryRequest(BaseModel):
doc_id: str = Field(..., min_length=1)
question: str = Field(..., min_length=1)
k: int = Field(3, ge=1, le=20)
class QueryResponse(BaseModel):
doc_id: str
question: str
answer: str
sources: List[str]
num_sources: int
class _DocCacheEntry:
def __init__(
self,
*,
filename: str,
file_type: str,
chunks: List[str],
vector_store: VectorStore,
graph: Graph,
created_at: str,
):
self.filename = filename
self.file_type = file_type
self.chunks = chunks
self.vector_store = vector_store
self.graph = graph
self.created_at = created_at
class _AppState:
def __init__(self) -> None:
self.lock = threading.RLock()
self.rag_engine: Optional[RAGEngine] = None
self.doc_cache: Dict[str, _DocCacheEntry] = {}
def get_rag_engine(self) -> RAGEngine:
with self.lock:
if self.rag_engine is None:
model_path = os.getenv("CONTEXTIQ_MODEL_PATH", "/app/model/phi-3-openvino")
device = os.getenv("CONTEXTIQ_DEVICE", "CPU")
max_tokens = int(os.getenv("CONTEXTIQ_MAX_NEW_TOKENS", "512"))
self.rag_engine = RAGEngine(model_path=model_path, device=device, max_tokens=max_tokens)
return self.rag_engine
def get_doc(self, doc_id: str) -> _DocCacheEntry:
with self.lock:
entry = self.doc_cache.get(doc_id)
if entry is None:
raise KeyError(doc_id)
return entry
state = _AppState()
app = FastAPI(
title="ContextIQ API",
version="1.0.0",
description="Upload documents and query them using the ContextIQ RAG engine.",
)
cors_origins_raw = os.getenv("CONTEXTIQ_CORS_ALLOW_ORIGINS", "*").strip()
allow_origins = ["*"] if cors_origins_raw == "*" else [o.strip() for o in cors_origins_raw.split(",") if o.strip()]
app.add_middleware(
CORSMiddleware,
allow_origins=allow_origins,
allow_credentials=False,
allow_methods=["*"],
allow_headers=["*"],
)
@app.get("/health")
def health() -> Dict[str, str]:
return {"status": "ok"}
@app.post("/v1/documents", response_model=IngestResponse)
async def ingest_document(
file: UploadFile = File(...),
chunk_size: int = 500,
chunk_overlap: int = 100,
) -> IngestResponse:
filename = file.filename or "uploaded"
file_type = (filename.rsplit(".", 1)[-1].lower() if "." in filename else "").strip()
if file_type not in ALLOWED_FILE_TYPES:
raise HTTPException(status_code=400, detail=f"Unsupported file type '{file_type}'. Allowed: {sorted(ALLOWED_FILE_TYPES)}")
if chunk_size < 100 or chunk_size > 2000:
raise HTTPException(status_code=400, detail="chunk_size must be between 100 and 2000")
if chunk_overlap < 0 or chunk_overlap >= chunk_size:
raise HTTPException(status_code=400, detail="chunk_overlap must be >= 0 and < chunk_size")
payload = await file.read()
if not payload:
raise HTTPException(status_code=400, detail="Empty file")
try:
text = DocumentParser.parse_document(BytesIO(payload), file_type)
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
processor = TextProcessor(chunk_size=chunk_size, chunk_overlap=chunk_overlap)
chunks, embeddings = processor.process_text(text)
if not chunks:
raise HTTPException(status_code=400, detail="No text chunks produced from document")
# Build FAISS index
vector_store = VectorStore()
vector_store.create_index(embeddings)
# Build knowledge graph — done once at ingest, reused on every query
rag = state.get_rag_engine()
graph = rag.build_knowledge_graph(chunks)
doc_id = uuid.uuid4().hex
created_at = _utc_now_iso()
# Store everything in memory — no disk writes
with state.lock:
if len(state.doc_cache) >= 10:
state.doc_cache.pop(next(iter(state.doc_cache)))
state.doc_cache[doc_id] = _DocCacheEntry(
filename=filename,
file_type=file_type,
chunks=chunks,
vector_store=vector_store,
graph=graph,
created_at=created_at,
)
return IngestResponse(
doc_id=doc_id,
filename=filename,
file_type=file_type,
num_chunks=len(chunks),
created_at=created_at,
)
@app.post("/v1/query", response_model=QueryResponse)
def query(req: QueryRequest) -> QueryResponse:
try:
entry = state.get_doc(req.doc_id)
except KeyError:
raise HTTPException(status_code=404, detail="doc_id not found. Please re-upload your document.")
try:
rag = state.get_rag_engine()
# Pass chunks, vector_store and graph directly — no shared mutation,
# so multiple users can query simultaneously without interfering
result: Dict[str, Any] = rag.query(
question=req.question,
chunks=entry.chunks,
vector_store=entry.vector_store,
graph=entry.graph,
k=req.k,
)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Query failed: {e}")
return QueryResponse(
doc_id=req.doc_id,
question=req.question,
answer=str(result.get("answer", "")),
sources=list(result.get("sources", [])),
num_sources=int(result.get("num_sources", 0)),
)