Spaces:
Sleeping
Sleeping
| # ChatbotSystem/app.py | |
| from fastapi import FastAPI, HTTPException, Body, BackgroundTasks | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from pydantic import BaseModel, Field | |
| from typing import List, Optional, Dict, Any | |
| import uuid | |
| import asyncio | |
| from datetime import datetime | |
| import logging | |
| from contextlib import asynccontextmanager | |
| import os | |
| import chatbot_manager as cm | |
| # from dotenv import load_dotenv | |
| # # Load environment variables from .env file | |
| # load_dotenv() | |
| # --- NEW: Explicitly get the API Key from environment variables --- | |
| GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY") | |
| if not GOOGLE_API_KEY: | |
| # This will raise a clear error if the secret is not set in Hugging Face | |
| raise ValueError("GOOGLE_API_KEY secret not found in environment variables.") | |
| # ----------------------------------------------------------------- | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| # Lifespan manager for startup/shutdown events | |
| async def lifespan(app: FastAPI): | |
| # Startup | |
| logger.info("Starting Enhanced Chatbot API Server...") | |
| yield | |
| # Shutdown | |
| logger.info("Shutting down Enhanced Chatbot API Server...") | |
| app = FastAPI( | |
| title="Enhanced Chatbot Management System API", | |
| description="Advanced API for conversational AI chatbots with memory, natural interactions, and session management.", | |
| version="2.0.0", | |
| lifespan=lifespan | |
| ) | |
| # Enhanced CORS configuration | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], # Configure as needed for production | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # --- Enhanced Pydantic Models --- | |
| class LoadModelRequest(BaseModel): | |
| chatbot_name: str = Field(..., description="Name of the chatbot to load") | |
| version: Optional[str] = Field("latest", description="Version of the chatbot") | |
| class LoadModelResponse(BaseModel): | |
| status: str | |
| message: str | |
| model_id: str | |
| chatbot_info: Optional[Dict[str, Any]] = None | |
| class ChatRequest(BaseModel): | |
| model_id: str = Field(..., description="The ID of the loaded model, e.g., 'SBBURA_v2'") | |
| query: str = Field(..., description="User's message/query") | |
| session_id: Optional[str] = Field(None, description="Session ID for conversation continuity") | |
| user_context: Optional[Dict[str, Any]] = Field(None, description="Additional user context") | |
| class Source(BaseModel): | |
| source: str | |
| snippet: str | |
| class ChatResponse(BaseModel): | |
| response: str | |
| sources: List[Source] = [] | |
| confidence: float | |
| response_time: str | |
| session_id: str | |
| response_type: str | |
| conversation_length: int | |
| metadata: Optional[Dict[str, Any]] = None | |
| class ChatbotInfo(BaseModel): | |
| name: str | |
| versions: List[str] | |
| latest: str | |
| created: str | |
| last_updated: str | |
| description: Optional[str] = None | |
| capabilities: Optional[List[str]] = None | |
| class ChatbotsListResponse(BaseModel): | |
| chatbots: List[ChatbotInfo] | |
| total_count: int | |
| class SessionSummaryResponse(BaseModel): | |
| session_id: str | |
| message_count: int | |
| user_context: Dict[str, Any] | |
| last_interaction: Optional[str] | |
| conversation_started: Optional[str] | |
| chatbot_name: Optional[str] = None | |
| class ConversationExportResponse(BaseModel): | |
| session_id: str | |
| user_context: Dict[str, Any] | |
| messages: List[Dict[str, Any]] | |
| exported_at: str | |
| total_messages: int | |
| class SessionClearResponse(BaseModel): | |
| status: str | |
| message: str | |
| session_id: str | |
| class HealthCheckResponse(BaseModel): | |
| status: str | |
| timestamp: str | |
| version: str | |
| uptime: str | |
| active_sessions: int | |
| class ErrorResponse(BaseModel): | |
| error: str | |
| detail: Optional[str] = None | |
| timestamp: str | |
| session_id: Optional[str] = None | |
| # --- Global Variables --- | |
| app_start_time = datetime.now() | |
| # --- Utility Functions --- | |
| def generate_session_id() -> str: | |
| """Generate a unique session ID.""" | |
| return str(uuid.uuid4()) | |
| def get_chatbot_info_enhanced(chatbot_name: str) -> Dict[str, Any]: | |
| """Get enhanced chatbot information.""" | |
| config = cm.get_chatbot_config(chatbot_name) | |
| if not config: | |
| return {} | |
| # Add enhanced information | |
| enhanced_info = { | |
| "description": config.get("description", f"AI assistant specialized in {chatbot_name}"), | |
| "capabilities": [ | |
| "Natural conversation", | |
| "Memory across sessions", | |
| "Context-aware responses", | |
| "Greeting handling", | |
| "Follow-up questions", | |
| "Domain-specific knowledge" | |
| ], | |
| "model_info": { | |
| "llm_model": cm.LLM_MODEL, | |
| "embedding_model": cm.EMBEDDING_MODEL | |
| } | |
| } | |
| return enhanced_info | |
| # --- API Endpoints --- | |
| async def root(): | |
| """Root endpoint with API information.""" | |
| return { | |
| "message": "Enhanced Chatbot Management System API", | |
| "version": "2.0.0", | |
| "documentation": "/docs", | |
| "health_check": "/health" | |
| } | |
| async def health_check(): | |
| """Health check endpoint.""" | |
| uptime = datetime.now() - app_start_time | |
| uptime_str = str(uptime).split('.')[0] # Remove microseconds | |
| # Count active sessions | |
| active_sessions = len(cm.conversation_memory) | |
| return { | |
| "status": "healthy", | |
| "timestamp": datetime.now().isoformat(), | |
| "version": "2.0.0", | |
| "uptime": uptime_str, | |
| "active_sessions": active_sessions | |
| } | |
| async def get_available_chatbots(): | |
| """ | |
| Retrieves a list of all available chatbots with enhanced information. | |
| """ | |
| try: | |
| chatbots_data = cm.get_available_chatbots() | |
| # Enhance chatbot information | |
| enhanced_chatbots = [] | |
| for chatbot in chatbots_data: | |
| enhanced_info = get_chatbot_info_enhanced(chatbot["name"]) | |
| chatbot.update(enhanced_info) | |
| enhanced_chatbots.append(chatbot) | |
| return { | |
| "chatbots": enhanced_chatbots, | |
| "total_count": len(enhanced_chatbots) | |
| } | |
| except Exception as e: | |
| logger.error(f"Error retrieving chatbots: {str(e)}") | |
| raise HTTPException( | |
| status_code=500, | |
| detail=f"Error retrieving chatbots: {str(e)}" | |
| ) | |
| async def load_model(request: LoadModelRequest): | |
| """ | |
| Validates if a chatbot version exists and returns a model_id for use in the chat API. | |
| Enhanced with additional chatbot information. | |
| """ | |
| try: | |
| config = cm.get_chatbot_config(request.chatbot_name) | |
| if not config: | |
| raise FileNotFoundError(f"Chatbot '{request.chatbot_name}' not found") | |
| version = request.version | |
| if version == "latest": | |
| version = config.get('latest_version') | |
| if not version or version not in config.get('versions', []): | |
| raise FileNotFoundError(f"Version '{version}' not found for chatbot '{request.chatbot_name}'") | |
| model_id = f"{request.chatbot_name}_{version}" | |
| # Get enhanced chatbot information | |
| enhanced_info = get_chatbot_info_enhanced(request.chatbot_name) | |
| return { | |
| "status": "success", | |
| "message": f"{request.chatbot_name} {version} is ready for conversation.", | |
| "model_id": model_id, | |
| "chatbot_info": enhanced_info | |
| } | |
| except FileNotFoundError as e: | |
| raise HTTPException( | |
| status_code=404, | |
| detail=str(e) | |
| ) | |
| except Exception as e: | |
| logger.error(f"Error loading model: {str(e)}") | |
| raise HTTPException( | |
| status_code=500, | |
| detail=f"Error loading model: {str(e)}" | |
| ) | |
| async def chat(request: ChatRequest): | |
| """ | |
| Enhanced chat endpoint with session management, memory, and natural conversation flow. | |
| """ | |
| try: | |
| # Parse model_id | |
| parts = request.model_id.split('_') | |
| if len(parts) != 2: | |
| raise HTTPException( | |
| status_code=400, | |
| detail="Invalid model_id format. Expected 'name_version'." | |
| ) | |
| chatbot_name, version = parts[0], parts[1] | |
| # Generate session_id if not provided | |
| session_id = request.session_id or generate_session_id() | |
| # Validate query | |
| if not request.query or not request.query.strip(): | |
| raise HTTPException( | |
| status_code=400, | |
| detail="Query cannot be empty" | |
| ) | |
| # Get enhanced chatbot response | |
| result = await cm.get_chatbot_response( | |
| chatbot_name=chatbot_name, | |
| version=version, | |
| query=request.query.strip(), | |
| session_id=session_id | |
| ) | |
| # Add additional metadata | |
| result["metadata"] = { | |
| "chatbot_name": chatbot_name, | |
| "version": version, | |
| "query_length": len(request.query), | |
| "timestamp": datetime.now().isoformat() | |
| } | |
| return result | |
| except HTTPException: | |
| raise | |
| except Exception as e: | |
| logger.error(f"Error in chat endpoint: {str(e)}") | |
| error_response = { | |
| "response": "I apologize, but I'm experiencing some technical difficulties. Please try again in a moment.", | |
| "sources": [], | |
| "confidence": 0.1, | |
| "response_time": "0.00s", | |
| "session_id": request.session_id or generate_session_id(), | |
| "response_type": "error", | |
| "conversation_length": 0, | |
| "metadata": { | |
| "error": str(e), | |
| "timestamp": datetime.now().isoformat() | |
| } | |
| } | |
| return error_response | |
| async def get_session_summary(session_id: str): | |
| """ | |
| Get conversation summary for a specific session. | |
| """ | |
| try: | |
| summary = cm.get_conversation_summary(session_id) | |
| return summary | |
| except Exception as e: | |
| logger.error(f"Error getting session summary: {str(e)}") | |
| raise HTTPException( | |
| status_code=500, | |
| detail=f"Error retrieving session summary: {str(e)}" | |
| ) | |
| async def export_conversation(session_id: str): | |
| """ | |
| Export conversation history for a specific session. | |
| """ | |
| try: | |
| conversation_data = cm.export_conversation(session_id) | |
| conversation_data["total_messages"] = len(conversation_data.get("messages", [])) | |
| return conversation_data | |
| except Exception as e: | |
| logger.error(f"Error exporting conversation: {str(e)}") | |
| raise HTTPException( | |
| status_code=500, | |
| detail=f"Error exporting conversation: {str(e)}" | |
| ) | |
| async def clear_session(session_id: str): | |
| """ | |
| Clear conversation history for a specific session. | |
| """ | |
| try: | |
| cm.clear_conversation_history(session_id) | |
| return { | |
| "status": "success", | |
| "message": "Conversation history cleared successfully", | |
| "session_id": session_id | |
| } | |
| except Exception as e: | |
| logger.error(f"Error clearing session: {str(e)}") | |
| raise HTTPException( | |
| status_code=500, | |
| detail=f"Error clearing session: {str(e)}" | |
| ) | |
| async def get_active_sessions(): | |
| """ | |
| Get list of active sessions. | |
| """ | |
| try: | |
| active_sessions = [] | |
| for session_id in cm.conversation_memory.keys(): | |
| summary = cm.get_conversation_summary(session_id) | |
| active_sessions.append(summary) | |
| return { | |
| "active_sessions": active_sessions, | |
| "total_count": len(active_sessions) | |
| } | |
| except Exception as e: | |
| logger.error(f"Error getting active sessions: {str(e)}") | |
| raise HTTPException( | |
| status_code=500, | |
| detail=f"Error retrieving active sessions: {str(e)}" | |
| ) | |
| async def cleanup_old_sessions(background_tasks: BackgroundTasks): | |
| """ | |
| Clean up old inactive sessions (background task). | |
| """ | |
| try: | |
| def cleanup_task(): | |
| # This would implement cleanup logic for old sessions | |
| # For now, we'll just log the action | |
| logger.info("Session cleanup task executed") | |
| background_tasks.add_task(cleanup_task) | |
| return { | |
| "status": "success", | |
| "message": "Session cleanup initiated" | |
| } | |
| except Exception as e: | |
| logger.error(f"Error initiating cleanup: {str(e)}") | |
| raise HTTPException( | |
| status_code=500, | |
| detail=f"Error initiating session cleanup: {str(e)}" | |
| ) | |
| # Fixed greeting endpoint in app.py | |
| # @app.post("/api/chat/greeting", response_model=ChatResponse, tags=["Chat"]) | |
| # async def quick_greeting(chatbot_name: str = Body(...), session_id: Optional[str] = Body(None)): | |
| # """ | |
| # Quick greeting endpoint for initializing conversations. | |
| # """ | |
| # try: | |
| # session_id = session_id or generate_session_id() | |
| # # Use the enhanced greeting system | |
| # conversation_manager = cm.ConversationManager(session_id) | |
| # user_context = conversation_manager.get_user_context() | |
| # greeting_response = cm.GreetingHandler.get_greeting_response( | |
| # "hello", chatbot_name, user_context | |
| # ) | |
| # conversation_manager.add_message("system", "greeting_initiated") | |
| # conversation_manager.add_message("assistant", greeting_response) | |
| # # Return a proper ChatResponse object with all required fields | |
| # return ChatResponse( | |
| # response=greeting_response, | |
| # sources=[], | |
| # confidence=1.0, | |
| # response_time="0.00s", | |
| # session_id=session_id, | |
| # response_type="greeting", | |
| # conversation_length=len(conversation_manager.get_conversation_history()), | |
| # metadata={ | |
| # "chatbot_name": chatbot_name, | |
| # "version": "v2", | |
| # "query_length": len("hello"), | |
| # "timestamp": datetime.now().isoformat() | |
| # } | |
| # ) | |
| # except Exception as e: | |
| # logger.error(f"Error in greeting endpoint: {str(e)}") | |
| # raise HTTPException( | |
| # status_code=500, | |
| # detail=f"Error generating greeting: {str(e)}" | |
| # ) | |
| # Updated greeting endpoint in app.py - Replace the existing one | |
| async def quick_greeting(chatbot_name: str = Body(...), session_id: Optional[str] = Body(None)): | |
| """ | |
| Quick greeting endpoint for initializing conversations with natural responses. | |
| """ | |
| try: | |
| session_id = session_id or generate_session_id() | |
| # Initialize conversation manager | |
| conversation_manager = cm.ConversationManager(session_id) | |
| # Get conversation history to determine if this is a repeat greeting | |
| history = conversation_manager.get_conversation_history() | |
| # Generate natural greeting responses | |
| if len(history) == 0: | |
| # First interaction | |
| greeting_responses = [ | |
| "Hello! How can I help you today?", | |
| "Hi there! What would you like to know?", | |
| "Hey! What can I assist you with?", | |
| "Hello! I'm here to help. What's your question?" | |
| ] | |
| else: | |
| # Subsequent interactions - more casual | |
| greeting_responses = [ | |
| "Hi again! What else can I help you with?", | |
| "Hello! What's your next question?", | |
| "Hey! How can I assist you further?", | |
| "What else would you like to know?" | |
| ] | |
| import random | |
| greeting_response = random.choice(greeting_responses) | |
| # Add to conversation history | |
| conversation_manager.add_message("user", "hello") | |
| conversation_manager.add_message("assistant", greeting_response) | |
| # Return ChatResponse | |
| return ChatResponse( | |
| response=greeting_response, | |
| sources=[], | |
| confidence=1.0, | |
| response_time="0.01s", | |
| session_id=session_id, | |
| response_type="greeting", | |
| conversation_length=len(conversation_manager.get_conversation_history()), | |
| metadata={ | |
| "chatbot_name": chatbot_name, | |
| "version": "v2", | |
| "query_length": 5, | |
| "timestamp": datetime.now().isoformat(), | |
| "interaction_count": len(history) + 1 | |
| } | |
| ) | |
| except Exception as e: | |
| logger.error(f"Error in greeting endpoint: {str(e)}") | |
| raise HTTPException( | |
| status_code=500, | |
| detail=f"Error generating greeting: {str(e)}" | |
| ) | |
| async def update_user_context( | |
| session_id: str = Body(...), | |
| context_updates: Dict[str, Any] = Body(...) | |
| ): | |
| """ | |
| Update user context for a session. | |
| """ | |
| try: | |
| conversation_manager = cm.ConversationManager(session_id) | |
| for key, value in context_updates.items(): | |
| conversation_manager.set_user_context(key, value) | |
| return { | |
| "status": "success", | |
| "message": "User context updated", | |
| "session_id": session_id, | |
| "updated_context": conversation_manager.get_user_context() | |
| } | |
| except Exception as e: | |
| logger.error(f"Error updating context: {str(e)}") | |
| raise HTTPException( | |
| status_code=500, | |
| detail=f"Error updating user context: {str(e)}" | |
| ) | |
| # --- Error Handlers --- | |
| async def http_exception_handler(request, exc): | |
| """Custom HTTP exception handler.""" | |
| return { | |
| "error": exc.detail, | |
| "status_code": exc.status_code, | |
| "timestamp": datetime.now().isoformat() | |
| } | |
| async def general_exception_handler(request, exc): | |
| """General exception handler.""" | |
| logger.error(f"Unhandled exception: {str(exc)}") | |
| return { | |
| "error": "Internal server error", | |
| "detail": str(exc), | |
| "timestamp": datetime.now().isoformat() | |
| } | |
| # --- Startup Configuration --- | |
| if __name__ == "__main__": | |
| import uvicorn | |
| # Enhanced server configuration | |
| config = { | |
| "host": "0.0.0.0", | |
| "port": 8000, | |
| "log_level": "info", | |
| "reload": True, # Set to False in production | |
| "access_log": True, | |
| "use_colors": True, | |
| } | |
| logger.info(f"Starting Enhanced Chatbot API Server on {config['host']}:{config['port']}") | |
| uvicorn.run(app, **config) | |