""" Database Manager for EmotionMirror Application This module provides functionality for database operations, including: - Creating and initializing the SQLite database - Storing analysis results - Retrieving historical data - Managing database connections All database operations are encapsulated in this module to ensure clean separation of concerns and modularity. """ import os import json import sqlite3 import logging from datetime import datetime from typing import Dict, List, Any, Optional, Tuple # Configure logging logger = logging.getLogger(__name__) class DatabaseManager: """ Manages database operations for the EmotionMirror application. This class handles all interactions with the SQLite database, providing a clean interface for the rest of the application to store and retrieve data. """ def __init__(self, db_path: str): """ Initialize the database manager. Args: db_path: Path to the SQLite database file """ self.db_path = db_path self._initialize_db() def _initialize_db(self) -> None: """ Initialize the database by creating necessary tables if they don't exist. """ os.makedirs(os.path.dirname(self.db_path), exist_ok=True) conn = None try: conn = sqlite3.connect(self.db_path) cursor = conn.cursor() # Create analyses table cursor.execute(''' CREATE TABLE IF NOT EXISTS analyses ( id INTEGER PRIMARY KEY AUTOINCREMENT, session_id TEXT NOT NULL, timestamp TEXT NOT NULL, image_path TEXT NOT NULL, face_count INTEGER NOT NULL, results TEXT NOT NULL, tags TEXT ) ''') # Create faces table cursor.execute(''' CREATE TABLE IF NOT EXISTS faces ( id INTEGER PRIMARY KEY AUTOINCREMENT, analysis_id INTEGER NOT NULL, face_index INTEGER NOT NULL, emotion TEXT NOT NULL, confidence REAL NOT NULL, features TEXT NOT NULL, emotions TEXT NOT NULL, FOREIGN KEY (analysis_id) REFERENCES analyses (id) ) ''') conn.commit() logger.info("Database initialized successfully") except sqlite3.Error as e: logger.error(f"Database initialization error: {e}") finally: if conn: conn.close() def save_analysis(self, session_id: str, image_path: str, results: Dict[str, Any], tags: Optional[List[str]] = None) -> int: """ Save analysis results to the database. Args: session_id: Current session identifier image_path: Path to the analyzed image results: Analysis results dictionary tags: Optional list of tags for the analysis Returns: The ID of the newly inserted analysis record """ conn = None try: conn = sqlite3.connect(self.db_path) cursor = conn.cursor() # Convert tags list to JSON string if provided tags_json = json.dumps(tags) if tags else None # Insert analysis record cursor.execute( ''' INSERT INTO analyses (session_id, timestamp, image_path, face_count, results, tags) VALUES (?, ?, ?, ?, ?, ?) ''', ( session_id, datetime.now().isoformat(), image_path, results.get('face_count', 0), json.dumps(results), tags_json ) ) analysis_id = cursor.lastrowid # Insert face records if present faces = results.get('faces', []) for i, face in enumerate(faces): cursor.execute( ''' INSERT INTO faces (analysis_id, face_index, emotion, confidence, features, emotions) VALUES (?, ?, ?, ?, ?, ?) ''', ( analysis_id, i, face.get('emotion', 'unknown'), face.get('confidence', 0.0), json.dumps(face.get('features', {})), json.dumps(face.get('emotions', {})) ) ) conn.commit() logger.info(f"Saved analysis with ID {analysis_id} and {len(faces)} faces") return analysis_id except sqlite3.Error as e: logger.error(f"Error saving analysis: {e}") if conn: conn.rollback() return -1 finally: if conn: conn.close() def get_analysis_history(self, session_id: Optional[str] = None, limit: int = 10) -> List[Dict[str, Any]]: """ Retrieve analysis history from the database. Args: session_id: Optional session ID to filter by limit: Maximum number of records to return Returns: List of analysis records as dictionaries """ conn = None try: conn = sqlite3.connect(self.db_path) conn.row_factory = sqlite3.Row # This enables name-based access to columns cursor = conn.cursor() query = ''' SELECT id, session_id, timestamp, image_path, face_count, results, tags FROM analyses ''' params = [] if session_id: query += ' WHERE session_id = ?' params.append(session_id) query += ''' ORDER BY timestamp DESC LIMIT ? ''' params.append(limit) cursor.execute(query, params) # Convert row objects to dictionaries analyses = [] for row in cursor.fetchall(): analysis = dict(row) # Parse JSON fields analysis['results'] = json.loads(analysis['results']) if analysis['tags']: analysis['tags'] = json.loads(analysis['tags']) # Get faces for this analysis face_cursor = conn.cursor() face_cursor.execute( ''' SELECT id, face_index, emotion, confidence, features, emotions FROM faces WHERE analysis_id = ? ORDER BY face_index ''', (analysis['id'],) ) faces = [] for face_row in face_cursor.fetchall(): face = dict(face_row) # Parse JSON fields face['features'] = json.loads(face['features']) face['emotions'] = json.loads(face['emotions']) faces.append(face) analysis['faces'] = faces analyses.append(analysis) return analyses except sqlite3.Error as e: logger.error(f"Error retrieving analysis history: {e}") return [] finally: if conn: conn.close() def get_analysis_by_id(self, analysis_id: int) -> Optional[Dict[str, Any]]: """ Retrieve a specific analysis by ID. Args: analysis_id: ID of the analysis to retrieve Returns: Analysis record as a dictionary, or None if not found """ conn = None try: conn = sqlite3.connect(self.db_path) conn.row_factory = sqlite3.Row cursor = conn.cursor() cursor.execute( ''' SELECT id, session_id, timestamp, image_path, face_count, results, tags FROM analyses WHERE id = ? ''', (analysis_id,) ) row = cursor.fetchone() if not row: return None analysis = dict(row) # Parse JSON fields analysis['results'] = json.loads(analysis['results']) if analysis['tags']: analysis['tags'] = json.loads(analysis['tags']) # Get faces for this analysis face_cursor = conn.cursor() face_cursor.execute( ''' SELECT id, face_index, emotion, confidence, features, emotions FROM faces WHERE analysis_id = ? ORDER BY face_index ''', (analysis_id,) ) faces = [] for face_row in face_cursor.fetchall(): face = dict(face_row) # Parse JSON fields face['features'] = json.loads(face['features']) face['emotions'] = json.loads(face['emotions']) faces.append(face) analysis['faces'] = faces return analysis except sqlite3.Error as e: logger.error(f"Error retrieving analysis by ID: {e}") return None finally: if conn: conn.close() def get_emotion_statistics(self, session_id: Optional[str] = None, limit: int = 50) -> Dict[str, float]: """ Compute emotion statistics across multiple analyses. Args: session_id: Optional session ID to filter by limit: Maximum number of records to analyze Returns: Dictionary of emotion frequencies """ conn = None try: conn = sqlite3.connect(self.db_path) cursor = conn.cursor() query = ''' SELECT emotion, COUNT(*) as count FROM faces ''' params = [] if session_id: query += ''' JOIN analyses ON faces.analysis_id = analyses.id WHERE analyses.session_id = ? ''' params.append(session_id) query += ''' GROUP BY emotion ORDER BY count DESC ''' cursor.execute(query, params) rows = cursor.fetchall() # Calculate emotion frequencies total = sum(count for _, count in rows) stats = {emotion: count / total for emotion, count in rows} if total > 0 else {} return stats except sqlite3.Error as e: logger.error(f"Error computing emotion statistics: {e}") return {} finally: if conn: conn.close() def delete_analysis(self, analysis_id: int) -> bool: """ Delete an analysis and its associated faces. Args: analysis_id: ID of the analysis to delete Returns: True if deletion was successful, False otherwise """ conn = None try: conn = sqlite3.connect(self.db_path) cursor = conn.cursor() # Delete associated faces first (due to foreign key constraint) cursor.execute('DELETE FROM faces WHERE analysis_id = ?', (analysis_id,)) # Delete the analysis cursor.execute('DELETE FROM analyses WHERE id = ?', (analysis_id,)) conn.commit() return cursor.rowcount > 0 except sqlite3.Error as e: logger.error(f"Error deleting analysis: {e}") if conn: conn.rollback() return False finally: if conn: conn.close() def export_analysis_data(self, analysis_id: Optional[int] = None, session_id: Optional[str] = None, limit: int = 100) -> Dict[str, Any]: """ Export analysis data in a structured format suitable for JSON/CSV export. Args: analysis_id: Optional specific analysis ID to export session_id: Optional session ID to filter by limit: Maximum number of records to export Returns: Dictionary containing the exported data """ if analysis_id: # Export a single analysis analysis = self.get_analysis_by_id(analysis_id) if not analysis: return {'error': f'Analysis with ID {analysis_id} not found'} return { 'metadata': { 'exported_at': datetime.now().isoformat(), 'record_count': 1 }, 'analyses': [analysis] } else: # Export multiple analyses analyses = self.get_analysis_history(session_id, limit) return { 'metadata': { 'exported_at': datetime.now().isoformat(), 'record_count': len(analyses), 'session_id': session_id }, 'analyses': analyses }