ganeshkumar383's picture
Upload 26 files
13fe470 verified
raw
history blame
31.8 kB
"""
Database Module - SQLite Database Management for Image Processing History
========================================================================
Comprehensive database management for storing processing history, user sessions,
image metadata, and performance analytics with full CRUD operations.
"""
import sqlite3
import json
import os
from datetime import datetime, timezone
from typing import Dict, List, Any, Optional, Tuple
import hashlib
import base64
import numpy as np
import logging
from contextlib import contextmanager
from dataclasses import dataclass, asdict
import uuid
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class ProcessingRecord:
"""Data class for image processing records"""
id: Optional[int] = None
session_id: str = ""
original_filename: str = ""
file_hash: str = ""
blur_type: str = ""
blur_confidence: float = 0.0
processing_method: str = ""
processing_parameters: str = "{}"
original_quality_score: float = 0.0
enhanced_quality_score: float = 0.0
improvement_percentage: float = 0.0
processing_time_seconds: float = 0.0
timestamp: str = ""
notes: str = ""
@dataclass
class SessionInfo:
"""Data class for user sessions"""
session_id: str = ""
start_time: str = ""
end_time: Optional[str] = None
total_images_processed: int = 0
average_improvement: float = 0.0
preferred_method: str = ""
class DatabaseManager:
"""SQLite database manager for image processing application"""
def __init__(self, db_path: str = "data/processing_history.db"):
"""
Initialize database manager
Args:
db_path: Path to SQLite database file
"""
self.db_path = db_path
self.ensure_directory_exists()
self.initialize_database()
def ensure_directory_exists(self):
"""Ensure the database directory exists"""
try:
db_dir = os.path.dirname(self.db_path)
if db_dir and not os.path.exists(db_dir):
os.makedirs(db_dir, exist_ok=True)
logger.info(f"Created database directory: {db_dir}")
except Exception as e:
logger.error(f"Error creating database directory: {e}")
@contextmanager
def get_connection(self):
"""Context manager for database connections"""
conn = None
try:
conn = sqlite3.connect(self.db_path)
conn.row_factory = sqlite3.Row # Enable column access by name
yield conn
except Exception as e:
if conn:
conn.rollback()
logger.error(f"Database connection error: {e}")
raise
finally:
if conn:
conn.close()
def initialize_database(self):
"""Initialize database with required tables"""
try:
with self.get_connection() as conn:
cursor = conn.cursor()
# Create processing_records table
cursor.execute('''
CREATE TABLE IF NOT EXISTS processing_records (
id INTEGER PRIMARY KEY AUTOINCREMENT,
session_id TEXT NOT NULL,
original_filename TEXT NOT NULL,
file_hash TEXT NOT NULL,
blur_type TEXT,
blur_confidence REAL DEFAULT 0.0,
processing_method TEXT NOT NULL,
processing_parameters TEXT DEFAULT '{}',
original_quality_score REAL DEFAULT 0.0,
enhanced_quality_score REAL DEFAULT 0.0,
improvement_percentage REAL DEFAULT 0.0,
processing_time_seconds REAL DEFAULT 0.0,
timestamp TEXT NOT NULL,
notes TEXT DEFAULT '',
UNIQUE(file_hash, processing_method, processing_parameters)
)
''')
# Create sessions table
cursor.execute('''
CREATE TABLE IF NOT EXISTS sessions (
session_id TEXT PRIMARY KEY,
start_time TEXT NOT NULL,
end_time TEXT,
total_images_processed INTEGER DEFAULT 0,
average_improvement REAL DEFAULT 0.0,
preferred_method TEXT DEFAULT ''
)
''')
# Create performance_metrics table
cursor.execute('''
CREATE TABLE IF NOT EXISTS performance_metrics (
id INTEGER PRIMARY KEY AUTOINCREMENT,
method_name TEXT NOT NULL,
average_processing_time REAL DEFAULT 0.0,
average_improvement REAL DEFAULT 0.0,
success_rate REAL DEFAULT 0.0,
total_uses INTEGER DEFAULT 0,
last_updated TEXT NOT NULL
)
''')
# Create indexes for better performance
cursor.execute('''
CREATE INDEX IF NOT EXISTS idx_processing_session
ON processing_records(session_id)
''')
cursor.execute('''
CREATE INDEX IF NOT EXISTS idx_processing_timestamp
ON processing_records(timestamp)
''')
cursor.execute('''
CREATE INDEX IF NOT EXISTS idx_processing_method
ON processing_records(processing_method)
''')
conn.commit()
logger.info("Database initialized successfully")
except Exception as e:
logger.error(f"Error initializing database: {e}")
def generate_session_id(self) -> str:
"""Generate unique session ID"""
return str(uuid.uuid4())
def calculate_file_hash(self, file_data: bytes) -> str:
"""Calculate SHA-256 hash of file data"""
return hashlib.sha256(file_data).hexdigest()
def start_session(self, session_id: Optional[str] = None) -> str:
"""
Start a new processing session
Args:
session_id: Optional session ID, generates new if not provided
Returns:
str: Session ID
"""
try:
if not session_id:
session_id = self.generate_session_id()
current_time = datetime.now(timezone.utc).isoformat()
with self.get_connection() as conn:
cursor = conn.cursor()
cursor.execute('''
INSERT OR REPLACE INTO sessions
(session_id, start_time, total_images_processed, average_improvement)
VALUES (?, ?, 0, 0.0)
''', (session_id, current_time))
conn.commit()
logger.info(f"Session started: {session_id}")
return session_id
except Exception as e:
logger.error(f"Error starting session: {e}")
return self.generate_session_id() # Fallback
def end_session(self, session_id: str):
"""
End a processing session and update statistics
Args:
session_id: Session ID to end
"""
try:
current_time = datetime.now(timezone.utc).isoformat()
with self.get_connection() as conn:
cursor = conn.cursor()
# Calculate session statistics
cursor.execute('''
SELECT COUNT(*), AVG(improvement_percentage),
processing_method,
COUNT(processing_method) as method_count
FROM processing_records
WHERE session_id = ?
GROUP BY processing_method
ORDER BY method_count DESC
LIMIT 1
''', (session_id,))
stats = cursor.fetchone()
if stats:
total_processed = stats[0]
avg_improvement = stats[1] or 0.0
preferred_method = stats[2] or ""
else:
total_processed = 0
avg_improvement = 0.0
preferred_method = ""
# Update session
cursor.execute('''
UPDATE sessions
SET end_time = ?,
total_images_processed = ?,
average_improvement = ?,
preferred_method = ?
WHERE session_id = ?
''', (current_time, total_processed, avg_improvement,
preferred_method, session_id))
conn.commit()
logger.info(f"Session ended: {session_id}")
except Exception as e:
logger.error(f"Error ending session: {e}")
def add_processing_record(self, record: ProcessingRecord) -> Optional[int]:
"""
Add a new processing record to database
Args:
record: ProcessingRecord instance
Returns:
Optional[int]: Record ID if successful
"""
try:
# Set timestamp if not provided
if not record.timestamp:
record.timestamp = datetime.now(timezone.utc).isoformat()
with self.get_connection() as conn:
cursor = conn.cursor()
cursor.execute('''
INSERT OR IGNORE INTO processing_records (
session_id, original_filename, file_hash, blur_type,
blur_confidence, processing_method, processing_parameters,
original_quality_score, enhanced_quality_score,
improvement_percentage, processing_time_seconds,
timestamp, notes
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
''', (
record.session_id, record.original_filename, record.file_hash,
record.blur_type, record.blur_confidence, record.processing_method,
record.processing_parameters, record.original_quality_score,
record.enhanced_quality_score, record.improvement_percentage,
record.processing_time_seconds, record.timestamp, record.notes
))
record_id = cursor.lastrowid
conn.commit()
# Update performance metrics
self._update_performance_metrics(record.processing_method, record)
logger.info(f"Processing record added: ID {record_id}")
return record_id
except Exception as e:
logger.error(f"Error adding processing record: {e}")
return None
def _update_performance_metrics(self, method_name: str, record: ProcessingRecord):
"""Update performance metrics for a processing method"""
try:
current_time = datetime.now(timezone.utc).isoformat()
with self.get_connection() as conn:
cursor = conn.cursor()
# Get current metrics
cursor.execute('''
SELECT total_uses, average_processing_time,
average_improvement, success_rate
FROM performance_metrics
WHERE method_name = ?
''', (method_name,))
existing = cursor.fetchone()
if existing:
total_uses = existing[0] + 1
avg_time = ((existing[1] * existing[0]) + record.processing_time_seconds) / total_uses
avg_improvement = ((existing[2] * existing[0]) + record.improvement_percentage) / total_uses
success_rate = existing[3] # Could be updated based on improvement threshold
cursor.execute('''
UPDATE performance_metrics
SET total_uses = ?, average_processing_time = ?,
average_improvement = ?, success_rate = ?,
last_updated = ?
WHERE method_name = ?
''', (total_uses, avg_time, avg_improvement, success_rate,
current_time, method_name))
else:
# New method
cursor.execute('''
INSERT INTO performance_metrics (
method_name, average_processing_time, average_improvement,
success_rate, total_uses, last_updated
) VALUES (?, ?, ?, ?, ?, ?)
''', (method_name, record.processing_time_seconds,
record.improvement_percentage, 1.0, 1, current_time))
conn.commit()
except Exception as e:
logger.error(f"Error updating performance metrics: {e}")
def get_processing_history(self, session_id: Optional[str] = None,
limit: int = 100,
method_filter: Optional[str] = None) -> List[ProcessingRecord]:
"""
Get processing history records
Args:
session_id: Filter by session ID
limit: Maximum number of records
method_filter: Filter by processing method
Returns:
List[ProcessingRecord]: Processing records
"""
try:
with self.get_connection() as conn:
cursor = conn.cursor()
query = "SELECT * FROM processing_records WHERE 1=1"
params = []
if session_id:
query += " AND session_id = ?"
params.append(session_id)
if method_filter:
query += " AND processing_method = ?"
params.append(method_filter)
query += " ORDER BY timestamp DESC LIMIT ?"
params.append(limit)
cursor.execute(query, params)
rows = cursor.fetchall()
records = []
for row in rows:
record = ProcessingRecord(
id=row['id'],
session_id=row['session_id'],
original_filename=row['original_filename'],
file_hash=row['file_hash'],
blur_type=row['blur_type'] or "",
blur_confidence=row['blur_confidence'] or 0.0,
processing_method=row['processing_method'],
processing_parameters=row['processing_parameters'] or "{}",
original_quality_score=row['original_quality_score'] or 0.0,
enhanced_quality_score=row['enhanced_quality_score'] or 0.0,
improvement_percentage=row['improvement_percentage'] or 0.0,
processing_time_seconds=row['processing_time_seconds'] or 0.0,
timestamp=row['timestamp'],
notes=row['notes'] or ""
)
records.append(record)
return records
except Exception as e:
logger.error(f"Error getting processing history: {e}")
return []
def get_session_statistics(self, session_id: str) -> Dict[str, Any]:
"""
Get comprehensive statistics for a session
Args:
session_id: Session ID
Returns:
dict: Session statistics
"""
try:
with self.get_connection() as conn:
cursor = conn.cursor()
# Basic session info
cursor.execute('''
SELECT * FROM sessions WHERE session_id = ?
''', (session_id,))
session_info = cursor.fetchone()
# Processing statistics
cursor.execute('''
SELECT
COUNT(*) as total_processed,
AVG(improvement_percentage) as avg_improvement,
MAX(improvement_percentage) as max_improvement,
MIN(improvement_percentage) as min_improvement,
AVG(processing_time_seconds) as avg_processing_time,
AVG(original_quality_score) as avg_original_quality,
AVG(enhanced_quality_score) as avg_enhanced_quality
FROM processing_records
WHERE session_id = ?
''', (session_id,))
stats = cursor.fetchone()
# Method breakdown
cursor.execute('''
SELECT
processing_method,
COUNT(*) as count,
AVG(improvement_percentage) as avg_improvement
FROM processing_records
WHERE session_id = ?
GROUP BY processing_method
ORDER BY count DESC
''', (session_id,))
method_stats = cursor.fetchall()
# Ensure stats have default values for None entries
stats_dict = {}
if stats:
stats_dict = {
'total_processed': stats[0] or 0,
'avg_improvement': stats[1] or 0.0,
'max_improvement': stats[2] or 0.0,
'min_improvement': stats[3] or 0.0,
'avg_processing_time': stats[4] or 0.0,
'avg_original_quality': stats[5] or 0.0,
'avg_enhanced_quality': stats[6] or 0.0
}
return {
'session_info': dict(session_info) if session_info else {},
'processing_stats': stats_dict,
'method_breakdown': [dict(row) for row in method_stats]
}
except Exception as e:
logger.error(f"Error getting session statistics: {e}")
return {}
def get_global_statistics(self) -> Dict[str, Any]:
"""
Get comprehensive global statistics (all sessions)
Returns:
dict: Global statistics
"""
try:
with self.get_connection() as conn:
cursor = conn.cursor()
# Processing statistics
cursor.execute('''
SELECT
COUNT(*) as total_processed,
AVG(improvement_percentage) as avg_improvement,
MAX(improvement_percentage) as max_improvement,
MIN(improvement_percentage) as min_improvement,
AVG(processing_time_seconds) as avg_processing_time,
AVG(original_quality_score) as avg_original_quality,
AVG(enhanced_quality_score) as avg_enhanced_quality
FROM processing_records
''')
stats = cursor.fetchone()
# Method breakdown
cursor.execute('''
SELECT
processing_method,
COUNT(*) as count,
AVG(improvement_percentage) as avg_improvement
FROM processing_records
GROUP BY processing_method
ORDER BY count DESC
''')
method_stats = cursor.fetchall()
# Ensure stats have default values for None entries
stats_dict = {}
if stats:
stats_dict = {
'total_processed': stats[0] or 0,
'avg_improvement': stats[1] or 0.0,
'max_improvement': stats[2] or 0.0,
'min_improvement': stats[3] or 0.0,
'avg_processing_time': stats[4] or 0.0,
'avg_original_quality': stats[5] or 0.0,
'avg_enhanced_quality': stats[6] or 0.0
}
return {
'processing_stats': stats_dict,
'method_breakdown': [dict(row) for row in method_stats]
}
except Exception as e:
logger.error(f"Error getting global statistics: {e}")
return {}
def get_performance_metrics(self) -> List[Dict[str, Any]]:
"""
Get performance metrics for all methods
Returns:
List[dict]: Performance metrics
"""
try:
with self.get_connection() as conn:
cursor = conn.cursor()
cursor.execute('''
SELECT * FROM performance_metrics
ORDER BY total_uses DESC
''')
rows = cursor.fetchall()
return [dict(row) for row in rows]
except Exception as e:
logger.error(f"Error getting performance metrics: {e}")
return []
def search_records(self, search_params: Dict[str, Any]) -> List[ProcessingRecord]:
"""
Search processing records with flexible criteria
Args:
search_params: Dictionary with search criteria
Returns:
List[ProcessingRecord]: Matching records
"""
try:
with self.get_connection() as conn:
cursor = conn.cursor()
query = "SELECT * FROM processing_records WHERE 1=1"
params = []
# Build dynamic query based on search parameters
if 'filename_contains' in search_params:
query += " AND original_filename LIKE ?"
params.append(f"%{search_params['filename_contains']}%")
if 'method' in search_params:
query += " AND processing_method = ?"
params.append(search_params['method'])
if 'min_improvement' in search_params:
query += " AND improvement_percentage >= ?"
params.append(search_params['min_improvement'])
if 'date_from' in search_params:
query += " AND timestamp >= ?"
params.append(search_params['date_from'])
if 'date_to' in search_params:
query += " AND timestamp <= ?"
params.append(search_params['date_to'])
query += " ORDER BY timestamp DESC"
if 'limit' in search_params:
query += " LIMIT ?"
params.append(search_params['limit'])
cursor.execute(query, params)
rows = cursor.fetchall()
records = []
for row in rows:
record = ProcessingRecord(
id=row['id'],
session_id=row['session_id'],
original_filename=row['original_filename'],
file_hash=row['file_hash'],
blur_type=row['blur_type'] or "",
blur_confidence=row['blur_confidence'] or 0.0,
processing_method=row['processing_method'],
processing_parameters=row['processing_parameters'] or "{}",
original_quality_score=row['original_quality_score'] or 0.0,
enhanced_quality_score=row['enhanced_quality_score'] or 0.0,
improvement_percentage=row['improvement_percentage'] or 0.0,
processing_time_seconds=row['processing_time_seconds'] or 0.0,
timestamp=row['timestamp'],
notes=row['notes'] or ""
)
records.append(record)
return records
except Exception as e:
logger.error(f"Error searching records: {e}")
return []
def cleanup_old_records(self, days_old: int = 30) -> int:
"""
Clean up old processing records
Args:
days_old: Remove records older than this many days
Returns:
int: Number of records deleted
"""
try:
cutoff_date = datetime.now(timezone.utc).replace(
hour=0, minute=0, second=0, microsecond=0
) - datetime.timedelta(days=days_old)
cutoff_str = cutoff_date.isoformat()
with self.get_connection() as conn:
cursor = conn.cursor()
# Count records to be deleted
cursor.execute('''
SELECT COUNT(*) FROM processing_records
WHERE timestamp < ?
''', (cutoff_str,))
count = cursor.fetchone()[0]
# Delete old records
cursor.execute('''
DELETE FROM processing_records
WHERE timestamp < ?
''', (cutoff_str,))
# Delete orphaned sessions
cursor.execute('''
DELETE FROM sessions
WHERE session_id NOT IN (
SELECT DISTINCT session_id FROM processing_records
)
''')
conn.commit()
logger.info(f"Cleaned up {count} old records")
return count
except Exception as e:
logger.error(f"Error cleaning up old records: {e}")
return 0
# Convenience functions for easy database operations
def get_database_manager(db_path: str = "data/processing_history.db") -> DatabaseManager:
"""Get database manager instance"""
return DatabaseManager(db_path)
def log_processing_result(session_id: str,
filename: str,
file_data: bytes,
processing_result: Dict[str, Any],
db_path: str = "data/processing_history.db") -> Optional[int]:
"""
Convenience function to log processing result
Args:
session_id: Session ID
filename: Original filename
file_data: File data for hash calculation
processing_result: Processing result dictionary
db_path: Database path
Returns:
Optional[int]: Record ID if successful
"""
try:
db_manager = DatabaseManager(db_path)
file_hash = db_manager.calculate_file_hash(file_data)
record = ProcessingRecord(
session_id=session_id,
original_filename=filename,
file_hash=file_hash,
blur_type=processing_result.get('blur_type', ''),
blur_confidence=processing_result.get('blur_confidence', 0.0),
processing_method=processing_result.get('method', ''),
processing_parameters=json.dumps(processing_result.get('parameters', {})),
original_quality_score=processing_result.get('original_quality', 0.0),
enhanced_quality_score=processing_result.get('enhanced_quality', 0.0),
improvement_percentage=processing_result.get('improvement_percentage', 0.0),
processing_time_seconds=processing_result.get('processing_time', 0.0),
notes=processing_result.get('notes', '')
)
return db_manager.add_processing_record(record)
except Exception as e:
logger.error(f"Error logging processing result: {e}")
return None
# Example usage and testing
if __name__ == "__main__":
print("Database Module - Testing")
print("========================")
# Initialize database manager
db_manager = DatabaseManager("test_database.db")
# Start a session
session_id = db_manager.start_session()
print(f"Started session: {session_id}")
# Create test processing record
test_record = ProcessingRecord(
session_id=session_id,
original_filename="test_image.jpg",
file_hash="abc123def456",
blur_type="gaussian",
blur_confidence=0.85,
processing_method="wiener_filter",
processing_parameters='{"sigma": 2.0}',
original_quality_score=0.45,
enhanced_quality_score=0.72,
improvement_percentage=60.0,
processing_time_seconds=2.3,
notes="Test processing"
)
# Add record
record_id = db_manager.add_processing_record(test_record)
print(f"Added record with ID: {record_id}")
# Get history
history = db_manager.get_processing_history(session_id=session_id)
print(f"Retrieved {len(history)} records")
# Get session statistics
stats = db_manager.get_session_statistics(session_id)
print(f"Session stats: {stats}")
# End session
db_manager.end_session(session_id)
print("Session ended")
# Cleanup test database
os.remove("test_database.db")
print("Test database cleaned up")
print("\nDatabase module test completed!")