import os import json import logging import re from datetime import datetime from fastapi import FastAPI, Request, UploadFile, File, Form from typing import Optional from contextlib import asynccontextmanager from fastapi.middleware.cors import CORSMiddleware from .routes import chat, users, auth, login from ..agents.agent import service, close_connection from ..agents.rag_agent import process_uploaded_file, has_file_loaded, retrieve_context_for_query from .middleware.logging import RequestLoggingMiddleware from .middleware.rate_limit import SimpleRateLimitMiddleware from ..db.database import init_db, dispose_engine from ..core.config.config import settings from ..models import QueryRequest, QueryResponse, HealthCheckResponse from dotenv import load_dotenv, find_dotenv _ = load_dotenv(find_dotenv()) logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', handlers=[ logging.StreamHandler(), # Console output logging.FileHandler('app.log', encoding='utf-8') # File output ] ) logger = logging.getLogger(__name__) @asynccontextmanager async def lifespan(app: FastAPI): """ This function runs when your app starts and stops. Think of it like: - BEFORE yield: Morning routine (turn on lights, prep kitchen) - AFTER yield: Closing routine (turn off lights, lock doors) if this doesn't switched off there will be a use of resources Why I need to build this function? FastAPI will stop, but it will not clean up what it didn’t create — that’s why lifespan() exists. if we dont handle it well it will causes a lot problems when we push the code. New connections will be created without closing the old ones. that may leads to memory leaks and performance degradation over time. """ print("Starting up... Initializing resources.") try: await init_db() print("[OK] Database schema ready!") except Exception as e: print(f"[WARNING] Database setup warning: {e}") print("[OK] Ready to serve customers!") yield # application runs from this point print("Shutting down... Cleaning up resources.") try: # Close MCP server connection await close_connection() except Exception as e: print(f"[WARNING] Error closing MCP connection: {e}") try: await dispose_engine() except Exception as e: print(f"[WARNING] Error during engine disposal: {e}") print("[OK] Cleanup complete. Goodbye!") def create_application() -> FastAPI: app = FastAPI( title="Agentic AI Chatbot", description="An AI powered Chatbot that deliver amazing results to the customers and provide seamless experience.", version="1.0.0", lifespan=lifespan, docs_url="/docs", redoc_url="/redoc" ) return app app = create_application() # Include routers app.include_router(chat.router) app.include_router(users.router) app.include_router(auth.router) app.include_router(login.router) # Middleware Setup # cors_origins = os.getenv("CORS_ORIGINS", '["http://localhost:3000"]') # try: # if isinstance(cors_origins, str): # cors_origins = json.loads(cors_origins) # except json.JSONDecodeError: # cors_origins = ["http://localhost:3000"] app.add_middleware( CORSMiddleware, allow_origins= ['*'] ,##cors_origins ## My flutter preflight request will be rejected if I dont add any origin properly🧠 Why OPTIONS Is Sent When you send: ## with Content-Type: application/json or custom headers ##➡️ Browser / WebView first sends an OPTIONS request: OPTIONS /auth/login ## Backend MUST respond with CORS headers, otherwise request fails.*, allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Security and monitoring middleware # Order matters! These run in REVERSE order of addition app.add_middleware(SimpleRateLimitMiddleware, requests_per_minute=60) # Rate limiting app.add_middleware(RequestLoggingMiddleware) # Logging def _get_conversation_id(http_request: Request, explicit: Optional[str] = None) -> str: """Derive a stable conversation id for chat memory + per-thread RAG.""" if explicit and str(explicit).strip(): return str(explicit).strip() header_id = http_request.headers.get("X-Conversation-Id") or http_request.headers.get("X-Session-Id") if header_id and header_id.strip(): return header_id.strip() return "default" async def _handle_query( http_request: Request, query: str, file: Optional[UploadFile] = None, conversation_id: Optional[str] = None, ) -> QueryResponse: """ Internal handler for AI query processing. Supports both JSON and multipart/form-data requests. """ try: # Log the request for debugging client_ip = http_request.client.host if http_request.client else "unknown" logger.info(f"AI query from {client_ip}: {query[:50]}...") conv_id = _get_conversation_id(http_request, conversation_id) # Guard against huge context injection max_context_chars = int(os.getenv("RAG_MAX_CONTEXT_CHARS", "12000")) # Process file if provided (persisted for this conversation) if file: logger.info(f"File uploaded: {file.filename}") file_content = await file.read() # Process the file with RAG agent result = process_uploaded_file(file_content, file.filename, session_id=conv_id) if not result["success"]: logger.error(f"Failed to process file: {result['error']}") return QueryResponse( success=False, error=f"Failed to process file: {result['message']}", timestamp=datetime.utcnow() ) logger.info(f"File processed: {result['chunks']} chunks created") # Retrieval-first: if a file is loaded for this conversation, retrieve relevant context retrieved_context = "" if has_file_loaded(session_id=conv_id): try: retrieved_context = retrieve_context_for_query(query, session_id=conv_id) or "" except Exception as e: logger.warning(f"RAG retrieval failed for conv_id={conv_id}: {e}") retrieved_context = "" # Inject retrieved context into the prompt (ChatGPT-like file QA) full_query = query if retrieved_context.strip(): trimmed_context = retrieved_context.strip()[:max_context_chars] full_query = f"""You are answering the user's question. You also have context retrieved from the user's uploaded file(s) for this conversation. RULES: - Use the RAG context if it is relevant to the user's question. - If the answer is not present in the RAG context, say you don't have enough information from the uploaded file. - Do not invent details not supported by the RAG context. RAG CONTEXT: {trimmed_context} USER QUESTION: {query} """ # Process with AI agent result = await service(full_query, conversation_id=conv_id) # Log success logger.info(f"AI query successful for {client_ip}") # Check if result contains Google OAuth URL (authentication required) auth_url = None requires_auth = False # Pattern to match Google OAuth URLs oauth_pattern = r'https://accounts\.google\.com/o/oauth2/auth\?[^\s\)\"\'<>]+' match = re.search(oauth_pattern, result) if match: auth_url = match.group(0) requires_auth = True logger.info(f"Authentication required for {client_ip}, auth URL extracted") # Print auth URL to terminal for easy copy/paste (localhost redirect) print("\n" + "="*80) print("🔐 AUTHENTICATION REQUIRED - Copy this URL to your browser:") print("="*80) print(auth_url) print("="*80 + "\n") # Return structured response return QueryResponse( success=True, response=result, timestamp=datetime.utcnow(), requires_auth=requires_auth, auth_url=auth_url ) except Exception as e: # Log the error with full details for debugging logger.error(f"Error processing AI query from {client_ip}: {str(e)}", exc_info=True) # Return user-friendly error response return QueryResponse( success=False, error="Sorry, I'm having trouble processing your request right now. Please try again in a moment.", timestamp=datetime.utcnow() ) @app.post("/models", response_model=QueryResponse) async def modelResponse( http_request: Request, conversation_id: Optional[str] = Form(None, max_length=128, description="Optional conversation/session id"), query: str = Form(..., min_length=1, max_length=5000, description="The question or prompt to send to the AI"), file: Optional[UploadFile] = File(None, description="Optional file to process with RAG") ) -> QueryResponse: """ Get AI model response for a query with optional file upload (multipart/form-data). Use this endpoint when uploading files. """ return await _handle_query(http_request, query, file, conversation_id=conversation_id) @app.post("/models/json", response_model=QueryResponse) async def modelResponseJson( http_request: Request, request_body: QueryRequest ) -> QueryResponse: """ Get AI model response for a query (JSON body). Use this endpoint for simple text queries without file uploads. """ return await _handle_query( http_request, request_body.query, conversation_id=request_body.conversation_id, ) @app.get("/health", response_model=HealthCheckResponse) async def health_check() -> HealthCheckResponse: """Health check endpoint for monitoring.""" health_status = HealthCheckResponse( status="healthy", timestamp=datetime.utcnow(), components={} ) # Check database connection try: from ..db.database import get_engine engine = get_engine() # Try a simple query to test connection health_status.components["database"] = "healthy" logger.info("Database health check: OK") except Exception as e: logger.warning(f"Database health check failed: {e}") health_status.components["database"] = "unhealthy" health_status.status = "degraded" # Check AI service try: # Quick test of AI service test_result = await service("test") if test_result and len(test_result) > 0: health_status.components["ai_service"] = "healthy" logger.info("AI service health check: OK") else: health_status.components["ai_service"] = "unhealthy" health_status.status = "degraded" except Exception as e: logger.warning(f"AI service health check failed: {e}") health_status.components["ai_service"] = "unhealthy" health_status.status = "degraded" return health_status @app.get("/") async def root(): """Root endpoint.""" return {"message": "Welcome to Agentic AI Chatbot API"} # if __name__ == "__main__": # import uvicorn # uvicorn.run(app, host="0.0.0.0", port=8000)