WebRAG / src /app.py
Arun21102003
Initial clean commit
97f9138
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel, HttpUrl
from typing import Optional
import uuid
import logging
from src.database import db
from src.queue_manager import queue_manager
from src.vector_store import vector_store
from src.embeddings import embedding_service
from groq import Groq
from src.config import config
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
app = FastAPI(title="RAG System API", version="1.0.0")
class IngestURLRequest(BaseModel):
url: HttpUrl
class QueryRequest(BaseModel):
question: str
top_k: Optional[int] = None
class IngestURLResponse(BaseModel):
url_id: str
url: str
status: str
message: str
class StatusResponse(BaseModel):
url_id: str
url: str
status: str
created_at: str
updated_at: str
completed_at: Optional[str] = None
chunk_count: int
error_message: Optional[str] = None
class QueryResponse(BaseModel):
question: str
answer: str
sources: list
@app.on_event("startup")
async def startup_event():
logger.info("Starting RAG System API...")
logger.info(f"Database path: {config.DATABASE_PATH}")
logger.info(f"Redis: {config.REDIS_HOST}:{config.REDIS_PORT}")
logger.info(f"Qdrant: {config.QDRANT_HOST}:{config.QDRANT_PORT}")
@app.get("/")
async def root():
return {
"message": "RAG System API",
"version": "1.0.0",
"endpoints": {
"ingest": "POST /ingest-url",
"query": "POST /query",
"status": "GET /status/{url_id}"
}
}
@app.post("/ingest-url", response_model=IngestURLResponse, status_code=202)
async def ingest_url(request: IngestURLRequest):
try:
url = str(request.url)
url_id = str(uuid.uuid4())
db.create_url_record(url_id, url)
job_data = {
"url_id": url_id,
"url": url
}
queue_manager.enqueue(job_data)
logger.info(f"Enqueued URL for processing: {url} (ID: {url_id})")
return IngestURLResponse(
url_id=url_id,
url=url,
status="pending",
message="URL has been queued for processing"
)
except Exception as e:
logger.error(f"Error enqueueing URL: {str(e)}")
raise HTTPException(status_code=500, detail=f"Error processing request: {str(e)}")
@app.get("/status/{url_id}", response_model=StatusResponse)
async def get_status(url_id: str):
record = db.get_url_record(url_id)
if not record:
raise HTTPException(status_code=404, detail="URL ID not found")
return StatusResponse(
url_id=record["id"],
url=record["url"],
status=record["status"],
created_at=record["created_at"],
updated_at=record["updated_at"],
completed_at=record.get("completed_at"),
chunk_count=record.get("chunk_count", 0),
error_message=record.get("error_message")
)
@app.post("/query", response_model=QueryResponse)
async def query_knowledge_base(request: QueryRequest):
try:
if not config.GROQ_API_KEY:
raise HTTPException(
status_code=500,
detail="GROQ_API_KEY not configured. Please set it in your .env file"
)
question = request.question
top_k = request.top_k or config.TOP_K_RESULTS
query_embedding = embedding_service.embed_text(question)
search_results = vector_store.search(query_embedding, top_k=top_k)
if not search_results:
return QueryResponse(
question=question,
answer="I don't have enough information in my knowledge base to answer this question. Please ingest relevant URLs first.",
sources=[]
)
context_parts = []
sources = []
for i, result in enumerate(search_results):
context_parts.append(f"[Source {i+1}]: {result['text']}")
sources.append({
"url": result["url"],
"score": result["score"],
"text_snippet": result["text"][:200] + "..." if len(result["text"]) > 200 else result["text"]
})
context = "\n\n".join(context_parts)
system_prompt = """You are a helpful AI assistant that answers questions based solely on the provided context.
Your responses must be:
1. Grounded in the provided sources
2. Accurate and factual
3. Clear and concise
4. If the context doesn't contain relevant information, say so explicitly."""
user_prompt = f"""Context from knowledge base:
{context}
Question: {question}
Please provide a clear, grounded answer based only on the information in the context above. If the context doesn't contain enough information to answer the question, please say so."""
groq_client = Groq(api_key=config.GROQ_API_KEY)
chat_completion = groq_client.chat.completions.create(
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt}
],
model="llama-3.3-70b-versatile",
temperature=0.3,
max_tokens=1024
)
answer = chat_completion.choices[0].message.content
logger.info(f"Query processed: {question[:100]}...")
return QueryResponse(
question=question,
answer=answer,
sources=sources
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Error processing query: {str(e)}")
raise HTTPException(status_code=500, detail=f"Error processing query: {str(e)}")
@app.get("/health")
async def health_check():
redis_ok = queue_manager.ping()
return {
"status": "healthy",
"redis_connected": redis_ok,
"queue_length": queue_manager.get_queue_length() if redis_ok else None
}