from fastapi import FastAPI, HTTPException from pydantic import BaseModel, HttpUrl from typing import Optional import uuid import logging from src.database import db from src.queue_manager import queue_manager from src.vector_store import vector_store from src.embeddings import embedding_service from groq import Groq from src.config import config logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) app = FastAPI(title="RAG System API", version="1.0.0") class IngestURLRequest(BaseModel): url: HttpUrl class QueryRequest(BaseModel): question: str top_k: Optional[int] = None class IngestURLResponse(BaseModel): url_id: str url: str status: str message: str class StatusResponse(BaseModel): url_id: str url: str status: str created_at: str updated_at: str completed_at: Optional[str] = None chunk_count: int error_message: Optional[str] = None class QueryResponse(BaseModel): question: str answer: str sources: list @app.on_event("startup") async def startup_event(): logger.info("Starting RAG System API...") logger.info(f"Database path: {config.DATABASE_PATH}") logger.info(f"Redis: {config.REDIS_HOST}:{config.REDIS_PORT}") logger.info(f"Qdrant: {config.QDRANT_HOST}:{config.QDRANT_PORT}") @app.get("/") async def root(): return { "message": "RAG System API", "version": "1.0.0", "endpoints": { "ingest": "POST /ingest-url", "query": "POST /query", "status": "GET /status/{url_id}" } } @app.post("/ingest-url", response_model=IngestURLResponse, status_code=202) async def ingest_url(request: IngestURLRequest): try: url = str(request.url) url_id = str(uuid.uuid4()) db.create_url_record(url_id, url) job_data = { "url_id": url_id, "url": url } queue_manager.enqueue(job_data) logger.info(f"Enqueued URL for processing: {url} (ID: {url_id})") return IngestURLResponse( url_id=url_id, url=url, status="pending", message="URL has been queued for processing" ) except Exception as e: logger.error(f"Error enqueueing URL: {str(e)}") raise HTTPException(status_code=500, detail=f"Error processing request: {str(e)}") @app.get("/status/{url_id}", response_model=StatusResponse) async def get_status(url_id: str): record = db.get_url_record(url_id) if not record: raise HTTPException(status_code=404, detail="URL ID not found") return StatusResponse( url_id=record["id"], url=record["url"], status=record["status"], created_at=record["created_at"], updated_at=record["updated_at"], completed_at=record.get("completed_at"), chunk_count=record.get("chunk_count", 0), error_message=record.get("error_message") ) @app.post("/query", response_model=QueryResponse) async def query_knowledge_base(request: QueryRequest): try: if not config.GROQ_API_KEY: raise HTTPException( status_code=500, detail="GROQ_API_KEY not configured. Please set it in your .env file" ) question = request.question top_k = request.top_k or config.TOP_K_RESULTS query_embedding = embedding_service.embed_text(question) search_results = vector_store.search(query_embedding, top_k=top_k) if not search_results: return QueryResponse( question=question, answer="I don't have enough information in my knowledge base to answer this question. Please ingest relevant URLs first.", sources=[] ) context_parts = [] sources = [] for i, result in enumerate(search_results): context_parts.append(f"[Source {i+1}]: {result['text']}") sources.append({ "url": result["url"], "score": result["score"], "text_snippet": result["text"][:200] + "..." if len(result["text"]) > 200 else result["text"] }) context = "\n\n".join(context_parts) system_prompt = """You are a helpful AI assistant that answers questions based solely on the provided context. Your responses must be: 1. Grounded in the provided sources 2. Accurate and factual 3. Clear and concise 4. If the context doesn't contain relevant information, say so explicitly.""" user_prompt = f"""Context from knowledge base: {context} Question: {question} Please provide a clear, grounded answer based only on the information in the context above. If the context doesn't contain enough information to answer the question, please say so.""" groq_client = Groq(api_key=config.GROQ_API_KEY) chat_completion = groq_client.chat.completions.create( messages=[ {"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt} ], model="llama-3.3-70b-versatile", temperature=0.3, max_tokens=1024 ) answer = chat_completion.choices[0].message.content logger.info(f"Query processed: {question[:100]}...") return QueryResponse( question=question, answer=answer, sources=sources ) except HTTPException: raise except Exception as e: logger.error(f"Error processing query: {str(e)}") raise HTTPException(status_code=500, detail=f"Error processing query: {str(e)}") @app.get("/health") async def health_check(): redis_ok = queue_manager.ping() return { "status": "healthy", "redis_connected": redis_ok, "queue_length": queue_manager.get_queue_length() if redis_ok else None }