Spaces:
Running
Running
File size: 11,539 Bytes
a8a2cf5 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 |
import os
import json
import logging
import re
from datetime import datetime
from fastapi import FastAPI, Request, UploadFile, File, Form
from typing import Optional
from contextlib import asynccontextmanager
from fastapi.middleware.cors import CORSMiddleware
from .routes import chat, users, auth, login
from ..agents.agent import service, close_connection
from ..agents.rag_agent import process_uploaded_file, has_file_loaded, retrieve_context_for_query
from .middleware.logging import RequestLoggingMiddleware
from .middleware.rate_limit import SimpleRateLimitMiddleware
from ..db.database import init_db, dispose_engine
from ..core.config.config import settings
from ..models import QueryRequest, QueryResponse, HealthCheckResponse
from dotenv import load_dotenv, find_dotenv
_ = load_dotenv(find_dotenv())
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(), # Console output
logging.FileHandler('app.log', encoding='utf-8') # File output
]
)
logger = logging.getLogger(__name__)
@asynccontextmanager
async def lifespan(app: FastAPI):
"""
This function runs when your app starts and stops.
Think of it like:
- BEFORE yield: Morning routine (turn on lights, prep kitchen)
- AFTER yield: Closing routine (turn off lights, lock doors)
if this doesn't switched off there will be a use of resources
Why I need to build this function?
FastAPI will stop, but it will not clean up what it didn’t create — that’s why lifespan() exists.
if we dont handle it well it will causes a lot problems when we push the code. New connections will be created without closing the old ones. that may leads
to memory leaks and performance degradation over time.
"""
print("Starting up... Initializing resources.")
try:
await init_db()
print("[OK] Database schema ready!")
except Exception as e:
print(f"[WARNING] Database setup warning: {e}")
print("[OK] Ready to serve customers!")
yield # application runs from this point
print("Shutting down... Cleaning up resources.")
try:
# Close MCP server connection
await close_connection()
except Exception as e:
print(f"[WARNING] Error closing MCP connection: {e}")
try:
await dispose_engine()
except Exception as e:
print(f"[WARNING] Error during engine disposal: {e}")
print("[OK] Cleanup complete. Goodbye!")
def create_application() -> FastAPI:
app = FastAPI(
title="Agentic AI Chatbot",
description="An AI powered Chatbot that deliver amazing results to the customers and provide seamless experience.",
version="1.0.0",
lifespan=lifespan,
docs_url="/docs",
redoc_url="/redoc"
)
return app
app = create_application()
# Include routers
app.include_router(chat.router)
app.include_router(users.router)
app.include_router(auth.router)
app.include_router(login.router)
# Middleware Setup
# cors_origins = os.getenv("CORS_ORIGINS", '["http://localhost:3000"]')
# try:
# if isinstance(cors_origins, str):
# cors_origins = json.loads(cors_origins)
# except json.JSONDecodeError:
# cors_origins = ["http://localhost:3000"]
app.add_middleware(
CORSMiddleware,
allow_origins= ['*'] ,##cors_origins ## My flutter preflight request will be rejected if I dont add any origin properly🧠 Why OPTIONS Is Sent When you send:
## with Content-Type: application/json or custom headers
##➡️ Browser / WebView first sends an OPTIONS request: OPTIONS /auth/login
## Backend MUST respond with CORS headers, otherwise request fails.*,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Security and monitoring middleware
# Order matters! These run in REVERSE order of addition
app.add_middleware(SimpleRateLimitMiddleware, requests_per_minute=60) # Rate limiting
app.add_middleware(RequestLoggingMiddleware) # Logging
def _get_conversation_id(http_request: Request, explicit: Optional[str] = None) -> str:
"""Derive a stable conversation id for chat memory + per-thread RAG."""
if explicit and str(explicit).strip():
return str(explicit).strip()
header_id = http_request.headers.get("X-Conversation-Id") or http_request.headers.get("X-Session-Id")
if header_id and header_id.strip():
return header_id.strip()
return "default"
async def _handle_query(
http_request: Request,
query: str,
file: Optional[UploadFile] = None,
conversation_id: Optional[str] = None,
) -> QueryResponse:
"""
Internal handler for AI query processing.
Supports both JSON and multipart/form-data requests.
"""
try:
# Log the request for debugging
client_ip = http_request.client.host if http_request.client else "unknown"
logger.info(f"AI query from {client_ip}: {query[:50]}...")
conv_id = _get_conversation_id(http_request, conversation_id)
# Guard against huge context injection
max_context_chars = int(os.getenv("RAG_MAX_CONTEXT_CHARS", "12000"))
# Process file if provided (persisted for this conversation)
if file:
logger.info(f"File uploaded: {file.filename}")
file_content = await file.read()
# Process the file with RAG agent
result = process_uploaded_file(file_content, file.filename, session_id=conv_id)
if not result["success"]:
logger.error(f"Failed to process file: {result['error']}")
return QueryResponse(
success=False,
error=f"Failed to process file: {result['message']}",
timestamp=datetime.utcnow()
)
logger.info(f"File processed: {result['chunks']} chunks created")
# Retrieval-first: if a file is loaded for this conversation, retrieve relevant context
retrieved_context = ""
if has_file_loaded(session_id=conv_id):
try:
retrieved_context = retrieve_context_for_query(query, session_id=conv_id) or ""
except Exception as e:
logger.warning(f"RAG retrieval failed for conv_id={conv_id}: {e}")
retrieved_context = ""
# Inject retrieved context into the prompt (ChatGPT-like file QA)
full_query = query
if retrieved_context.strip():
trimmed_context = retrieved_context.strip()[:max_context_chars]
full_query = f"""You are answering the user's question.
You also have context retrieved from the user's uploaded file(s) for this conversation.
RULES:
- Use the RAG context if it is relevant to the user's question.
- If the answer is not present in the RAG context, say you don't have enough information from the uploaded file.
- Do not invent details not supported by the RAG context.
RAG CONTEXT:
{trimmed_context}
USER QUESTION:
{query}
"""
# Process with AI agent
result = await service(full_query, conversation_id=conv_id)
# Log success
logger.info(f"AI query successful for {client_ip}")
# Check if result contains Google OAuth URL (authentication required)
auth_url = None
requires_auth = False
# Pattern to match Google OAuth URLs
oauth_pattern = r'https://accounts\.google\.com/o/oauth2/auth\?[^\s\)\"\'<>]+'
match = re.search(oauth_pattern, result)
if match:
auth_url = match.group(0)
requires_auth = True
logger.info(f"Authentication required for {client_ip}, auth URL extracted")
# Print auth URL to terminal for easy copy/paste (localhost redirect)
print("\n" + "="*80)
print("🔐 AUTHENTICATION REQUIRED - Copy this URL to your browser:")
print("="*80)
print(auth_url)
print("="*80 + "\n")
# Return structured response
return QueryResponse(
success=True,
response=result,
timestamp=datetime.utcnow(),
requires_auth=requires_auth,
auth_url=auth_url
)
except Exception as e:
# Log the error with full details for debugging
logger.error(f"Error processing AI query from {client_ip}: {str(e)}", exc_info=True)
# Return user-friendly error response
return QueryResponse(
success=False,
error="Sorry, I'm having trouble processing your request right now. Please try again in a moment.",
timestamp=datetime.utcnow()
)
@app.post("/models", response_model=QueryResponse)
async def modelResponse(
http_request: Request,
conversation_id: Optional[str] = Form(None, max_length=128, description="Optional conversation/session id"),
query: str = Form(..., min_length=1, max_length=5000, description="The question or prompt to send to the AI"),
file: Optional[UploadFile] = File(None, description="Optional file to process with RAG")
) -> QueryResponse:
"""
Get AI model response for a query with optional file upload (multipart/form-data).
Use this endpoint when uploading files.
"""
return await _handle_query(http_request, query, file, conversation_id=conversation_id)
@app.post("/models/json", response_model=QueryResponse)
async def modelResponseJson(
http_request: Request,
request_body: QueryRequest
) -> QueryResponse:
"""
Get AI model response for a query (JSON body).
Use this endpoint for simple text queries without file uploads.
"""
return await _handle_query(
http_request,
request_body.query,
conversation_id=request_body.conversation_id,
)
@app.get("/health", response_model=HealthCheckResponse)
async def health_check() -> HealthCheckResponse:
"""Health check endpoint for monitoring."""
health_status = HealthCheckResponse(
status="healthy",
timestamp=datetime.utcnow(),
components={}
)
# Check database connection
try:
from ..db.database import get_engine
engine = get_engine()
# Try a simple query to test connection
health_status.components["database"] = "healthy"
logger.info("Database health check: OK")
except Exception as e:
logger.warning(f"Database health check failed: {e}")
health_status.components["database"] = "unhealthy"
health_status.status = "degraded"
# Check AI service
try:
# Quick test of AI service
test_result = await service("test")
if test_result and len(test_result) > 0:
health_status.components["ai_service"] = "healthy"
logger.info("AI service health check: OK")
else:
health_status.components["ai_service"] = "unhealthy"
health_status.status = "degraded"
except Exception as e:
logger.warning(f"AI service health check failed: {e}")
health_status.components["ai_service"] = "unhealthy"
health_status.status = "degraded"
return health_status
@app.get("/")
async def root():
"""Root endpoint."""
return {"message": "Welcome to Agentic AI Chatbot API"}
# if __name__ == "__main__":
# import uvicorn
# uvicorn.run(app, host="0.0.0.0", port=8000)
|