Spaces:
Sleeping
Sleeping
| import sqlite3 | |
| import json | |
| from pathlib import Path | |
| from datetime import datetime | |
| from typing import Optional, List, Dict | |
| from contextlib import contextmanager | |
| from loguru import logger | |
| class DatabaseManager: | |
| def __init__(self, db_path="database/identities.db"): | |
| self.db_path = db_path | |
| self.init_database() | |
| def init_database(self): | |
| """تهيئة قاعدة البيانات والجداول""" | |
| Path(self.db_path).parent.mkdir(parents=True, exist_ok=True) | |
| with self.get_connection() as conn: | |
| cursor = conn.cursor() | |
| # جدول الهويات | |
| cursor.execute(""" | |
| CREATE TABLE IF NOT EXISTS identities ( | |
| id TEXT PRIMARY KEY, | |
| name TEXT NOT NULL, | |
| embedding TEXT NOT NULL, | |
| created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, | |
| updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, | |
| metadata TEXT, | |
| face_image_path TEXT, | |
| verification_count INTEGER DEFAULT 0, | |
| last_verified TIMESTAMP | |
| ) | |
| """) | |
| # جدول سجل العمليات | |
| cursor.execute(""" | |
| CREATE TABLE IF NOT EXISTS audit_log ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| action TEXT NOT NULL, | |
| identity_id TEXT, | |
| status TEXT, | |
| details TEXT, | |
| timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP, | |
| ip_address TEXT, | |
| user_agent TEXT | |
| ) | |
| """) | |
| # جدول الإحصائيات | |
| cursor.execute(""" | |
| CREATE TABLE IF NOT EXISTS statistics ( | |
| metric_name TEXT PRIMARY KEY, | |
| metric_value INTEGER DEFAULT 0, | |
| last_updated TIMESTAMP DEFAULT CURRENT_TIMESTAMP | |
| ) | |
| """) | |
| # إضافة بعض الإحصائيات الابتدائية | |
| cursor.execute(""" | |
| INSERT OR IGNORE INTO statistics (metric_name, metric_value) | |
| VALUES ('total_enrollments', 0), ('total_searches', 0), ('total_matches', 0) | |
| """) | |
| conn.commit() | |
| logger.info("✅ Database initialized successfully") | |
| def get_connection(self): | |
| """إنشاء اتصال بقاعدة البيانات""" | |
| conn = sqlite3.connect(self.db_path) | |
| conn.row_factory = sqlite3.Row | |
| try: | |
| yield conn | |
| finally: | |
| conn.close() | |
| def add_identity(self, identity_id: str, name: str, embedding: np.ndarray, | |
| metadata: Dict = None, face_image_path: str = None) -> bool: | |
| """إضافة هوية جديدة""" | |
| try: | |
| with self.get_connection() as conn: | |
| cursor = conn.cursor() | |
| cursor.execute(""" | |
| INSERT INTO identities (id, name, embedding, metadata, face_image_path) | |
| VALUES (?, ?, ?, ?, ?) | |
| """, ( | |
| identity_id, | |
| name, | |
| json.dumps(embedding.tolist()), | |
| json.dumps(metadata) if metadata else None, | |
| face_image_path | |
| )) | |
| # تحديث الإحصاءات | |
| cursor.execute(""" | |
| UPDATE statistics | |
| SET metric_value = metric_value + 1, last_updated = CURRENT_TIMESTAMP | |
| WHERE metric_name = 'total_enrollments' | |
| """) | |
| conn.commit() | |
| logger.info(f"✅ Added identity: {identity_id} - {name}") | |
| return True | |
| except Exception as e: | |
| logger.error(f"❌ Failed to add identity: {e}") | |
| return False | |
| def get_identity(self, identity_id: str) -> Optional[Dict]: | |
| """الحصول على هوية محددة""" | |
| try: | |
| with self.get_connection() as conn: | |
| cursor = conn.cursor() | |
| cursor.execute("SELECT * FROM identities WHERE id = ?", (identity_id,)) | |
| row = cursor.fetchone() | |
| if row: | |
| return dict(row) | |
| return None | |
| except Exception as e: | |
| logger.error(f"❌ Failed to get identity: {e}") | |
| return None | |
| def update_verification(self, identity_id: str): | |
| """تحديث عدد التحقق من الهوية""" | |
| try: | |
| with self.get_connection() as conn: | |
| cursor = conn.cursor() | |
| cursor.execute(""" | |
| UPDATE identities | |
| SET verification_count = verification_count + 1, | |
| last_verified = CURRENT_TIMESTAMP, | |
| updated_at = CURRENT_TIMESTAMP | |
| WHERE id = ? | |
| """, (identity_id,)) | |
| cursor.execute(""" | |
| UPDATE statistics | |
| SET metric_value = metric_value + 1, last_updated = CURRENT_TIMESTAMP | |
| WHERE metric_name = 'total_matches' | |
| """) | |
| conn.commit() | |
| return True | |
| except Exception as e: | |
| logger.error(f"❌ Failed to update verification: {e}") | |
| return False | |
| def log_action(self, action: str, identity_id: str = None, | |
| status: str = "success", details: str = None, | |
| ip_address: str = None, user_agent: str = None): | |
| """تسجيل عملية في سجل المراجعة""" | |
| try: | |
| with self.get_connection() as conn: | |
| cursor = conn.cursor() | |
| cursor.execute(""" | |
| INSERT INTO audit_log (action, identity_id, status, details, ip_address, user_agent) | |
| VALUES (?, ?, ?, ?, ?, ?) | |
| """, (action, identity_id, status, details, ip_address, user_agent)) | |
| conn.commit() | |
| except Exception as e: | |
| logger.error(f"❌ Failed to log action: {e}") | |
| def get_statistics(self) -> Dict: | |
| """الحصول على الإحصائيات""" | |
| try: | |
| with self.get_connection() as conn: | |
| cursor = conn.cursor() | |
| cursor.execute("SELECT metric_name, metric_value FROM statistics") | |
| stats = {row['metric_name']: row['metric_value'] for row in cursor.fetchall()} | |
| # إحصائيات إضافية | |
| cursor.execute("SELECT COUNT(*) as total FROM identities") | |
| stats['total_identities'] = cursor.fetchone()['total'] | |
| cursor.execute(""" | |
| SELECT COUNT(*) as verified_today | |
| FROM identities | |
| WHERE DATE(last_verified) = DATE('now') | |
| """) | |
| stats['verified_today'] = cursor.fetchone()['verified_today'] | |
| return stats | |
| except Exception as e: | |
| logger.error(f"❌ Failed to get statistics: {e}") | |
| return {} | |
| def get_all_identities(self, limit: int = 100, offset: int = 0) -> List[Dict]: | |
| """الحصول على جميع الهويات""" | |
| try: | |
| with self.get_connection() as conn: | |
| cursor = conn.cursor() | |
| cursor.execute(""" | |
| SELECT id, name, created_at, updated_at, verification_count, last_verified | |
| FROM identities | |
| ORDER BY created_at DESC | |
| LIMIT ? OFFSET ? | |
| """, (limit, offset)) | |
| return [dict(row) for row in cursor.fetchall()] | |
| except Exception as e: | |
| logger.error(f"❌ Failed to get all identities: {e}") | |
| return [] | |
| def delete_identity(self, identity_id: str) -> bool: | |
| """حذف هوية""" | |
| try: | |
| with self.get_connection() as conn: | |
| cursor = conn.cursor() | |
| cursor.execute("DELETE FROM identities WHERE id = ?", (identity_id,)) | |
| conn.commit() | |
| if cursor.rowcount > 0: | |
| logger.info(f"✅ Deleted identity: {identity_id}") | |
| return True | |
| return False | |
| except Exception as e: | |
| logger.error(f"❌ Failed to delete identity: {e}") | |
| return False |