CUSTOMCHATBOT / app.py
shakeel143's picture
Update app.py
8b315b8 verified
# ChatbotSystem/app.py
from fastapi import FastAPI, HTTPException, Body, BackgroundTasks
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, Field
from typing import List, Optional, Dict, Any
import uuid
import asyncio
from datetime import datetime
import logging
from contextlib import asynccontextmanager
import os
import chatbot_manager as cm
# from dotenv import load_dotenv
# # Load environment variables from .env file
# load_dotenv()
# --- NEW: Explicitly get the API Key from environment variables ---
GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY")
if not GOOGLE_API_KEY:
# This will raise a clear error if the secret is not set in Hugging Face
raise ValueError("GOOGLE_API_KEY secret not found in environment variables.")
# -----------------------------------------------------------------
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Lifespan manager for startup/shutdown events
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup
logger.info("Starting Enhanced Chatbot API Server...")
yield
# Shutdown
logger.info("Shutting down Enhanced Chatbot API Server...")
app = FastAPI(
title="Enhanced Chatbot Management System API",
description="Advanced API for conversational AI chatbots with memory, natural interactions, and session management.",
version="2.0.0",
lifespan=lifespan
)
# Enhanced CORS configuration
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # Configure as needed for production
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# --- Enhanced Pydantic Models ---
class LoadModelRequest(BaseModel):
chatbot_name: str = Field(..., description="Name of the chatbot to load")
version: Optional[str] = Field("latest", description="Version of the chatbot")
class LoadModelResponse(BaseModel):
status: str
message: str
model_id: str
chatbot_info: Optional[Dict[str, Any]] = None
class ChatRequest(BaseModel):
model_id: str = Field(..., description="The ID of the loaded model, e.g., 'SBBURA_v2'")
query: str = Field(..., description="User's message/query")
session_id: Optional[str] = Field(None, description="Session ID for conversation continuity")
user_context: Optional[Dict[str, Any]] = Field(None, description="Additional user context")
class Source(BaseModel):
source: str
snippet: str
class ChatResponse(BaseModel):
response: str
sources: List[Source] = []
confidence: float
response_time: str
session_id: str
response_type: str
conversation_length: int
metadata: Optional[Dict[str, Any]] = None
class ChatbotInfo(BaseModel):
name: str
versions: List[str]
latest: str
created: str
last_updated: str
description: Optional[str] = None
capabilities: Optional[List[str]] = None
class ChatbotsListResponse(BaseModel):
chatbots: List[ChatbotInfo]
total_count: int
class SessionSummaryResponse(BaseModel):
session_id: str
message_count: int
user_context: Dict[str, Any]
last_interaction: Optional[str]
conversation_started: Optional[str]
chatbot_name: Optional[str] = None
class ConversationExportResponse(BaseModel):
session_id: str
user_context: Dict[str, Any]
messages: List[Dict[str, Any]]
exported_at: str
total_messages: int
class SessionClearResponse(BaseModel):
status: str
message: str
session_id: str
class HealthCheckResponse(BaseModel):
status: str
timestamp: str
version: str
uptime: str
active_sessions: int
class ErrorResponse(BaseModel):
error: str
detail: Optional[str] = None
timestamp: str
session_id: Optional[str] = None
# --- Global Variables ---
app_start_time = datetime.now()
# --- Utility Functions ---
def generate_session_id() -> str:
"""Generate a unique session ID."""
return str(uuid.uuid4())
def get_chatbot_info_enhanced(chatbot_name: str) -> Dict[str, Any]:
"""Get enhanced chatbot information."""
config = cm.get_chatbot_config(chatbot_name)
if not config:
return {}
# Add enhanced information
enhanced_info = {
"description": config.get("description", f"AI assistant specialized in {chatbot_name}"),
"capabilities": [
"Natural conversation",
"Memory across sessions",
"Context-aware responses",
"Greeting handling",
"Follow-up questions",
"Domain-specific knowledge"
],
"model_info": {
"llm_model": cm.LLM_MODEL,
"embedding_model": cm.EMBEDDING_MODEL
}
}
return enhanced_info
# --- API Endpoints ---
@app.get("/", tags=["Root"])
async def root():
"""Root endpoint with API information."""
return {
"message": "Enhanced Chatbot Management System API",
"version": "2.0.0",
"documentation": "/docs",
"health_check": "/health"
}
@app.get("/health", response_model=HealthCheckResponse, tags=["Health"])
async def health_check():
"""Health check endpoint."""
uptime = datetime.now() - app_start_time
uptime_str = str(uptime).split('.')[0] # Remove microseconds
# Count active sessions
active_sessions = len(cm.conversation_memory)
return {
"status": "healthy",
"timestamp": datetime.now().isoformat(),
"version": "2.0.0",
"uptime": uptime_str,
"active_sessions": active_sessions
}
@app.get("/api/chatbots", response_model=ChatbotsListResponse, tags=["Management"])
async def get_available_chatbots():
"""
Retrieves a list of all available chatbots with enhanced information.
"""
try:
chatbots_data = cm.get_available_chatbots()
# Enhance chatbot information
enhanced_chatbots = []
for chatbot in chatbots_data:
enhanced_info = get_chatbot_info_enhanced(chatbot["name"])
chatbot.update(enhanced_info)
enhanced_chatbots.append(chatbot)
return {
"chatbots": enhanced_chatbots,
"total_count": len(enhanced_chatbots)
}
except Exception as e:
logger.error(f"Error retrieving chatbots: {str(e)}")
raise HTTPException(
status_code=500,
detail=f"Error retrieving chatbots: {str(e)}"
)
@app.post("/api/load_model", response_model=LoadModelResponse, tags=["Chat"])
async def load_model(request: LoadModelRequest):
"""
Validates if a chatbot version exists and returns a model_id for use in the chat API.
Enhanced with additional chatbot information.
"""
try:
config = cm.get_chatbot_config(request.chatbot_name)
if not config:
raise FileNotFoundError(f"Chatbot '{request.chatbot_name}' not found")
version = request.version
if version == "latest":
version = config.get('latest_version')
if not version or version not in config.get('versions', []):
raise FileNotFoundError(f"Version '{version}' not found for chatbot '{request.chatbot_name}'")
model_id = f"{request.chatbot_name}_{version}"
# Get enhanced chatbot information
enhanced_info = get_chatbot_info_enhanced(request.chatbot_name)
return {
"status": "success",
"message": f"{request.chatbot_name} {version} is ready for conversation.",
"model_id": model_id,
"chatbot_info": enhanced_info
}
except FileNotFoundError as e:
raise HTTPException(
status_code=404,
detail=str(e)
)
except Exception as e:
logger.error(f"Error loading model: {str(e)}")
raise HTTPException(
status_code=500,
detail=f"Error loading model: {str(e)}"
)
@app.post("/api/chat", response_model=ChatResponse, tags=["Chat"])
async def chat(request: ChatRequest):
"""
Enhanced chat endpoint with session management, memory, and natural conversation flow.
"""
try:
# Parse model_id
parts = request.model_id.split('_')
if len(parts) != 2:
raise HTTPException(
status_code=400,
detail="Invalid model_id format. Expected 'name_version'."
)
chatbot_name, version = parts[0], parts[1]
# Generate session_id if not provided
session_id = request.session_id or generate_session_id()
# Validate query
if not request.query or not request.query.strip():
raise HTTPException(
status_code=400,
detail="Query cannot be empty"
)
# Get enhanced chatbot response
result = await cm.get_chatbot_response(
chatbot_name=chatbot_name,
version=version,
query=request.query.strip(),
session_id=session_id
)
# Add additional metadata
result["metadata"] = {
"chatbot_name": chatbot_name,
"version": version,
"query_length": len(request.query),
"timestamp": datetime.now().isoformat()
}
return result
except HTTPException:
raise
except Exception as e:
logger.error(f"Error in chat endpoint: {str(e)}")
error_response = {
"response": "I apologize, but I'm experiencing some technical difficulties. Please try again in a moment.",
"sources": [],
"confidence": 0.1,
"response_time": "0.00s",
"session_id": request.session_id or generate_session_id(),
"response_type": "error",
"conversation_length": 0,
"metadata": {
"error": str(e),
"timestamp": datetime.now().isoformat()
}
}
return error_response
@app.get("/api/sessions/{session_id}/summary", response_model=SessionSummaryResponse, tags=["Session Management"])
async def get_session_summary(session_id: str):
"""
Get conversation summary for a specific session.
"""
try:
summary = cm.get_conversation_summary(session_id)
return summary
except Exception as e:
logger.error(f"Error getting session summary: {str(e)}")
raise HTTPException(
status_code=500,
detail=f"Error retrieving session summary: {str(e)}"
)
@app.get("/api/sessions/{session_id}/export", response_model=ConversationExportResponse, tags=["Session Management"])
async def export_conversation(session_id: str):
"""
Export conversation history for a specific session.
"""
try:
conversation_data = cm.export_conversation(session_id)
conversation_data["total_messages"] = len(conversation_data.get("messages", []))
return conversation_data
except Exception as e:
logger.error(f"Error exporting conversation: {str(e)}")
raise HTTPException(
status_code=500,
detail=f"Error exporting conversation: {str(e)}"
)
@app.post("/api/sessions/{session_id}/clear", response_model=SessionClearResponse, tags=["Session Management"])
async def clear_session(session_id: str):
"""
Clear conversation history for a specific session.
"""
try:
cm.clear_conversation_history(session_id)
return {
"status": "success",
"message": "Conversation history cleared successfully",
"session_id": session_id
}
except Exception as e:
logger.error(f"Error clearing session: {str(e)}")
raise HTTPException(
status_code=500,
detail=f"Error clearing session: {str(e)}"
)
@app.get("/api/sessions", tags=["Session Management"])
async def get_active_sessions():
"""
Get list of active sessions.
"""
try:
active_sessions = []
for session_id in cm.conversation_memory.keys():
summary = cm.get_conversation_summary(session_id)
active_sessions.append(summary)
return {
"active_sessions": active_sessions,
"total_count": len(active_sessions)
}
except Exception as e:
logger.error(f"Error getting active sessions: {str(e)}")
raise HTTPException(
status_code=500,
detail=f"Error retrieving active sessions: {str(e)}"
)
@app.delete("/api/sessions/cleanup", tags=["Session Management"])
async def cleanup_old_sessions(background_tasks: BackgroundTasks):
"""
Clean up old inactive sessions (background task).
"""
try:
def cleanup_task():
# This would implement cleanup logic for old sessions
# For now, we'll just log the action
logger.info("Session cleanup task executed")
background_tasks.add_task(cleanup_task)
return {
"status": "success",
"message": "Session cleanup initiated"
}
except Exception as e:
logger.error(f"Error initiating cleanup: {str(e)}")
raise HTTPException(
status_code=500,
detail=f"Error initiating session cleanup: {str(e)}"
)
# Fixed greeting endpoint in app.py
# @app.post("/api/chat/greeting", response_model=ChatResponse, tags=["Chat"])
# async def quick_greeting(chatbot_name: str = Body(...), session_id: Optional[str] = Body(None)):
# """
# Quick greeting endpoint for initializing conversations.
# """
# try:
# session_id = session_id or generate_session_id()
# # Use the enhanced greeting system
# conversation_manager = cm.ConversationManager(session_id)
# user_context = conversation_manager.get_user_context()
# greeting_response = cm.GreetingHandler.get_greeting_response(
# "hello", chatbot_name, user_context
# )
# conversation_manager.add_message("system", "greeting_initiated")
# conversation_manager.add_message("assistant", greeting_response)
# # Return a proper ChatResponse object with all required fields
# return ChatResponse(
# response=greeting_response,
# sources=[],
# confidence=1.0,
# response_time="0.00s",
# session_id=session_id,
# response_type="greeting",
# conversation_length=len(conversation_manager.get_conversation_history()),
# metadata={
# "chatbot_name": chatbot_name,
# "version": "v2",
# "query_length": len("hello"),
# "timestamp": datetime.now().isoformat()
# }
# )
# except Exception as e:
# logger.error(f"Error in greeting endpoint: {str(e)}")
# raise HTTPException(
# status_code=500,
# detail=f"Error generating greeting: {str(e)}"
# )
# Updated greeting endpoint in app.py - Replace the existing one
@app.post("/api/chat/greeting", response_model=ChatResponse, tags=["Chat"])
async def quick_greeting(chatbot_name: str = Body(...), session_id: Optional[str] = Body(None)):
"""
Quick greeting endpoint for initializing conversations with natural responses.
"""
try:
session_id = session_id or generate_session_id()
# Initialize conversation manager
conversation_manager = cm.ConversationManager(session_id)
# Get conversation history to determine if this is a repeat greeting
history = conversation_manager.get_conversation_history()
# Generate natural greeting responses
if len(history) == 0:
# First interaction
greeting_responses = [
"Hello! How can I help you today?",
"Hi there! What would you like to know?",
"Hey! What can I assist you with?",
"Hello! I'm here to help. What's your question?"
]
else:
# Subsequent interactions - more casual
greeting_responses = [
"Hi again! What else can I help you with?",
"Hello! What's your next question?",
"Hey! How can I assist you further?",
"What else would you like to know?"
]
import random
greeting_response = random.choice(greeting_responses)
# Add to conversation history
conversation_manager.add_message("user", "hello")
conversation_manager.add_message("assistant", greeting_response)
# Return ChatResponse
return ChatResponse(
response=greeting_response,
sources=[],
confidence=1.0,
response_time="0.01s",
session_id=session_id,
response_type="greeting",
conversation_length=len(conversation_manager.get_conversation_history()),
metadata={
"chatbot_name": chatbot_name,
"version": "v2",
"query_length": 5,
"timestamp": datetime.now().isoformat(),
"interaction_count": len(history) + 1
}
)
except Exception as e:
logger.error(f"Error in greeting endpoint: {str(e)}")
raise HTTPException(
status_code=500,
detail=f"Error generating greeting: {str(e)}"
)
@app.post("/api/chat/context", tags=["Chat"])
async def update_user_context(
session_id: str = Body(...),
context_updates: Dict[str, Any] = Body(...)
):
"""
Update user context for a session.
"""
try:
conversation_manager = cm.ConversationManager(session_id)
for key, value in context_updates.items():
conversation_manager.set_user_context(key, value)
return {
"status": "success",
"message": "User context updated",
"session_id": session_id,
"updated_context": conversation_manager.get_user_context()
}
except Exception as e:
logger.error(f"Error updating context: {str(e)}")
raise HTTPException(
status_code=500,
detail=f"Error updating user context: {str(e)}"
)
# --- Error Handlers ---
@app.exception_handler(HTTPException)
async def http_exception_handler(request, exc):
"""Custom HTTP exception handler."""
return {
"error": exc.detail,
"status_code": exc.status_code,
"timestamp": datetime.now().isoformat()
}
@app.exception_handler(Exception)
async def general_exception_handler(request, exc):
"""General exception handler."""
logger.error(f"Unhandled exception: {str(exc)}")
return {
"error": "Internal server error",
"detail": str(exc),
"timestamp": datetime.now().isoformat()
}
# --- Startup Configuration ---
if __name__ == "__main__":
import uvicorn
# Enhanced server configuration
config = {
"host": "0.0.0.0",
"port": 8000,
"log_level": "info",
"reload": True, # Set to False in production
"access_log": True,
"use_colors": True,
}
logger.info(f"Starting Enhanced Chatbot API Server on {config['host']}:{config['port']}")
uvicorn.run(app, **config)