Spaces:
Sleeping
Sleeping
File size: 8,329 Bytes
d4f1687 2ade705 d4f1687 2ade705 d4f1687 2ade705 d4f1687 2ade705 d4f1687 295bc31 d4f1687 295bc31 d4f1687 295bc31 d4f1687 295bc31 d4f1687 295bc31 d4f1687 295bc31 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 | """
Main module for the RAG Agent and API Layer system.
This module provides the FastAPI application with endpoints for question-answering.
"""
from fastapi import FastAPI, HTTPException, Depends, BackgroundTasks
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
from typing import Dict, Any, Optional
import asyncio
import logging
from .config import validate_config, get_config
from .models import QueryRequest, APIResponse, ErrorResponse, HealthResponse
from .schemas import AgentResponse, AgentContext
from .utils import setup_logging, generate_response_id, format_timestamp, create_error_response
from .openrouter_agent import OpenRouterAgent
from .retrieval import QdrantRetriever
# Initialize the FastAPI application
app = FastAPI(
title="RAG Agent and API Layer",
description="Question-answering API using OpenAI Agents and Qdrant retrieval",
version="1.0.0",
docs_url="/docs",
redoc_url="/redoc"
)
# Add CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # In production, replace with specific origins
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Initialize components
config = get_config()
setup_logging(config.log_level)
# Global instances
retriever: Optional[QdrantRetriever] = None
agent: Optional[OpenRouterAgent] = None
@app.on_event("startup")
async def startup_event():
"""Initialize components on application startup."""
global retriever, agent
# Validate configuration
if not validate_config():
logging.error("Configuration validation failed")
raise RuntimeError("Configuration validation failed")
# Initialize agent first (this doesn't require async initialization)
try:
agent = OpenRouterAgent()
logging.info("OpenRouter agent initialized successfully")
except Exception as e:
logging.error(f"Failed to initialize OpenRouter agent: {e}")
raise
# Initialize retriever (async operations will be handled in the methods themselves)
try:
retriever = QdrantRetriever()
logging.info("Qdrant retriever initialized successfully")
except Exception as e:
logging.error(f"Failed to initialize Qdrant retriever: {e}")
raise
logging.info("Application startup completed")
@app.get("/health", response_model=HealthResponse)
async def health_check() -> HealthResponse:
"""
Health check endpoint to verify the status of the API and its dependencies.
Returns:
HealthResponse with status of services
"""
# Check if all required components are initialized
openrouter_status = "up" if agent else "down"
qdrant_status = "up" if retriever else "down"
agent_status = "up" if agent else "down"
# Determine overall status
overall_status = "healthy"
if openrouter_status == "down" or qdrant_status == "down":
overall_status = "unhealthy"
elif openrouter_status == "degraded" or qdrant_status == "degraded":
overall_status = "degraded"
return HealthResponse(
status=overall_status,
timestamp=format_timestamp(),
services={
"openrouter": openrouter_status,
"qdrant": qdrant_status,
"agent": agent_status
}
)
@app.post("/ask", response_model=APIResponse)
async def ask_question(request: QueryRequest) -> APIResponse:
"""
Main question-answering endpoint that accepts user queries and returns AI-generated answers
based on book content retrieved from Qdrant.
Args:
request: QueryRequest containing the user's question and parameters
Returns:
APIResponse with the answer and source citations
"""
try:
# Validate components are initialized
if not retriever or not agent:
raise HTTPException(status_code=500, detail="Service not properly initialized")
# Generate response ID
response_id = generate_response_id()
# Log the incoming query
logging.info(f"Processing query: {request.query[:100]}...")
# Step 1: Retrieve relevant content chunks from Qdrant
logging.info("Step 1: Retrieving relevant content from Qdrant...")
retrieved_chunks = await retriever.retrieve_context(
query=request.query,
top_k=request.context_window
)
logging.info(f"Retrieved {len(retrieved_chunks)} chunks from Qdrant")
# Step 2: Create agent context with retrieved chunks
agent_context = AgentContext(
query=request.query,
retrieved_chunks=retrieved_chunks,
max_context_length=4000, # Typical token limit consideration
source_policy="strict" # Ensure responses are grounded in provided context
)
# Step 3: Generate response using the OpenAI agent
logging.info("Step 2: Generating response with OpenAI agent...")
agent_response = await agent.generate_response(agent_context)
# Step 4: Format the response according to API specification
logging.info("Step 3: Formatting response...")
# Extract source information from agent response
sources = []
for chunk in retrieved_chunks:
if hasattr(agent_response, 'used_sources') and chunk.id in agent_response.used_sources:
sources.append(chunk)
# Create the final API response
api_response = APIResponse(
id=response_id,
query=request.query,
answer=agent_response.raw_response if hasattr(agent_response, 'raw_response') else agent_response.answer,
sources=sources,
confidence=agent_response.confidence_score if hasattr(agent_response, 'confidence_score') else 0.0,
timestamp=format_timestamp(),
model_used=agent.model_name if hasattr(agent, 'model_name') else "unknown" # Assuming agent has this attribute
)
logging.info(f"Query processed successfully, response ID: {response_id}")
return api_response
except HTTPException:
# Re-raise HTTP exceptions as they are
raise
except Exception as e:
logging.error(f"Error processing query: {str(e)}", exc_info=True)
error_resp = create_error_response(
error_code="PROCESSING_ERROR",
message=f"Error processing your query: {str(e)}"
)
raise HTTPException(status_code=500, detail=error_resp.dict())
@app.get("/")
async def root() -> Dict[str, Any]:
"""
Root endpoint providing basic information about the API.
Returns:
Dictionary with API information
"""
return {
"message": "RAG Agent and API Layer",
"version": "1.0.0",
"description": "Question-answering API using OpenRouter Agents and Qdrant retrieval",
"endpoints": {
"POST /ask": "Main question-answering endpoint",
"GET /health": "Health check endpoint",
"/docs": "API documentation (Swagger UI)",
"/redoc": "API documentation (Redoc)"
}
}
# Error handlers
@app.exception_handler(404)
async def not_found_handler(request, exc):
"""Handle 404 errors."""
error_resp = create_error_response(
error_code="ENDPOINT_NOT_FOUND",
message="The requested endpoint was not found"
)
return JSONResponse(
status_code=404,
content=error_resp.dict()
)
@app.exception_handler(500)
async def internal_error_handler(request, exc):
"""Handle 500 errors."""
error_resp = create_error_response(
error_code="INTERNAL_SERVER_ERROR",
message="An internal server error occurred"
)
return JSONResponse(
status_code=500,
content=error_resp.dict()
)
# Additional utility endpoints if needed
@app.get("/ready")
async def readiness_check() -> Dict[str, str]:
"""
Readiness check endpoint to verify the application is ready to serve traffic.
Returns:
Dictionary with readiness status
"""
if retriever and agent:
return {"status": "ready"}
else:
raise HTTPException(status_code=503, detail="Service not ready")
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000) |