Spaces:
Sleeping
Sleeping
| """ | |
| Database Service for EmotionMirror Application | |
| This service provides a high-level interface for database operations, | |
| integrating the database manager with the rest of the application. | |
| """ | |
| import os | |
| import json | |
| import logging | |
| import streamlit as st | |
| from typing import Dict, List, Any, Optional, Tuple | |
| from datetime import datetime | |
| from config import settings | |
| from database.db_manager import DatabaseManager | |
| # Configure logging | |
| logger = logging.getLogger(__name__) | |
| class DatabaseService: | |
| """ | |
| Service for handling database operations in the EmotionMirror application. | |
| This class integrates the database functionality with the rest of the | |
| application, providing a service-oriented interface. | |
| IMPORTANT NOTE ABOUT DIFFERENCES BETWEEN LOCAL AND HUGGING FACE VERSIONS: | |
| ----------------------------------------------------------------------- | |
| In environments with permission restrictions such as Hugging Face, this class | |
| implements additional logic to handle write errors: | |
| 1. Verifies that the database directory has write permissions | |
| 2. Attempts to write a test file to confirm permissions | |
| 3. If it fails, uses an alternative location (/tmp/emotion_mirror.db) | |
| 4. Logs detailed debugging information | |
| This behavior is different from the local version, where it is assumed | |
| that there are no write permission restrictions. | |
| HISTORY FUNCTIONALITY DIFFERENCES: | |
| -------------------------------- | |
| The history retrieval system also differs between versions: | |
| Local Version: | |
| - Uses standard DatabaseManager method for data retrieval | |
| - Requires explicit session ID for filtering | |
| - Implements basic error handling | |
| - Assumes stable database connection | |
| Hugging Face Version: | |
| - Uses a multi-layered approach with direct SQL fallback | |
| - Auto-detects current session ID when possible | |
| - Implements enhanced error recovery with detailed diagnostics | |
| - Tests database connection and table existence before operations | |
| - Performs preliminary count of available records | |
| - Provides detailed logging of database operations | |
| These differences ensure the history functionality works reliably in the | |
| more restrictive Hugging Face environment. | |
| """ | |
| def __init__(self): | |
| """Initialize the database service with the configured database path.""" | |
| # Ensure the database directory exists | |
| db_dir = os.path.dirname(settings.DB_PATH) | |
| os.makedirs(db_dir, exist_ok=True) | |
| # Log database path for debugging | |
| logger.info(f"Initializing DatabaseService with DB at: {settings.DB_PATH}") | |
| logger.info(f"Database directory exists: {os.path.exists(db_dir)}") | |
| logger.info(f"Database directory is writable: {os.access(db_dir, os.W_OK)}") | |
| # Print to console for immediate visibility in logs | |
| print(f"[DEBUG] Database path: {settings.DB_PATH}") | |
| print(f"[DEBUG] Database dir exists: {os.path.exists(db_dir)}") | |
| print(f"[DEBUG] Database dir writable: {os.access(db_dir, os.W_OK)}") | |
| try: | |
| # Test if we can write to the directory | |
| test_file = os.path.join(db_dir, '.db_test') | |
| with open(test_file, 'w') as f: | |
| f.write('test') | |
| os.remove(test_file) | |
| logger.info(f"Successfully wrote test file to {db_dir}") | |
| print(f"[DEBUG] Successfully wrote test file to {db_dir}") | |
| except Exception as e: | |
| logger.error(f"Failed to write test file to {db_dir}: {e}") | |
| print(f"[DEBUG] Failed to write test file to {db_dir}: {e}") | |
| # No need to raise exception, we'll let the DatabaseManager handle this | |
| # List of possible fallback paths from most to least preferred | |
| fallback_paths = [ | |
| os.path.join('/tmp', 'emotion_mirror.db'), | |
| os.path.join('/home/user', 'emotion_mirror.db'), | |
| ':memory:' # SQLite in-memory database as last resort | |
| ] | |
| # Initialize the database manager | |
| db_initialized = False | |
| db_path_used = settings.DB_PATH | |
| try: | |
| self.db_manager = DatabaseManager(settings.DB_PATH) | |
| logger.info(f"Database manager initialized successfully") | |
| print(f"[DEBUG] Database manager initialized successfully with {settings.DB_PATH}") | |
| # Verify we can actually write to the database | |
| try: | |
| # Test direct SQLite connection instead of using a method that might not exist | |
| import sqlite3 | |
| conn = sqlite3.connect(settings.DB_PATH) | |
| cursor = conn.cursor() | |
| cursor.execute("PRAGMA quick_check") | |
| result = cursor.fetchone() | |
| print(f"[DEBUG] Database integrity check: {result}") | |
| # Test insert and select | |
| cursor.execute("CREATE TABLE IF NOT EXISTS test_table (id INTEGER PRIMARY KEY, test TEXT)") | |
| cursor.execute("INSERT INTO test_table (test) VALUES ('test_write')") | |
| conn.commit() | |
| cursor.execute("SELECT * FROM test_table LIMIT 1") | |
| test_result = cursor.fetchone() | |
| print(f"[DEBUG] Database test write/read successful: {test_result}") | |
| cursor.execute("DROP TABLE test_table") | |
| conn.commit() | |
| conn.close() | |
| db_initialized = True | |
| except Exception as e: | |
| logger.error(f"Database write test failed: {e}") | |
| print(f"[DEBUG] Database write test failed: {e}") | |
| raise Exception(f"Database write test failed: {e}") | |
| except Exception as e: | |
| logger.error(f"Error initializing database manager: {e}") | |
| print(f"[DEBUG] Error initializing database manager: {e}") | |
| # Try fallback paths | |
| for fallback_path in fallback_paths: | |
| try: | |
| print(f"[DEBUG] Trying fallback database path: {fallback_path}") | |
| # Test direct SQLite connection for fallback path | |
| if fallback_path != ':memory:': | |
| fallback_dir = os.path.dirname(fallback_path) | |
| if fallback_dir: # Skip directory check for :memory: | |
| os.makedirs(fallback_dir, exist_ok=True) | |
| test_file = os.path.join(fallback_dir, '.db_test') | |
| with open(test_file, 'w') as f: | |
| f.write('test') | |
| os.remove(test_file) | |
| # Test database connection and operations | |
| import sqlite3 | |
| conn = sqlite3.connect(fallback_path) | |
| cursor = conn.cursor() | |
| cursor.execute("CREATE TABLE IF NOT EXISTS test_table (id INTEGER PRIMARY KEY, test TEXT)") | |
| cursor.execute("INSERT INTO test_table (test) VALUES ('test_write')") | |
| conn.commit() | |
| cursor.execute("SELECT * FROM test_table LIMIT 1") | |
| test_result = cursor.fetchone() | |
| cursor.execute("DROP TABLE test_table") | |
| conn.commit() | |
| conn.close() | |
| # If we got here, the database works with this path | |
| self.db_manager = DatabaseManager(fallback_path) | |
| db_path_used = fallback_path | |
| print(f"[DEBUG] Successfully initialized database with fallback path: {fallback_path}") | |
| logger.info(f"Using alternative database path: {fallback_path}") | |
| db_initialized = True | |
| break | |
| except Exception as e: | |
| print(f"[DEBUG] Failed to use fallback path {fallback_path}: {e}") | |
| logger.warning(f"Failed to use fallback path {fallback_path}: {e}") | |
| continue | |
| if not db_initialized: | |
| error_msg = "CRITICAL: Could not initialize database with any path. History functionality will not work." | |
| logger.critical(error_msg) | |
| print(f"[DEBUG] {error_msg}") | |
| # Create an in-memory database as last resort | |
| try: | |
| self.db_manager = DatabaseManager(":memory:") | |
| print("[DEBUG] Created in-memory database as last resort. History will not persist.") | |
| except Exception as e: | |
| print(f"[DEBUG] Even in-memory database failed: {e}") | |
| # Initialize a dummy database manager that will log but not throw errors | |
| class DummyDBManager: | |
| def __init__(self): | |
| pass | |
| def __getattr__(self, name): | |
| def dummy_method(*args, **kwargs): | |
| print(f"[DEBUG] Called dummy DB method {name} with {args}, {kwargs}") | |
| if name.startswith('get_'): | |
| return [] | |
| return 0 | |
| return dummy_method | |
| self.db_manager = DummyDBManager() | |
| print(f"[DEBUG] DatabaseService initialization complete. Using DB: {db_path_used}") | |
| logger.info(f"DatabaseService initialization complete. Using DB: {db_path_used}") | |
| def save_analysis_results(self, | |
| session_id: str, | |
| image_path: str, | |
| results: Dict[str, Any], | |
| tags: Optional[List[str]] = None) -> int: | |
| """ | |
| Save analysis results to the database. | |
| Args: | |
| session_id: Current session identifier | |
| image_path: Path to the analyzed image | |
| results: Analysis results dictionary | |
| tags: Optional list of tags for the analysis | |
| Returns: | |
| The ID of the saved analysis record | |
| """ | |
| try: | |
| logger.info(f"Saving analysis results for session: {session_id}") | |
| print(f"[DEBUG] Attempting to save analysis for session: {session_id}, image: {image_path}") | |
| result_id = self.db_manager.save_analysis(session_id, image_path, results, tags) | |
| print(f"[DEBUG] Analysis saved successfully with ID: {result_id}") | |
| return result_id | |
| except Exception as e: | |
| error_msg = f"Failed to save analysis results: {e}" | |
| logger.error(error_msg) | |
| print(f"[DEBUG] {error_msg}") | |
| # Return 0 to indicate failure but continue execution | |
| return 0 | |
| def get_history(self, | |
| session_id: Optional[str] = None, | |
| limit: int = 10) -> List[Dict[str, Any]]: | |
| """ | |
| Retrieve analysis history. | |
| Args: | |
| session_id: Optional session ID to filter by | |
| limit: Maximum number of records to return | |
| Returns: | |
| List of analysis records | |
| """ | |
| try: | |
| logger.info(f"Retrieving analysis history for session: {session_id or 'all sessions'}") | |
| print(f"[DEBUG] Attempting to get history for session: {session_id}, limit: {limit}") | |
| # If session_id is None, use current session ID | |
| if session_id is None and hasattr(st, 'session_state') and 'session_id' in st.session_state: | |
| session_id = st.session_state.session_id | |
| print(f"[DEBUG] Using session_id from session_state: {session_id}") | |
| # First check if the database file exists | |
| if not os.path.exists(self.db_manager.db_path): | |
| print(f"[DEBUG] Database file does not exist: {self.db_manager.db_path}") | |
| logger.warning(f"Database file does not exist: {self.db_manager.db_path}") | |
| return [] | |
| # Test database connection | |
| try: | |
| import sqlite3 | |
| conn = sqlite3.connect(self.db_manager.db_path) | |
| cursor = conn.cursor() | |
| # Check if the analyses table exists | |
| cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='analyses'") | |
| if not cursor.fetchone(): | |
| print("[DEBUG] The 'analyses' table does not exist in the database") | |
| conn.close() | |
| return [] | |
| # Get a count of records for this session | |
| if session_id: | |
| cursor.execute("SELECT COUNT(*) FROM analyses WHERE session_id = ?", (session_id,)) | |
| else: | |
| cursor.execute("SELECT COUNT(*) FROM analyses") | |
| count = cursor.fetchone()[0] | |
| print(f"[DEBUG] Found {count} records in the database") | |
| conn.close() | |
| except Exception as e: | |
| print(f"[DEBUG] Error testing database connection: {e}") | |
| logger.error(f"Error testing database connection: {e}") | |
| # Try to get records using the db_manager | |
| records = self.db_manager.get_analysis_history(session_id, limit) | |
| print(f"[DEBUG] Retrieved {len(records)} records from db_manager") | |
| # If no records were found but we expect some (from the count above) | |
| # try a direct query as a fallback | |
| if not records and count > 0: | |
| print("[DEBUG] No records returned from db_manager but count > 0, trying direct query") | |
| try: | |
| conn = sqlite3.connect(self.db_manager.db_path) | |
| conn.row_factory = sqlite3.Row | |
| cursor = conn.cursor() | |
| if session_id: | |
| query = "SELECT * FROM analyses WHERE session_id = ? ORDER BY timestamp DESC LIMIT ?" | |
| cursor.execute(query, (session_id, limit)) | |
| else: | |
| query = "SELECT * FROM analyses ORDER BY timestamp DESC LIMIT ?" | |
| cursor.execute(query, (limit,)) | |
| rows = cursor.fetchall() | |
| records = [] | |
| for row in rows: | |
| record = {key: row[key] for key in row.keys()} | |
| # Parse the JSON fields | |
| try: | |
| if 'results' in record and record['results']: | |
| record['results'] = json.loads(record['results']) | |
| if 'tags' in record and record['tags']: | |
| record['tags'] = json.loads(record['tags']) | |
| except: | |
| pass | |
| records.append(record) | |
| conn.close() | |
| print(f"[DEBUG] Retrieved {len(records)} records via direct query") | |
| except Exception as e: | |
| print(f"[DEBUG] Error in direct SQL query fallback: {e}") | |
| logger.error(f"Error in direct SQL query fallback: {e}") | |
| return records | |
| except Exception as e: | |
| error_msg = f"Failed to retrieve analysis history: {e}" | |
| logger.error(error_msg) | |
| print(f"[DEBUG] {error_msg}") | |
| # Return empty list to avoid breaking the UI | |
| return [] | |
| def get_analysis(self, analysis_id: int) -> Optional[Dict[str, Any]]: | |
| """ | |
| Retrieve a specific analysis. | |
| Args: | |
| analysis_id: ID of the analysis to retrieve | |
| Returns: | |
| Analysis record or None if not found | |
| """ | |
| logger.info(f"Retrieving analysis with ID: {analysis_id}") | |
| return self.db_manager.get_analysis_by_id(analysis_id) | |
| def get_emotion_stats(self, | |
| session_id: Optional[str] = None, | |
| limit: int = 50) -> Dict[str, float]: | |
| """ | |
| Get emotion statistics. | |
| Args: | |
| session_id: Optional session ID to filter by | |
| limit: Maximum number of records to analyze | |
| Returns: | |
| Dictionary of emotion frequencies | |
| """ | |
| logger.info(f"Computing emotion statistics for session: {session_id or 'all sessions'}") | |
| return self.db_manager.get_emotion_statistics(session_id, limit) | |
| def delete_record(self, analysis_id: int) -> bool: | |
| """ | |
| Delete an analysis record. | |
| Args: | |
| analysis_id: ID of the analysis to delete | |
| Returns: | |
| True if deletion was successful, False otherwise | |
| """ | |
| logger.info(f"Deleting analysis with ID: {analysis_id}") | |
| return self.db_manager.delete_analysis(analysis_id) | |
| def export_data(self, | |
| analysis_id: Optional[int] = None, | |
| session_id: Optional[str] = None, | |
| limit: int = 100) -> Dict[str, Any]: | |
| """ | |
| Export analysis data in a structured format. | |
| Args: | |
| analysis_id: Optional specific analysis ID to export | |
| session_id: Optional session ID to filter by | |
| limit: Maximum number of records to export | |
| Returns: | |
| Dictionary containing the exported data | |
| """ | |
| logger.info(f"Exporting data for analysis ID: {analysis_id or 'multiple'}") | |
| return self.db_manager.export_analysis_data(analysis_id, session_id, limit) | |