Spaces:
Running
Running
| # app/main.py | |
| # pylint: disable=unused-import | |
| import uuid | |
| import time | |
| from contextlib import asynccontextmanager | |
| from pathlib import Path | |
| from fastapi import FastAPI, HTTPException | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from fastapi.staticfiles import StaticFiles | |
| from pydantic import BaseModel | |
| # NOTE: Using absolute imports from project root (app.*) instead of relative imports | |
| # This ensures imports work consistently in both IDE and Docker environments | |
| from app.middleware.tenant_auth import TenantAuthMiddleware | |
| from app.utils.config import get_config, get_logger, get_version | |
| from app.models.nlp_engine import NLPEngine | |
| from app.models.conversation_manager import ConversationManager | |
| from app.models.database import ( | |
| SessionLocal, | |
| init_database, | |
| health_check as db_health_check, | |
| ) | |
| from app.models.smart_models import UserPreference, UserInsight, ConversationTopic | |
| from app.data.training_data import TRAINING_DATA | |
| from app.routers import knowledge_base | |
| from app.utils.seeding import seed_demo_tenant | |
| # Load configuration (this sets up logging automatically) | |
| config = get_config() | |
| logger = get_logger(__name__) | |
| # Train the mode on startup | |
| async def lifespan(fastapi_app: FastAPI): # pylint: disable=unused-argument | |
| """Application lifespan manager for startup and shutdown events""" | |
| # Startup | |
| logger.info("Starting application..") | |
| logger.info("CORS origins configured: %s", config.middleware["cors_origins"]) | |
| # Initialize database | |
| try: | |
| init_database() | |
| with SessionLocal() as session: | |
| seed_demo_tenant(session) | |
| if db_health_check(): | |
| logger.info("Database connection established successfully") | |
| else: | |
| logger.warning("Database health check failed, but continuing...") | |
| except Exception as e: # pylint: disable=broad-exception-caught | |
| logger.error("Database initialization failed: %s: %s", type(e).__name__, str(e)) | |
| logger.warning("Continuing without database - some features may not work...") | |
| # Train NLP model | |
| nlp_engine.train_intent_classifier(TRAINING_DATA) | |
| logger.info("NLP model training completed") | |
| logger.info("Application started successfully in %s mode", config.env.name) | |
| yield # App runs here | |
| # Shutdown (if needed) | |
| logger.info("Application shutting down...") | |
| app = FastAPI(title=config.api.title, debug=config.api.debug, lifespan=lifespan) | |
| app.include_router(knowledge_base.router) | |
| # Enable CORS | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=config.middleware["cors_origins"], | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # Add Tenant Auth | |
| app.add_middleware(TenantAuthMiddleware) | |
| # Initialize components | |
| nlp_engine = NLPEngine(config) | |
| conversation_manager = ConversationManager(config) | |
| # Request logging middleware | |
| if config.middleware["enable_request_logging"]: | |
| async def log_requests(request, call_next): | |
| logger.info("Request: %s %s", request.method, request.url) | |
| response = await call_next(request) | |
| logger.info("Response: %s", response.status_code) | |
| return response | |
| class ChatRequest(BaseModel): | |
| message: str | |
| session_id: str | None = None | |
| class ChatResponse(BaseModel): | |
| thinking: str | None = None | |
| response: str | |
| session_id: str | |
| intent: str | |
| confidence: float | |
| entities: dict | |
| debug_info: dict | None = None | |
| async def health_check(): | |
| logger.debug("Health check endpoint accessed") | |
| # Check database health | |
| db_status = "healthy" if db_health_check() else "unhealthy" | |
| return { | |
| "status": "healthy", | |
| "environment": config.env.name, | |
| "version": get_version(), # Read from pyproject.toml | |
| "database": db_status, | |
| "components": { | |
| "nlp_engine": "ready", | |
| "conversation_manager": "ready", | |
| "database": db_status, | |
| }, | |
| } | |
| async def chat(request: ChatRequest): | |
| logger.debug("Chat request received: %s ...", request.message[:50]) | |
| start_time = time.time() | |
| try: | |
| # Generate session ID if not provided | |
| session_id = request.session_id or str(uuid.uuid4()) | |
| # Detect language | |
| language = nlp_engine.detect_language(request.message) | |
| # Process message | |
| intent, confidence = nlp_engine.classify_intent(request.message) | |
| entities = nlp_engine.extract_entities(request.message, language) | |
| # Convert NumPy types to Python types | |
| intent = str(intent) | |
| confidence = float(confidence) | |
| logger.debug( | |
| "Processed - Language: %s, Intent: %s, Confidence: %.2f", | |
| language, | |
| intent, | |
| confidence, | |
| ) | |
| # Apply confidence threshold | |
| if confidence <= config.nlp["confidence_threshold"]: | |
| logger.info( | |
| "Low confidence (%.2f}) for intent %s, using fallback", | |
| confidence, | |
| intent, | |
| ) | |
| intent = "low_confidence" | |
| # Generate response (now with context awareness) | |
| thinking_block, response = conversation_manager.get_response( | |
| intent, language, request.message, session_id, entities | |
| ) | |
| # Calculate response time | |
| response_time_ms = int((time.time() - start_time) * 1000) | |
| # Save conversation (now to database) | |
| conversation_manager.save_conversation( | |
| session_id, | |
| request.message, | |
| response, | |
| intent, | |
| confidence, | |
| entities, | |
| language, | |
| response_time_ms, | |
| ) | |
| # Prepare response | |
| chat_response = ChatResponse( | |
| thinking=thinking_block, | |
| response=response, | |
| session_id=session_id, | |
| intent=intent, | |
| confidence=confidence, | |
| entities=entities, | |
| ) | |
| # Add debug info if enabled | |
| if config.nlp["enable_debug"]: | |
| chat_response.debug_info = { | |
| "language": language, | |
| "original_confidence": confidence, | |
| "threshold_applied": confidence < config.nlp["confidence_threshold"], | |
| "environment": config.env.name, | |
| "response_time_ms": response_time_ms, | |
| "database_enabled": db_health_check(), | |
| } | |
| logger.info("Chat response generated successfully for session %s", session_id) | |
| return chat_response | |
| except Exception as e: | |
| logger.error("Error processing chat request: %s", str(e), exc_info=True) | |
| raise HTTPException(status_code=500, detail="Internal server error") from e | |
| async def get_conversation_history(session_id: str): | |
| logger.debug("Analytics requested for session: %s", session_id) | |
| try: | |
| history = conversation_manager.get_conversation_history(session_id) | |
| if history: | |
| logger.info("Conversation history found for session %s", session_id) | |
| return { | |
| "session_id": session_id, | |
| "message_count": len(history), | |
| "messages": history, | |
| } | |
| else: | |
| logger.warning("No conversation history found for session %s", session_id) | |
| raise HTTPException(status_code=404, detail="Session not found") | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.error("Error retrieving conversation history: %s", str(e), exc_info=True) | |
| raise HTTPException(status_code=500, detail="Internal server error") from e | |
| async def get_user_stats(session_id: str): | |
| logger.debug("User stats requested for session: %s", session_id) | |
| try: | |
| stats = conversation_manager.get_user_stats(session_id) | |
| if stats: | |
| logger.info("User stats found for session %s", session_id) | |
| return stats | |
| else: | |
| logger.warning("No user stats found for session %s", session_id) | |
| raise HTTPException(status_code=404, detail="Session not found") | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.error("Error retrieving user stats: %s", str(e), exc_info=True) | |
| raise HTTPException(status_code=500, detail="Internal server error") from e | |
| # Serve frontend static files (HF Spaces has no separate nginx) | |
| # Must be mounted LAST so API routes above take precedence | |
| _frontend_dir = Path(__file__).parent.parent / "frontend" | |
| if _frontend_dir.exists(): | |
| app.mount( | |
| "/", StaticFiles(directory=str(_frontend_dir), html=True), name="frontend" | |
| ) | |