Spaces:
Running
Running
File size: 8,943 Bytes
fd86882 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 | import sqlite3
import json
from pathlib import Path
from datetime import datetime
from typing import Optional, List, Dict
from contextlib import contextmanager
from loguru import logger
class DatabaseManager:
def __init__(self, db_path="database/identities.db"):
self.db_path = db_path
self.init_database()
def init_database(self):
"""تهيئة قاعدة البيانات والجداول"""
Path(self.db_path).parent.mkdir(parents=True, exist_ok=True)
with self.get_connection() as conn:
cursor = conn.cursor()
# جدول الهويات
cursor.execute("""
CREATE TABLE IF NOT EXISTS identities (
id TEXT PRIMARY KEY,
name TEXT NOT NULL,
embedding TEXT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
metadata TEXT,
face_image_path TEXT,
verification_count INTEGER DEFAULT 0,
last_verified TIMESTAMP
)
""")
# جدول سجل العمليات
cursor.execute("""
CREATE TABLE IF NOT EXISTS audit_log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
action TEXT NOT NULL,
identity_id TEXT,
status TEXT,
details TEXT,
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
ip_address TEXT,
user_agent TEXT
)
""")
# جدول الإحصائيات
cursor.execute("""
CREATE TABLE IF NOT EXISTS statistics (
metric_name TEXT PRIMARY KEY,
metric_value INTEGER DEFAULT 0,
last_updated TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
# إضافة بعض الإحصائيات الابتدائية
cursor.execute("""
INSERT OR IGNORE INTO statistics (metric_name, metric_value)
VALUES ('total_enrollments', 0), ('total_searches', 0), ('total_matches', 0)
""")
conn.commit()
logger.info("✅ Database initialized successfully")
@contextmanager
def get_connection(self):
"""إنشاء اتصال بقاعدة البيانات"""
conn = sqlite3.connect(self.db_path)
conn.row_factory = sqlite3.Row
try:
yield conn
finally:
conn.close()
def add_identity(self, identity_id: str, name: str, embedding: np.ndarray,
metadata: Dict = None, face_image_path: str = None) -> bool:
"""إضافة هوية جديدة"""
try:
with self.get_connection() as conn:
cursor = conn.cursor()
cursor.execute("""
INSERT INTO identities (id, name, embedding, metadata, face_image_path)
VALUES (?, ?, ?, ?, ?)
""", (
identity_id,
name,
json.dumps(embedding.tolist()),
json.dumps(metadata) if metadata else None,
face_image_path
))
# تحديث الإحصاءات
cursor.execute("""
UPDATE statistics
SET metric_value = metric_value + 1, last_updated = CURRENT_TIMESTAMP
WHERE metric_name = 'total_enrollments'
""")
conn.commit()
logger.info(f"✅ Added identity: {identity_id} - {name}")
return True
except Exception as e:
logger.error(f"❌ Failed to add identity: {e}")
return False
def get_identity(self, identity_id: str) -> Optional[Dict]:
"""الحصول على هوية محددة"""
try:
with self.get_connection() as conn:
cursor = conn.cursor()
cursor.execute("SELECT * FROM identities WHERE id = ?", (identity_id,))
row = cursor.fetchone()
if row:
return dict(row)
return None
except Exception as e:
logger.error(f"❌ Failed to get identity: {e}")
return None
def update_verification(self, identity_id: str):
"""تحديث عدد التحقق من الهوية"""
try:
with self.get_connection() as conn:
cursor = conn.cursor()
cursor.execute("""
UPDATE identities
SET verification_count = verification_count + 1,
last_verified = CURRENT_TIMESTAMP,
updated_at = CURRENT_TIMESTAMP
WHERE id = ?
""", (identity_id,))
cursor.execute("""
UPDATE statistics
SET metric_value = metric_value + 1, last_updated = CURRENT_TIMESTAMP
WHERE metric_name = 'total_matches'
""")
conn.commit()
return True
except Exception as e:
logger.error(f"❌ Failed to update verification: {e}")
return False
def log_action(self, action: str, identity_id: str = None,
status: str = "success", details: str = None,
ip_address: str = None, user_agent: str = None):
"""تسجيل عملية في سجل المراجعة"""
try:
with self.get_connection() as conn:
cursor = conn.cursor()
cursor.execute("""
INSERT INTO audit_log (action, identity_id, status, details, ip_address, user_agent)
VALUES (?, ?, ?, ?, ?, ?)
""", (action, identity_id, status, details, ip_address, user_agent))
conn.commit()
except Exception as e:
logger.error(f"❌ Failed to log action: {e}")
def get_statistics(self) -> Dict:
"""الحصول على الإحصائيات"""
try:
with self.get_connection() as conn:
cursor = conn.cursor()
cursor.execute("SELECT metric_name, metric_value FROM statistics")
stats = {row['metric_name']: row['metric_value'] for row in cursor.fetchall()}
# إحصائيات إضافية
cursor.execute("SELECT COUNT(*) as total FROM identities")
stats['total_identities'] = cursor.fetchone()['total']
cursor.execute("""
SELECT COUNT(*) as verified_today
FROM identities
WHERE DATE(last_verified) = DATE('now')
""")
stats['verified_today'] = cursor.fetchone()['verified_today']
return stats
except Exception as e:
logger.error(f"❌ Failed to get statistics: {e}")
return {}
def get_all_identities(self, limit: int = 100, offset: int = 0) -> List[Dict]:
"""الحصول على جميع الهويات"""
try:
with self.get_connection() as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT id, name, created_at, updated_at, verification_count, last_verified
FROM identities
ORDER BY created_at DESC
LIMIT ? OFFSET ?
""", (limit, offset))
return [dict(row) for row in cursor.fetchall()]
except Exception as e:
logger.error(f"❌ Failed to get all identities: {e}")
return []
def delete_identity(self, identity_id: str) -> bool:
"""حذف هوية"""
try:
with self.get_connection() as conn:
cursor = conn.cursor()
cursor.execute("DELETE FROM identities WHERE id = ?", (identity_id,))
conn.commit()
if cursor.rowcount > 0:
logger.info(f"✅ Deleted identity: {identity_id}")
return True
return False
except Exception as e:
logger.error(f"❌ Failed to delete identity: {e}")
return False |