finalproject / database /db_manager.py
jarondon82's picture
Initial commit for EmotionMirror finalproject
f7e620e
"""
Database Manager for EmotionMirror Application
This module provides functionality for database operations, including:
- Creating and initializing the SQLite database
- Storing analysis results
- Retrieving historical data
- Managing database connections
All database operations are encapsulated in this module to ensure clean separation
of concerns and modularity.
"""
import os
import json
import sqlite3
import logging
from datetime import datetime
from typing import Dict, List, Any, Optional, Tuple
# Configure logging
logger = logging.getLogger(__name__)
class DatabaseManager:
"""
Manages database operations for the EmotionMirror application.
This class handles all interactions with the SQLite database, providing
a clean interface for the rest of the application to store and retrieve data.
"""
def __init__(self, db_path: str):
"""
Initialize the database manager.
Args:
db_path: Path to the SQLite database file
"""
self.db_path = db_path
self._initialize_db()
def _initialize_db(self) -> None:
"""
Initialize the database by creating necessary tables if they don't exist.
"""
os.makedirs(os.path.dirname(self.db_path), exist_ok=True)
conn = None
try:
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# Create analyses table
cursor.execute('''
CREATE TABLE IF NOT EXISTS analyses (
id INTEGER PRIMARY KEY AUTOINCREMENT,
session_id TEXT NOT NULL,
timestamp TEXT NOT NULL,
image_path TEXT NOT NULL,
face_count INTEGER NOT NULL,
results TEXT NOT NULL,
tags TEXT
)
''')
# Create faces table
cursor.execute('''
CREATE TABLE IF NOT EXISTS faces (
id INTEGER PRIMARY KEY AUTOINCREMENT,
analysis_id INTEGER NOT NULL,
face_index INTEGER NOT NULL,
emotion TEXT NOT NULL,
confidence REAL NOT NULL,
features TEXT NOT NULL,
emotions TEXT NOT NULL,
FOREIGN KEY (analysis_id) REFERENCES analyses (id)
)
''')
conn.commit()
logger.info("Database initialized successfully")
except sqlite3.Error as e:
logger.error(f"Database initialization error: {e}")
finally:
if conn:
conn.close()
def save_analysis(self,
session_id: str,
image_path: str,
results: Dict[str, Any],
tags: Optional[List[str]] = None) -> int:
"""
Save analysis results to the database.
Args:
session_id: Current session identifier
image_path: Path to the analyzed image
results: Analysis results dictionary
tags: Optional list of tags for the analysis
Returns:
The ID of the newly inserted analysis record
"""
conn = None
try:
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# Convert tags list to JSON string if provided
tags_json = json.dumps(tags) if tags else None
# Insert analysis record
cursor.execute(
'''
INSERT INTO analyses
(session_id, timestamp, image_path, face_count, results, tags)
VALUES (?, ?, ?, ?, ?, ?)
''',
(
session_id,
datetime.now().isoformat(),
image_path,
results.get('face_count', 0),
json.dumps(results),
tags_json
)
)
analysis_id = cursor.lastrowid
# Insert face records if present
faces = results.get('faces', [])
for i, face in enumerate(faces):
cursor.execute(
'''
INSERT INTO faces
(analysis_id, face_index, emotion, confidence, features, emotions)
VALUES (?, ?, ?, ?, ?, ?)
''',
(
analysis_id,
i,
face.get('emotion', 'unknown'),
face.get('confidence', 0.0),
json.dumps(face.get('features', {})),
json.dumps(face.get('emotions', {}))
)
)
conn.commit()
logger.info(f"Saved analysis with ID {analysis_id} and {len(faces)} faces")
return analysis_id
except sqlite3.Error as e:
logger.error(f"Error saving analysis: {e}")
if conn:
conn.rollback()
return -1
finally:
if conn:
conn.close()
def get_analysis_history(self,
session_id: Optional[str] = None,
limit: int = 10) -> List[Dict[str, Any]]:
"""
Retrieve analysis history from the database.
Args:
session_id: Optional session ID to filter by
limit: Maximum number of records to return
Returns:
List of analysis records as dictionaries
"""
conn = None
try:
conn = sqlite3.connect(self.db_path)
conn.row_factory = sqlite3.Row # This enables name-based access to columns
cursor = conn.cursor()
query = '''
SELECT
id,
session_id,
timestamp,
image_path,
face_count,
results,
tags
FROM analyses
'''
params = []
if session_id:
query += ' WHERE session_id = ?'
params.append(session_id)
query += '''
ORDER BY timestamp DESC
LIMIT ?
'''
params.append(limit)
cursor.execute(query, params)
# Convert row objects to dictionaries
analyses = []
for row in cursor.fetchall():
analysis = dict(row)
# Parse JSON fields
analysis['results'] = json.loads(analysis['results'])
if analysis['tags']:
analysis['tags'] = json.loads(analysis['tags'])
# Get faces for this analysis
face_cursor = conn.cursor()
face_cursor.execute(
'''
SELECT
id,
face_index,
emotion,
confidence,
features,
emotions
FROM faces
WHERE analysis_id = ?
ORDER BY face_index
''',
(analysis['id'],)
)
faces = []
for face_row in face_cursor.fetchall():
face = dict(face_row)
# Parse JSON fields
face['features'] = json.loads(face['features'])
face['emotions'] = json.loads(face['emotions'])
faces.append(face)
analysis['faces'] = faces
analyses.append(analysis)
return analyses
except sqlite3.Error as e:
logger.error(f"Error retrieving analysis history: {e}")
return []
finally:
if conn:
conn.close()
def get_analysis_by_id(self, analysis_id: int) -> Optional[Dict[str, Any]]:
"""
Retrieve a specific analysis by ID.
Args:
analysis_id: ID of the analysis to retrieve
Returns:
Analysis record as a dictionary, or None if not found
"""
conn = None
try:
conn = sqlite3.connect(self.db_path)
conn.row_factory = sqlite3.Row
cursor = conn.cursor()
cursor.execute(
'''
SELECT
id,
session_id,
timestamp,
image_path,
face_count,
results,
tags
FROM analyses
WHERE id = ?
''',
(analysis_id,)
)
row = cursor.fetchone()
if not row:
return None
analysis = dict(row)
# Parse JSON fields
analysis['results'] = json.loads(analysis['results'])
if analysis['tags']:
analysis['tags'] = json.loads(analysis['tags'])
# Get faces for this analysis
face_cursor = conn.cursor()
face_cursor.execute(
'''
SELECT
id,
face_index,
emotion,
confidence,
features,
emotions
FROM faces
WHERE analysis_id = ?
ORDER BY face_index
''',
(analysis_id,)
)
faces = []
for face_row in face_cursor.fetchall():
face = dict(face_row)
# Parse JSON fields
face['features'] = json.loads(face['features'])
face['emotions'] = json.loads(face['emotions'])
faces.append(face)
analysis['faces'] = faces
return analysis
except sqlite3.Error as e:
logger.error(f"Error retrieving analysis by ID: {e}")
return None
finally:
if conn:
conn.close()
def get_emotion_statistics(self,
session_id: Optional[str] = None,
limit: int = 50) -> Dict[str, float]:
"""
Compute emotion statistics across multiple analyses.
Args:
session_id: Optional session ID to filter by
limit: Maximum number of records to analyze
Returns:
Dictionary of emotion frequencies
"""
conn = None
try:
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
query = '''
SELECT emotion, COUNT(*) as count
FROM faces
'''
params = []
if session_id:
query += '''
JOIN analyses ON faces.analysis_id = analyses.id
WHERE analyses.session_id = ?
'''
params.append(session_id)
query += '''
GROUP BY emotion
ORDER BY count DESC
'''
cursor.execute(query, params)
rows = cursor.fetchall()
# Calculate emotion frequencies
total = sum(count for _, count in rows)
stats = {emotion: count / total for emotion, count in rows} if total > 0 else {}
return stats
except sqlite3.Error as e:
logger.error(f"Error computing emotion statistics: {e}")
return {}
finally:
if conn:
conn.close()
def delete_analysis(self, analysis_id: int) -> bool:
"""
Delete an analysis and its associated faces.
Args:
analysis_id: ID of the analysis to delete
Returns:
True if deletion was successful, False otherwise
"""
conn = None
try:
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# Delete associated faces first (due to foreign key constraint)
cursor.execute('DELETE FROM faces WHERE analysis_id = ?', (analysis_id,))
# Delete the analysis
cursor.execute('DELETE FROM analyses WHERE id = ?', (analysis_id,))
conn.commit()
return cursor.rowcount > 0
except sqlite3.Error as e:
logger.error(f"Error deleting analysis: {e}")
if conn:
conn.rollback()
return False
finally:
if conn:
conn.close()
def export_analysis_data(self,
analysis_id: Optional[int] = None,
session_id: Optional[str] = None,
limit: int = 100) -> Dict[str, Any]:
"""
Export analysis data in a structured format suitable for JSON/CSV export.
Args:
analysis_id: Optional specific analysis ID to export
session_id: Optional session ID to filter by
limit: Maximum number of records to export
Returns:
Dictionary containing the exported data
"""
if analysis_id:
# Export a single analysis
analysis = self.get_analysis_by_id(analysis_id)
if not analysis:
return {'error': f'Analysis with ID {analysis_id} not found'}
return {
'metadata': {
'exported_at': datetime.now().isoformat(),
'record_count': 1
},
'analyses': [analysis]
}
else:
# Export multiple analyses
analyses = self.get_analysis_history(session_id, limit)
return {
'metadata': {
'exported_at': datetime.now().isoformat(),
'record_count': len(analyses),
'session_id': session_id
},
'analyses': analyses
}