wanderlust.ai / app.py
BlakeL's picture
Upload app.py
8ffff15 verified
#!/usr/bin/env python3
"""
Production-Ready Gradio Chat Interface
This is the main application file for production deployment of the
learning conversational travel chat system.
Features:
- Environment-specific configuration
- Comprehensive error handling
- Rate limiting and session management
- Security measures and input validation
- Production-ready logging and monitoring
Author: AI Assistant
Date: 2024
"""
import os
import sys
import logging
import asyncio
from typing import Dict, List, Any, Optional, Tuple
import gradio as gr
from datetime import datetime
import json
# Ensure the app directory itself is importable (for HF runtime quirks)
APP_DIR = os.path.dirname(__file__)
if APP_DIR not in sys.path:
sys.path.insert(0, APP_DIR)
# Add the src directory to the path
sys.path.insert(0, os.path.join(os.path.dirname(__file__), 'src'))
# Import configuration and security systems
from config.settings import get_config, is_production, is_development
# Initialize a basic logger early so it's available during import fallbacks
logger = logging.getLogger(__name__)
# Try to import security systems, with fallback for Hugging Face deployment
try:
from security.error_handler import (
InputValidator, RateLimiter, SessionManager, ErrorHandler,
handle_errors, validate_input, rate_limit, require_session,
ValidationError, RateLimitError, AuthenticationError
)
SECURITY_AVAILABLE = True
except ImportError as e:
logger.warning(f"Security modules not available: {e}")
logger.warning("Running in Hugging Face compatibility mode")
SECURITY_AVAILABLE = False
# Create fallback classes for Hugging Face deployment
class InputValidator:
def __init__(self, max_length=1000):
self.max_length = max_length
def _truncate(self, text: str) -> str:
return (text or "")[: self.max_length]
def validate(self, text: str) -> str:
return self._truncate(text)
def validate_user_id(self, user_id: str) -> str:
return self._truncate(user_id)
def validate_message(self, message: str) -> str:
return self._truncate(message)
def validate_input(self, value: str, _name: str = "") -> str:
return self._truncate(value)
class RateLimiter:
def __init__(self, requests_per_minute=60, requests_per_hour=1000):
self.requests_per_minute = requests_per_minute
self.requests_per_hour = requests_per_hour
def check_rate_limit(self, user_id: str) -> bool:
return True
def get_remaining_requests(self, _user_id: str) -> dict:
return {
"per_minute_remaining": self.requests_per_minute,
"per_hour_remaining": self.requests_per_hour,
}
class SessionManager:
def __init__(self, session_timeout_minutes=30):
self.session_timeout_minutes = session_timeout_minutes
self.sessions: Dict[str, dict] = {}
def create_session(self, user_id: str, data: Optional[dict] = None) -> str:
session_id = f"session_{user_id}_{datetime.now().timestamp()}"
self.sessions[session_id] = {
"user_id": user_id,
"created_at": datetime.now(),
"last_activity": datetime.now(),
"data": data or {},
}
return session_id
def validate_session(self, session_id: str) -> Optional[dict]:
return self.sessions.get(session_id)
def destroy_session(self, session_id: str) -> None:
self.sessions.pop(session_id, None)
class ErrorHandler:
def __init__(self):
self._errors: List[dict] = []
def handle_error(self, error, context: str = ""):
logger.error(f"Error: {error} - Context: {context}")
self._errors.append({
"error": str(error),
"context": context,
"time": datetime.now().isoformat(),
})
return "An error occurred. Please try again."
def get_error_statistics(self) -> dict:
return {"total_errors": len(self._errors)}
# Fallback decorators
def handle_errors(func):
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except Exception as e:
logger.error(f"Error in {func.__name__}: {e}")
return "An error occurred. Please try again."
return wrapper
def validate_input(func):
return func
def rate_limit(func):
return func
def require_session(func):
return func
# Fallback exceptions
class ValidationError(Exception):
pass
class RateLimitError(Exception):
pass
class AuthenticationError(Exception):
pass
# Import the real API chat system for Hugging Face deployment (robust fallback)
try:
from conversational_travel_chat_real_apis import RealAPIConversationalTravelChat
except ModuleNotFoundError as e:
logger.warning(f"Primary import failed: {e}. sys.path={sys.path}")
# Attempt dynamic import via importlib as a fallback
try:
import importlib.util
module_path = os.path.join(APP_DIR, 'conversational_travel_chat_real_apis.py')
spec = importlib.util.spec_from_file_location('conversational_travel_chat_real_apis', module_path)
module = importlib.util.module_from_spec(spec)
assert spec and spec.loader
spec.loader.exec_module(module)
RealAPIConversationalTravelChat = getattr(module, 'RealAPIConversationalTravelChat')
logger.info("Loaded RealAPIConversationalTravelChat via dynamic import")
except Exception as inner_e:
logger.error(f"Failed to load conversational_travel_chat_real_apis: {inner_e}")
raise
# Set up logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(),
logging.FileHandler('app.log')
]
)
logger = logging.getLogger(__name__)
class ProductionChatApp:
"""
Production-ready chat application with security and monitoring
"""
def __init__(self):
# Load configuration
self.config = get_config()
# Initialize security systems (with fallback for Hugging Face)
if SECURITY_AVAILABLE:
self.input_validator = InputValidator(max_length=self.config.security.max_input_length)
self.rate_limiter = RateLimiter(
requests_per_minute=self.config.api.rate_limit_per_minute,
requests_per_hour=self.config.api.rate_limit_per_hour
)
self.session_manager = SessionManager(
session_timeout_minutes=self.config.security.session_timeout_minutes
)
self.error_handler = ErrorHandler()
else:
# Use fallback security systems for Hugging Face
self.input_validator = InputValidator(max_length=1000)
self.rate_limiter = RateLimiter(requests_per_minute=60, requests_per_hour=1000)
self.session_manager = SessionManager(session_timeout_minutes=30)
self.error_handler = ErrorHandler()
# Initialize real API chat system for Hugging Face
self.real_api_chat = RealAPIConversationalTravelChat()
# Application state
self.active_sessions = {}
self.request_count = 0
# Check for required API keys for Hugging Face deployment
self._check_api_keys()
logger.info(f"Production chat app initialized in {self.config.deployment.environment.value} mode")
def _check_api_keys(self):
"""Check for required API keys for Hugging Face deployment"""
required_vars = ['SERPAPI_API_KEY', 'TAVILY_API_KEY']
missing_vars = [var for var in required_vars if not os.getenv(var)]
if missing_vars:
logger.warning(f"Missing environment variables: {missing_vars}")
logger.warning("The app will run with limited functionality.")
else:
logger.info("All required API keys are configured")
@handle_errors
async def start_conversation(self, user_id: str, message: str, session_id: Optional[str] = None) -> Tuple[str, str, str]:
"""Start or continue a conversation with security checks"""
# Validate inputs
user_id = self.input_validator.validate_user_id(user_id)
message = self.input_validator.validate_message(message)
# Check rate limiting
if not self.rate_limiter.check_rate_limit(user_id):
raise RateLimitError("Rate limit exceeded. Please wait before making another request.")
# Handle session management
if session_id:
session_data = self.session_manager.validate_session(session_id)
if not session_data:
raise AuthenticationError("Invalid or expired session")
else:
session_id = self.session_manager.create_session(user_id, {
"start_time": datetime.now().isoformat(),
"message_count": 0
})
# Update session data
session_data = self.session_manager.sessions[session_id]
session_data["data"]["message_count"] = session_data["data"].get("message_count", 0) + 1
session_data["last_activity"] = datetime.now()
# Process conversation with real API chat
try:
response = await self.real_api_chat.process_message(user_id, message)
# Update request count
self.request_count += 1
# Log successful interaction
logger.info(f"Conversation processed for user {user_id}, session {session_id}")
return response, session_id, "success"
except Exception as e:
logger.error(f"Error processing conversation for user {user_id}: {e}")
raise
@handle_errors
def collect_feedback(self, user_id: str, session_id: str, rating: int, comments: str = "") -> str:
"""Collect user feedback with validation"""
# Validate inputs
user_id = self.input_validator.validate_user_id(user_id)
session_id = self.input_validator.validate_input(session_id, "session_id")
# Validate rating
if not isinstance(rating, int) or rating < 1 or rating > 5:
raise ValidationError("Rating must be an integer between 1 and 5")
# Validate comments if provided
if comments:
comments = self.input_validator.validate_input(comments, "comments")
# Validate session
session_data = self.session_manager.validate_session(session_id)
if not session_data:
raise AuthenticationError("Invalid or expired session")
# Collect feedback
self.learning_chat.collect_feedback(user_id, rating, comments)
logger.info(f"Feedback collected from user {user_id}: {rating}/5")
return f"Thank you for your feedback! Rating: {rating}/5"
@handle_errors
def end_conversation(self, user_id: str, session_id: str, completion_status: str = "completed") -> str:
"""End user conversation"""
# Validate inputs
user_id = self.input_validator.validate_user_id(user_id)
session_id = self.input_validator.validate_input(session_id, "session_id")
# Validate completion status
valid_statuses = ["completed", "abandoned", "in_progress"]
if completion_status not in valid_statuses:
raise ValidationError(f"Completion status must be one of: {valid_statuses}")
# Validate session
session_data = self.session_manager.validate_session(session_id)
if not session_data:
raise AuthenticationError("Invalid or expired session")
# End conversation
self.learning_chat.end_conversation(user_id, completion_status)
# Destroy session
self.session_manager.destroy_session(session_id)
logger.info(f"Conversation ended for user {user_id} with status: {completion_status}")
return f"Session ended successfully. Status: {completion_status}"
@handle_errors
def get_analytics(self, admin_key: str) -> str:
"""Get system analytics (admin only)"""
# Simple admin authentication (in production, use proper auth)
if admin_key != os.getenv('ADMIN_KEY', 'admin123'):
raise AuthenticationError("Invalid admin key")
# Get analytics
insights = self.learning_chat.get_learning_insights()
# Format for display
analytics_data = {
"timestamp": datetime.now().isoformat(),
"request_count": self.request_count,
"active_sessions": len(self.session_manager.sessions),
"rate_limit_stats": self.rate_limiter.get_remaining_requests("system"),
"error_stats": self.error_handler.get_error_statistics(),
"learning_insights": insights
}
return json.dumps(analytics_data, indent=2, default=str)
@handle_errors
def health_check(self) -> Dict[str, Any]:
"""Health check endpoint"""
return {
"status": "healthy",
"timestamp": datetime.now().isoformat(),
"environment": self.config.deployment.environment.value,
"version": "1.0.0",
"uptime": "active"
}
def create_interface(self) -> gr.Blocks:
"""Create the Gradio interface"""
# Custom CSS for production styling
custom_css = """
.gradio-container {
max-width: 1200px !important;
margin: 0 auto !important;
}
.chat-message {
padding: 10px;
margin: 5px 0;
border-radius: 10px;
}
.user-message {
background-color: #e3f2fd;
text-align: right;
}
.bot-message {
background-color: #f5f5f5;
text-align: left;
}
.error-message {
background-color: #ffebee;
color: #c62828;
border: 1px solid #ef5350;
}
.success-message {
background-color: #e8f5e8;
color: #2e7d32;
border: 1px solid #4caf50;
}
#send-button {
background: linear-gradient(45deg, #4CAF50, #45a049) !important;
border: none !important;
color: white !important;
font-weight: bold !important;
transition: all 0.3s ease !important;
}
#send-button:hover {
background: linear-gradient(45deg, #45a049, #4CAF50) !important;
transform: translateY(-2px) !important;
box-shadow: 0 4px 8px rgba(0,0,0,0.2) !important;
}
.textbox textarea {
border: 2px solid #e0e0e0 !important;
border-radius: 8px !important;
transition: border-color 0.3s ease !important;
}
.textbox textarea:focus {
border-color: #4CAF50 !important;
outline: none !important;
box-shadow: 0 0 0 3px rgba(76, 175, 80, 0.1) !important;
}
.textbox textarea {
resize: vertical !important;
}
"""
with gr.Blocks(
title="AI Travel Assistant",
css=custom_css,
theme=gr.themes.Soft() if is_development() else gr.themes.Default()
) as interface:
gr.Markdown("""
# 🧠 AI Travel Assistant
Your intelligent travel planning companion that learns from every interaction
to provide increasingly personalized experiences.
**Features:**
- 🎯 Personalized recommendations based on your travel style
- πŸ“Š Continuous learning from your preferences
- πŸ”’ Secure and private conversations
- πŸ“ˆ Real-time feedback and improvement
""")
with gr.Tab("πŸ’¬ Chat"):
with gr.Row():
with gr.Column(scale=1):
# User input section
user_id_input = gr.Textbox(
label="User ID",
placeholder="Enter your user ID (e.g., user_123)",
value="user_123",
interactive=True
)
session_id_input = gr.Textbox(
label="Session ID",
placeholder="Session ID (auto-generated)",
interactive=False,
visible=False
)
gr.Markdown("### Start Your Conversation")
chatbot = gr.Chatbot(
label="Conversation",
height=400,
show_label=False,
container=True
)
message_input = gr.Textbox(
label="Message",
placeholder="Tell me about your travel plans... (Press Enter to send)",
lines=2,
max_lines=4,
show_copy_button=False,
container=True,
scale=4
)
with gr.Row():
send_btn = gr.Button("Send ⏎", variant="primary", scale=1, elem_id="send-button")
clear_btn = gr.Button("Clear", variant="secondary", scale=1)
gr.Markdown("πŸ’‘ **Tip:** Press Enter to send your message quickly!")
with gr.Column(scale=1):
# Feedback and session management
gr.Markdown("### πŸ“Š Feedback & Session")
with gr.Row():
feedback_rating = gr.Slider(
minimum=1, maximum=5, step=1, value=3,
label="Rate your experience (1-5)"
)
feedback_comments = gr.Textbox(
label="Comments (optional)",
placeholder="Tell us how we can improve...",
lines=3,
max_lines=5
)
with gr.Row():
feedback_btn = gr.Button("Submit Feedback", variant="secondary")
gr.Markdown("### πŸ”§ Session Management")
completion_status = gr.Dropdown(
choices=["completed", "abandoned", "in_progress"],
value="completed",
label="Session Status"
)
end_session_btn = gr.Button("End Session", variant="stop")
# Status display
status_display = gr.Textbox(
label="Status",
interactive=False,
lines=2
)
with gr.Tab("πŸ“Š Analytics"):
gr.Markdown("### πŸ“ˆ System Analytics")
admin_key_input = gr.Textbox(
label="Admin Key",
placeholder="Enter admin key to view analytics",
type="password"
)
analytics_btn = gr.Button("View Analytics", variant="primary")
analytics_output = gr.JSON(label="Analytics Data")
with gr.Tab("πŸ”§ Health Check"):
gr.Markdown("### πŸ₯ System Health")
health_btn = gr.Button("Check Health", variant="primary")
health_output = gr.JSON(label="Health Status")
# Event handlers
async def process_message(user_id, message, session_id, history):
"""Process user message with error handling"""
try:
if not message.strip():
return "", history, session_id, "Please enter a message."
response, new_session_id, status = await self.start_conversation(user_id, message, session_id)
# Update history
history.append([message, response])
return "", history, new_session_id, f"βœ… {status}"
except Exception as e:
error_msg = f"❌ Error: {str(e)}"
return "", history, session_id, error_msg
def clear_conversation():
"""Clear conversation history"""
return [], None, "Conversation cleared."
def submit_feedback(user_id, session_id, rating, comments):
"""Submit user feedback"""
try:
if not session_id:
return "❌ No active session. Please start a conversation first."
result = self.collect_feedback(user_id, session_id, rating, comments)
return f"βœ… {result}"
except Exception as e:
return f"❌ Error: {str(e)}"
def end_session(user_id, session_id, status):
"""End user session"""
try:
if not session_id:
return "❌ No active session to end."
result = self.end_conversation(user_id, session_id, status)
return f"βœ… {result}"
except Exception as e:
return f"❌ Error: {str(e)}"
def view_analytics(admin_key):
"""View system analytics"""
try:
result = self.get_analytics(admin_key)
return result
except Exception as e:
return f"❌ Error: {str(e)}"
def check_health():
"""Check system health"""
try:
result = self.health_check()
return result
except Exception as e:
return {"status": "error", "message": str(e)}
# Connect events - Enter key and Send button both trigger the same function
def handle_message_submit(user_id, message, session_id, history):
"""Handle message submission from either Enter key or Send button"""
print(f"πŸ” DEBUG: handle_message_submit called with message: '{message}'")
if not message.strip():
print("πŸ” DEBUG: Empty message detected")
return "", history, session_id, "Please enter a message."
print(f"πŸ” DEBUG: Processing message: '{message}'")
# Call the async process_message function and return its result
import asyncio
try:
loop = asyncio.get_event_loop()
result = loop.run_until_complete(process_message(user_id, message, session_id, history))
return result
except RuntimeError:
# If no event loop is running, create a new one
result = asyncio.run(process_message(user_id, message, session_id, history))
return result
# Send button click
send_btn.click(
handle_message_submit,
inputs=[user_id_input, message_input, session_id_input, chatbot],
outputs=[message_input, chatbot, session_id_input, status_display]
)
# Enter key press in message input - Enhanced configuration
message_input.submit(
handle_message_submit,
inputs=[user_id_input, message_input, session_id_input, chatbot],
outputs=[message_input, chatbot, session_id_input, status_display],
show_progress=False,
queue=True
)
clear_btn.click(
clear_conversation,
outputs=[chatbot, session_id_input, status_display]
)
feedback_btn.click(
submit_feedback,
inputs=[user_id_input, session_id_input, feedback_rating, feedback_comments],
outputs=[status_display]
)
end_session_btn.click(
end_session,
inputs=[user_id_input, session_id_input, completion_status],
outputs=[status_display]
)
analytics_btn.click(
view_analytics,
inputs=[admin_key_input],
outputs=[analytics_output]
)
health_btn.click(
check_health,
outputs=[health_output]
)
return interface
def run(self):
"""Run the application"""
interface = self.create_interface()
# Detect Hugging Face Spaces environment
running_in_hf_space = bool(os.getenv("SPACE_ID") or os.getenv("HF_SPACE_ID") or os.getenv("HF_HOME"))
# Configure host/port for Spaces explicitly
host = "0.0.0.0" if running_in_hf_space else self.config.deployment.host
port = int(os.getenv("PORT", "7860")) if running_in_hf_space else self.config.deployment.port
# Launch (avoid SSL params in Spaces; enable queue)
launch_kwargs = dict(
server_name=host,
server_port=port,
share=False,
debug=self.config.deployment.debug_mode,
show_error=True,
quiet=not self.config.deployment.debug_mode,
)
# Only pass SSL options outside Spaces
if not running_in_hf_space:
launch_kwargs.update(
ssl_certfile=self.config.deployment.ssl_cert_path,
ssl_keyfile=self.config.deployment.ssl_key_path,
ssl_verify=False if is_development() else True,
)
interface.queue().launch(**launch_kwargs)
def main():
"""Main application entry point"""
try:
# Initialize and run the application
app = ProductionChatApp()
logger.info("Starting production chat application...")
logger.info(f"Environment: {app.config.deployment.environment.value}")
logger.info(f"Host: {app.config.deployment.host}")
logger.info(f"Port: {app.config.deployment.port}")
logger.info(f"Debug mode: {app.config.deployment.debug_mode}")
app.run()
except Exception as e:
logger.error(f"Failed to start application: {e}")
sys.exit(1)
if __name__ == "__main__":
main()
# Expose a global Gradio interface for Hugging Face Spaces autodiscovery
try:
if os.getenv("SPACE_ID") or os.getenv("HF_SPACE_ID") or os.getenv("HF_HOME"):
_app_instance = ProductionChatApp()
demo = _app_instance.create_interface()
except Exception:
# Do not fail module import on Spaces if lazy creation is desired
pass