from flask import Flask, jsonify, request from flask_cors import CORS import logging from datetime import datetime, timedelta import json from typing import Dict, Any, List, Optional import threading import time from .dashboard_database_manager import DashboardDatabaseManager class DashboardAPI: """API layer for dashboard integration""" def __init__(self, database_manager: DashboardDatabaseManager, port: int = 5001): self.database_manager = database_manager self.port = port self.app = Flask(__name__) CORS(self.app) # Enable CORS for all routes # Configure logging logging.basicConfig(level=logging.INFO) self.logger = logging.getLogger(__name__) # Setup routes self._setup_routes() # Background thread for API server self.api_thread = None self.running = False def _setup_routes(self): """Setup API routes for dashboard integration""" @self.app.route('/api/health', methods=['GET']) def health_check(): """Health check endpoint""" return jsonify({ 'status': 'healthy', 'timestamp': datetime.now().isoformat(), 'service': 'SmartHeal Bot API' }) @self.app.route('/api/bot/analytics', methods=['GET']) def get_bot_analytics(): """Get comprehensive bot analytics for dashboard""" try: analytics_data = self.database_manager.get_analytics_data() # Add trend data for charts analytics_data.update(self._get_trend_data()) return jsonify({ 'success': True, 'data': analytics_data, 'timestamp': datetime.now().isoformat() }) except Exception as e: self.logger.error(f"Error getting bot analytics: {e}") return jsonify({ 'success': False, 'error': str(e), 'timestamp': datetime.now().isoformat() }), 500 @self.app.route('/api/bot/analytics/details/', methods=['GET']) def get_analysis_details(analysis_id): """Get detailed analysis information""" try: query = "SELECT * FROM ai_analyses WHERE id = %s" analysis = self.database_manager.execute_query_one(query, (analysis_id,)) if not analysis: return jsonify({ 'success': False, 'error': 'Analysis not found' }), 404 # Convert datetime objects to strings for JSON serialization if analysis.get('created_at'): analysis['created_at'] = analysis['created_at'].isoformat() return jsonify({ 'success': True, 'data': analysis, 'timestamp': datetime.now().isoformat() }) except Exception as e: self.logger.error(f"Error getting analysis details: {e}") return jsonify({ 'success': False, 'error': str(e) }), 500 @self.app.route('/api/bot/interactions', methods=['GET']) def get_bot_interactions(): """Get bot interaction history""" try: limit = request.args.get('limit', 50, type=int) interactions = self.database_manager.get_interaction_history(limit) # Convert datetime objects for JSON serialization for interaction in interactions: if interaction.get('interacted_at'): interaction['interacted_at'] = interaction['interacted_at'].isoformat() return jsonify({ 'success': True, 'data': interactions, 'count': len(interactions), 'timestamp': datetime.now().isoformat() }) except Exception as e: self.logger.error(f"Error getting bot interactions: {e}") return jsonify({ 'success': False, 'error': str(e) }), 500 @self.app.route('/api/bot/sessions', methods=['GET']) def get_session_analytics(): """Get session analytics""" try: session_data = self.database_manager.get_session_analytics() return jsonify({ 'success': True, 'data': session_data, 'timestamp': datetime.now().isoformat() }) except Exception as e: self.logger.error(f"Error getting session analytics: {e}") return jsonify({ 'success': False, 'error': str(e) }), 500 @self.app.route('/api/bot/stats/summary', methods=['GET']) def get_summary_stats(): """Get summary statistics for dashboard widgets""" try: # Get basic counts total_analyses = self.database_manager.execute_query_one("SELECT COUNT(*) as count FROM ai_analyses") total_patients = self.database_manager.execute_query_one("SELECT COUNT(DISTINCT patient_id) as count FROM bot_interactions WHERE patient_id IS NOT NULL") total_sessions = self.database_manager.execute_query_one("SELECT COUNT(*) as count FROM analysis_sessions") # Get today's activity today_analyses = self.database_manager.execute_query_one(""" SELECT COUNT(*) as count FROM ai_analyses WHERE DATE(created_at) = CURDATE() """) # Get average metrics avg_processing_time = self.database_manager.execute_query_one(""" SELECT AVG(processing_time) as avg_time FROM ai_analyses WHERE processing_time IS NOT NULL """) avg_risk_score = self.database_manager.execute_query_one(""" SELECT AVG(risk_score) as avg_risk FROM ai_analyses WHERE risk_score IS NOT NULL """) summary = { 'total_analyses': total_analyses['count'] if total_analyses else 0, 'total_patients': total_patients['count'] if total_patients else 0, 'total_sessions': total_sessions['count'] if total_sessions else 0, 'today_analyses': today_analyses['count'] if today_analyses else 0, 'avg_processing_time': round(avg_processing_time['avg_time'], 2) if avg_processing_time and avg_processing_time['avg_time'] else 0, 'avg_risk_score': round(avg_risk_score['avg_risk'], 1) if avg_risk_score and avg_risk_score['avg_risk'] else 0 } return jsonify({ 'success': True, 'data': summary, 'timestamp': datetime.now().isoformat() }) except Exception as e: self.logger.error(f"Error getting summary stats: {e}") return jsonify({ 'success': False, 'error': str(e) }), 500 @self.app.route('/api/bot/models/performance', methods=['GET']) def get_model_performance(): """Get AI model performance metrics""" try: performance_data = self.database_manager.execute_query(""" SELECT model_version, COUNT(*) as total_analyses, AVG(processing_time) as avg_processing_time, AVG(risk_score) as avg_risk_score, MIN(created_at) as first_used, MAX(created_at) as last_used FROM ai_analyses WHERE model_version IS NOT NULL GROUP BY model_version ORDER BY last_used DESC """, fetch=True) # Convert datetime objects for JSON serialization for model in performance_data: if model.get('first_used'): model['first_used'] = model['first_used'].isoformat() if model.get('last_used'): model['last_used'] = model['last_used'].isoformat() if model.get('avg_processing_time'): model['avg_processing_time'] = round(model['avg_processing_time'], 2) if model.get('avg_risk_score'): model['avg_risk_score'] = round(model['avg_risk_score'], 1) return jsonify({ 'success': True, 'data': performance_data or [], 'timestamp': datetime.now().isoformat() }) except Exception as e: self.logger.error(f"Error getting model performance: {e}") return jsonify({ 'success': False, 'error': str(e) }), 500 def _get_trend_data(self) -> Dict[str, Any]: """Get trend data for dashboard charts""" try: # Get last 30 days of analysis data trend_data = self.database_manager.execute_query(""" SELECT DATE(created_at) as analysis_date, COUNT(*) as count FROM ai_analyses WHERE created_at >= DATE_SUB(CURDATE(), INTERVAL 30 DAY) GROUP BY DATE(created_at) ORDER BY analysis_date """, fetch=True) # Prepare data for Chart.js labels = [] data = [] if trend_data: for row in trend_data: labels.append(row['analysis_date'].strftime('%Y-%m-%d')) data.append(row['count']) # Get risk level distribution risk_distribution = self.database_manager.execute_query(""" SELECT risk_level, COUNT(*) as count FROM ai_analyses GROUP BY risk_level """, fetch=True) risk_labels = [] risk_data = [] if risk_distribution: for row in risk_distribution: risk_labels.append(row['risk_level']) risk_data.append(row['count']) # Get processing time distribution processing_time_data = self.database_manager.execute_query(""" SELECT CASE WHEN processing_time < 1 THEN '< 1s' WHEN processing_time < 2 THEN '1-2s' WHEN processing_time < 5 THEN '2-5s' WHEN processing_time < 10 THEN '5-10s' ELSE '> 10s' END as time_range, COUNT(*) as count FROM ai_analyses WHERE processing_time IS NOT NULL GROUP BY time_range ORDER BY CASE WHEN processing_time < 1 THEN 1 WHEN processing_time < 2 THEN 2 WHEN processing_time < 5 THEN 3 WHEN processing_time < 10 THEN 4 ELSE 5 END """, fetch=True) processing_labels = [] processing_data = [] if processing_time_data: for row in processing_time_data: processing_labels.append(row['time_range']) processing_data.append(row['count']) return { 'trend_labels': labels, 'trend_data': data, 'risk_level_labels': risk_labels, 'risk_level_data': risk_data, 'processing_time_labels': processing_labels, 'processing_time_data': processing_data } except Exception as e: self.logger.error(f"Error getting trend data: {e}") return { 'trend_labels': [], 'trend_data': [], 'risk_level_labels': [], 'risk_level_data': [], 'processing_time_labels': [], 'processing_time_data': [] } def start_api_server(self): """Start the API server in a background thread""" if self.running: self.logger.warning("API server is already running") return def run_server(): try: self.logger.info(f"Starting SmartHeal Bot API server on port {self.port}") self.app.run(host='0.0.0.0', port=self.port, debug=False, threaded=True) except Exception as e: self.logger.error(f"Error starting API server: {e}") self.running = True self.api_thread = threading.Thread(target=run_server, daemon=True) self.api_thread.start() # Give the server a moment to start time.sleep(1) self.logger.info(f"✅ SmartHeal Bot API server started on http://0.0.0.0:{self.port}") def stop_api_server(self): """Stop the API server""" self.running = False if self.api_thread and self.api_thread.is_alive(): self.logger.info("Stopping SmartHeal Bot API server") # Note: Flask development server doesn't have a clean shutdown method # In production, you would use a proper WSGI server like Gunicorn def is_running(self) -> bool: """Check if the API server is running""" return self.running and self.api_thread and self.api_thread.is_alive() class DashboardIntegrationManager: """Manager class for dashboard integration functionality""" def __init__(self, database_manager: DashboardDatabaseManager): self.database_manager = database_manager self.api = DashboardAPI(database_manager) self.logger = logging.getLogger(__name__) def start_integration(self): """Start dashboard integration services""" try: self.api.start_api_server() self.logger.info("✅ Dashboard integration started successfully") except Exception as e: self.logger.error(f"❌ Failed to start dashboard integration: {e}") def stop_integration(self): """Stop dashboard integration services""" try: self.api.stop_api_server() self.logger.info("✅ Dashboard integration stopped") except Exception as e: self.logger.error(f"❌ Error stopping dashboard integration: {e}") def log_analysis_session(self, session_data: Dict[str, Any]) -> Optional[int]: """Log an analysis session for dashboard tracking""" try: session_id = self.database_manager.save_analysis_session(session_data) if session_id: self.logger.info(f"✅ Analysis session logged with ID: {session_id}") return session_id except Exception as e: self.logger.error(f"❌ Error logging analysis session: {e}") return None def log_bot_interaction(self, interaction_data: Dict[str, Any]) -> Optional[int]: """Log a bot interaction for dashboard tracking""" try: interaction_id = self.database_manager.save_bot_interaction(interaction_data) if interaction_id: self.logger.info(f"✅ Bot interaction logged with ID: {interaction_id}") return interaction_id except Exception as e: self.logger.error(f"❌ Error logging bot interaction: {e}") return None def get_integration_status(self) -> Dict[str, Any]: """Get the status of dashboard integration""" return { 'api_running': self.api.is_running(), 'database_connected': self.database_manager.get_connection() is not None, 'timestamp': datetime.now().isoformat() }