""" Database Module - SQLite Database Management for Image Processing History ======================================================================== Comprehensive database management for storing processing history, user sessions, image metadata, and performance analytics with full CRUD operations. """ import sqlite3 import json import os from datetime import datetime, timezone from typing import Dict, List, Any, Optional, Tuple import hashlib import base64 import numpy as np import logging from contextlib import contextmanager from dataclasses import dataclass, asdict import uuid # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @dataclass class ProcessingRecord: """Data class for image processing records""" id: Optional[int] = None session_id: str = "" original_filename: str = "" file_hash: str = "" blur_type: str = "" blur_confidence: float = 0.0 processing_method: str = "" processing_parameters: str = "{}" original_quality_score: float = 0.0 enhanced_quality_score: float = 0.0 improvement_percentage: float = 0.0 processing_time_seconds: float = 0.0 timestamp: str = "" notes: str = "" @dataclass class SessionInfo: """Data class for user sessions""" session_id: str = "" start_time: str = "" end_time: Optional[str] = None total_images_processed: int = 0 average_improvement: float = 0.0 preferred_method: str = "" class DatabaseManager: """SQLite database manager for image processing application""" def __init__(self, db_path: str = "data/processing_history.db"): """ Initialize database manager Args: db_path: Path to SQLite database file """ self.db_path = db_path self.ensure_directory_exists() self.initialize_database() def ensure_directory_exists(self): """Ensure the database directory exists""" try: db_dir = os.path.dirname(self.db_path) if db_dir and not os.path.exists(db_dir): os.makedirs(db_dir, exist_ok=True) logger.info(f"Created database directory: {db_dir}") except Exception as e: logger.error(f"Error creating database directory: {e}") @contextmanager def get_connection(self): """Context manager for database connections""" conn = None try: conn = sqlite3.connect(self.db_path) conn.row_factory = sqlite3.Row # Enable column access by name yield conn except Exception as e: if conn: conn.rollback() logger.error(f"Database connection error: {e}") raise finally: if conn: conn.close() def initialize_database(self): """Initialize database with required tables""" try: with self.get_connection() as conn: cursor = conn.cursor() # Create processing_records table cursor.execute(''' CREATE TABLE IF NOT EXISTS processing_records ( id INTEGER PRIMARY KEY AUTOINCREMENT, session_id TEXT NOT NULL, original_filename TEXT NOT NULL, file_hash TEXT NOT NULL, blur_type TEXT, blur_confidence REAL DEFAULT 0.0, processing_method TEXT NOT NULL, processing_parameters TEXT DEFAULT '{}', original_quality_score REAL DEFAULT 0.0, enhanced_quality_score REAL DEFAULT 0.0, improvement_percentage REAL DEFAULT 0.0, processing_time_seconds REAL DEFAULT 0.0, timestamp TEXT NOT NULL, notes TEXT DEFAULT '', UNIQUE(file_hash, processing_method, processing_parameters) ) ''') # Create sessions table cursor.execute(''' CREATE TABLE IF NOT EXISTS sessions ( session_id TEXT PRIMARY KEY, start_time TEXT NOT NULL, end_time TEXT, total_images_processed INTEGER DEFAULT 0, average_improvement REAL DEFAULT 0.0, preferred_method TEXT DEFAULT '' ) ''') # Create performance_metrics table cursor.execute(''' CREATE TABLE IF NOT EXISTS performance_metrics ( id INTEGER PRIMARY KEY AUTOINCREMENT, method_name TEXT NOT NULL, average_processing_time REAL DEFAULT 0.0, average_improvement REAL DEFAULT 0.0, success_rate REAL DEFAULT 0.0, total_uses INTEGER DEFAULT 0, last_updated TEXT NOT NULL ) ''') # Create indexes for better performance cursor.execute(''' CREATE INDEX IF NOT EXISTS idx_processing_session ON processing_records(session_id) ''') cursor.execute(''' CREATE INDEX IF NOT EXISTS idx_processing_timestamp ON processing_records(timestamp) ''') cursor.execute(''' CREATE INDEX IF NOT EXISTS idx_processing_method ON processing_records(processing_method) ''') conn.commit() logger.info("Database initialized successfully") except Exception as e: logger.error(f"Error initializing database: {e}") def generate_session_id(self) -> str: """Generate unique session ID""" return str(uuid.uuid4()) def calculate_file_hash(self, file_data: bytes) -> str: """Calculate SHA-256 hash of file data""" return hashlib.sha256(file_data).hexdigest() def start_session(self, session_id: Optional[str] = None) -> str: """ Start a new processing session Args: session_id: Optional session ID, generates new if not provided Returns: str: Session ID """ try: if not session_id: session_id = self.generate_session_id() current_time = datetime.now(timezone.utc).isoformat() with self.get_connection() as conn: cursor = conn.cursor() cursor.execute(''' INSERT OR REPLACE INTO sessions (session_id, start_time, total_images_processed, average_improvement) VALUES (?, ?, 0, 0.0) ''', (session_id, current_time)) conn.commit() logger.info(f"Session started: {session_id}") return session_id except Exception as e: logger.error(f"Error starting session: {e}") return self.generate_session_id() # Fallback def end_session(self, session_id: str): """ End a processing session and update statistics Args: session_id: Session ID to end """ try: current_time = datetime.now(timezone.utc).isoformat() with self.get_connection() as conn: cursor = conn.cursor() # Calculate session statistics cursor.execute(''' SELECT COUNT(*), AVG(improvement_percentage), processing_method, COUNT(processing_method) as method_count FROM processing_records WHERE session_id = ? GROUP BY processing_method ORDER BY method_count DESC LIMIT 1 ''', (session_id,)) stats = cursor.fetchone() if stats: total_processed = stats[0] avg_improvement = stats[1] or 0.0 preferred_method = stats[2] or "" else: total_processed = 0 avg_improvement = 0.0 preferred_method = "" # Update session cursor.execute(''' UPDATE sessions SET end_time = ?, total_images_processed = ?, average_improvement = ?, preferred_method = ? WHERE session_id = ? ''', (current_time, total_processed, avg_improvement, preferred_method, session_id)) conn.commit() logger.info(f"Session ended: {session_id}") except Exception as e: logger.error(f"Error ending session: {e}") def add_processing_record(self, record: ProcessingRecord) -> Optional[int]: """ Add a new processing record to database Args: record: ProcessingRecord instance Returns: Optional[int]: Record ID if successful """ try: # Set timestamp if not provided if not record.timestamp: record.timestamp = datetime.now(timezone.utc).isoformat() with self.get_connection() as conn: cursor = conn.cursor() cursor.execute(''' INSERT OR IGNORE INTO processing_records ( session_id, original_filename, file_hash, blur_type, blur_confidence, processing_method, processing_parameters, original_quality_score, enhanced_quality_score, improvement_percentage, processing_time_seconds, timestamp, notes ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ''', ( record.session_id, record.original_filename, record.file_hash, record.blur_type, record.blur_confidence, record.processing_method, record.processing_parameters, record.original_quality_score, record.enhanced_quality_score, record.improvement_percentage, record.processing_time_seconds, record.timestamp, record.notes )) record_id = cursor.lastrowid conn.commit() # Update performance metrics self._update_performance_metrics(record.processing_method, record) logger.info(f"Processing record added: ID {record_id}") return record_id except Exception as e: logger.error(f"Error adding processing record: {e}") return None def _update_performance_metrics(self, method_name: str, record: ProcessingRecord): """Update performance metrics for a processing method""" try: current_time = datetime.now(timezone.utc).isoformat() with self.get_connection() as conn: cursor = conn.cursor() # Get current metrics cursor.execute(''' SELECT total_uses, average_processing_time, average_improvement, success_rate FROM performance_metrics WHERE method_name = ? ''', (method_name,)) existing = cursor.fetchone() if existing: total_uses = existing[0] + 1 avg_time = ((existing[1] * existing[0]) + record.processing_time_seconds) / total_uses avg_improvement = ((existing[2] * existing[0]) + record.improvement_percentage) / total_uses success_rate = existing[3] # Could be updated based on improvement threshold cursor.execute(''' UPDATE performance_metrics SET total_uses = ?, average_processing_time = ?, average_improvement = ?, success_rate = ?, last_updated = ? WHERE method_name = ? ''', (total_uses, avg_time, avg_improvement, success_rate, current_time, method_name)) else: # New method cursor.execute(''' INSERT INTO performance_metrics ( method_name, average_processing_time, average_improvement, success_rate, total_uses, last_updated ) VALUES (?, ?, ?, ?, ?, ?) ''', (method_name, record.processing_time_seconds, record.improvement_percentage, 1.0, 1, current_time)) conn.commit() except Exception as e: logger.error(f"Error updating performance metrics: {e}") def get_processing_history(self, session_id: Optional[str] = None, limit: int = 100, method_filter: Optional[str] = None) -> List[ProcessingRecord]: """ Get processing history records Args: session_id: Filter by session ID limit: Maximum number of records method_filter: Filter by processing method Returns: List[ProcessingRecord]: Processing records """ try: with self.get_connection() as conn: cursor = conn.cursor() query = "SELECT * FROM processing_records WHERE 1=1" params = [] if session_id: query += " AND session_id = ?" params.append(session_id) if method_filter: query += " AND processing_method = ?" params.append(method_filter) query += " ORDER BY timestamp DESC LIMIT ?" params.append(limit) cursor.execute(query, params) rows = cursor.fetchall() records = [] for row in rows: record = ProcessingRecord( id=row['id'], session_id=row['session_id'], original_filename=row['original_filename'], file_hash=row['file_hash'], blur_type=row['blur_type'] or "", blur_confidence=row['blur_confidence'] or 0.0, processing_method=row['processing_method'], processing_parameters=row['processing_parameters'] or "{}", original_quality_score=row['original_quality_score'] or 0.0, enhanced_quality_score=row['enhanced_quality_score'] or 0.0, improvement_percentage=row['improvement_percentage'] or 0.0, processing_time_seconds=row['processing_time_seconds'] or 0.0, timestamp=row['timestamp'], notes=row['notes'] or "" ) records.append(record) return records except Exception as e: logger.error(f"Error getting processing history: {e}") return [] def get_session_statistics(self, session_id: str) -> Dict[str, Any]: """ Get comprehensive statistics for a session Args: session_id: Session ID Returns: dict: Session statistics """ try: with self.get_connection() as conn: cursor = conn.cursor() # Basic session info cursor.execute(''' SELECT * FROM sessions WHERE session_id = ? ''', (session_id,)) session_info = cursor.fetchone() # Processing statistics cursor.execute(''' SELECT COUNT(*) as total_processed, AVG(improvement_percentage) as avg_improvement, MAX(improvement_percentage) as max_improvement, MIN(improvement_percentage) as min_improvement, AVG(processing_time_seconds) as avg_processing_time, AVG(original_quality_score) as avg_original_quality, AVG(enhanced_quality_score) as avg_enhanced_quality FROM processing_records WHERE session_id = ? ''', (session_id,)) stats = cursor.fetchone() # Method breakdown cursor.execute(''' SELECT processing_method, COUNT(*) as count, AVG(improvement_percentage) as avg_improvement FROM processing_records WHERE session_id = ? GROUP BY processing_method ORDER BY count DESC ''', (session_id,)) method_stats = cursor.fetchall() # Ensure stats have default values for None entries stats_dict = {} if stats: stats_dict = { 'total_processed': stats[0] or 0, 'avg_improvement': stats[1] or 0.0, 'max_improvement': stats[2] or 0.0, 'min_improvement': stats[3] or 0.0, 'avg_processing_time': stats[4] or 0.0, 'avg_original_quality': stats[5] or 0.0, 'avg_enhanced_quality': stats[6] or 0.0 } return { 'session_info': dict(session_info) if session_info else {}, 'processing_stats': stats_dict, 'method_breakdown': [dict(row) for row in method_stats] } except Exception as e: logger.error(f"Error getting session statistics: {e}") return {} def get_global_statistics(self) -> Dict[str, Any]: """ Get comprehensive global statistics (all sessions) Returns: dict: Global statistics """ try: with self.get_connection() as conn: cursor = conn.cursor() # Processing statistics cursor.execute(''' SELECT COUNT(*) as total_processed, AVG(improvement_percentage) as avg_improvement, MAX(improvement_percentage) as max_improvement, MIN(improvement_percentage) as min_improvement, AVG(processing_time_seconds) as avg_processing_time, AVG(original_quality_score) as avg_original_quality, AVG(enhanced_quality_score) as avg_enhanced_quality FROM processing_records ''') stats = cursor.fetchone() # Method breakdown cursor.execute(''' SELECT processing_method, COUNT(*) as count, AVG(improvement_percentage) as avg_improvement FROM processing_records GROUP BY processing_method ORDER BY count DESC ''') method_stats = cursor.fetchall() # Ensure stats have default values for None entries stats_dict = {} if stats: stats_dict = { 'total_processed': stats[0] or 0, 'avg_improvement': stats[1] or 0.0, 'max_improvement': stats[2] or 0.0, 'min_improvement': stats[3] or 0.0, 'avg_processing_time': stats[4] or 0.0, 'avg_original_quality': stats[5] or 0.0, 'avg_enhanced_quality': stats[6] or 0.0 } return { 'processing_stats': stats_dict, 'method_breakdown': [dict(row) for row in method_stats] } except Exception as e: logger.error(f"Error getting global statistics: {e}") return {} def get_performance_metrics(self) -> List[Dict[str, Any]]: """ Get performance metrics for all methods Returns: List[dict]: Performance metrics """ try: with self.get_connection() as conn: cursor = conn.cursor() cursor.execute(''' SELECT * FROM performance_metrics ORDER BY total_uses DESC ''') rows = cursor.fetchall() return [dict(row) for row in rows] except Exception as e: logger.error(f"Error getting performance metrics: {e}") return [] def search_records(self, search_params: Dict[str, Any]) -> List[ProcessingRecord]: """ Search processing records with flexible criteria Args: search_params: Dictionary with search criteria Returns: List[ProcessingRecord]: Matching records """ try: with self.get_connection() as conn: cursor = conn.cursor() query = "SELECT * FROM processing_records WHERE 1=1" params = [] # Build dynamic query based on search parameters if 'filename_contains' in search_params: query += " AND original_filename LIKE ?" params.append(f"%{search_params['filename_contains']}%") if 'method' in search_params: query += " AND processing_method = ?" params.append(search_params['method']) if 'min_improvement' in search_params: query += " AND improvement_percentage >= ?" params.append(search_params['min_improvement']) if 'date_from' in search_params: query += " AND timestamp >= ?" params.append(search_params['date_from']) if 'date_to' in search_params: query += " AND timestamp <= ?" params.append(search_params['date_to']) query += " ORDER BY timestamp DESC" if 'limit' in search_params: query += " LIMIT ?" params.append(search_params['limit']) cursor.execute(query, params) rows = cursor.fetchall() records = [] for row in rows: record = ProcessingRecord( id=row['id'], session_id=row['session_id'], original_filename=row['original_filename'], file_hash=row['file_hash'], blur_type=row['blur_type'] or "", blur_confidence=row['blur_confidence'] or 0.0, processing_method=row['processing_method'], processing_parameters=row['processing_parameters'] or "{}", original_quality_score=row['original_quality_score'] or 0.0, enhanced_quality_score=row['enhanced_quality_score'] or 0.0, improvement_percentage=row['improvement_percentage'] or 0.0, processing_time_seconds=row['processing_time_seconds'] or 0.0, timestamp=row['timestamp'], notes=row['notes'] or "" ) records.append(record) return records except Exception as e: logger.error(f"Error searching records: {e}") return [] def cleanup_old_records(self, days_old: int = 30) -> int: """ Clean up old processing records Args: days_old: Remove records older than this many days Returns: int: Number of records deleted """ try: cutoff_date = datetime.now(timezone.utc).replace( hour=0, minute=0, second=0, microsecond=0 ) - datetime.timedelta(days=days_old) cutoff_str = cutoff_date.isoformat() with self.get_connection() as conn: cursor = conn.cursor() # Count records to be deleted cursor.execute(''' SELECT COUNT(*) FROM processing_records WHERE timestamp < ? ''', (cutoff_str,)) count = cursor.fetchone()[0] # Delete old records cursor.execute(''' DELETE FROM processing_records WHERE timestamp < ? ''', (cutoff_str,)) # Delete orphaned sessions cursor.execute(''' DELETE FROM sessions WHERE session_id NOT IN ( SELECT DISTINCT session_id FROM processing_records ) ''') conn.commit() logger.info(f"Cleaned up {count} old records") return count except Exception as e: logger.error(f"Error cleaning up old records: {e}") return 0 # Convenience functions for easy database operations def get_database_manager(db_path: str = "data/processing_history.db") -> DatabaseManager: """Get database manager instance""" return DatabaseManager(db_path) def log_processing_result(session_id: str, filename: str, file_data: bytes, processing_result: Dict[str, Any], db_path: str = "data/processing_history.db") -> Optional[int]: """ Convenience function to log processing result Args: session_id: Session ID filename: Original filename file_data: File data for hash calculation processing_result: Processing result dictionary db_path: Database path Returns: Optional[int]: Record ID if successful """ try: db_manager = DatabaseManager(db_path) file_hash = db_manager.calculate_file_hash(file_data) record = ProcessingRecord( session_id=session_id, original_filename=filename, file_hash=file_hash, blur_type=processing_result.get('blur_type', ''), blur_confidence=processing_result.get('blur_confidence', 0.0), processing_method=processing_result.get('method', ''), processing_parameters=json.dumps(processing_result.get('parameters', {})), original_quality_score=processing_result.get('original_quality', 0.0), enhanced_quality_score=processing_result.get('enhanced_quality', 0.0), improvement_percentage=processing_result.get('improvement_percentage', 0.0), processing_time_seconds=processing_result.get('processing_time', 0.0), notes=processing_result.get('notes', '') ) return db_manager.add_processing_record(record) except Exception as e: logger.error(f"Error logging processing result: {e}") return None # Example usage and testing if __name__ == "__main__": print("Database Module - Testing") print("========================") # Initialize database manager db_manager = DatabaseManager("test_database.db") # Start a session session_id = db_manager.start_session() print(f"Started session: {session_id}") # Create test processing record test_record = ProcessingRecord( session_id=session_id, original_filename="test_image.jpg", file_hash="abc123def456", blur_type="gaussian", blur_confidence=0.85, processing_method="wiener_filter", processing_parameters='{"sigma": 2.0}', original_quality_score=0.45, enhanced_quality_score=0.72, improvement_percentage=60.0, processing_time_seconds=2.3, notes="Test processing" ) # Add record record_id = db_manager.add_processing_record(test_record) print(f"Added record with ID: {record_id}") # Get history history = db_manager.get_processing_history(session_id=session_id) print(f"Retrieved {len(history)} records") # Get session statistics stats = db_manager.get_session_statistics(session_id) print(f"Session stats: {stats}") # End session db_manager.end_session(session_id) print("Session ended") # Cleanup test database os.remove("test_database.db") print("Test database cleaned up") print("\nDatabase module test completed!")