Spaces:
Sleeping
Sleeping
| """ | |
| Database Manager for EmotionMirror Application | |
| This module provides functionality for database operations, including: | |
| - Creating and initializing the SQLite database | |
| - Storing analysis results | |
| - Retrieving historical data | |
| - Managing database connections | |
| All database operations are encapsulated in this module to ensure clean separation | |
| of concerns and modularity. | |
| """ | |
| import os | |
| import json | |
| import sqlite3 | |
| import logging | |
| from datetime import datetime | |
| from typing import Dict, List, Any, Optional, Tuple | |
| # Configure logging | |
| logger = logging.getLogger(__name__) | |
| class DatabaseManager: | |
| """ | |
| Manages database operations for the EmotionMirror application. | |
| This class handles all interactions with the SQLite database, providing | |
| a clean interface for the rest of the application to store and retrieve data. | |
| """ | |
| def __init__(self, db_path: str): | |
| """ | |
| Initialize the database manager. | |
| Args: | |
| db_path: Path to the SQLite database file | |
| """ | |
| self.db_path = db_path | |
| self._initialize_db() | |
| def _initialize_db(self) -> None: | |
| """ | |
| Initialize the database by creating necessary tables if they don't exist. | |
| """ | |
| os.makedirs(os.path.dirname(self.db_path), exist_ok=True) | |
| conn = None | |
| try: | |
| conn = sqlite3.connect(self.db_path) | |
| cursor = conn.cursor() | |
| # Create analyses table | |
| cursor.execute(''' | |
| CREATE TABLE IF NOT EXISTS analyses ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| session_id TEXT NOT NULL, | |
| timestamp TEXT NOT NULL, | |
| image_path TEXT NOT NULL, | |
| face_count INTEGER NOT NULL, | |
| results TEXT NOT NULL, | |
| tags TEXT | |
| ) | |
| ''') | |
| # Create faces table | |
| cursor.execute(''' | |
| CREATE TABLE IF NOT EXISTS faces ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| analysis_id INTEGER NOT NULL, | |
| face_index INTEGER NOT NULL, | |
| emotion TEXT NOT NULL, | |
| confidence REAL NOT NULL, | |
| features TEXT NOT NULL, | |
| emotions TEXT NOT NULL, | |
| FOREIGN KEY (analysis_id) REFERENCES analyses (id) | |
| ) | |
| ''') | |
| conn.commit() | |
| logger.info("Database initialized successfully") | |
| except sqlite3.Error as e: | |
| logger.error(f"Database initialization error: {e}") | |
| finally: | |
| if conn: | |
| conn.close() | |
| def save_analysis(self, | |
| session_id: str, | |
| image_path: str, | |
| results: Dict[str, Any], | |
| tags: Optional[List[str]] = None) -> int: | |
| """ | |
| Save analysis results to the database. | |
| Args: | |
| session_id: Current session identifier | |
| image_path: Path to the analyzed image | |
| results: Analysis results dictionary | |
| tags: Optional list of tags for the analysis | |
| Returns: | |
| The ID of the newly inserted analysis record | |
| """ | |
| conn = None | |
| try: | |
| conn = sqlite3.connect(self.db_path) | |
| cursor = conn.cursor() | |
| # Convert tags list to JSON string if provided | |
| tags_json = json.dumps(tags) if tags else None | |
| # Insert analysis record | |
| cursor.execute( | |
| ''' | |
| INSERT INTO analyses | |
| (session_id, timestamp, image_path, face_count, results, tags) | |
| VALUES (?, ?, ?, ?, ?, ?) | |
| ''', | |
| ( | |
| session_id, | |
| datetime.now().isoformat(), | |
| image_path, | |
| results.get('face_count', 0), | |
| json.dumps(results), | |
| tags_json | |
| ) | |
| ) | |
| analysis_id = cursor.lastrowid | |
| # Insert face records if present | |
| faces = results.get('faces', []) | |
| for i, face in enumerate(faces): | |
| cursor.execute( | |
| ''' | |
| INSERT INTO faces | |
| (analysis_id, face_index, emotion, confidence, features, emotions) | |
| VALUES (?, ?, ?, ?, ?, ?) | |
| ''', | |
| ( | |
| analysis_id, | |
| i, | |
| face.get('emotion', 'unknown'), | |
| face.get('confidence', 0.0), | |
| json.dumps(face.get('features', {})), | |
| json.dumps(face.get('emotions', {})) | |
| ) | |
| ) | |
| conn.commit() | |
| logger.info(f"Saved analysis with ID {analysis_id} and {len(faces)} faces") | |
| return analysis_id | |
| except sqlite3.Error as e: | |
| logger.error(f"Error saving analysis: {e}") | |
| if conn: | |
| conn.rollback() | |
| return -1 | |
| finally: | |
| if conn: | |
| conn.close() | |
| def get_analysis_history(self, | |
| session_id: Optional[str] = None, | |
| limit: int = 10) -> List[Dict[str, Any]]: | |
| """ | |
| Retrieve analysis history from the database. | |
| Args: | |
| session_id: Optional session ID to filter by | |
| limit: Maximum number of records to return | |
| Returns: | |
| List of analysis records as dictionaries | |
| """ | |
| conn = None | |
| try: | |
| conn = sqlite3.connect(self.db_path) | |
| conn.row_factory = sqlite3.Row # This enables name-based access to columns | |
| cursor = conn.cursor() | |
| query = ''' | |
| SELECT | |
| id, | |
| session_id, | |
| timestamp, | |
| image_path, | |
| face_count, | |
| results, | |
| tags | |
| FROM analyses | |
| ''' | |
| params = [] | |
| if session_id: | |
| query += ' WHERE session_id = ?' | |
| params.append(session_id) | |
| query += ''' | |
| ORDER BY timestamp DESC | |
| LIMIT ? | |
| ''' | |
| params.append(limit) | |
| cursor.execute(query, params) | |
| # Convert row objects to dictionaries | |
| analyses = [] | |
| for row in cursor.fetchall(): | |
| analysis = dict(row) | |
| # Parse JSON fields | |
| analysis['results'] = json.loads(analysis['results']) | |
| if analysis['tags']: | |
| analysis['tags'] = json.loads(analysis['tags']) | |
| # Get faces for this analysis | |
| face_cursor = conn.cursor() | |
| face_cursor.execute( | |
| ''' | |
| SELECT | |
| id, | |
| face_index, | |
| emotion, | |
| confidence, | |
| features, | |
| emotions | |
| FROM faces | |
| WHERE analysis_id = ? | |
| ORDER BY face_index | |
| ''', | |
| (analysis['id'],) | |
| ) | |
| faces = [] | |
| for face_row in face_cursor.fetchall(): | |
| face = dict(face_row) | |
| # Parse JSON fields | |
| face['features'] = json.loads(face['features']) | |
| face['emotions'] = json.loads(face['emotions']) | |
| faces.append(face) | |
| analysis['faces'] = faces | |
| analyses.append(analysis) | |
| return analyses | |
| except sqlite3.Error as e: | |
| logger.error(f"Error retrieving analysis history: {e}") | |
| return [] | |
| finally: | |
| if conn: | |
| conn.close() | |
| def get_analysis_by_id(self, analysis_id: int) -> Optional[Dict[str, Any]]: | |
| """ | |
| Retrieve a specific analysis by ID. | |
| Args: | |
| analysis_id: ID of the analysis to retrieve | |
| Returns: | |
| Analysis record as a dictionary, or None if not found | |
| """ | |
| conn = None | |
| try: | |
| conn = sqlite3.connect(self.db_path) | |
| conn.row_factory = sqlite3.Row | |
| cursor = conn.cursor() | |
| cursor.execute( | |
| ''' | |
| SELECT | |
| id, | |
| session_id, | |
| timestamp, | |
| image_path, | |
| face_count, | |
| results, | |
| tags | |
| FROM analyses | |
| WHERE id = ? | |
| ''', | |
| (analysis_id,) | |
| ) | |
| row = cursor.fetchone() | |
| if not row: | |
| return None | |
| analysis = dict(row) | |
| # Parse JSON fields | |
| analysis['results'] = json.loads(analysis['results']) | |
| if analysis['tags']: | |
| analysis['tags'] = json.loads(analysis['tags']) | |
| # Get faces for this analysis | |
| face_cursor = conn.cursor() | |
| face_cursor.execute( | |
| ''' | |
| SELECT | |
| id, | |
| face_index, | |
| emotion, | |
| confidence, | |
| features, | |
| emotions | |
| FROM faces | |
| WHERE analysis_id = ? | |
| ORDER BY face_index | |
| ''', | |
| (analysis_id,) | |
| ) | |
| faces = [] | |
| for face_row in face_cursor.fetchall(): | |
| face = dict(face_row) | |
| # Parse JSON fields | |
| face['features'] = json.loads(face['features']) | |
| face['emotions'] = json.loads(face['emotions']) | |
| faces.append(face) | |
| analysis['faces'] = faces | |
| return analysis | |
| except sqlite3.Error as e: | |
| logger.error(f"Error retrieving analysis by ID: {e}") | |
| return None | |
| finally: | |
| if conn: | |
| conn.close() | |
| def get_emotion_statistics(self, | |
| session_id: Optional[str] = None, | |
| limit: int = 50) -> Dict[str, float]: | |
| """ | |
| Compute emotion statistics across multiple analyses. | |
| Args: | |
| session_id: Optional session ID to filter by | |
| limit: Maximum number of records to analyze | |
| Returns: | |
| Dictionary of emotion frequencies | |
| """ | |
| conn = None | |
| try: | |
| conn = sqlite3.connect(self.db_path) | |
| cursor = conn.cursor() | |
| query = ''' | |
| SELECT emotion, COUNT(*) as count | |
| FROM faces | |
| ''' | |
| params = [] | |
| if session_id: | |
| query += ''' | |
| JOIN analyses ON faces.analysis_id = analyses.id | |
| WHERE analyses.session_id = ? | |
| ''' | |
| params.append(session_id) | |
| query += ''' | |
| GROUP BY emotion | |
| ORDER BY count DESC | |
| ''' | |
| cursor.execute(query, params) | |
| rows = cursor.fetchall() | |
| # Calculate emotion frequencies | |
| total = sum(count for _, count in rows) | |
| stats = {emotion: count / total for emotion, count in rows} if total > 0 else {} | |
| return stats | |
| except sqlite3.Error as e: | |
| logger.error(f"Error computing emotion statistics: {e}") | |
| return {} | |
| finally: | |
| if conn: | |
| conn.close() | |
| def delete_analysis(self, analysis_id: int) -> bool: | |
| """ | |
| Delete an analysis and its associated faces. | |
| Args: | |
| analysis_id: ID of the analysis to delete | |
| Returns: | |
| True if deletion was successful, False otherwise | |
| """ | |
| conn = None | |
| try: | |
| conn = sqlite3.connect(self.db_path) | |
| cursor = conn.cursor() | |
| # Delete associated faces first (due to foreign key constraint) | |
| cursor.execute('DELETE FROM faces WHERE analysis_id = ?', (analysis_id,)) | |
| # Delete the analysis | |
| cursor.execute('DELETE FROM analyses WHERE id = ?', (analysis_id,)) | |
| conn.commit() | |
| return cursor.rowcount > 0 | |
| except sqlite3.Error as e: | |
| logger.error(f"Error deleting analysis: {e}") | |
| if conn: | |
| conn.rollback() | |
| return False | |
| finally: | |
| if conn: | |
| conn.close() | |
| def export_analysis_data(self, | |
| analysis_id: Optional[int] = None, | |
| session_id: Optional[str] = None, | |
| limit: int = 100) -> Dict[str, Any]: | |
| """ | |
| Export analysis data in a structured format suitable for JSON/CSV export. | |
| Args: | |
| analysis_id: Optional specific analysis ID to export | |
| session_id: Optional session ID to filter by | |
| limit: Maximum number of records to export | |
| Returns: | |
| Dictionary containing the exported data | |
| """ | |
| if analysis_id: | |
| # Export a single analysis | |
| analysis = self.get_analysis_by_id(analysis_id) | |
| if not analysis: | |
| return {'error': f'Analysis with ID {analysis_id} not found'} | |
| return { | |
| 'metadata': { | |
| 'exported_at': datetime.now().isoformat(), | |
| 'record_count': 1 | |
| }, | |
| 'analyses': [analysis] | |
| } | |
| else: | |
| # Export multiple analyses | |
| analyses = self.get_analysis_history(session_id, limit) | |
| return { | |
| 'metadata': { | |
| 'exported_at': datetime.now().isoformat(), | |
| 'record_count': len(analyses), | |
| 'session_id': session_id | |
| }, | |
| 'analyses': analyses | |
| } | |