test-app / src /dashboard_database_manager.py
SmartHeal's picture
Update src/dashboard_database_manager.py
9f48817 verified
import mysql.connector
from mysql.connector import Error
import logging
from datetime import datetime
import json
import uuid
import os
from PIL import Image
class DashboardDatabaseManager:
"""Fixed database operations manager for SmartHeal application with dashboard integration"""
def __init__(self, mysql_config):
"""Initialize database manager with MySQL configuration"""
self.mysql_config = mysql_config
self.test_connection()
def test_connection(self):
"""Test database connection"""
try:
connection = self.get_connection()
if connection:
connection.close()
logging.info("✅ Database connection successful")
else:
logging.error("❌ Database connection failed")
except Exception as e:
logging.error(f"Database connection test failed: {e}")
def get_connection(self):
"""Get a database connection"""
try:
connection = mysql.connector.connect(**self.mysql_config)
return connection
except Error as e:
logging.error(f"Error connecting to MySQL: {e}")
return None
def execute_query(self, query, params=None, fetch=False):
"""Execute a query and return results if fetch=True"""
connection = self.get_connection()
if not connection:
return None
cursor = None
try:
cursor = connection.cursor(dictionary=True)
cursor.execute(query, params or ())
if fetch:
result = cursor.fetchall()
else:
connection.commit()
result = cursor.rowcount
return result
except Error as e:
logging.error(f"Error executing query: {e}")
if connection:
connection.rollback()
return None
finally:
if cursor:
cursor.close()
if connection and connection.is_connected():
connection.close()
def execute_query_one(self, query, params=None):
"""Execute a query and return one result"""
connection = self.get_connection()
if not connection:
return None
cursor = None
try:
cursor = connection.cursor(dictionary=True)
cursor.execute(query, params or ())
result = cursor.fetchone()
return result
except Error as e:
logging.error(f"Error executing query: {e}")
return None
finally:
if cursor:
cursor.close()
if connection and connection.is_connected():
connection.close()
def save_questionnaire_response(self, questionnaire_data, user_id):
"""Save questionnaire response to database"""
connection = None
cursor = None
try:
connection = self.get_connection()
if not connection:
return None
cursor = connection.cursor()
# Create or get patient
patient_id = self._create_or_get_patient(cursor, questionnaire_data)
if not patient_id:
raise Exception("Failed to get or create patient")
# Get or create default questionnaire
questionnaire_id = self._get_or_create_default_questionnaire(cursor)
if not questionnaire_id:
raise Exception("Failed to get or create questionnaire")
# Prepare response data JSON
response_data = {
'patient_info': {
'name': questionnaire_data.get('patient_name', ''),
'age': questionnaire_data.get('patient_age', 0),
'gender': questionnaire_data.get('patient_gender', '')
},
'wound_details': {
'location': questionnaire_data.get('wound_location', ''),
'duration': questionnaire_data.get('wound_duration', ''),
'pain_level': questionnaire_data.get('pain_level', 0),
'moisture_level': questionnaire_data.get('moisture_level', ''),
'infection_signs': questionnaire_data.get('infection_signs', ''),
'diabetic_status': questionnaire_data.get('diabetic_status', '')
},
'medical_history': {
'previous_treatment': questionnaire_data.get('previous_treatment', ''),
'medical_history': questionnaire_data.get('medical_history', ''),
'medications': questionnaire_data.get('medications', ''),
'allergies': questionnaire_data.get('allergies', ''),
'additional_notes': questionnaire_data.get('additional_notes', '')
}
}
# Insert into questionnaire_responses
insert_resp = """
INSERT INTO questionnaire_responses
(questionnaire_id, patient_id, practitioner_id, response_data, submitted_at)
VALUES (%s, %s, %s, %s, %s)
"""
cursor.execute(insert_resp, (
questionnaire_id,
patient_id,
user_id,
json.dumps(response_data),
datetime.now()
))
response_id = cursor.lastrowid
connection.commit()
logging.info(f"✅ Saved questionnaire response ID {response_id}")
return response_id
except Exception as e:
logging.error(f"❌ Error saving questionnaire: {e}")
if connection:
connection.rollback()
return None
finally:
if cursor:
cursor.close()
if connection:
connection.close()
def _create_or_get_patient(self, cursor, questionnaire_data):
"""Create or get existing patient record"""
try:
# Check if patient exists
select_query = """
SELECT id FROM patients
WHERE name = %s AND age = %s AND gender = %s
"""
cursor.execute(select_query, (
questionnaire_data.get('patient_name', ''),
questionnaire_data.get('patient_age', 0),
questionnaire_data.get('patient_gender', '')
))
existing_patient = cursor.fetchone()
if existing_patient:
return existing_patient[0]
# Create new patient
patient_uuid = str(uuid.uuid4())
insert_query = """
INSERT INTO patients (
uuid, name, age, gender, illness, allergy, notes, created_at
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
"""
cursor.execute(insert_query, (
patient_uuid,
questionnaire_data.get('patient_name', ''),
questionnaire_data.get('patient_age', 0),
questionnaire_data.get('patient_gender', ''),
questionnaire_data.get('medical_history', ''),
questionnaire_data.get('allergies', ''),
questionnaire_data.get('additional_notes', ''),
datetime.now()
))
return cursor.lastrowid
except Exception as e:
logging.error(f"Error creating/getting patient: {e}")
return None
def _get_or_create_default_questionnaire(self, cursor):
"""Get or create default questionnaire"""
try:
# Check if default questionnaire exists
cursor.execute("SELECT id FROM questionnaires WHERE name = 'Default Patient Assessment' LIMIT 1")
questionnaire_row = cursor.fetchone()
if questionnaire_row:
return questionnaire_row[0]
# Create default questionnaire
cursor.execute("""
INSERT INTO questionnaires (name, description, created_at)
VALUES ('Default Patient Assessment', 'Standard patient wound assessment form', NOW())
""")
return cursor.lastrowid
except Exception as e:
logging.error(f"Error getting/creating questionnaire: {e}")
return None
def save_wound_image(self, response_id, image, filename):
"""Save wound image to dataset and database with proper URL"""
try:
# Generate unique filename with timestamp
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
unique_id = str(uuid.uuid4())[:8]
file_extension = os.path.splitext(filename)[1] or '.jpg'
unique_filename = f"wound_{timestamp}_{unique_id}{file_extension}"
# Create dataset directory structure
dataset_dir = os.path.join("dataset", "wound_images")
os.makedirs(dataset_dir, exist_ok=True)
# Save image to dataset
file_path = os.path.join(dataset_dir, unique_filename)
if hasattr(image, 'save'):
image.save(file_path, format='JPEG', quality=95)
width, height = image.size
file_size = os.path.getsize(file_path)
else:
# Handle numpy array or other formats
from PIL import Image as PILImage
if hasattr(image, 'shape'):
pil_image = PILImage.fromarray(image)
pil_image.save(file_path, format='JPEG', quality=95)
width, height = pil_image.size
file_size = os.path.getsize(file_path)
else:
raise ValueError("Unsupported image format")
# Create URL for dashboard access
image_url = f"/dataset/wound_images/{unique_filename}"
# Save to database
query = """
INSERT INTO wound_images (
questionnaire_response_id, image_url, original_filename,
file_size, image_width, image_height, created_at
) VALUES (%s, %s, %s, %s, %s, %s, %s)
"""
params = (
response_id,
image_url,
filename,
file_size,
width,
height,
datetime.now()
)
connection = self.get_connection()
if not connection:
return None
try:
cursor = connection.cursor()
cursor.execute(query, params)
connection.commit()
image_id = cursor.lastrowid
logging.info(f"✅ Image saved to dataset: {file_path}")
logging.info(f"✅ Image URL: {image_url}")
logging.info(f"✅ Database image ID: {image_id}")
return image_id
finally:
cursor.close()
connection.close()
except Exception as e:
logging.error(f"❌ Error saving wound image: {e}")
return None
def save_ai_analysis(self, analysis_data):
"""Save AI analysis results to database"""
try:
# Extract data from analysis_data
response_id = analysis_data.get('questionnaire_id') # This is actually response_id
image_id = analysis_data.get('image_id')
visual_results = analysis_data.get('visual_results', {})
# Calculate risk level from risk score
risk_score = analysis_data.get('risk_score', 0)
if risk_score >= 70:
risk_level = 'High'
elif risk_score >= 40:
risk_level = 'Moderate'
else:
risk_level = 'Low'
# Prepare wound dimensions string
length = visual_results.get('length_cm', 0)
breadth = visual_results.get('breadth_cm', 0)
area = visual_results.get('surface_area_cm2', 0)
wound_dimensions = f"{length} × {breadth} cm (Area: {area} cm²)"
query = """
INSERT INTO ai_analyses (
questionnaire_response_id, image_id, analysis_data, summary,
recommendations, risk_score, risk_level, wound_type,
wound_dimensions, processing_time, model_version, created_at
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
"""
params = (
response_id,
image_id,
json.dumps(analysis_data.get('analysis_data', {})),
analysis_data.get('summary', '')[:1000], # Limit summary length
analysis_data.get('recommendations', ''),
risk_score,
risk_level,
visual_results.get('wound_type', 'Unknown'),
wound_dimensions,
analysis_data.get('processing_time', 0),
analysis_data.get('model_version', 'v1.0'),
datetime.now()
)
result = self.execute_query(query, params)
if result is not None:
# Get the last inserted ID
connection = self.get_connection()
if connection:
cursor = connection.cursor()
cursor.execute("SELECT LAST_INSERT_ID()")
analysis_id = cursor.fetchone()[0]
cursor.close()
connection.close()
logging.info(f"✅ AI analysis saved with ID: {analysis_id}")
return analysis_id
return None
except Exception as e:
logging.error(f"❌ Error saving AI analysis: {e}")
return None
def save_analysis_session(self, session_data):
"""Save analysis session data"""
try:
query = """
INSERT INTO analysis_sessions (
user_id, questionnaire_response_id, image_id, analysis_id,
session_duration, created_at
) VALUES (%s, %s, %s, %s, %s, %s)
"""
params = (
session_data.get('user_id'),
session_data.get('questionnaire_id'), # This is actually response_id
session_data.get('image_id'),
session_data.get('analysis_id'),
session_data.get('session_duration', 0),
datetime.now()
)
result = self.execute_query(query, params)
if result is not None:
connection = self.get_connection()
if connection:
cursor = connection.cursor()
cursor.execute("SELECT LAST_INSERT_ID()")
session_id = cursor.fetchone()[0]
cursor.close()
connection.close()
logging.info(f"✅ Analysis session saved with ID: {session_id}")
return session_id
return None
except Exception as e:
logging.error(f"❌ Error saving analysis session: {e}")
return None
def save_bot_interaction(self, interaction_data):
"""Save bot interaction data"""
try:
query = """
INSERT INTO bot_interactions (
patient_id, practitioner_id, input_text, output_text,
wound_image_url, interaction_type, interacted_at
) VALUES (%s, %s, %s, %s, %s, %s, %s)
"""
params = (
interaction_data.get('patient_id'),
interaction_data.get('practitioner_id'),
interaction_data.get('input_text', ''),
interaction_data.get('output_text', ''),
interaction_data.get('wound_image_url', ''),
interaction_data.get('interaction_type', 'wound_analysis'),
datetime.now()
)
result = self.execute_query(query, params)
if result is not None:
connection = self.get_connection()
if connection:
cursor = connection.cursor()
cursor.execute("SELECT LAST_INSERT_ID()")
interaction_id = cursor.fetchone()[0]
cursor.close()
connection.close()
logging.info(f"✅ Bot interaction saved with ID: {interaction_id}")
return interaction_id
return None
except Exception as e:
logging.error(f"❌ Error saving bot interaction: {e}")
return None
def get_analytics_data(self):
"""Get comprehensive analytics data for dashboard"""
try:
analytics = {}
# Total analyses
result = self.execute_query_one("SELECT COUNT(*) as count FROM ai_analyses")
analytics['total_analyses'] = result['count'] if result else 0
# Average processing time
result = self.execute_query_one("""
SELECT AVG(processing_time) as avg_time FROM ai_analyses
WHERE processing_time IS NOT NULL
""")
analytics['avg_processing_time'] = round(result['avg_time'], 2) if result and result['avg_time'] else 0
# High risk count
result = self.execute_query_one("""
SELECT COUNT(*) as count FROM ai_analyses
WHERE risk_level = 'High'
""")
analytics['high_risk_count'] = result['count'] if result else 0
# Average risk score
result = self.execute_query_one("""
SELECT AVG(risk_score) as avg_risk FROM ai_analyses
WHERE risk_score IS NOT NULL
""")
analytics['avg_risk_score'] = round(result['avg_risk'], 1) if result and result['avg_risk'] else 0
# Analyses today
result = self.execute_query_one("""
SELECT COUNT(*) as count FROM ai_analyses
WHERE DATE(created_at) = CURDATE()
""")
analytics['analyses_today'] = result['count'] if result else 0
# Analyses this week
result = self.execute_query_one("""
SELECT COUNT(*) as count FROM ai_analyses
WHERE YEARWEEK(created_at) = YEARWEEK(NOW())
""")
analytics['analyses_this_week'] = result['count'] if result else 0
# Unique questionnaire responses
result = self.execute_query_one("""
SELECT COUNT(DISTINCT questionnaire_response_id) as count FROM ai_analyses
""")
analytics['unique_questionnaires'] = result['count'] if result else 0
# Analyses with images
result = self.execute_query_one("""
SELECT COUNT(*) as count FROM ai_analyses
WHERE image_id IS NOT NULL
""")
analytics['analyses_with_images'] = result['count'] if result else 0
return analytics
except Exception as e:
logging.error(f"Error getting analytics data: {e}")
return {}
def get_interaction_history(self, limit=50):
"""Get bot interaction history"""
try:
query = """
SELECT
bi.id,
bi.input_text,
bi.output_text,
bi.wound_image_url,
bi.interaction_type,
bi.interacted_at,
p.name as patient_name,
u.name as practitioner_name
FROM bot_interactions bi
LEFT JOIN patients p ON bi.patient_id = p.id
LEFT JOIN users u ON bi.practitioner_id = u.id
ORDER BY bi.interacted_at DESC
LIMIT %s
"""
results = self.execute_query(query, (limit,), fetch=True)
return results or []
except Exception as e:
logging.error(f"Error getting interaction history: {e}")
return []
def get_session_analytics(self):
"""Get session analytics data"""
try:
analytics = {}
# Total sessions
result = self.execute_query_one("SELECT COUNT(*) as count FROM analysis_sessions")
analytics['total_sessions'] = result['count'] if result else 0
# Average session duration
result = self.execute_query_one("""
SELECT AVG(session_duration) as avg_duration FROM analysis_sessions
WHERE session_duration IS NOT NULL
""")
analytics['avg_session_duration'] = round(result['avg_duration'], 2) if result and result['avg_duration'] else 0
# Sessions today
result = self.execute_query_one("""
SELECT COUNT(*) as count FROM analysis_sessions
WHERE DATE(created_at) = CURDATE()
""")
analytics['sessions_today'] = result['count'] if result else 0
return analytics
except Exception as e:
logging.error(f"Error getting session analytics: {e}")
return {}