sanilahmed2019's picture
Fix README for docker SDK
295bc31
"""
Main module for the RAG Agent and API Layer system.
This module provides the FastAPI application with endpoints for question-answering.
"""
from fastapi import FastAPI, HTTPException, Depends, BackgroundTasks
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
from typing import Dict, Any, Optional
import asyncio
import logging
from .config import validate_config, get_config
from .models import QueryRequest, APIResponse, ErrorResponse, HealthResponse
from .schemas import AgentResponse, AgentContext
from .utils import setup_logging, generate_response_id, format_timestamp, create_error_response
from .openrouter_agent import OpenRouterAgent
from .retrieval import QdrantRetriever
# Initialize the FastAPI application
app = FastAPI(
title="RAG Agent and API Layer",
description="Question-answering API using OpenAI Agents and Qdrant retrieval",
version="1.0.0",
docs_url="/docs",
redoc_url="/redoc"
)
# Add CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # In production, replace with specific origins
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Initialize components
config = get_config()
setup_logging(config.log_level)
# Global instances
retriever: Optional[QdrantRetriever] = None
agent: Optional[OpenRouterAgent] = None
@app.on_event("startup")
async def startup_event():
"""Initialize components on application startup."""
global retriever, agent
# Validate configuration
if not validate_config():
logging.error("Configuration validation failed")
raise RuntimeError("Configuration validation failed")
# Initialize agent first (this doesn't require async initialization)
try:
agent = OpenRouterAgent()
logging.info("OpenRouter agent initialized successfully")
except Exception as e:
logging.error(f"Failed to initialize OpenRouter agent: {e}")
raise
# Initialize retriever (async operations will be handled in the methods themselves)
try:
retriever = QdrantRetriever()
logging.info("Qdrant retriever initialized successfully")
except Exception as e:
logging.error(f"Failed to initialize Qdrant retriever: {e}")
raise
logging.info("Application startup completed")
@app.get("/health", response_model=HealthResponse)
async def health_check() -> HealthResponse:
"""
Health check endpoint to verify the status of the API and its dependencies.
Returns:
HealthResponse with status of services
"""
# Check if all required components are initialized
openrouter_status = "up" if agent else "down"
qdrant_status = "up" if retriever else "down"
agent_status = "up" if agent else "down"
# Determine overall status
overall_status = "healthy"
if openrouter_status == "down" or qdrant_status == "down":
overall_status = "unhealthy"
elif openrouter_status == "degraded" or qdrant_status == "degraded":
overall_status = "degraded"
return HealthResponse(
status=overall_status,
timestamp=format_timestamp(),
services={
"openrouter": openrouter_status,
"qdrant": qdrant_status,
"agent": agent_status
}
)
@app.post("/ask", response_model=APIResponse)
async def ask_question(request: QueryRequest) -> APIResponse:
"""
Main question-answering endpoint that accepts user queries and returns AI-generated answers
based on book content retrieved from Qdrant.
Args:
request: QueryRequest containing the user's question and parameters
Returns:
APIResponse with the answer and source citations
"""
try:
# Validate components are initialized
if not retriever or not agent:
raise HTTPException(status_code=500, detail="Service not properly initialized")
# Generate response ID
response_id = generate_response_id()
# Log the incoming query
logging.info(f"Processing query: {request.query[:100]}...")
# Step 1: Retrieve relevant content chunks from Qdrant
logging.info("Step 1: Retrieving relevant content from Qdrant...")
retrieved_chunks = await retriever.retrieve_context(
query=request.query,
top_k=request.context_window
)
logging.info(f"Retrieved {len(retrieved_chunks)} chunks from Qdrant")
# Step 2: Create agent context with retrieved chunks
agent_context = AgentContext(
query=request.query,
retrieved_chunks=retrieved_chunks,
max_context_length=4000, # Typical token limit consideration
source_policy="strict" # Ensure responses are grounded in provided context
)
# Step 3: Generate response using the OpenAI agent
logging.info("Step 2: Generating response with OpenAI agent...")
agent_response = await agent.generate_response(agent_context)
# Step 4: Format the response according to API specification
logging.info("Step 3: Formatting response...")
# Extract source information from agent response
sources = []
for chunk in retrieved_chunks:
if hasattr(agent_response, 'used_sources') and chunk.id in agent_response.used_sources:
sources.append(chunk)
# Create the final API response
api_response = APIResponse(
id=response_id,
query=request.query,
answer=agent_response.raw_response if hasattr(agent_response, 'raw_response') else agent_response.answer,
sources=sources,
confidence=agent_response.confidence_score if hasattr(agent_response, 'confidence_score') else 0.0,
timestamp=format_timestamp(),
model_used=agent.model_name if hasattr(agent, 'model_name') else "unknown" # Assuming agent has this attribute
)
logging.info(f"Query processed successfully, response ID: {response_id}")
return api_response
except HTTPException:
# Re-raise HTTP exceptions as they are
raise
except Exception as e:
logging.error(f"Error processing query: {str(e)}", exc_info=True)
error_resp = create_error_response(
error_code="PROCESSING_ERROR",
message=f"Error processing your query: {str(e)}"
)
raise HTTPException(status_code=500, detail=error_resp.dict())
@app.get("/")
async def root() -> Dict[str, Any]:
"""
Root endpoint providing basic information about the API.
Returns:
Dictionary with API information
"""
return {
"message": "RAG Agent and API Layer",
"version": "1.0.0",
"description": "Question-answering API using OpenRouter Agents and Qdrant retrieval",
"endpoints": {
"POST /ask": "Main question-answering endpoint",
"GET /health": "Health check endpoint",
"/docs": "API documentation (Swagger UI)",
"/redoc": "API documentation (Redoc)"
}
}
# Error handlers
@app.exception_handler(404)
async def not_found_handler(request, exc):
"""Handle 404 errors."""
error_resp = create_error_response(
error_code="ENDPOINT_NOT_FOUND",
message="The requested endpoint was not found"
)
return JSONResponse(
status_code=404,
content=error_resp.dict()
)
@app.exception_handler(500)
async def internal_error_handler(request, exc):
"""Handle 500 errors."""
error_resp = create_error_response(
error_code="INTERNAL_SERVER_ERROR",
message="An internal server error occurred"
)
return JSONResponse(
status_code=500,
content=error_resp.dict()
)
# Additional utility endpoints if needed
@app.get("/ready")
async def readiness_check() -> Dict[str, str]:
"""
Readiness check endpoint to verify the application is ready to serve traffic.
Returns:
Dictionary with readiness status
"""
if retriever and agent:
return {"status": "ready"}
else:
raise HTTPException(status_code=503, detail="Service not ready")
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)