import mysql.connector from mysql.connector import Error import logging from datetime import datetime import json import uuid import os from PIL import Image class DatabaseManager: """Database operations manager for SmartHeal application""" def __init__(self, mysql_config): """Initialize database manager with MySQL configuration""" self.mysql_config = mysql_config self.test_connection() def test_connection(self): """Test database connection""" try: connection = self.get_connection() if connection: connection.close() logging.info("✅ Database connection successful") else: logging.error("❌ Database connection failed") except Exception as e: logging.error(f"Database connection test failed: {e}") def get_connection(self): """Get a database connection""" try: connection = mysql.connector.connect(**self.mysql_config) return connection except Error as e: logging.error(f"Error connecting to MySQL: {e}") return None def execute_query(self, query, params=None, fetch=False): """Execute a query and return results if fetch=True""" connection = self.get_connection() if not connection: return None cursor = None try: cursor = connection.cursor(dictionary=True) cursor.execute(query, params or ()) if fetch: result = cursor.fetchall() else: connection.commit() result = cursor.rowcount return result except Error as e: logging.error(f"Error executing query: {e}") if connection: connection.rollback() return None finally: if cursor: cursor.close() if connection and connection.is_connected(): connection.close() def execute_query_one(self, query, params=None): """Execute a query and return one result""" connection = self.get_connection() if not connection: return None cursor = None try: cursor = connection.cursor(dictionary=True) cursor.execute(query, params or ()) result = cursor.fetchone() return result except Error as e: logging.error(f"Error executing query: {e}") return None finally: if cursor: cursor.close() if connection and connection.is_connected(): connection.close() def create_tables(self): """Create all required database tables""" tables = { "users": """ CREATE TABLE IF NOT EXISTS users ( id INT AUTO_INCREMENT PRIMARY KEY, username VARCHAR(50) UNIQUE NOT NULL, email VARCHAR(100) UNIQUE NOT NULL, password VARCHAR(255) NOT NULL, name VARCHAR(100) NOT NULL, role ENUM('practitioner', 'organization') NOT NULL, org INT NULL, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, last_login TIMESTAMP NULL, is_active BOOLEAN DEFAULT TRUE, INDEX idx_username (username), INDEX idx_email (email), INDEX idx_role (role) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci """, "organizations": """ CREATE TABLE IF NOT EXISTS organizations ( id INT AUTO_INCREMENT PRIMARY KEY, name VARCHAR(200) NOT NULL, email VARCHAR(100) UNIQUE NOT NULL, phone VARCHAR(20), country_code VARCHAR(10), department VARCHAR(100), location VARCHAR(200), created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, is_active BOOLEAN DEFAULT TRUE, INDEX idx_name (name), INDEX idx_email (email) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci """, "questionnaires": """ CREATE TABLE IF NOT EXISTS questionnaires ( id INT AUTO_INCREMENT PRIMARY KEY, user_id INT NOT NULL, patient_name VARCHAR(100) NOT NULL, patient_age INT, patient_gender ENUM('Male', 'Female', 'Other'), wound_location VARCHAR(200), wound_duration VARCHAR(100), pain_level INT DEFAULT 0, previous_treatment TEXT, medical_history TEXT, medications TEXT, allergies TEXT, additional_notes TEXT, moisture_level VARCHAR(50), infection_signs VARCHAR(50), diabetic_status VARCHAR(50), created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, INDEX idx_user_id (user_id), INDEX idx_patient_name (patient_name), INDEX idx_created_at (created_at) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci """, "wound_images": """ CREATE TABLE IF NOT EXISTS wound_images ( id INT AUTO_INCREMENT PRIMARY KEY, questionnaire_id INT NOT NULL, image_url VARCHAR(500) NOT NULL, original_filename VARCHAR(255), file_size INT, image_width INT, image_height INT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, INDEX idx_questionnaire_id (questionnaire_id), INDEX idx_created_at (created_at) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci """, "ai_analyses": """ CREATE TABLE IF NOT EXISTS ai_analyses ( id INT AUTO_INCREMENT PRIMARY KEY, questionnaire_id INT NOT NULL, image_id INT, analysis_data TEXT, summary TEXT, recommendations TEXT, risk_score INT DEFAULT 0, risk_level ENUM('Low', 'Moderate', 'High', 'Unknown') DEFAULT 'Unknown', wound_type VARCHAR(100), wound_dimensions VARCHAR(100), processing_time DECIMAL(5,2), model_version VARCHAR(50), created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, INDEX idx_questionnaire_id (questionnaire_id), INDEX idx_risk_level (risk_level), INDEX idx_created_at (created_at) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci """, "analysis_sessions": """ CREATE TABLE IF NOT EXISTS analysis_sessions ( id INT AUTO_INCREMENT PRIMARY KEY, user_id INT NOT NULL, questionnaire_id INT NOT NULL, image_id INT, analysis_id INT, session_duration DECIMAL(5,2), created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, INDEX idx_user_id (user_id), INDEX idx_created_at (created_at) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci """ } for table_name, table_sql in tables.items(): try: result = self.execute_query(table_sql) if result is not None: logging.info(f"Table '{table_name}' created or already exists") except Exception as e: logging.error(f"Error creating table '{table_name}': {e}") def save_questionnaire(self, questionnaire_data): """ Save questionnaire response using the default questionnaire. This fixes the foreign key constraint issue by using an existing questionnaire ID. """ connection = None cursor = None try: connection = self.get_connection() if not connection: return None cursor = connection.cursor() # (1) Create or get patient patient_id = self._create_or_get_patient(cursor, questionnaire_data) if not patient_id: raise Exception("Failed to get or create patient") # (2) Get default questionnaire ID cursor.execute("SELECT id FROM questionnaires WHERE name = 'Default Patient Assessment' LIMIT 1") questionnaire_row = cursor.fetchone() if not questionnaire_row: # Create default questionnaire if it doesn't exist cursor.execute(""" INSERT INTO questionnaires (name, description, created_at) VALUES ('Default Patient Assessment', 'Standard patient wound assessment form', NOW()) """) connection.commit() questionnaire_id = cursor.lastrowid else: questionnaire_id = questionnaire_row[0] # (3) Prepare response_data JSON response_data = { 'patient_info': { 'name': questionnaire_data['patient_name'], 'age': questionnaire_data['patient_age'], 'gender': questionnaire_data['patient_gender'] }, 'wound_details': { 'location': questionnaire_data['wound_location'], 'duration': questionnaire_data['wound_duration'], 'pain_level': questionnaire_data['pain_level'], 'moisture_level': questionnaire_data['moisture_level'], 'infection_signs': questionnaire_data['infection_signs'], 'diabetic_status': questionnaire_data['diabetic_status'] }, 'medical_history': { 'previous_treatment': questionnaire_data['previous_treatment'], 'medical_history': questionnaire_data['medical_history'], 'medications': questionnaire_data['medications'], 'allergies': questionnaire_data['allergies'], 'additional_notes': questionnaire_data['additional_notes'] } } # (4) Insert into questionnaire_responses with correct foreign key insert_resp = """ INSERT INTO questionnaire_responses (questionnaire_id, patient_id, practitioner_id, response_data, submitted_at) VALUES (%s, %s, %s, %s, %s) """ cursor.execute(insert_resp, ( questionnaire_id, # Use the existing questionnaire ID patient_id, questionnaire_data['user_id'], json.dumps(response_data), datetime.now() )) response_id = cursor.lastrowid connection.commit() logging.info(f"✅ Saved response ID {response_id} for questionnaire {questionnaire_id}") return response_id except Exception as e: logging.error(f"❌ Error saving questionnaire: {e}") if connection: connection.rollback() return None finally: if cursor: cursor.close() if connection: connection.close() def _create_or_get_patient(self, cursor, questionnaire_data): """Create or get existing patient record""" try: # Check if patient exists select_query = """ SELECT id FROM patients WHERE name = %s AND age = %s AND gender = %s """ cursor.execute(select_query, ( questionnaire_data['patient_name'], questionnaire_data['patient_age'], questionnaire_data['patient_gender'] )) existing_patient = cursor.fetchone() if existing_patient: return existing_patient[0] # Create new patient import uuid insert_query = """ INSERT INTO patients ( uuid, name, age, gender, illness, allergy, notes, created_at ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s) """ patient_uuid = str(uuid.uuid4()) cursor.execute(insert_query, ( patient_uuid, questionnaire_data['patient_name'], questionnaire_data['patient_age'], questionnaire_data['patient_gender'], questionnaire_data.get('medical_history', ''), questionnaire_data.get('allergies', ''), questionnaire_data.get('additional_notes', ''), datetime.now() )) return cursor.lastrowid except Exception as e: logging.error(f"Error creating/getting patient: {e}") return None def _create_wound_record(self, cursor, patient_id, questionnaire_data): """Create wound record""" try: import uuid wound_uuid = str(uuid.uuid4()) query = """ INSERT INTO wounds ( uuid, patient_id, position, category, moisture, infection, notes, created_at ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s) """ cursor.execute(query, ( wound_uuid, str(patient_id), questionnaire_data.get('wound_location', ''), 'Assessment', # Default category questionnaire_data.get('moisture_level', ''), questionnaire_data.get('infection_signs', ''), questionnaire_data.get('additional_notes', ''), datetime.now() )) return cursor.lastrowid except Exception as e: logging.error(f"Error creating wound record: {e}") return None def save_wound_image(self, patient_id, image): """Save wound image to filesystem and database""" try: import uuid import os # Generate unique filename image_id = str(uuid.uuid4()) filename = f"wound_{image_id}.jpg" file_path = os.path.join("uploads", filename) # Ensure uploads directory exists os.makedirs("uploads", exist_ok=True) # Save image to disk if hasattr(image, 'save'): image.save(file_path, format='JPEG', quality=95) # Get image dimensions width, height = image.size # Save to database using proper wound_images table structure query = """ INSERT INTO wound_images ( uuid, patient_id, image, width, height, created_at ) VALUES (%s, %s, %s, %s, %s, %s) """ params = ( str(uuid.uuid4()), str(patient_id), file_path, str(width), str(height), datetime.now() ) connection = self.get_connection() if not connection: return None try: cursor = connection.cursor() cursor.execute(query, params) connection.commit() image_db_id = cursor.lastrowid logging.info(f"Image saved with ID: {image_db_id}") return { 'id': image_db_id, 'filename': filename, 'path': file_path } except Error as e: logging.error(f"Error saving image to database: {e}") connection.rollback() return None finally: cursor.close() connection.close() else: logging.error("Invalid image object") return None except Exception as e: logging.error(f"Image save error: {e}") return None def save_analysis(self, questionnaire_id, image_id, analysis_data): """Save AI analysis results to database""" try: query = """ INSERT INTO ai_analyses ( questionnaire_id, image_id, analysis_data, summary, recommendations, risk_score, risk_level, wound_type, wound_dimensions, processing_time, model_version, created_at ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) """ params = ( questionnaire_id, image_id, json.dumps(analysis_data) if analysis_data else None, analysis_data.get('summary', ''), analysis_data.get('recommendations', ''), analysis_data.get('risk_score', 0), analysis_data.get('risk_level', 'Unknown'), analysis_data.get('wound_type', ''), analysis_data.get('wound_dimensions', ''), analysis_data.get('processing_time', 0.0), analysis_data.get('model_version', 'v1.0'), datetime.now() ) connection = self.get_connection() if not connection: return None try: cursor = connection.cursor() cursor.execute(query, params) connection.commit() analysis_id = cursor.lastrowid logging.info(f"Analysis saved with ID: {analysis_id}") return analysis_id except Error as e: logging.error(f"Error saving analysis: {e}") connection.rollback() return None finally: cursor.close() connection.close() except Exception as e: logging.error(f"Analysis save error: {e}") return None def get_user_history(self, user_id): """Get user's analysis history""" try: query = """ SELECT q.patient_name, q.wound_location, q.created_at, a.risk_level, a.summary, a.recommendations FROM questionnaires q LEFT JOIN ai_analyses a ON q.id = a.questionnaire_id WHERE q.user_id = %s ORDER BY q.created_at DESC LIMIT 20 """ result = self.execute_query(query, (user_id,), fetch=True) return result or [] except Exception as e: logging.error(f"Error fetching user history: {e}") return [] def get_organizations(self): """Get list of all organizations""" try: query = "SELECT id, name as org_name, location FROM organizations ORDER BY name" result = self.execute_query(query, fetch=True) return result or [{'id': 1, 'org_name': 'Default Hospital', 'location': 'Default Location'}] except Exception as e: logging.error(f"Error getting organizations: {e}") return [{'id': 1, 'org_name': 'Default Hospital', 'location': 'Default Location'}] def create_organization(self, org_data): """Create a new organization""" try: query = """INSERT INTO organizations (name, email, phone, country_code, department, location, created_at) VALUES (%s, %s, %s, %s, %s, %s, %s)""" params = ( org_data.get('org_name', ''), org_data.get('email', ''), org_data.get('phone', ''), org_data.get('country_code', ''), org_data.get('department', ''), org_data.get('location', ''), datetime.now() ) result = self.execute_query(query, params) if result: # Get the created organization ID org_id = self.execute_query_one( "SELECT id FROM organizations WHERE name = %s ORDER BY created_at DESC LIMIT 1", (org_data.get('org_name', ''),) ) return org_id['id'] if org_id else None return None except Exception as e: logging.error(f"Error creating organization: {e}") return None def save_analysis_result(self, questionnaire_id, analysis_result): """Save analysis result to database""" try: query = """INSERT INTO ai_analyses (questionnaire_id, analysis_data, summary, created_at) VALUES (%s, %s, %s, %s)""" params = ( questionnaire_id, json.dumps(analysis_result) if analysis_result else None, analysis_result.get('summary', 'Analysis completed'), datetime.now() ) result = self.execute_query(query, params) return result except Exception as e: logging.error(f"Error saving analysis result: {e}") return None