import mysql.connector from mysql.connector import Error import logging from datetime import datetime import json import uuid import os from PIL import Image class DashboardDatabaseManager: """Fixed database operations manager for SmartHeal application with dashboard integration""" def __init__(self, mysql_config): """Initialize database manager with MySQL configuration""" self.mysql_config = mysql_config self.test_connection() def test_connection(self): """Test database connection""" try: connection = self.get_connection() if connection: connection.close() logging.info("✅ Database connection successful") else: logging.error("❌ Database connection failed") except Exception as e: logging.error(f"Database connection test failed: {e}") def get_connection(self): """Get a database connection""" try: connection = mysql.connector.connect(**self.mysql_config) return connection except Error as e: logging.error(f"Error connecting to MySQL: {e}") return None def execute_query(self, query, params=None, fetch=False): """Execute a query and return results if fetch=True""" connection = self.get_connection() if not connection: return None cursor = None try: cursor = connection.cursor(dictionary=True) cursor.execute(query, params or ()) if fetch: result = cursor.fetchall() else: connection.commit() result = cursor.rowcount return result except Error as e: logging.error(f"Error executing query: {e}") if connection: connection.rollback() return None finally: if cursor: cursor.close() if connection and connection.is_connected(): connection.close() def execute_query_one(self, query, params=None): """Execute a query and return one result""" connection = self.get_connection() if not connection: return None cursor = None try: cursor = connection.cursor(dictionary=True) cursor.execute(query, params or ()) result = cursor.fetchone() return result except Error as e: logging.error(f"Error executing query: {e}") return None finally: if cursor: cursor.close() if connection and connection.is_connected(): connection.close() def save_questionnaire_response(self, questionnaire_data, user_id): """Save questionnaire response to database""" connection = None cursor = None try: connection = self.get_connection() if not connection: return None cursor = connection.cursor() # Create or get patient patient_id = self._create_or_get_patient(cursor, questionnaire_data) if not patient_id: raise Exception("Failed to get or create patient") # Get or create default questionnaire questionnaire_id = self._get_or_create_default_questionnaire(cursor) if not questionnaire_id: raise Exception("Failed to get or create questionnaire") # Prepare response data JSON response_data = { 'patient_info': { 'name': questionnaire_data.get('patient_name', ''), 'age': questionnaire_data.get('patient_age', 0), 'gender': questionnaire_data.get('patient_gender', '') }, 'wound_details': { 'location': questionnaire_data.get('wound_location', ''), 'duration': questionnaire_data.get('wound_duration', ''), 'pain_level': questionnaire_data.get('pain_level', 0), 'moisture_level': questionnaire_data.get('moisture_level', ''), 'infection_signs': questionnaire_data.get('infection_signs', ''), 'diabetic_status': questionnaire_data.get('diabetic_status', '') }, 'medical_history': { 'previous_treatment': questionnaire_data.get('previous_treatment', ''), 'medical_history': questionnaire_data.get('medical_history', ''), 'medications': questionnaire_data.get('medications', ''), 'allergies': questionnaire_data.get('allergies', ''), 'additional_notes': questionnaire_data.get('additional_notes', '') } } # Insert into questionnaire_responses insert_resp = """ INSERT INTO questionnaire_responses (questionnaire_id, patient_id, practitioner_id, response_data, submitted_at) VALUES (%s, %s, %s, %s, %s) """ cursor.execute(insert_resp, ( questionnaire_id, patient_id, user_id, json.dumps(response_data), datetime.now() )) response_id = cursor.lastrowid connection.commit() logging.info(f"✅ Saved questionnaire response ID {response_id}") return response_id except Exception as e: logging.error(f"❌ Error saving questionnaire: {e}") if connection: connection.rollback() return None finally: if cursor: cursor.close() if connection: connection.close() def _create_or_get_patient(self, cursor, questionnaire_data): """Create or get existing patient record""" try: # Check if patient exists select_query = """ SELECT id FROM patients WHERE name = %s AND age = %s AND gender = %s """ cursor.execute(select_query, ( questionnaire_data.get('patient_name', ''), questionnaire_data.get('patient_age', 0), questionnaire_data.get('patient_gender', '') )) existing_patient = cursor.fetchone() if existing_patient: return existing_patient[0] # Create new patient patient_uuid = str(uuid.uuid4()) insert_query = """ INSERT INTO patients ( uuid, name, age, gender, illness, allergy, notes, created_at ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s) """ cursor.execute(insert_query, ( patient_uuid, questionnaire_data.get('patient_name', ''), questionnaire_data.get('patient_age', 0), questionnaire_data.get('patient_gender', ''), questionnaire_data.get('medical_history', ''), questionnaire_data.get('allergies', ''), questionnaire_data.get('additional_notes', ''), datetime.now() )) return cursor.lastrowid except Exception as e: logging.error(f"Error creating/getting patient: {e}") return None def _get_or_create_default_questionnaire(self, cursor): """Get or create default questionnaire""" try: # Check if default questionnaire exists cursor.execute("SELECT id FROM questionnaires WHERE name = 'Default Patient Assessment' LIMIT 1") questionnaire_row = cursor.fetchone() if questionnaire_row: return questionnaire_row[0] # Create default questionnaire cursor.execute(""" INSERT INTO questionnaires (name, description, created_at) VALUES ('Default Patient Assessment', 'Standard patient wound assessment form', NOW()) """) return cursor.lastrowid except Exception as e: logging.error(f"Error getting/creating questionnaire: {e}") return None def save_wound_image(self, response_id, image, filename): """Save wound image to dataset and database with proper URL""" try: # Generate unique filename with timestamp timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') unique_id = str(uuid.uuid4())[:8] file_extension = os.path.splitext(filename)[1] or '.jpg' unique_filename = f"wound_{timestamp}_{unique_id}{file_extension}" # Create dataset directory structure dataset_dir = os.path.join("dataset", "wound_images") os.makedirs(dataset_dir, exist_ok=True) # Save image to dataset file_path = os.path.join(dataset_dir, unique_filename) if hasattr(image, 'save'): image.save(file_path, format='JPEG', quality=95) width, height = image.size file_size = os.path.getsize(file_path) else: # Handle numpy array or other formats from PIL import Image as PILImage if hasattr(image, 'shape'): pil_image = PILImage.fromarray(image) pil_image.save(file_path, format='JPEG', quality=95) width, height = pil_image.size file_size = os.path.getsize(file_path) else: raise ValueError("Unsupported image format") # Create URL for dashboard access image_url = f"/dataset/wound_images/{unique_filename}" # Save to database query = """ INSERT INTO wound_images ( questionnaire_response_id, image_url, original_filename, file_size, image_width, image_height, created_at ) VALUES (%s, %s, %s, %s, %s, %s, %s) """ params = ( response_id, image_url, filename, file_size, width, height, datetime.now() ) connection = self.get_connection() if not connection: return None try: cursor = connection.cursor() cursor.execute(query, params) connection.commit() image_id = cursor.lastrowid logging.info(f"✅ Image saved to dataset: {file_path}") logging.info(f"✅ Image URL: {image_url}") logging.info(f"✅ Database image ID: {image_id}") return image_id finally: cursor.close() connection.close() except Exception as e: logging.error(f"❌ Error saving wound image: {e}") return None def save_ai_analysis(self, analysis_data): """Save AI analysis results to database""" try: # Extract data from analysis_data response_id = analysis_data.get('questionnaire_id') # This is actually response_id image_id = analysis_data.get('image_id') visual_results = analysis_data.get('visual_results', {}) # Calculate risk level from risk score risk_score = analysis_data.get('risk_score', 0) if risk_score >= 70: risk_level = 'High' elif risk_score >= 40: risk_level = 'Moderate' else: risk_level = 'Low' # Prepare wound dimensions string length = visual_results.get('length_cm', 0) breadth = visual_results.get('breadth_cm', 0) area = visual_results.get('surface_area_cm2', 0) wound_dimensions = f"{length} × {breadth} cm (Area: {area} cm²)" query = """ INSERT INTO ai_analyses ( questionnaire_response_id, image_id, analysis_data, summary, recommendations, risk_score, risk_level, wound_type, wound_dimensions, processing_time, model_version, created_at ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) """ params = ( response_id, image_id, json.dumps(analysis_data.get('analysis_data', {})), analysis_data.get('summary', '')[:1000], # Limit summary length analysis_data.get('recommendations', ''), risk_score, risk_level, visual_results.get('wound_type', 'Unknown'), wound_dimensions, analysis_data.get('processing_time', 0), analysis_data.get('model_version', 'v1.0'), datetime.now() ) result = self.execute_query(query, params) if result is not None: # Get the last inserted ID connection = self.get_connection() if connection: cursor = connection.cursor() cursor.execute("SELECT LAST_INSERT_ID()") analysis_id = cursor.fetchone()[0] cursor.close() connection.close() logging.info(f"✅ AI analysis saved with ID: {analysis_id}") return analysis_id return None except Exception as e: logging.error(f"❌ Error saving AI analysis: {e}") return None def save_analysis_session(self, session_data): """Save analysis session data""" try: query = """ INSERT INTO analysis_sessions ( user_id, questionnaire_response_id, image_id, analysis_id, session_duration, created_at ) VALUES (%s, %s, %s, %s, %s, %s) """ params = ( session_data.get('user_id'), session_data.get('questionnaire_id'), # This is actually response_id session_data.get('image_id'), session_data.get('analysis_id'), session_data.get('session_duration', 0), datetime.now() ) result = self.execute_query(query, params) if result is not None: connection = self.get_connection() if connection: cursor = connection.cursor() cursor.execute("SELECT LAST_INSERT_ID()") session_id = cursor.fetchone()[0] cursor.close() connection.close() logging.info(f"✅ Analysis session saved with ID: {session_id}") return session_id return None except Exception as e: logging.error(f"❌ Error saving analysis session: {e}") return None def save_bot_interaction(self, interaction_data): """Save bot interaction data""" try: query = """ INSERT INTO bot_interactions ( patient_id, practitioner_id, input_text, output_text, wound_image_url, interaction_type, interacted_at ) VALUES (%s, %s, %s, %s, %s, %s, %s) """ params = ( interaction_data.get('patient_id'), interaction_data.get('practitioner_id'), interaction_data.get('input_text', ''), interaction_data.get('output_text', ''), interaction_data.get('wound_image_url', ''), interaction_data.get('interaction_type', 'wound_analysis'), datetime.now() ) result = self.execute_query(query, params) if result is not None: connection = self.get_connection() if connection: cursor = connection.cursor() cursor.execute("SELECT LAST_INSERT_ID()") interaction_id = cursor.fetchone()[0] cursor.close() connection.close() logging.info(f"✅ Bot interaction saved with ID: {interaction_id}") return interaction_id return None except Exception as e: logging.error(f"❌ Error saving bot interaction: {e}") return None def get_analytics_data(self): """Get comprehensive analytics data for dashboard""" try: analytics = {} # Total analyses result = self.execute_query_one("SELECT COUNT(*) as count FROM ai_analyses") analytics['total_analyses'] = result['count'] if result else 0 # Average processing time result = self.execute_query_one(""" SELECT AVG(processing_time) as avg_time FROM ai_analyses WHERE processing_time IS NOT NULL """) analytics['avg_processing_time'] = round(result['avg_time'], 2) if result and result['avg_time'] else 0 # High risk count result = self.execute_query_one(""" SELECT COUNT(*) as count FROM ai_analyses WHERE risk_level = 'High' """) analytics['high_risk_count'] = result['count'] if result else 0 # Average risk score result = self.execute_query_one(""" SELECT AVG(risk_score) as avg_risk FROM ai_analyses WHERE risk_score IS NOT NULL """) analytics['avg_risk_score'] = round(result['avg_risk'], 1) if result and result['avg_risk'] else 0 # Analyses today result = self.execute_query_one(""" SELECT COUNT(*) as count FROM ai_analyses WHERE DATE(created_at) = CURDATE() """) analytics['analyses_today'] = result['count'] if result else 0 # Analyses this week result = self.execute_query_one(""" SELECT COUNT(*) as count FROM ai_analyses WHERE YEARWEEK(created_at) = YEARWEEK(NOW()) """) analytics['analyses_this_week'] = result['count'] if result else 0 # Unique questionnaire responses result = self.execute_query_one(""" SELECT COUNT(DISTINCT questionnaire_response_id) as count FROM ai_analyses """) analytics['unique_questionnaires'] = result['count'] if result else 0 # Analyses with images result = self.execute_query_one(""" SELECT COUNT(*) as count FROM ai_analyses WHERE image_id IS NOT NULL """) analytics['analyses_with_images'] = result['count'] if result else 0 return analytics except Exception as e: logging.error(f"Error getting analytics data: {e}") return {} def get_interaction_history(self, limit=50): """Get bot interaction history""" try: query = """ SELECT bi.id, bi.input_text, bi.output_text, bi.wound_image_url, bi.interaction_type, bi.interacted_at, p.name as patient_name, u.name as practitioner_name FROM bot_interactions bi LEFT JOIN patients p ON bi.patient_id = p.id LEFT JOIN users u ON bi.practitioner_id = u.id ORDER BY bi.interacted_at DESC LIMIT %s """ results = self.execute_query(query, (limit,), fetch=True) return results or [] except Exception as e: logging.error(f"Error getting interaction history: {e}") return [] def get_session_analytics(self): """Get session analytics data""" try: analytics = {} # Total sessions result = self.execute_query_one("SELECT COUNT(*) as count FROM analysis_sessions") analytics['total_sessions'] = result['count'] if result else 0 # Average session duration result = self.execute_query_one(""" SELECT AVG(session_duration) as avg_duration FROM analysis_sessions WHERE session_duration IS NOT NULL """) analytics['avg_session_duration'] = round(result['avg_duration'], 2) if result and result['avg_duration'] else 0 # Sessions today result = self.execute_query_one(""" SELECT COUNT(*) as count FROM analysis_sessions WHERE DATE(created_at) = CURDATE() """) analytics['sessions_today'] = result['count'] if result else 0 return analytics except Exception as e: logging.error(f"Error getting session analytics: {e}") return {}