finalproject / services /database_service.py
jarondon82's picture
Initial commit for EmotionMirror finalproject
f7e620e
"""
Database Service for EmotionMirror Application
This service provides a high-level interface for database operations,
integrating the database manager with the rest of the application.
"""
import os
import json
import logging
import streamlit as st
from typing import Dict, List, Any, Optional, Tuple
from datetime import datetime
from config import settings
from database.db_manager import DatabaseManager
# Configure logging
logger = logging.getLogger(__name__)
class DatabaseService:
"""
Service for handling database operations in the EmotionMirror application.
This class integrates the database functionality with the rest of the
application, providing a service-oriented interface.
IMPORTANT NOTE ABOUT DIFFERENCES BETWEEN LOCAL AND HUGGING FACE VERSIONS:
-----------------------------------------------------------------------
In environments with permission restrictions such as Hugging Face, this class
implements additional logic to handle write errors:
1. Verifies that the database directory has write permissions
2. Attempts to write a test file to confirm permissions
3. If it fails, uses an alternative location (/tmp/emotion_mirror.db)
4. Logs detailed debugging information
This behavior is different from the local version, where it is assumed
that there are no write permission restrictions.
HISTORY FUNCTIONALITY DIFFERENCES:
--------------------------------
The history retrieval system also differs between versions:
Local Version:
- Uses standard DatabaseManager method for data retrieval
- Requires explicit session ID for filtering
- Implements basic error handling
- Assumes stable database connection
Hugging Face Version:
- Uses a multi-layered approach with direct SQL fallback
- Auto-detects current session ID when possible
- Implements enhanced error recovery with detailed diagnostics
- Tests database connection and table existence before operations
- Performs preliminary count of available records
- Provides detailed logging of database operations
These differences ensure the history functionality works reliably in the
more restrictive Hugging Face environment.
"""
def __init__(self):
"""Initialize the database service with the configured database path."""
# Ensure the database directory exists
db_dir = os.path.dirname(settings.DB_PATH)
os.makedirs(db_dir, exist_ok=True)
# Log database path for debugging
logger.info(f"Initializing DatabaseService with DB at: {settings.DB_PATH}")
logger.info(f"Database directory exists: {os.path.exists(db_dir)}")
logger.info(f"Database directory is writable: {os.access(db_dir, os.W_OK)}")
# Print to console for immediate visibility in logs
print(f"[DEBUG] Database path: {settings.DB_PATH}")
print(f"[DEBUG] Database dir exists: {os.path.exists(db_dir)}")
print(f"[DEBUG] Database dir writable: {os.access(db_dir, os.W_OK)}")
try:
# Test if we can write to the directory
test_file = os.path.join(db_dir, '.db_test')
with open(test_file, 'w') as f:
f.write('test')
os.remove(test_file)
logger.info(f"Successfully wrote test file to {db_dir}")
print(f"[DEBUG] Successfully wrote test file to {db_dir}")
except Exception as e:
logger.error(f"Failed to write test file to {db_dir}: {e}")
print(f"[DEBUG] Failed to write test file to {db_dir}: {e}")
# No need to raise exception, we'll let the DatabaseManager handle this
# List of possible fallback paths from most to least preferred
fallback_paths = [
os.path.join('/tmp', 'emotion_mirror.db'),
os.path.join('/home/user', 'emotion_mirror.db'),
':memory:' # SQLite in-memory database as last resort
]
# Initialize the database manager
db_initialized = False
db_path_used = settings.DB_PATH
try:
self.db_manager = DatabaseManager(settings.DB_PATH)
logger.info(f"Database manager initialized successfully")
print(f"[DEBUG] Database manager initialized successfully with {settings.DB_PATH}")
# Verify we can actually write to the database
try:
# Test direct SQLite connection instead of using a method that might not exist
import sqlite3
conn = sqlite3.connect(settings.DB_PATH)
cursor = conn.cursor()
cursor.execute("PRAGMA quick_check")
result = cursor.fetchone()
print(f"[DEBUG] Database integrity check: {result}")
# Test insert and select
cursor.execute("CREATE TABLE IF NOT EXISTS test_table (id INTEGER PRIMARY KEY, test TEXT)")
cursor.execute("INSERT INTO test_table (test) VALUES ('test_write')")
conn.commit()
cursor.execute("SELECT * FROM test_table LIMIT 1")
test_result = cursor.fetchone()
print(f"[DEBUG] Database test write/read successful: {test_result}")
cursor.execute("DROP TABLE test_table")
conn.commit()
conn.close()
db_initialized = True
except Exception as e:
logger.error(f"Database write test failed: {e}")
print(f"[DEBUG] Database write test failed: {e}")
raise Exception(f"Database write test failed: {e}")
except Exception as e:
logger.error(f"Error initializing database manager: {e}")
print(f"[DEBUG] Error initializing database manager: {e}")
# Try fallback paths
for fallback_path in fallback_paths:
try:
print(f"[DEBUG] Trying fallback database path: {fallback_path}")
# Test direct SQLite connection for fallback path
if fallback_path != ':memory:':
fallback_dir = os.path.dirname(fallback_path)
if fallback_dir: # Skip directory check for :memory:
os.makedirs(fallback_dir, exist_ok=True)
test_file = os.path.join(fallback_dir, '.db_test')
with open(test_file, 'w') as f:
f.write('test')
os.remove(test_file)
# Test database connection and operations
import sqlite3
conn = sqlite3.connect(fallback_path)
cursor = conn.cursor()
cursor.execute("CREATE TABLE IF NOT EXISTS test_table (id INTEGER PRIMARY KEY, test TEXT)")
cursor.execute("INSERT INTO test_table (test) VALUES ('test_write')")
conn.commit()
cursor.execute("SELECT * FROM test_table LIMIT 1")
test_result = cursor.fetchone()
cursor.execute("DROP TABLE test_table")
conn.commit()
conn.close()
# If we got here, the database works with this path
self.db_manager = DatabaseManager(fallback_path)
db_path_used = fallback_path
print(f"[DEBUG] Successfully initialized database with fallback path: {fallback_path}")
logger.info(f"Using alternative database path: {fallback_path}")
db_initialized = True
break
except Exception as e:
print(f"[DEBUG] Failed to use fallback path {fallback_path}: {e}")
logger.warning(f"Failed to use fallback path {fallback_path}: {e}")
continue
if not db_initialized:
error_msg = "CRITICAL: Could not initialize database with any path. History functionality will not work."
logger.critical(error_msg)
print(f"[DEBUG] {error_msg}")
# Create an in-memory database as last resort
try:
self.db_manager = DatabaseManager(":memory:")
print("[DEBUG] Created in-memory database as last resort. History will not persist.")
except Exception as e:
print(f"[DEBUG] Even in-memory database failed: {e}")
# Initialize a dummy database manager that will log but not throw errors
class DummyDBManager:
def __init__(self):
pass
def __getattr__(self, name):
def dummy_method(*args, **kwargs):
print(f"[DEBUG] Called dummy DB method {name} with {args}, {kwargs}")
if name.startswith('get_'):
return []
return 0
return dummy_method
self.db_manager = DummyDBManager()
print(f"[DEBUG] DatabaseService initialization complete. Using DB: {db_path_used}")
logger.info(f"DatabaseService initialization complete. Using DB: {db_path_used}")
def save_analysis_results(self,
session_id: str,
image_path: str,
results: Dict[str, Any],
tags: Optional[List[str]] = None) -> int:
"""
Save analysis results to the database.
Args:
session_id: Current session identifier
image_path: Path to the analyzed image
results: Analysis results dictionary
tags: Optional list of tags for the analysis
Returns:
The ID of the saved analysis record
"""
try:
logger.info(f"Saving analysis results for session: {session_id}")
print(f"[DEBUG] Attempting to save analysis for session: {session_id}, image: {image_path}")
result_id = self.db_manager.save_analysis(session_id, image_path, results, tags)
print(f"[DEBUG] Analysis saved successfully with ID: {result_id}")
return result_id
except Exception as e:
error_msg = f"Failed to save analysis results: {e}"
logger.error(error_msg)
print(f"[DEBUG] {error_msg}")
# Return 0 to indicate failure but continue execution
return 0
def get_history(self,
session_id: Optional[str] = None,
limit: int = 10) -> List[Dict[str, Any]]:
"""
Retrieve analysis history.
Args:
session_id: Optional session ID to filter by
limit: Maximum number of records to return
Returns:
List of analysis records
"""
try:
logger.info(f"Retrieving analysis history for session: {session_id or 'all sessions'}")
print(f"[DEBUG] Attempting to get history for session: {session_id}, limit: {limit}")
# If session_id is None, use current session ID
if session_id is None and hasattr(st, 'session_state') and 'session_id' in st.session_state:
session_id = st.session_state.session_id
print(f"[DEBUG] Using session_id from session_state: {session_id}")
# First check if the database file exists
if not os.path.exists(self.db_manager.db_path):
print(f"[DEBUG] Database file does not exist: {self.db_manager.db_path}")
logger.warning(f"Database file does not exist: {self.db_manager.db_path}")
return []
# Test database connection
try:
import sqlite3
conn = sqlite3.connect(self.db_manager.db_path)
cursor = conn.cursor()
# Check if the analyses table exists
cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='analyses'")
if not cursor.fetchone():
print("[DEBUG] The 'analyses' table does not exist in the database")
conn.close()
return []
# Get a count of records for this session
if session_id:
cursor.execute("SELECT COUNT(*) FROM analyses WHERE session_id = ?", (session_id,))
else:
cursor.execute("SELECT COUNT(*) FROM analyses")
count = cursor.fetchone()[0]
print(f"[DEBUG] Found {count} records in the database")
conn.close()
except Exception as e:
print(f"[DEBUG] Error testing database connection: {e}")
logger.error(f"Error testing database connection: {e}")
# Try to get records using the db_manager
records = self.db_manager.get_analysis_history(session_id, limit)
print(f"[DEBUG] Retrieved {len(records)} records from db_manager")
# If no records were found but we expect some (from the count above)
# try a direct query as a fallback
if not records and count > 0:
print("[DEBUG] No records returned from db_manager but count > 0, trying direct query")
try:
conn = sqlite3.connect(self.db_manager.db_path)
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
if session_id:
query = "SELECT * FROM analyses WHERE session_id = ? ORDER BY timestamp DESC LIMIT ?"
cursor.execute(query, (session_id, limit))
else:
query = "SELECT * FROM analyses ORDER BY timestamp DESC LIMIT ?"
cursor.execute(query, (limit,))
rows = cursor.fetchall()
records = []
for row in rows:
record = {key: row[key] for key in row.keys()}
# Parse the JSON fields
try:
if 'results' in record and record['results']:
record['results'] = json.loads(record['results'])
if 'tags' in record and record['tags']:
record['tags'] = json.loads(record['tags'])
except:
pass
records.append(record)
conn.close()
print(f"[DEBUG] Retrieved {len(records)} records via direct query")
except Exception as e:
print(f"[DEBUG] Error in direct SQL query fallback: {e}")
logger.error(f"Error in direct SQL query fallback: {e}")
return records
except Exception as e:
error_msg = f"Failed to retrieve analysis history: {e}"
logger.error(error_msg)
print(f"[DEBUG] {error_msg}")
# Return empty list to avoid breaking the UI
return []
def get_analysis(self, analysis_id: int) -> Optional[Dict[str, Any]]:
"""
Retrieve a specific analysis.
Args:
analysis_id: ID of the analysis to retrieve
Returns:
Analysis record or None if not found
"""
logger.info(f"Retrieving analysis with ID: {analysis_id}")
return self.db_manager.get_analysis_by_id(analysis_id)
def get_emotion_stats(self,
session_id: Optional[str] = None,
limit: int = 50) -> Dict[str, float]:
"""
Get emotion statistics.
Args:
session_id: Optional session ID to filter by
limit: Maximum number of records to analyze
Returns:
Dictionary of emotion frequencies
"""
logger.info(f"Computing emotion statistics for session: {session_id or 'all sessions'}")
return self.db_manager.get_emotion_statistics(session_id, limit)
def delete_record(self, analysis_id: int) -> bool:
"""
Delete an analysis record.
Args:
analysis_id: ID of the analysis to delete
Returns:
True if deletion was successful, False otherwise
"""
logger.info(f"Deleting analysis with ID: {analysis_id}")
return self.db_manager.delete_analysis(analysis_id)
def export_data(self,
analysis_id: Optional[int] = None,
session_id: Optional[str] = None,
limit: int = 100) -> Dict[str, Any]:
"""
Export analysis data in a structured format.
Args:
analysis_id: Optional specific analysis ID to export
session_id: Optional session ID to filter by
limit: Maximum number of records to export
Returns:
Dictionary containing the exported data
"""
logger.info(f"Exporting data for analysis ID: {analysis_id or 'multiple'}")
return self.db_manager.export_analysis_data(analysis_id, session_id, limit)