Spaces:
Sleeping
Sleeping
| from fastapi import FastAPI, HTTPException | |
| from pydantic import BaseModel, HttpUrl | |
| from typing import Optional | |
| import uuid | |
| import logging | |
| from src.database import db | |
| from src.queue_manager import queue_manager | |
| from src.vector_store import vector_store | |
| from src.embeddings import embedding_service | |
| from groq import Groq | |
| from src.config import config | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| app = FastAPI(title="RAG System API", version="1.0.0") | |
| class IngestURLRequest(BaseModel): | |
| url: HttpUrl | |
| class QueryRequest(BaseModel): | |
| question: str | |
| top_k: Optional[int] = None | |
| class IngestURLResponse(BaseModel): | |
| url_id: str | |
| url: str | |
| status: str | |
| message: str | |
| class StatusResponse(BaseModel): | |
| url_id: str | |
| url: str | |
| status: str | |
| created_at: str | |
| updated_at: str | |
| completed_at: Optional[str] = None | |
| chunk_count: int | |
| error_message: Optional[str] = None | |
| class QueryResponse(BaseModel): | |
| question: str | |
| answer: str | |
| sources: list | |
| async def startup_event(): | |
| logger.info("Starting RAG System API...") | |
| logger.info(f"Database path: {config.DATABASE_PATH}") | |
| logger.info(f"Redis: {config.REDIS_HOST}:{config.REDIS_PORT}") | |
| logger.info(f"Qdrant: {config.QDRANT_HOST}:{config.QDRANT_PORT}") | |
| async def root(): | |
| return { | |
| "message": "RAG System API", | |
| "version": "1.0.0", | |
| "endpoints": { | |
| "ingest": "POST /ingest-url", | |
| "query": "POST /query", | |
| "status": "GET /status/{url_id}" | |
| } | |
| } | |
| async def ingest_url(request: IngestURLRequest): | |
| try: | |
| url = str(request.url) | |
| url_id = str(uuid.uuid4()) | |
| db.create_url_record(url_id, url) | |
| job_data = { | |
| "url_id": url_id, | |
| "url": url | |
| } | |
| queue_manager.enqueue(job_data) | |
| logger.info(f"Enqueued URL for processing: {url} (ID: {url_id})") | |
| return IngestURLResponse( | |
| url_id=url_id, | |
| url=url, | |
| status="pending", | |
| message="URL has been queued for processing" | |
| ) | |
| except Exception as e: | |
| logger.error(f"Error enqueueing URL: {str(e)}") | |
| raise HTTPException(status_code=500, detail=f"Error processing request: {str(e)}") | |
| async def get_status(url_id: str): | |
| record = db.get_url_record(url_id) | |
| if not record: | |
| raise HTTPException(status_code=404, detail="URL ID not found") | |
| return StatusResponse( | |
| url_id=record["id"], | |
| url=record["url"], | |
| status=record["status"], | |
| created_at=record["created_at"], | |
| updated_at=record["updated_at"], | |
| completed_at=record.get("completed_at"), | |
| chunk_count=record.get("chunk_count", 0), | |
| error_message=record.get("error_message") | |
| ) | |
| async def query_knowledge_base(request: QueryRequest): | |
| try: | |
| if not config.GROQ_API_KEY: | |
| raise HTTPException( | |
| status_code=500, | |
| detail="GROQ_API_KEY not configured. Please set it in your .env file" | |
| ) | |
| question = request.question | |
| top_k = request.top_k or config.TOP_K_RESULTS | |
| query_embedding = embedding_service.embed_text(question) | |
| search_results = vector_store.search(query_embedding, top_k=top_k) | |
| if not search_results: | |
| return QueryResponse( | |
| question=question, | |
| answer="I don't have enough information in my knowledge base to answer this question. Please ingest relevant URLs first.", | |
| sources=[] | |
| ) | |
| context_parts = [] | |
| sources = [] | |
| for i, result in enumerate(search_results): | |
| context_parts.append(f"[Source {i+1}]: {result['text']}") | |
| sources.append({ | |
| "url": result["url"], | |
| "score": result["score"], | |
| "text_snippet": result["text"][:200] + "..." if len(result["text"]) > 200 else result["text"] | |
| }) | |
| context = "\n\n".join(context_parts) | |
| system_prompt = """You are a helpful AI assistant that answers questions based solely on the provided context. | |
| Your responses must be: | |
| 1. Grounded in the provided sources | |
| 2. Accurate and factual | |
| 3. Clear and concise | |
| 4. If the context doesn't contain relevant information, say so explicitly.""" | |
| user_prompt = f"""Context from knowledge base: | |
| {context} | |
| Question: {question} | |
| Please provide a clear, grounded answer based only on the information in the context above. If the context doesn't contain enough information to answer the question, please say so.""" | |
| groq_client = Groq(api_key=config.GROQ_API_KEY) | |
| chat_completion = groq_client.chat.completions.create( | |
| messages=[ | |
| {"role": "system", "content": system_prompt}, | |
| {"role": "user", "content": user_prompt} | |
| ], | |
| model="llama-3.3-70b-versatile", | |
| temperature=0.3, | |
| max_tokens=1024 | |
| ) | |
| answer = chat_completion.choices[0].message.content | |
| logger.info(f"Query processed: {question[:100]}...") | |
| return QueryResponse( | |
| question=question, | |
| answer=answer, | |
| sources=sources | |
| ) | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.error(f"Error processing query: {str(e)}") | |
| raise HTTPException(status_code=500, detail=f"Error processing query: {str(e)}") | |
| async def health_check(): | |
| redis_ok = queue_manager.ping() | |
| return { | |
| "status": "healthy", | |
| "redis_connected": redis_ok, | |
| "queue_length": queue_manager.get_queue_length() if redis_ok else None | |
| } | |