Spaces:
Running
Running
| import os | |
| import json | |
| import logging | |
| import re | |
| from datetime import datetime | |
| from fastapi import FastAPI, Request, UploadFile, File, Form | |
| from typing import Optional | |
| from contextlib import asynccontextmanager | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from .routes import chat, users, auth, login | |
| from ..agents.agent import service, close_connection | |
| from ..agents.rag_agent import process_uploaded_file, has_file_loaded, retrieve_context_for_query | |
| from .middleware.logging import RequestLoggingMiddleware | |
| from .middleware.rate_limit import SimpleRateLimitMiddleware | |
| from ..db.database import init_db, dispose_engine | |
| from ..core.config.config import settings | |
| from ..models import QueryRequest, QueryResponse, HealthCheckResponse | |
| from dotenv import load_dotenv, find_dotenv | |
| _ = load_dotenv(find_dotenv()) | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', | |
| handlers=[ | |
| logging.StreamHandler(), # Console output | |
| logging.FileHandler('app.log', encoding='utf-8') # File output | |
| ] | |
| ) | |
| logger = logging.getLogger(__name__) | |
| async def lifespan(app: FastAPI): | |
| """ | |
| This function runs when your app starts and stops. | |
| Think of it like: | |
| - BEFORE yield: Morning routine (turn on lights, prep kitchen) | |
| - AFTER yield: Closing routine (turn off lights, lock doors) | |
| if this doesn't switched off there will be a use of resources | |
| Why I need to build this function? | |
| FastAPI will stop, but it will not clean up what it didn’t create — that’s why lifespan() exists. | |
| if we dont handle it well it will causes a lot problems when we push the code. New connections will be created without closing the old ones. that may leads | |
| to memory leaks and performance degradation over time. | |
| """ | |
| print("Starting up... Initializing resources.") | |
| try: | |
| await init_db() | |
| print("[OK] Database schema ready!") | |
| except Exception as e: | |
| print(f"[WARNING] Database setup warning: {e}") | |
| print("[OK] Ready to serve customers!") | |
| yield # application runs from this point | |
| print("Shutting down... Cleaning up resources.") | |
| try: | |
| # Close MCP server connection | |
| await close_connection() | |
| except Exception as e: | |
| print(f"[WARNING] Error closing MCP connection: {e}") | |
| try: | |
| await dispose_engine() | |
| except Exception as e: | |
| print(f"[WARNING] Error during engine disposal: {e}") | |
| print("[OK] Cleanup complete. Goodbye!") | |
| def create_application() -> FastAPI: | |
| app = FastAPI( | |
| title="Agentic AI Chatbot", | |
| description="An AI powered Chatbot that deliver amazing results to the customers and provide seamless experience.", | |
| version="1.0.0", | |
| lifespan=lifespan, | |
| docs_url="/docs", | |
| redoc_url="/redoc" | |
| ) | |
| return app | |
| app = create_application() | |
| # Include routers | |
| app.include_router(chat.router) | |
| app.include_router(users.router) | |
| app.include_router(auth.router) | |
| app.include_router(login.router) | |
| # Middleware Setup | |
| # cors_origins = os.getenv("CORS_ORIGINS", '["http://localhost:3000"]') | |
| # try: | |
| # if isinstance(cors_origins, str): | |
| # cors_origins = json.loads(cors_origins) | |
| # except json.JSONDecodeError: | |
| # cors_origins = ["http://localhost:3000"] | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins= ['*'] ,##cors_origins ## My flutter preflight request will be rejected if I dont add any origin properly🧠 Why OPTIONS Is Sent When you send: | |
| ## with Content-Type: application/json or custom headers | |
| ##➡️ Browser / WebView first sends an OPTIONS request: OPTIONS /auth/login | |
| ## Backend MUST respond with CORS headers, otherwise request fails.*, | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # Security and monitoring middleware | |
| # Order matters! These run in REVERSE order of addition | |
| app.add_middleware(SimpleRateLimitMiddleware, requests_per_minute=60) # Rate limiting | |
| app.add_middleware(RequestLoggingMiddleware) # Logging | |
| def _get_conversation_id(http_request: Request, explicit: Optional[str] = None) -> str: | |
| """Derive a stable conversation id for chat memory + per-thread RAG.""" | |
| if explicit and str(explicit).strip(): | |
| return str(explicit).strip() | |
| header_id = http_request.headers.get("X-Conversation-Id") or http_request.headers.get("X-Session-Id") | |
| if header_id and header_id.strip(): | |
| return header_id.strip() | |
| return "default" | |
| async def _handle_query( | |
| http_request: Request, | |
| query: str, | |
| file: Optional[UploadFile] = None, | |
| conversation_id: Optional[str] = None, | |
| ) -> QueryResponse: | |
| """ | |
| Internal handler for AI query processing. | |
| Supports both JSON and multipart/form-data requests. | |
| """ | |
| try: | |
| # Log the request for debugging | |
| client_ip = http_request.client.host if http_request.client else "unknown" | |
| logger.info(f"AI query from {client_ip}: {query[:50]}...") | |
| conv_id = _get_conversation_id(http_request, conversation_id) | |
| # Guard against huge context injection | |
| max_context_chars = int(os.getenv("RAG_MAX_CONTEXT_CHARS", "12000")) | |
| # Process file if provided (persisted for this conversation) | |
| if file: | |
| logger.info(f"File uploaded: {file.filename}") | |
| file_content = await file.read() | |
| # Process the file with RAG agent | |
| result = process_uploaded_file(file_content, file.filename, session_id=conv_id) | |
| if not result["success"]: | |
| logger.error(f"Failed to process file: {result['error']}") | |
| return QueryResponse( | |
| success=False, | |
| error=f"Failed to process file: {result['message']}", | |
| timestamp=datetime.utcnow() | |
| ) | |
| logger.info(f"File processed: {result['chunks']} chunks created") | |
| # Retrieval-first: if a file is loaded for this conversation, retrieve relevant context | |
| retrieved_context = "" | |
| if has_file_loaded(session_id=conv_id): | |
| try: | |
| retrieved_context = retrieve_context_for_query(query, session_id=conv_id) or "" | |
| except Exception as e: | |
| logger.warning(f"RAG retrieval failed for conv_id={conv_id}: {e}") | |
| retrieved_context = "" | |
| # Inject retrieved context into the prompt (ChatGPT-like file QA) | |
| full_query = query | |
| if retrieved_context.strip(): | |
| trimmed_context = retrieved_context.strip()[:max_context_chars] | |
| full_query = f"""You are answering the user's question. | |
| You also have context retrieved from the user's uploaded file(s) for this conversation. | |
| RULES: | |
| - Use the RAG context if it is relevant to the user's question. | |
| - If the answer is not present in the RAG context, say you don't have enough information from the uploaded file. | |
| - Do not invent details not supported by the RAG context. | |
| RAG CONTEXT: | |
| {trimmed_context} | |
| USER QUESTION: | |
| {query} | |
| """ | |
| # Process with AI agent | |
| result = await service(full_query, conversation_id=conv_id) | |
| # Log success | |
| logger.info(f"AI query successful for {client_ip}") | |
| # Check if result contains Google OAuth URL (authentication required) | |
| auth_url = None | |
| requires_auth = False | |
| # Pattern to match Google OAuth URLs | |
| oauth_pattern = r'https://accounts\.google\.com/o/oauth2/auth\?[^\s\)\"\'<>]+' | |
| match = re.search(oauth_pattern, result) | |
| if match: | |
| auth_url = match.group(0) | |
| requires_auth = True | |
| logger.info(f"Authentication required for {client_ip}, auth URL extracted") | |
| # Print auth URL to terminal for easy copy/paste (localhost redirect) | |
| print("\n" + "="*80) | |
| print("🔐 AUTHENTICATION REQUIRED - Copy this URL to your browser:") | |
| print("="*80) | |
| print(auth_url) | |
| print("="*80 + "\n") | |
| # Return structured response | |
| return QueryResponse( | |
| success=True, | |
| response=result, | |
| timestamp=datetime.utcnow(), | |
| requires_auth=requires_auth, | |
| auth_url=auth_url | |
| ) | |
| except Exception as e: | |
| # Log the error with full details for debugging | |
| logger.error(f"Error processing AI query from {client_ip}: {str(e)}", exc_info=True) | |
| # Return user-friendly error response | |
| return QueryResponse( | |
| success=False, | |
| error="Sorry, I'm having trouble processing your request right now. Please try again in a moment.", | |
| timestamp=datetime.utcnow() | |
| ) | |
| async def modelResponse( | |
| http_request: Request, | |
| conversation_id: Optional[str] = Form(None, max_length=128, description="Optional conversation/session id"), | |
| query: str = Form(..., min_length=1, max_length=5000, description="The question or prompt to send to the AI"), | |
| file: Optional[UploadFile] = File(None, description="Optional file to process with RAG") | |
| ) -> QueryResponse: | |
| """ | |
| Get AI model response for a query with optional file upload (multipart/form-data). | |
| Use this endpoint when uploading files. | |
| """ | |
| return await _handle_query(http_request, query, file, conversation_id=conversation_id) | |
| async def modelResponseJson( | |
| http_request: Request, | |
| request_body: QueryRequest | |
| ) -> QueryResponse: | |
| """ | |
| Get AI model response for a query (JSON body). | |
| Use this endpoint for simple text queries without file uploads. | |
| """ | |
| return await _handle_query( | |
| http_request, | |
| request_body.query, | |
| conversation_id=request_body.conversation_id, | |
| ) | |
| async def health_check() -> HealthCheckResponse: | |
| """Health check endpoint for monitoring.""" | |
| health_status = HealthCheckResponse( | |
| status="healthy", | |
| timestamp=datetime.utcnow(), | |
| components={} | |
| ) | |
| # Check database connection | |
| try: | |
| from ..db.database import get_engine | |
| engine = get_engine() | |
| # Try a simple query to test connection | |
| health_status.components["database"] = "healthy" | |
| logger.info("Database health check: OK") | |
| except Exception as e: | |
| logger.warning(f"Database health check failed: {e}") | |
| health_status.components["database"] = "unhealthy" | |
| health_status.status = "degraded" | |
| # Check AI service | |
| try: | |
| # Quick test of AI service | |
| test_result = await service("test") | |
| if test_result and len(test_result) > 0: | |
| health_status.components["ai_service"] = "healthy" | |
| logger.info("AI service health check: OK") | |
| else: | |
| health_status.components["ai_service"] = "unhealthy" | |
| health_status.status = "degraded" | |
| except Exception as e: | |
| logger.warning(f"AI service health check failed: {e}") | |
| health_status.components["ai_service"] = "unhealthy" | |
| health_status.status = "degraded" | |
| return health_status | |
| async def root(): | |
| """Root endpoint.""" | |
| return {"message": "Welcome to Agentic AI Chatbot API"} | |
| # if __name__ == "__main__": | |
| # import uvicorn | |
| # uvicorn.run(app, host="0.0.0.0", port=8000) | |