import sqlite3 import json from pathlib import Path from datetime import datetime from typing import Optional, List, Dict from contextlib import contextmanager from loguru import logger class DatabaseManager: def __init__(self, db_path="database/identities.db"): self.db_path = db_path self.init_database() def init_database(self): """تهيئة قاعدة البيانات والجداول""" Path(self.db_path).parent.mkdir(parents=True, exist_ok=True) with self.get_connection() as conn: cursor = conn.cursor() # جدول الهويات cursor.execute(""" CREATE TABLE IF NOT EXISTS identities ( id TEXT PRIMARY KEY, name TEXT NOT NULL, embedding TEXT NOT NULL, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, metadata TEXT, face_image_path TEXT, verification_count INTEGER DEFAULT 0, last_verified TIMESTAMP ) """) # جدول سجل العمليات cursor.execute(""" CREATE TABLE IF NOT EXISTS audit_log ( id INTEGER PRIMARY KEY AUTOINCREMENT, action TEXT NOT NULL, identity_id TEXT, status TEXT, details TEXT, timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP, ip_address TEXT, user_agent TEXT ) """) # جدول الإحصائيات cursor.execute(""" CREATE TABLE IF NOT EXISTS statistics ( metric_name TEXT PRIMARY KEY, metric_value INTEGER DEFAULT 0, last_updated TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) """) # إضافة بعض الإحصائيات الابتدائية cursor.execute(""" INSERT OR IGNORE INTO statistics (metric_name, metric_value) VALUES ('total_enrollments', 0), ('total_searches', 0), ('total_matches', 0) """) conn.commit() logger.info("✅ Database initialized successfully") @contextmanager def get_connection(self): """إنشاء اتصال بقاعدة البيانات""" conn = sqlite3.connect(self.db_path) conn.row_factory = sqlite3.Row try: yield conn finally: conn.close() def add_identity(self, identity_id: str, name: str, embedding: np.ndarray, metadata: Dict = None, face_image_path: str = None) -> bool: """إضافة هوية جديدة""" try: with self.get_connection() as conn: cursor = conn.cursor() cursor.execute(""" INSERT INTO identities (id, name, embedding, metadata, face_image_path) VALUES (?, ?, ?, ?, ?) """, ( identity_id, name, json.dumps(embedding.tolist()), json.dumps(metadata) if metadata else None, face_image_path )) # تحديث الإحصاءات cursor.execute(""" UPDATE statistics SET metric_value = metric_value + 1, last_updated = CURRENT_TIMESTAMP WHERE metric_name = 'total_enrollments' """) conn.commit() logger.info(f"✅ Added identity: {identity_id} - {name}") return True except Exception as e: logger.error(f"❌ Failed to add identity: {e}") return False def get_identity(self, identity_id: str) -> Optional[Dict]: """الحصول على هوية محددة""" try: with self.get_connection() as conn: cursor = conn.cursor() cursor.execute("SELECT * FROM identities WHERE id = ?", (identity_id,)) row = cursor.fetchone() if row: return dict(row) return None except Exception as e: logger.error(f"❌ Failed to get identity: {e}") return None def update_verification(self, identity_id: str): """تحديث عدد التحقق من الهوية""" try: with self.get_connection() as conn: cursor = conn.cursor() cursor.execute(""" UPDATE identities SET verification_count = verification_count + 1, last_verified = CURRENT_TIMESTAMP, updated_at = CURRENT_TIMESTAMP WHERE id = ? """, (identity_id,)) cursor.execute(""" UPDATE statistics SET metric_value = metric_value + 1, last_updated = CURRENT_TIMESTAMP WHERE metric_name = 'total_matches' """) conn.commit() return True except Exception as e: logger.error(f"❌ Failed to update verification: {e}") return False def log_action(self, action: str, identity_id: str = None, status: str = "success", details: str = None, ip_address: str = None, user_agent: str = None): """تسجيل عملية في سجل المراجعة""" try: with self.get_connection() as conn: cursor = conn.cursor() cursor.execute(""" INSERT INTO audit_log (action, identity_id, status, details, ip_address, user_agent) VALUES (?, ?, ?, ?, ?, ?) """, (action, identity_id, status, details, ip_address, user_agent)) conn.commit() except Exception as e: logger.error(f"❌ Failed to log action: {e}") def get_statistics(self) -> Dict: """الحصول على الإحصائيات""" try: with self.get_connection() as conn: cursor = conn.cursor() cursor.execute("SELECT metric_name, metric_value FROM statistics") stats = {row['metric_name']: row['metric_value'] for row in cursor.fetchall()} # إحصائيات إضافية cursor.execute("SELECT COUNT(*) as total FROM identities") stats['total_identities'] = cursor.fetchone()['total'] cursor.execute(""" SELECT COUNT(*) as verified_today FROM identities WHERE DATE(last_verified) = DATE('now') """) stats['verified_today'] = cursor.fetchone()['verified_today'] return stats except Exception as e: logger.error(f"❌ Failed to get statistics: {e}") return {} def get_all_identities(self, limit: int = 100, offset: int = 0) -> List[Dict]: """الحصول على جميع الهويات""" try: with self.get_connection() as conn: cursor = conn.cursor() cursor.execute(""" SELECT id, name, created_at, updated_at, verification_count, last_verified FROM identities ORDER BY created_at DESC LIMIT ? OFFSET ? """, (limit, offset)) return [dict(row) for row in cursor.fetchall()] except Exception as e: logger.error(f"❌ Failed to get all identities: {e}") return [] def delete_identity(self, identity_id: str) -> bool: """حذف هوية""" try: with self.get_connection() as conn: cursor = conn.cursor() cursor.execute("DELETE FROM identities WHERE id = ?", (identity_id,)) conn.commit() if cursor.rowcount > 0: logger.info(f"✅ Deleted identity: {identity_id}") return True return False except Exception as e: logger.error(f"❌ Failed to delete identity: {e}") return False