# app.py import logging import uuid import io from fastapi import FastAPI, UploadFile, File, HTTPException from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel # Import from our core modules from core.chunking import semantic_chunker from core.vector_store import create_faiss_index, deserialize_faiss_index # Parsing and AI libraries import fitz from PIL import Image import pytesseract from sentence_transformers import SentenceTransformer from ctransformers import AutoModel # NEW: For running quantized GGUF models # --- 1. INITIAL SETUP & MODEL LOADING --- logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) app = FastAPI(title="Generative Universal Data AI", version="3.0.0") app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"] ) # --- Load Models --- try: logger.info("Loading AI models...") # Model for creating vector embeddings (remains the same) embedding_model = SentenceTransformer('BAAI/bge-large-en-v1.5') # NEW: Loading the quantized Phi-2 model using ctransformers # This downloads a GGUF model file, optimized for CPU inference. # Q4_K_M is a good balance of quality and performance. llm = AutoModel.from_pretrained( "TheBloke/phi-2-GGUF", model_file="phi-2.Q4_K_M.gguf" ) logger.info("AI models loaded successfully.") except Exception as e: logger.critical(f"Fatal error: Could not load AI models. {e}") embedding_model = None llm = None SESSION_DATA = {} # --- 2. DATA MODELS --- class QueryRequest(BaseModel): question: str class UploadResponse(BaseModel): session_id: str; filename: str; chunks_created: int # Modified response to reflect generative model output class QueryResponse(BaseModel): answer: str; context: str # --- 3. HELPER FUNCTIONS --- (No changes here) def parse_pdf(content: bytes) -> str: doc = fitz.open(stream=content, filetype="pdf"); return "".join(page.get_text() for page in doc) def parse_image(content: bytes) -> str: image = Image.open(io.BytesIO(content)); return pytesseract.image_to_string(image) # --- 4. API ENDPOINTS --- @app.get("/") def read_root(): return {"status": "ok", "message": "Welcome to the Generative Universal Data AI"} @app.post("/upload", response_model=UploadResponse) async def upload_file(file: UploadFile = File(...)): # This endpoint remains largely the same, using the BGE model and semantic chunking if not embedding_model: raise HTTPException(status_code=503, detail="Embedding model not available.") # ... (the rest of the upload logic is identical to the previous version) session_id = str(uuid.uuid4()) content = await file.read() content_type = file.content_type if content_type == "application/pdf": text = parse_pdf(content) elif content_type and content_type.startswith("image/"): text = parse_image(content) else: text = content.decode("utf-8") if not text.strip(): raise HTTPException(status_code=400, detail="No text could be extracted.") text_chunks = semantic_chunker(text, embedding_model) if not text_chunks: raise HTTPException(status_code=400, detail="Document too short to be processed.") embeddings = embedding_model.encode(text_chunks, convert_to_numpy=True) serialized_index = create_faiss_index(embeddings) if not serialized_index: raise HTTPException(status_code=500, detail="Failed to create document index.") SESSION_DATA[session_id] = {"chunks": text_chunks, "index": serialized_index} logger.info(f"Session {session_id} created with {len(text_chunks)} chunks.") return {"session_id": session_id, "filename": file.filename, "chunks_created": len(text_chunks)} @app.post("/query/{session_id}", response_model=QueryResponse) async def query_session(session_id: str, request: QueryRequest): # --- THIS ENDPOINT IS COMPLETELY REWORKED FOR PHI-2 --- if not llm or not embedding_model: raise HTTPException(status_code=503, detail="AI models are not available.") session = SESSION_DATA.get(session_id) if not session: raise HTTPException(status_code=404, detail="Session not found.") # Step 1: Retrieve relevant context (same as before) query_with_prefix = f"Represent this sentence for searching relevant passages: {request.question}" question_embedding = embedding_model.encode([query_with_prefix], convert_to_numpy=True).astype('float32') index = deserialize_faiss_index(session["index"]) if not index: raise HTTPException(status_code=500, detail="Could not load session index.") k = min(5, index.ntotal) distances, indices = index.search(question_embedding, k) context = "\n".join([session["chunks"][i] for i in indices[0]]) # Step 2: Create a specific prompt for the generative model # This template instructs the model on how to behave. prompt = f""" Instruct: Use the following context to answer the question accurately. If the answer is not present in the context, say "The answer is not available in the provided document." Context: {context} Question: {request.question} Answer:""" logger.info("Generating answer with Phi-2...") # Step 3: Generate the answer answer = llm( prompt, max_new_tokens=256, # Max length of the answer temperature=0.2, # Lower temperature for more factual answers stop=["\n", "Instruct:", "Question:"] # Stop generation at these tokens ) # Generative models don't give a confidence 'score' like extractive ones. # We simply return the generated text. return {"answer": answer.strip(), "context": context}