| | """
|
| | Database Module - SQLite Database Management for Image Processing History
|
| | ========================================================================
|
| |
|
| | Comprehensive database management for storing processing history, user sessions,
|
| | image metadata, and performance analytics with full CRUD operations.
|
| | """
|
| |
|
| | import sqlite3
|
| | import json
|
| | import os
|
| | from datetime import datetime, timezone
|
| | from typing import Dict, List, Any, Optional, Tuple
|
| | import hashlib
|
| | import base64
|
| | import numpy as np
|
| | import logging
|
| | from contextlib import contextmanager
|
| | from dataclasses import dataclass, asdict
|
| | import uuid
|
| |
|
| |
|
| | logging.basicConfig(level=logging.INFO)
|
| | logger = logging.getLogger(__name__)
|
| |
|
| | @dataclass
|
| | class ProcessingRecord:
|
| | """Data class for image processing records"""
|
| | id: Optional[int] = None
|
| | session_id: str = ""
|
| | original_filename: str = ""
|
| | file_hash: str = ""
|
| | blur_type: str = ""
|
| | blur_confidence: float = 0.0
|
| | processing_method: str = ""
|
| | processing_parameters: str = "{}"
|
| | original_quality_score: float = 0.0
|
| | enhanced_quality_score: float = 0.0
|
| | improvement_percentage: float = 0.0
|
| | processing_time_seconds: float = 0.0
|
| | timestamp: str = ""
|
| | notes: str = ""
|
| |
|
| | @dataclass
|
| | class SessionInfo:
|
| | """Data class for user sessions"""
|
| | session_id: str = ""
|
| | start_time: str = ""
|
| | end_time: Optional[str] = None
|
| | total_images_processed: int = 0
|
| | average_improvement: float = 0.0
|
| | preferred_method: str = ""
|
| |
|
| | class DatabaseManager:
|
| | """SQLite database manager for image processing application"""
|
| |
|
| | def __init__(self, db_path: str = "data/processing_history.db"):
|
| | """
|
| | Initialize database manager
|
| |
|
| | Args:
|
| | db_path: Path to SQLite database file
|
| | """
|
| | self.db_path = db_path
|
| | self.ensure_directory_exists()
|
| | self.initialize_database()
|
| |
|
| | def ensure_directory_exists(self):
|
| | """Ensure the database directory exists"""
|
| | try:
|
| | db_dir = os.path.dirname(self.db_path)
|
| | if db_dir and not os.path.exists(db_dir):
|
| | os.makedirs(db_dir, exist_ok=True)
|
| | logger.info(f"Created database directory: {db_dir}")
|
| | except Exception as e:
|
| | logger.error(f"Error creating database directory: {e}")
|
| |
|
| | @contextmanager
|
| | def get_connection(self):
|
| | """Context manager for database connections"""
|
| | conn = None
|
| | try:
|
| | conn = sqlite3.connect(self.db_path)
|
| | conn.row_factory = sqlite3.Row
|
| | yield conn
|
| | except Exception as e:
|
| | if conn:
|
| | conn.rollback()
|
| | logger.error(f"Database connection error: {e}")
|
| | raise
|
| | finally:
|
| | if conn:
|
| | conn.close()
|
| |
|
| | def initialize_database(self):
|
| | """Initialize database with required tables"""
|
| | try:
|
| | with self.get_connection() as conn:
|
| | cursor = conn.cursor()
|
| |
|
| |
|
| | cursor.execute('''
|
| | CREATE TABLE IF NOT EXISTS processing_records (
|
| | id INTEGER PRIMARY KEY AUTOINCREMENT,
|
| | session_id TEXT NOT NULL,
|
| | original_filename TEXT NOT NULL,
|
| | file_hash TEXT NOT NULL,
|
| | blur_type TEXT,
|
| | blur_confidence REAL DEFAULT 0.0,
|
| | processing_method TEXT NOT NULL,
|
| | processing_parameters TEXT DEFAULT '{}',
|
| | original_quality_score REAL DEFAULT 0.0,
|
| | enhanced_quality_score REAL DEFAULT 0.0,
|
| | improvement_percentage REAL DEFAULT 0.0,
|
| | processing_time_seconds REAL DEFAULT 0.0,
|
| | timestamp TEXT NOT NULL,
|
| | notes TEXT DEFAULT '',
|
| | UNIQUE(file_hash, processing_method, processing_parameters)
|
| | )
|
| | ''')
|
| |
|
| |
|
| | cursor.execute('''
|
| | CREATE TABLE IF NOT EXISTS sessions (
|
| | session_id TEXT PRIMARY KEY,
|
| | start_time TEXT NOT NULL,
|
| | end_time TEXT,
|
| | total_images_processed INTEGER DEFAULT 0,
|
| | average_improvement REAL DEFAULT 0.0,
|
| | preferred_method TEXT DEFAULT ''
|
| | )
|
| | ''')
|
| |
|
| |
|
| | cursor.execute('''
|
| | CREATE TABLE IF NOT EXISTS performance_metrics (
|
| | id INTEGER PRIMARY KEY AUTOINCREMENT,
|
| | method_name TEXT NOT NULL,
|
| | average_processing_time REAL DEFAULT 0.0,
|
| | average_improvement REAL DEFAULT 0.0,
|
| | success_rate REAL DEFAULT 0.0,
|
| | total_uses INTEGER DEFAULT 0,
|
| | last_updated TEXT NOT NULL
|
| | )
|
| | ''')
|
| |
|
| |
|
| | cursor.execute('''
|
| | CREATE INDEX IF NOT EXISTS idx_processing_session
|
| | ON processing_records(session_id)
|
| | ''')
|
| |
|
| | cursor.execute('''
|
| | CREATE INDEX IF NOT EXISTS idx_processing_timestamp
|
| | ON processing_records(timestamp)
|
| | ''')
|
| |
|
| | cursor.execute('''
|
| | CREATE INDEX IF NOT EXISTS idx_processing_method
|
| | ON processing_records(processing_method)
|
| | ''')
|
| |
|
| | conn.commit()
|
| | logger.info("Database initialized successfully")
|
| |
|
| | except Exception as e:
|
| | logger.error(f"Error initializing database: {e}")
|
| |
|
| | def generate_session_id(self) -> str:
|
| | """Generate unique session ID"""
|
| | return str(uuid.uuid4())
|
| |
|
| | def calculate_file_hash(self, file_data: bytes) -> str:
|
| | """Calculate SHA-256 hash of file data"""
|
| | return hashlib.sha256(file_data).hexdigest()
|
| |
|
| | def start_session(self, session_id: Optional[str] = None) -> str:
|
| | """
|
| | Start a new processing session
|
| |
|
| | Args:
|
| | session_id: Optional session ID, generates new if not provided
|
| |
|
| | Returns:
|
| | str: Session ID
|
| | """
|
| | try:
|
| | if not session_id:
|
| | session_id = self.generate_session_id()
|
| |
|
| | current_time = datetime.now(timezone.utc).isoformat()
|
| |
|
| | with self.get_connection() as conn:
|
| | cursor = conn.cursor()
|
| | cursor.execute('''
|
| | INSERT OR REPLACE INTO sessions
|
| | (session_id, start_time, total_images_processed, average_improvement)
|
| | VALUES (?, ?, 0, 0.0)
|
| | ''', (session_id, current_time))
|
| | conn.commit()
|
| |
|
| | logger.info(f"Session started: {session_id}")
|
| | return session_id
|
| |
|
| | except Exception as e:
|
| | logger.error(f"Error starting session: {e}")
|
| | return self.generate_session_id()
|
| |
|
| | def end_session(self, session_id: str):
|
| | """
|
| | End a processing session and update statistics
|
| |
|
| | Args:
|
| | session_id: Session ID to end
|
| | """
|
| | try:
|
| | current_time = datetime.now(timezone.utc).isoformat()
|
| |
|
| | with self.get_connection() as conn:
|
| | cursor = conn.cursor()
|
| |
|
| |
|
| | cursor.execute('''
|
| | SELECT COUNT(*), AVG(improvement_percentage),
|
| | processing_method,
|
| | COUNT(processing_method) as method_count
|
| | FROM processing_records
|
| | WHERE session_id = ?
|
| | GROUP BY processing_method
|
| | ORDER BY method_count DESC
|
| | LIMIT 1
|
| | ''', (session_id,))
|
| |
|
| | stats = cursor.fetchone()
|
| |
|
| | if stats:
|
| | total_processed = stats[0]
|
| | avg_improvement = stats[1] or 0.0
|
| | preferred_method = stats[2] or ""
|
| | else:
|
| | total_processed = 0
|
| | avg_improvement = 0.0
|
| | preferred_method = ""
|
| |
|
| |
|
| | cursor.execute('''
|
| | UPDATE sessions
|
| | SET end_time = ?,
|
| | total_images_processed = ?,
|
| | average_improvement = ?,
|
| | preferred_method = ?
|
| | WHERE session_id = ?
|
| | ''', (current_time, total_processed, avg_improvement,
|
| | preferred_method, session_id))
|
| |
|
| | conn.commit()
|
| |
|
| | logger.info(f"Session ended: {session_id}")
|
| |
|
| | except Exception as e:
|
| | logger.error(f"Error ending session: {e}")
|
| |
|
| | def add_processing_record(self, record: ProcessingRecord) -> Optional[int]:
|
| | """
|
| | Add a new processing record to database
|
| |
|
| | Args:
|
| | record: ProcessingRecord instance
|
| |
|
| | Returns:
|
| | Optional[int]: Record ID if successful
|
| | """
|
| | try:
|
| |
|
| | if not record.timestamp:
|
| | record.timestamp = datetime.now(timezone.utc).isoformat()
|
| |
|
| | with self.get_connection() as conn:
|
| | cursor = conn.cursor()
|
| |
|
| | cursor.execute('''
|
| | INSERT OR IGNORE INTO processing_records (
|
| | session_id, original_filename, file_hash, blur_type,
|
| | blur_confidence, processing_method, processing_parameters,
|
| | original_quality_score, enhanced_quality_score,
|
| | improvement_percentage, processing_time_seconds,
|
| | timestamp, notes
|
| | ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
| | ''', (
|
| | record.session_id, record.original_filename, record.file_hash,
|
| | record.blur_type, record.blur_confidence, record.processing_method,
|
| | record.processing_parameters, record.original_quality_score,
|
| | record.enhanced_quality_score, record.improvement_percentage,
|
| | record.processing_time_seconds, record.timestamp, record.notes
|
| | ))
|
| |
|
| | record_id = cursor.lastrowid
|
| | conn.commit()
|
| |
|
| |
|
| | self._update_performance_metrics(record.processing_method, record)
|
| |
|
| | logger.info(f"Processing record added: ID {record_id}")
|
| | return record_id
|
| |
|
| | except Exception as e:
|
| | logger.error(f"Error adding processing record: {e}")
|
| | return None
|
| |
|
| | def _update_performance_metrics(self, method_name: str, record: ProcessingRecord):
|
| | """Update performance metrics for a processing method"""
|
| | try:
|
| | current_time = datetime.now(timezone.utc).isoformat()
|
| |
|
| | with self.get_connection() as conn:
|
| | cursor = conn.cursor()
|
| |
|
| |
|
| | cursor.execute('''
|
| | SELECT total_uses, average_processing_time,
|
| | average_improvement, success_rate
|
| | FROM performance_metrics
|
| | WHERE method_name = ?
|
| | ''', (method_name,))
|
| |
|
| | existing = cursor.fetchone()
|
| |
|
| | if existing:
|
| | total_uses = existing[0] + 1
|
| | avg_time = ((existing[1] * existing[0]) + record.processing_time_seconds) / total_uses
|
| | avg_improvement = ((existing[2] * existing[0]) + record.improvement_percentage) / total_uses
|
| | success_rate = existing[3]
|
| |
|
| | cursor.execute('''
|
| | UPDATE performance_metrics
|
| | SET total_uses = ?, average_processing_time = ?,
|
| | average_improvement = ?, success_rate = ?,
|
| | last_updated = ?
|
| | WHERE method_name = ?
|
| | ''', (total_uses, avg_time, avg_improvement, success_rate,
|
| | current_time, method_name))
|
| | else:
|
| |
|
| | cursor.execute('''
|
| | INSERT INTO performance_metrics (
|
| | method_name, average_processing_time, average_improvement,
|
| | success_rate, total_uses, last_updated
|
| | ) VALUES (?, ?, ?, ?, ?, ?)
|
| | ''', (method_name, record.processing_time_seconds,
|
| | record.improvement_percentage, 1.0, 1, current_time))
|
| |
|
| | conn.commit()
|
| |
|
| | except Exception as e:
|
| | logger.error(f"Error updating performance metrics: {e}")
|
| |
|
| | def get_processing_history(self, session_id: Optional[str] = None,
|
| | limit: int = 100,
|
| | method_filter: Optional[str] = None) -> List[ProcessingRecord]:
|
| | """
|
| | Get processing history records
|
| |
|
| | Args:
|
| | session_id: Filter by session ID
|
| | limit: Maximum number of records
|
| | method_filter: Filter by processing method
|
| |
|
| | Returns:
|
| | List[ProcessingRecord]: Processing records
|
| | """
|
| | try:
|
| | with self.get_connection() as conn:
|
| | cursor = conn.cursor()
|
| |
|
| | query = "SELECT * FROM processing_records WHERE 1=1"
|
| | params = []
|
| |
|
| | if session_id:
|
| | query += " AND session_id = ?"
|
| | params.append(session_id)
|
| |
|
| | if method_filter:
|
| | query += " AND processing_method = ?"
|
| | params.append(method_filter)
|
| |
|
| | query += " ORDER BY timestamp DESC LIMIT ?"
|
| | params.append(limit)
|
| |
|
| | cursor.execute(query, params)
|
| | rows = cursor.fetchall()
|
| |
|
| | records = []
|
| | for row in rows:
|
| | record = ProcessingRecord(
|
| | id=row['id'],
|
| | session_id=row['session_id'],
|
| | original_filename=row['original_filename'],
|
| | file_hash=row['file_hash'],
|
| | blur_type=row['blur_type'] or "",
|
| | blur_confidence=row['blur_confidence'] or 0.0,
|
| | processing_method=row['processing_method'],
|
| | processing_parameters=row['processing_parameters'] or "{}",
|
| | original_quality_score=row['original_quality_score'] or 0.0,
|
| | enhanced_quality_score=row['enhanced_quality_score'] or 0.0,
|
| | improvement_percentage=row['improvement_percentage'] or 0.0,
|
| | processing_time_seconds=row['processing_time_seconds'] or 0.0,
|
| | timestamp=row['timestamp'],
|
| | notes=row['notes'] or ""
|
| | )
|
| | records.append(record)
|
| |
|
| | return records
|
| |
|
| | except Exception as e:
|
| | logger.error(f"Error getting processing history: {e}")
|
| | return []
|
| |
|
| | def get_session_statistics(self, session_id: str) -> Dict[str, Any]:
|
| | """
|
| | Get comprehensive statistics for a session
|
| |
|
| | Args:
|
| | session_id: Session ID
|
| |
|
| | Returns:
|
| | dict: Session statistics
|
| | """
|
| | try:
|
| | with self.get_connection() as conn:
|
| | cursor = conn.cursor()
|
| |
|
| |
|
| | cursor.execute('''
|
| | SELECT * FROM sessions WHERE session_id = ?
|
| | ''', (session_id,))
|
| | session_info = cursor.fetchone()
|
| |
|
| |
|
| | cursor.execute('''
|
| | SELECT
|
| | COUNT(*) as total_processed,
|
| | AVG(improvement_percentage) as avg_improvement,
|
| | MAX(improvement_percentage) as max_improvement,
|
| | MIN(improvement_percentage) as min_improvement,
|
| | AVG(processing_time_seconds) as avg_processing_time,
|
| | AVG(original_quality_score) as avg_original_quality,
|
| | AVG(enhanced_quality_score) as avg_enhanced_quality
|
| | FROM processing_records
|
| | WHERE session_id = ?
|
| | ''', (session_id,))
|
| | stats = cursor.fetchone()
|
| |
|
| |
|
| | cursor.execute('''
|
| | SELECT
|
| | processing_method,
|
| | COUNT(*) as count,
|
| | AVG(improvement_percentage) as avg_improvement
|
| | FROM processing_records
|
| | WHERE session_id = ?
|
| | GROUP BY processing_method
|
| | ORDER BY count DESC
|
| | ''', (session_id,))
|
| | method_stats = cursor.fetchall()
|
| |
|
| |
|
| | stats_dict = {}
|
| | if stats:
|
| | stats_dict = {
|
| | 'total_processed': stats[0] or 0,
|
| | 'avg_improvement': stats[1] or 0.0,
|
| | 'max_improvement': stats[2] or 0.0,
|
| | 'min_improvement': stats[3] or 0.0,
|
| | 'avg_processing_time': stats[4] or 0.0,
|
| | 'avg_original_quality': stats[5] or 0.0,
|
| | 'avg_enhanced_quality': stats[6] or 0.0
|
| | }
|
| |
|
| | return {
|
| | 'session_info': dict(session_info) if session_info else {},
|
| | 'processing_stats': stats_dict,
|
| | 'method_breakdown': [dict(row) for row in method_stats]
|
| | }
|
| |
|
| | except Exception as e:
|
| | logger.error(f"Error getting session statistics: {e}")
|
| | return {}
|
| |
|
| | def get_global_statistics(self) -> Dict[str, Any]:
|
| | """
|
| | Get comprehensive global statistics (all sessions)
|
| |
|
| | Returns:
|
| | dict: Global statistics
|
| | """
|
| | try:
|
| | with self.get_connection() as conn:
|
| | cursor = conn.cursor()
|
| |
|
| |
|
| | cursor.execute('''
|
| | SELECT
|
| | COUNT(*) as total_processed,
|
| | AVG(improvement_percentage) as avg_improvement,
|
| | MAX(improvement_percentage) as max_improvement,
|
| | MIN(improvement_percentage) as min_improvement,
|
| | AVG(processing_time_seconds) as avg_processing_time,
|
| | AVG(original_quality_score) as avg_original_quality,
|
| | AVG(enhanced_quality_score) as avg_enhanced_quality
|
| | FROM processing_records
|
| | ''')
|
| | stats = cursor.fetchone()
|
| |
|
| |
|
| | cursor.execute('''
|
| | SELECT
|
| | processing_method,
|
| | COUNT(*) as count,
|
| | AVG(improvement_percentage) as avg_improvement
|
| | FROM processing_records
|
| | GROUP BY processing_method
|
| | ORDER BY count DESC
|
| | ''')
|
| | method_stats = cursor.fetchall()
|
| |
|
| |
|
| | stats_dict = {}
|
| | if stats:
|
| | stats_dict = {
|
| | 'total_processed': stats[0] or 0,
|
| | 'avg_improvement': stats[1] or 0.0,
|
| | 'max_improvement': stats[2] or 0.0,
|
| | 'min_improvement': stats[3] or 0.0,
|
| | 'avg_processing_time': stats[4] or 0.0,
|
| | 'avg_original_quality': stats[5] or 0.0,
|
| | 'avg_enhanced_quality': stats[6] or 0.0
|
| | }
|
| |
|
| | return {
|
| | 'processing_stats': stats_dict,
|
| | 'method_breakdown': [dict(row) for row in method_stats]
|
| | }
|
| |
|
| | except Exception as e:
|
| | logger.error(f"Error getting global statistics: {e}")
|
| | return {}
|
| |
|
| | def get_performance_metrics(self) -> List[Dict[str, Any]]:
|
| | """
|
| | Get performance metrics for all methods
|
| |
|
| | Returns:
|
| | List[dict]: Performance metrics
|
| | """
|
| | try:
|
| | with self.get_connection() as conn:
|
| | cursor = conn.cursor()
|
| | cursor.execute('''
|
| | SELECT * FROM performance_metrics
|
| | ORDER BY total_uses DESC
|
| | ''')
|
| | rows = cursor.fetchall()
|
| | return [dict(row) for row in rows]
|
| |
|
| | except Exception as e:
|
| | logger.error(f"Error getting performance metrics: {e}")
|
| | return []
|
| |
|
| | def search_records(self, search_params: Dict[str, Any]) -> List[ProcessingRecord]:
|
| | """
|
| | Search processing records with flexible criteria
|
| |
|
| | Args:
|
| | search_params: Dictionary with search criteria
|
| |
|
| | Returns:
|
| | List[ProcessingRecord]: Matching records
|
| | """
|
| | try:
|
| | with self.get_connection() as conn:
|
| | cursor = conn.cursor()
|
| |
|
| | query = "SELECT * FROM processing_records WHERE 1=1"
|
| | params = []
|
| |
|
| |
|
| | if 'filename_contains' in search_params:
|
| | query += " AND original_filename LIKE ?"
|
| | params.append(f"%{search_params['filename_contains']}%")
|
| |
|
| | if 'method' in search_params:
|
| | query += " AND processing_method = ?"
|
| | params.append(search_params['method'])
|
| |
|
| | if 'min_improvement' in search_params:
|
| | query += " AND improvement_percentage >= ?"
|
| | params.append(search_params['min_improvement'])
|
| |
|
| | if 'date_from' in search_params:
|
| | query += " AND timestamp >= ?"
|
| | params.append(search_params['date_from'])
|
| |
|
| | if 'date_to' in search_params:
|
| | query += " AND timestamp <= ?"
|
| | params.append(search_params['date_to'])
|
| |
|
| | query += " ORDER BY timestamp DESC"
|
| |
|
| | if 'limit' in search_params:
|
| | query += " LIMIT ?"
|
| | params.append(search_params['limit'])
|
| |
|
| | cursor.execute(query, params)
|
| | rows = cursor.fetchall()
|
| |
|
| | records = []
|
| | for row in rows:
|
| | record = ProcessingRecord(
|
| | id=row['id'],
|
| | session_id=row['session_id'],
|
| | original_filename=row['original_filename'],
|
| | file_hash=row['file_hash'],
|
| | blur_type=row['blur_type'] or "",
|
| | blur_confidence=row['blur_confidence'] or 0.0,
|
| | processing_method=row['processing_method'],
|
| | processing_parameters=row['processing_parameters'] or "{}",
|
| | original_quality_score=row['original_quality_score'] or 0.0,
|
| | enhanced_quality_score=row['enhanced_quality_score'] or 0.0,
|
| | improvement_percentage=row['improvement_percentage'] or 0.0,
|
| | processing_time_seconds=row['processing_time_seconds'] or 0.0,
|
| | timestamp=row['timestamp'],
|
| | notes=row['notes'] or ""
|
| | )
|
| | records.append(record)
|
| |
|
| | return records
|
| |
|
| | except Exception as e:
|
| | logger.error(f"Error searching records: {e}")
|
| | return []
|
| |
|
| | def cleanup_old_records(self, days_old: int = 30) -> int:
|
| | """
|
| | Clean up old processing records
|
| |
|
| | Args:
|
| | days_old: Remove records older than this many days
|
| |
|
| | Returns:
|
| | int: Number of records deleted
|
| | """
|
| | try:
|
| | cutoff_date = datetime.now(timezone.utc).replace(
|
| | hour=0, minute=0, second=0, microsecond=0
|
| | ) - datetime.timedelta(days=days_old)
|
| | cutoff_str = cutoff_date.isoformat()
|
| |
|
| | with self.get_connection() as conn:
|
| | cursor = conn.cursor()
|
| |
|
| |
|
| | cursor.execute('''
|
| | SELECT COUNT(*) FROM processing_records
|
| | WHERE timestamp < ?
|
| | ''', (cutoff_str,))
|
| | count = cursor.fetchone()[0]
|
| |
|
| |
|
| | cursor.execute('''
|
| | DELETE FROM processing_records
|
| | WHERE timestamp < ?
|
| | ''', (cutoff_str,))
|
| |
|
| |
|
| | cursor.execute('''
|
| | DELETE FROM sessions
|
| | WHERE session_id NOT IN (
|
| | SELECT DISTINCT session_id FROM processing_records
|
| | )
|
| | ''')
|
| |
|
| | conn.commit()
|
| |
|
| | logger.info(f"Cleaned up {count} old records")
|
| | return count
|
| |
|
| | except Exception as e:
|
| | logger.error(f"Error cleaning up old records: {e}")
|
| | return 0
|
| |
|
| |
|
| | def get_database_manager(db_path: str = "data/processing_history.db") -> DatabaseManager:
|
| | """Get database manager instance"""
|
| | return DatabaseManager(db_path)
|
| |
|
| | def log_processing_result(session_id: str,
|
| | filename: str,
|
| | file_data: bytes,
|
| | processing_result: Dict[str, Any],
|
| | db_path: str = "data/processing_history.db") -> Optional[int]:
|
| | """
|
| | Convenience function to log processing result
|
| |
|
| | Args:
|
| | session_id: Session ID
|
| | filename: Original filename
|
| | file_data: File data for hash calculation
|
| | processing_result: Processing result dictionary
|
| | db_path: Database path
|
| |
|
| | Returns:
|
| | Optional[int]: Record ID if successful
|
| | """
|
| | try:
|
| | db_manager = DatabaseManager(db_path)
|
| | file_hash = db_manager.calculate_file_hash(file_data)
|
| |
|
| | record = ProcessingRecord(
|
| | session_id=session_id,
|
| | original_filename=filename,
|
| | file_hash=file_hash,
|
| | blur_type=processing_result.get('blur_type', ''),
|
| | blur_confidence=processing_result.get('blur_confidence', 0.0),
|
| | processing_method=processing_result.get('method', ''),
|
| | processing_parameters=json.dumps(processing_result.get('parameters', {})),
|
| | original_quality_score=processing_result.get('original_quality', 0.0),
|
| | enhanced_quality_score=processing_result.get('enhanced_quality', 0.0),
|
| | improvement_percentage=processing_result.get('improvement_percentage', 0.0),
|
| | processing_time_seconds=processing_result.get('processing_time', 0.0),
|
| | notes=processing_result.get('notes', '')
|
| | )
|
| |
|
| | return db_manager.add_processing_record(record)
|
| |
|
| | except Exception as e:
|
| | logger.error(f"Error logging processing result: {e}")
|
| | return None
|
| |
|
| |
|
| | if __name__ == "__main__":
|
| | print("Database Module - Testing")
|
| | print("========================")
|
| |
|
| |
|
| | db_manager = DatabaseManager("test_database.db")
|
| |
|
| |
|
| | session_id = db_manager.start_session()
|
| | print(f"Started session: {session_id}")
|
| |
|
| |
|
| | test_record = ProcessingRecord(
|
| | session_id=session_id,
|
| | original_filename="test_image.jpg",
|
| | file_hash="abc123def456",
|
| | blur_type="gaussian",
|
| | blur_confidence=0.85,
|
| | processing_method="wiener_filter",
|
| | processing_parameters='{"sigma": 2.0}',
|
| | original_quality_score=0.45,
|
| | enhanced_quality_score=0.72,
|
| | improvement_percentage=60.0,
|
| | processing_time_seconds=2.3,
|
| | notes="Test processing"
|
| | )
|
| |
|
| |
|
| | record_id = db_manager.add_processing_record(test_record)
|
| | print(f"Added record with ID: {record_id}")
|
| |
|
| |
|
| | history = db_manager.get_processing_history(session_id=session_id)
|
| | print(f"Retrieved {len(history)} records")
|
| |
|
| |
|
| | stats = db_manager.get_session_statistics(session_id)
|
| | print(f"Session stats: {stats}")
|
| |
|
| |
|
| | db_manager.end_session(session_id)
|
| | print("Session ended")
|
| |
|
| |
|
| | os.remove("test_database.db")
|
| | print("Test database cleaned up")
|
| |
|
| | print("\nDatabase module test completed!") |