from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import StreamingResponse from pydantic import BaseModel from huggingface_hub import InferenceClient import os, logging, time, threading from loader import Loader from chunker import Chunker from embedder import Embedder from vector import VectorStorage from retriever import Retriever app = FastAPI() logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"], ) MODELS = [ "Qwen/Qwen2.5-72B-Instruct", "meta-llama/Llama-3.2-3B-Instruct", "deepseek-ai/DeepSeek-R1-Distill-Qwen-1.5B", "mistralai/Mistral-7B-Instruct-v0.3", "HuggingFaceH4/zephyr-7b-beta", ] SESSION_TIMEOUT = 3 * 60 * 60 sessions: dict = {} def auto_cleanup(): while True: time.sleep(SESSION_TIMEOUT) current_time = time.time() expired = [ sid for sid, data in sessions.items() if current_time - data.get('created_at', current_time) > SESSION_TIMEOUT ] for sid in expired: del sessions[sid] if expired: logger.info(f"Auto-cleaned {len(expired)} expired sessions to free RAM.") threading.Thread(target=auto_cleanup, daemon=True).start() # --- Global RAG Components --- text = Loader("portfolio.pdf").load() chunks = Chunker().chunker(text) embedder = Embedder() vectors = embedder.embed(chunks) store = VectorStorage(dimension=len(vectors[0])) store.add(vectors, chunks) class ChatRequest(BaseModel): session_id: str message: str @app.post("/") def chat(req: ChatRequest): if req.session_id not in sessions: sessions[req.session_id] = {"history": [], "created_at": time.time()} session = sessions[req.session_id] retriever = Retriever(store, embedder, k=3) context_chunks = retriever.retrieve(req.message) if not context_chunks: return {"response": "I only answer questions about Aarav and his work."} context_text = "\n\n".join(context_chunks) system_prompt = ( "You are Aarav's AI assistant.\n" "Your name is Zooba\n" "Your job is to answer questions about Aarav Kumar Ranjan, his projects, skills, and interests using the provided context.\n" "Rules:\n" "- Only answer using the given context. Do not make up information.\n" "- If the answer is not in the context, say: I only answer questions about Aarav and his work.\n" "- Keep answers clear, simple, and confident.\n" "- Do not use complex jargon unless necessary.\n" "- Prefer explaining things in a way a beginner can understand.\n" "Style:\n" "- Speak in a calm, intelligent, and slightly friendly tone.\n" "- Be concise but informative.\n" "- When explaining projects, include:\n" " • what it does\n" " • how it works (simple explanation)\n" " • why it is useful\n" "Do not generate fake achievements, skills, or experiences.\n" "Do not pretend to be Aarav himself.\n" "If asked about projects, mention their names clearly.\n" "Make Aarav appear as a thoughtful, skilled, and curious machine learning enthusiast who focuses on understanding and building real systems.\n" ) messages = [{"role": "system", "content": system_prompt}] recent_history = session["history"][-10:] messages.extend(recent_history) messages.append({"role": "user", "content": f"Context:\n{context_text}\n\nQuestion: {req.message}"}) full_response = "" def token_stream(): for model in MODELS: try: client = InferenceClient(model, token=os.environ["HF_TOKEN"]) logger.info(f"Streaming with: {model}") success = False for token in client.chat_completion(messages, max_tokens=512, stream=True): text = token.choices[0].delta.content if text: success = True yield f"data: {text}\n\n" yield "data: [DONE]\n\n" return except Exception as e: if success: yield "data: [DONE]\n\n" return logger.warning(f"Streaming failed for {model}: {e}") continue yield "data: Sorry, all models are currently unavailable. Try again later.\n\n" yield "data: [DONE]\n\n" return StreamingResponse(token_stream(), media_type="text/event-stream") if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=7600)