Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| Production-Ready Gradio Chat Interface | |
| This is the main application file for production deployment of the | |
| learning conversational travel chat system. | |
| Features: | |
| - Environment-specific configuration | |
| - Comprehensive error handling | |
| - Rate limiting and session management | |
| - Security measures and input validation | |
| - Production-ready logging and monitoring | |
| Author: AI Assistant | |
| Date: 2024 | |
| """ | |
| import os | |
| import sys | |
| import logging | |
| import asyncio | |
| from typing import Dict, List, Any, Optional, Tuple | |
| import gradio as gr | |
| from datetime import datetime | |
| import json | |
| # Ensure the app directory itself is importable (for HF runtime quirks) | |
| APP_DIR = os.path.dirname(__file__) | |
| if APP_DIR not in sys.path: | |
| sys.path.insert(0, APP_DIR) | |
| # Add the src directory to the path | |
| sys.path.insert(0, os.path.join(os.path.dirname(__file__), 'src')) | |
| # Import configuration and security systems | |
| from config.settings import get_config, is_production, is_development | |
| # Initialize a basic logger early so it's available during import fallbacks | |
| logger = logging.getLogger(__name__) | |
| # Try to import security systems, with fallback for Hugging Face deployment | |
| try: | |
| from security.error_handler import ( | |
| InputValidator, RateLimiter, SessionManager, ErrorHandler, | |
| handle_errors, validate_input, rate_limit, require_session, | |
| ValidationError, RateLimitError, AuthenticationError | |
| ) | |
| SECURITY_AVAILABLE = True | |
| except ImportError as e: | |
| logger.warning(f"Security modules not available: {e}") | |
| logger.warning("Running in Hugging Face compatibility mode") | |
| SECURITY_AVAILABLE = False | |
| # Create fallback classes for Hugging Face deployment | |
| class InputValidator: | |
| def __init__(self, max_length=1000): | |
| self.max_length = max_length | |
| def _truncate(self, text: str) -> str: | |
| return (text or "")[: self.max_length] | |
| def validate(self, text: str) -> str: | |
| return self._truncate(text) | |
| def validate_user_id(self, user_id: str) -> str: | |
| return self._truncate(user_id) | |
| def validate_message(self, message: str) -> str: | |
| return self._truncate(message) | |
| def validate_input(self, value: str, _name: str = "") -> str: | |
| return self._truncate(value) | |
| class RateLimiter: | |
| def __init__(self, requests_per_minute=60, requests_per_hour=1000): | |
| self.requests_per_minute = requests_per_minute | |
| self.requests_per_hour = requests_per_hour | |
| def check_rate_limit(self, user_id: str) -> bool: | |
| return True | |
| def get_remaining_requests(self, _user_id: str) -> dict: | |
| return { | |
| "per_minute_remaining": self.requests_per_minute, | |
| "per_hour_remaining": self.requests_per_hour, | |
| } | |
| class SessionManager: | |
| def __init__(self, session_timeout_minutes=30): | |
| self.session_timeout_minutes = session_timeout_minutes | |
| self.sessions: Dict[str, dict] = {} | |
| def create_session(self, user_id: str, data: Optional[dict] = None) -> str: | |
| session_id = f"session_{user_id}_{datetime.now().timestamp()}" | |
| self.sessions[session_id] = { | |
| "user_id": user_id, | |
| "created_at": datetime.now(), | |
| "last_activity": datetime.now(), | |
| "data": data or {}, | |
| } | |
| return session_id | |
| def validate_session(self, session_id: str) -> Optional[dict]: | |
| return self.sessions.get(session_id) | |
| def destroy_session(self, session_id: str) -> None: | |
| self.sessions.pop(session_id, None) | |
| class ErrorHandler: | |
| def __init__(self): | |
| self._errors: List[dict] = [] | |
| def handle_error(self, error, context: str = ""): | |
| logger.error(f"Error: {error} - Context: {context}") | |
| self._errors.append({ | |
| "error": str(error), | |
| "context": context, | |
| "time": datetime.now().isoformat(), | |
| }) | |
| return "An error occurred. Please try again." | |
| def get_error_statistics(self) -> dict: | |
| return {"total_errors": len(self._errors)} | |
| # Fallback decorators | |
| def handle_errors(func): | |
| def wrapper(*args, **kwargs): | |
| try: | |
| return func(*args, **kwargs) | |
| except Exception as e: | |
| logger.error(f"Error in {func.__name__}: {e}") | |
| return "An error occurred. Please try again." | |
| return wrapper | |
| def validate_input(func): | |
| return func | |
| def rate_limit(func): | |
| return func | |
| def require_session(func): | |
| return func | |
| # Fallback exceptions | |
| class ValidationError(Exception): | |
| pass | |
| class RateLimitError(Exception): | |
| pass | |
| class AuthenticationError(Exception): | |
| pass | |
| # Import the real API chat system for Hugging Face deployment (robust fallback) | |
| try: | |
| from conversational_travel_chat_real_apis import RealAPIConversationalTravelChat | |
| except ModuleNotFoundError as e: | |
| logger.warning(f"Primary import failed: {e}. sys.path={sys.path}") | |
| # Attempt dynamic import via importlib as a fallback | |
| try: | |
| import importlib.util | |
| module_path = os.path.join(APP_DIR, 'conversational_travel_chat_real_apis.py') | |
| spec = importlib.util.spec_from_file_location('conversational_travel_chat_real_apis', module_path) | |
| module = importlib.util.module_from_spec(spec) | |
| assert spec and spec.loader | |
| spec.loader.exec_module(module) | |
| RealAPIConversationalTravelChat = getattr(module, 'RealAPIConversationalTravelChat') | |
| logger.info("Loaded RealAPIConversationalTravelChat via dynamic import") | |
| except Exception as inner_e: | |
| logger.error(f"Failed to load conversational_travel_chat_real_apis: {inner_e}") | |
| raise | |
| # Set up logging | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', | |
| handlers=[ | |
| logging.StreamHandler(), | |
| logging.FileHandler('app.log') | |
| ] | |
| ) | |
| logger = logging.getLogger(__name__) | |
| class ProductionChatApp: | |
| """ | |
| Production-ready chat application with security and monitoring | |
| """ | |
| def __init__(self): | |
| # Load configuration | |
| self.config = get_config() | |
| # Initialize security systems (with fallback for Hugging Face) | |
| if SECURITY_AVAILABLE: | |
| self.input_validator = InputValidator(max_length=self.config.security.max_input_length) | |
| self.rate_limiter = RateLimiter( | |
| requests_per_minute=self.config.api.rate_limit_per_minute, | |
| requests_per_hour=self.config.api.rate_limit_per_hour | |
| ) | |
| self.session_manager = SessionManager( | |
| session_timeout_minutes=self.config.security.session_timeout_minutes | |
| ) | |
| self.error_handler = ErrorHandler() | |
| else: | |
| # Use fallback security systems for Hugging Face | |
| self.input_validator = InputValidator(max_length=1000) | |
| self.rate_limiter = RateLimiter(requests_per_minute=60, requests_per_hour=1000) | |
| self.session_manager = SessionManager(session_timeout_minutes=30) | |
| self.error_handler = ErrorHandler() | |
| # Initialize real API chat system for Hugging Face | |
| self.real_api_chat = RealAPIConversationalTravelChat() | |
| # Application state | |
| self.active_sessions = {} | |
| self.request_count = 0 | |
| # Check for required API keys for Hugging Face deployment | |
| self._check_api_keys() | |
| logger.info(f"Production chat app initialized in {self.config.deployment.environment.value} mode") | |
| def _check_api_keys(self): | |
| """Check for required API keys for Hugging Face deployment""" | |
| required_vars = ['SERPAPI_API_KEY', 'TAVILY_API_KEY'] | |
| missing_vars = [var for var in required_vars if not os.getenv(var)] | |
| if missing_vars: | |
| logger.warning(f"Missing environment variables: {missing_vars}") | |
| logger.warning("The app will run with limited functionality.") | |
| else: | |
| logger.info("All required API keys are configured") | |
| async def start_conversation(self, user_id: str, message: str, session_id: Optional[str] = None) -> Tuple[str, str, str]: | |
| """Start or continue a conversation with security checks""" | |
| # Validate inputs | |
| user_id = self.input_validator.validate_user_id(user_id) | |
| message = self.input_validator.validate_message(message) | |
| # Check rate limiting | |
| if not self.rate_limiter.check_rate_limit(user_id): | |
| raise RateLimitError("Rate limit exceeded. Please wait before making another request.") | |
| # Handle session management | |
| if session_id: | |
| session_data = self.session_manager.validate_session(session_id) | |
| if not session_data: | |
| raise AuthenticationError("Invalid or expired session") | |
| else: | |
| session_id = self.session_manager.create_session(user_id, { | |
| "start_time": datetime.now().isoformat(), | |
| "message_count": 0 | |
| }) | |
| # Update session data | |
| session_data = self.session_manager.sessions[session_id] | |
| session_data["data"]["message_count"] = session_data["data"].get("message_count", 0) + 1 | |
| session_data["last_activity"] = datetime.now() | |
| # Process conversation with real API chat | |
| try: | |
| response = await self.real_api_chat.process_message(user_id, message) | |
| # Update request count | |
| self.request_count += 1 | |
| # Log successful interaction | |
| logger.info(f"Conversation processed for user {user_id}, session {session_id}") | |
| return response, session_id, "success" | |
| except Exception as e: | |
| logger.error(f"Error processing conversation for user {user_id}: {e}") | |
| raise | |
| def collect_feedback(self, user_id: str, session_id: str, rating: int, comments: str = "") -> str: | |
| """Collect user feedback with validation""" | |
| # Validate inputs | |
| user_id = self.input_validator.validate_user_id(user_id) | |
| session_id = self.input_validator.validate_input(session_id, "session_id") | |
| # Validate rating | |
| if not isinstance(rating, int) or rating < 1 or rating > 5: | |
| raise ValidationError("Rating must be an integer between 1 and 5") | |
| # Validate comments if provided | |
| if comments: | |
| comments = self.input_validator.validate_input(comments, "comments") | |
| # Validate session | |
| session_data = self.session_manager.validate_session(session_id) | |
| if not session_data: | |
| raise AuthenticationError("Invalid or expired session") | |
| # Collect feedback | |
| self.learning_chat.collect_feedback(user_id, rating, comments) | |
| logger.info(f"Feedback collected from user {user_id}: {rating}/5") | |
| return f"Thank you for your feedback! Rating: {rating}/5" | |
| def end_conversation(self, user_id: str, session_id: str, completion_status: str = "completed") -> str: | |
| """End user conversation""" | |
| # Validate inputs | |
| user_id = self.input_validator.validate_user_id(user_id) | |
| session_id = self.input_validator.validate_input(session_id, "session_id") | |
| # Validate completion status | |
| valid_statuses = ["completed", "abandoned", "in_progress"] | |
| if completion_status not in valid_statuses: | |
| raise ValidationError(f"Completion status must be one of: {valid_statuses}") | |
| # Validate session | |
| session_data = self.session_manager.validate_session(session_id) | |
| if not session_data: | |
| raise AuthenticationError("Invalid or expired session") | |
| # End conversation | |
| self.learning_chat.end_conversation(user_id, completion_status) | |
| # Destroy session | |
| self.session_manager.destroy_session(session_id) | |
| logger.info(f"Conversation ended for user {user_id} with status: {completion_status}") | |
| return f"Session ended successfully. Status: {completion_status}" | |
| def get_analytics(self, admin_key: str) -> str: | |
| """Get system analytics (admin only)""" | |
| # Simple admin authentication (in production, use proper auth) | |
| if admin_key != os.getenv('ADMIN_KEY', 'admin123'): | |
| raise AuthenticationError("Invalid admin key") | |
| # Get analytics | |
| insights = self.learning_chat.get_learning_insights() | |
| # Format for display | |
| analytics_data = { | |
| "timestamp": datetime.now().isoformat(), | |
| "request_count": self.request_count, | |
| "active_sessions": len(self.session_manager.sessions), | |
| "rate_limit_stats": self.rate_limiter.get_remaining_requests("system"), | |
| "error_stats": self.error_handler.get_error_statistics(), | |
| "learning_insights": insights | |
| } | |
| return json.dumps(analytics_data, indent=2, default=str) | |
| def health_check(self) -> Dict[str, Any]: | |
| """Health check endpoint""" | |
| return { | |
| "status": "healthy", | |
| "timestamp": datetime.now().isoformat(), | |
| "environment": self.config.deployment.environment.value, | |
| "version": "1.0.0", | |
| "uptime": "active" | |
| } | |
| def create_interface(self) -> gr.Blocks: | |
| """Create the Gradio interface""" | |
| # Custom CSS for production styling | |
| custom_css = """ | |
| .gradio-container { | |
| max-width: 1200px !important; | |
| margin: 0 auto !important; | |
| } | |
| .chat-message { | |
| padding: 10px; | |
| margin: 5px 0; | |
| border-radius: 10px; | |
| } | |
| .user-message { | |
| background-color: #e3f2fd; | |
| text-align: right; | |
| } | |
| .bot-message { | |
| background-color: #f5f5f5; | |
| text-align: left; | |
| } | |
| .error-message { | |
| background-color: #ffebee; | |
| color: #c62828; | |
| border: 1px solid #ef5350; | |
| } | |
| .success-message { | |
| background-color: #e8f5e8; | |
| color: #2e7d32; | |
| border: 1px solid #4caf50; | |
| } | |
| #send-button { | |
| background: linear-gradient(45deg, #4CAF50, #45a049) !important; | |
| border: none !important; | |
| color: white !important; | |
| font-weight: bold !important; | |
| transition: all 0.3s ease !important; | |
| } | |
| #send-button:hover { | |
| background: linear-gradient(45deg, #45a049, #4CAF50) !important; | |
| transform: translateY(-2px) !important; | |
| box-shadow: 0 4px 8px rgba(0,0,0,0.2) !important; | |
| } | |
| .textbox textarea { | |
| border: 2px solid #e0e0e0 !important; | |
| border-radius: 8px !important; | |
| transition: border-color 0.3s ease !important; | |
| } | |
| .textbox textarea:focus { | |
| border-color: #4CAF50 !important; | |
| outline: none !important; | |
| box-shadow: 0 0 0 3px rgba(76, 175, 80, 0.1) !important; | |
| } | |
| .textbox textarea { | |
| resize: vertical !important; | |
| } | |
| """ | |
| with gr.Blocks( | |
| title="AI Travel Assistant", | |
| css=custom_css, | |
| theme=gr.themes.Soft() if is_development() else gr.themes.Default() | |
| ) as interface: | |
| gr.Markdown(""" | |
| # π§ AI Travel Assistant | |
| Your intelligent travel planning companion that learns from every interaction | |
| to provide increasingly personalized experiences. | |
| **Features:** | |
| - π― Personalized recommendations based on your travel style | |
| - π Continuous learning from your preferences | |
| - π Secure and private conversations | |
| - π Real-time feedback and improvement | |
| """) | |
| with gr.Tab("π¬ Chat"): | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| # User input section | |
| user_id_input = gr.Textbox( | |
| label="User ID", | |
| placeholder="Enter your user ID (e.g., user_123)", | |
| value="user_123", | |
| interactive=True | |
| ) | |
| session_id_input = gr.Textbox( | |
| label="Session ID", | |
| placeholder="Session ID (auto-generated)", | |
| interactive=False, | |
| visible=False | |
| ) | |
| gr.Markdown("### Start Your Conversation") | |
| chatbot = gr.Chatbot( | |
| label="Conversation", | |
| height=400, | |
| show_label=False, | |
| container=True | |
| ) | |
| message_input = gr.Textbox( | |
| label="Message", | |
| placeholder="Tell me about your travel plans... (Press Enter to send)", | |
| lines=2, | |
| max_lines=4, | |
| show_copy_button=False, | |
| container=True, | |
| scale=4 | |
| ) | |
| with gr.Row(): | |
| send_btn = gr.Button("Send β", variant="primary", scale=1, elem_id="send-button") | |
| clear_btn = gr.Button("Clear", variant="secondary", scale=1) | |
| gr.Markdown("π‘ **Tip:** Press Enter to send your message quickly!") | |
| with gr.Column(scale=1): | |
| # Feedback and session management | |
| gr.Markdown("### π Feedback & Session") | |
| with gr.Row(): | |
| feedback_rating = gr.Slider( | |
| minimum=1, maximum=5, step=1, value=3, | |
| label="Rate your experience (1-5)" | |
| ) | |
| feedback_comments = gr.Textbox( | |
| label="Comments (optional)", | |
| placeholder="Tell us how we can improve...", | |
| lines=3, | |
| max_lines=5 | |
| ) | |
| with gr.Row(): | |
| feedback_btn = gr.Button("Submit Feedback", variant="secondary") | |
| gr.Markdown("### π§ Session Management") | |
| completion_status = gr.Dropdown( | |
| choices=["completed", "abandoned", "in_progress"], | |
| value="completed", | |
| label="Session Status" | |
| ) | |
| end_session_btn = gr.Button("End Session", variant="stop") | |
| # Status display | |
| status_display = gr.Textbox( | |
| label="Status", | |
| interactive=False, | |
| lines=2 | |
| ) | |
| with gr.Tab("π Analytics"): | |
| gr.Markdown("### π System Analytics") | |
| admin_key_input = gr.Textbox( | |
| label="Admin Key", | |
| placeholder="Enter admin key to view analytics", | |
| type="password" | |
| ) | |
| analytics_btn = gr.Button("View Analytics", variant="primary") | |
| analytics_output = gr.JSON(label="Analytics Data") | |
| with gr.Tab("π§ Health Check"): | |
| gr.Markdown("### π₯ System Health") | |
| health_btn = gr.Button("Check Health", variant="primary") | |
| health_output = gr.JSON(label="Health Status") | |
| # Event handlers | |
| async def process_message(user_id, message, session_id, history): | |
| """Process user message with error handling""" | |
| try: | |
| if not message.strip(): | |
| return "", history, session_id, "Please enter a message." | |
| response, new_session_id, status = await self.start_conversation(user_id, message, session_id) | |
| # Update history | |
| history.append([message, response]) | |
| return "", history, new_session_id, f"β {status}" | |
| except Exception as e: | |
| error_msg = f"β Error: {str(e)}" | |
| return "", history, session_id, error_msg | |
| def clear_conversation(): | |
| """Clear conversation history""" | |
| return [], None, "Conversation cleared." | |
| def submit_feedback(user_id, session_id, rating, comments): | |
| """Submit user feedback""" | |
| try: | |
| if not session_id: | |
| return "β No active session. Please start a conversation first." | |
| result = self.collect_feedback(user_id, session_id, rating, comments) | |
| return f"β {result}" | |
| except Exception as e: | |
| return f"β Error: {str(e)}" | |
| def end_session(user_id, session_id, status): | |
| """End user session""" | |
| try: | |
| if not session_id: | |
| return "β No active session to end." | |
| result = self.end_conversation(user_id, session_id, status) | |
| return f"β {result}" | |
| except Exception as e: | |
| return f"β Error: {str(e)}" | |
| def view_analytics(admin_key): | |
| """View system analytics""" | |
| try: | |
| result = self.get_analytics(admin_key) | |
| return result | |
| except Exception as e: | |
| return f"β Error: {str(e)}" | |
| def check_health(): | |
| """Check system health""" | |
| try: | |
| result = self.health_check() | |
| return result | |
| except Exception as e: | |
| return {"status": "error", "message": str(e)} | |
| # Connect events - Enter key and Send button both trigger the same function | |
| def handle_message_submit(user_id, message, session_id, history): | |
| """Handle message submission from either Enter key or Send button""" | |
| print(f"π DEBUG: handle_message_submit called with message: '{message}'") | |
| if not message.strip(): | |
| print("π DEBUG: Empty message detected") | |
| return "", history, session_id, "Please enter a message." | |
| print(f"π DEBUG: Processing message: '{message}'") | |
| # Call the async process_message function and return its result | |
| import asyncio | |
| try: | |
| loop = asyncio.get_event_loop() | |
| result = loop.run_until_complete(process_message(user_id, message, session_id, history)) | |
| return result | |
| except RuntimeError: | |
| # If no event loop is running, create a new one | |
| result = asyncio.run(process_message(user_id, message, session_id, history)) | |
| return result | |
| # Send button click | |
| send_btn.click( | |
| handle_message_submit, | |
| inputs=[user_id_input, message_input, session_id_input, chatbot], | |
| outputs=[message_input, chatbot, session_id_input, status_display] | |
| ) | |
| # Enter key press in message input - Enhanced configuration | |
| message_input.submit( | |
| handle_message_submit, | |
| inputs=[user_id_input, message_input, session_id_input, chatbot], | |
| outputs=[message_input, chatbot, session_id_input, status_display], | |
| show_progress=False, | |
| queue=True | |
| ) | |
| clear_btn.click( | |
| clear_conversation, | |
| outputs=[chatbot, session_id_input, status_display] | |
| ) | |
| feedback_btn.click( | |
| submit_feedback, | |
| inputs=[user_id_input, session_id_input, feedback_rating, feedback_comments], | |
| outputs=[status_display] | |
| ) | |
| end_session_btn.click( | |
| end_session, | |
| inputs=[user_id_input, session_id_input, completion_status], | |
| outputs=[status_display] | |
| ) | |
| analytics_btn.click( | |
| view_analytics, | |
| inputs=[admin_key_input], | |
| outputs=[analytics_output] | |
| ) | |
| health_btn.click( | |
| check_health, | |
| outputs=[health_output] | |
| ) | |
| return interface | |
| def run(self): | |
| """Run the application""" | |
| interface = self.create_interface() | |
| # Detect Hugging Face Spaces environment | |
| running_in_hf_space = bool(os.getenv("SPACE_ID") or os.getenv("HF_SPACE_ID") or os.getenv("HF_HOME")) | |
| # Configure host/port for Spaces explicitly | |
| host = "0.0.0.0" if running_in_hf_space else self.config.deployment.host | |
| port = int(os.getenv("PORT", "7860")) if running_in_hf_space else self.config.deployment.port | |
| # Launch (avoid SSL params in Spaces; enable queue) | |
| launch_kwargs = dict( | |
| server_name=host, | |
| server_port=port, | |
| share=False, | |
| debug=self.config.deployment.debug_mode, | |
| show_error=True, | |
| quiet=not self.config.deployment.debug_mode, | |
| ) | |
| # Only pass SSL options outside Spaces | |
| if not running_in_hf_space: | |
| launch_kwargs.update( | |
| ssl_certfile=self.config.deployment.ssl_cert_path, | |
| ssl_keyfile=self.config.deployment.ssl_key_path, | |
| ssl_verify=False if is_development() else True, | |
| ) | |
| interface.queue().launch(**launch_kwargs) | |
| def main(): | |
| """Main application entry point""" | |
| try: | |
| # Initialize and run the application | |
| app = ProductionChatApp() | |
| logger.info("Starting production chat application...") | |
| logger.info(f"Environment: {app.config.deployment.environment.value}") | |
| logger.info(f"Host: {app.config.deployment.host}") | |
| logger.info(f"Port: {app.config.deployment.port}") | |
| logger.info(f"Debug mode: {app.config.deployment.debug_mode}") | |
| app.run() | |
| except Exception as e: | |
| logger.error(f"Failed to start application: {e}") | |
| sys.exit(1) | |
| if __name__ == "__main__": | |
| main() | |
| # Expose a global Gradio interface for Hugging Face Spaces autodiscovery | |
| try: | |
| if os.getenv("SPACE_ID") or os.getenv("HF_SPACE_ID") or os.getenv("HF_HOME"): | |
| _app_instance = ProductionChatApp() | |
| demo = _app_instance.create_interface() | |
| except Exception: | |
| # Do not fail module import on Spaces if lazy creation is desired | |
| pass | |