101 / database /db_manager.py
midokhaled927's picture
Create database/db_manager.py
fd86882 verified
import sqlite3
import json
from pathlib import Path
from datetime import datetime
from typing import Optional, List, Dict
from contextlib import contextmanager
from loguru import logger
class DatabaseManager:
def __init__(self, db_path="database/identities.db"):
self.db_path = db_path
self.init_database()
def init_database(self):
"""تهيئة قاعدة البيانات والجداول"""
Path(self.db_path).parent.mkdir(parents=True, exist_ok=True)
with self.get_connection() as conn:
cursor = conn.cursor()
# جدول الهويات
cursor.execute("""
CREATE TABLE IF NOT EXISTS identities (
id TEXT PRIMARY KEY,
name TEXT NOT NULL,
embedding TEXT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
metadata TEXT,
face_image_path TEXT,
verification_count INTEGER DEFAULT 0,
last_verified TIMESTAMP
)
""")
# جدول سجل العمليات
cursor.execute("""
CREATE TABLE IF NOT EXISTS audit_log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
action TEXT NOT NULL,
identity_id TEXT,
status TEXT,
details TEXT,
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
ip_address TEXT,
user_agent TEXT
)
""")
# جدول الإحصائيات
cursor.execute("""
CREATE TABLE IF NOT EXISTS statistics (
metric_name TEXT PRIMARY KEY,
metric_value INTEGER DEFAULT 0,
last_updated TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
# إضافة بعض الإحصائيات الابتدائية
cursor.execute("""
INSERT OR IGNORE INTO statistics (metric_name, metric_value)
VALUES ('total_enrollments', 0), ('total_searches', 0), ('total_matches', 0)
""")
conn.commit()
logger.info("✅ Database initialized successfully")
@contextmanager
def get_connection(self):
"""إنشاء اتصال بقاعدة البيانات"""
conn = sqlite3.connect(self.db_path)
conn.row_factory = sqlite3.Row
try:
yield conn
finally:
conn.close()
def add_identity(self, identity_id: str, name: str, embedding: np.ndarray,
metadata: Dict = None, face_image_path: str = None) -> bool:
"""إضافة هوية جديدة"""
try:
with self.get_connection() as conn:
cursor = conn.cursor()
cursor.execute("""
INSERT INTO identities (id, name, embedding, metadata, face_image_path)
VALUES (?, ?, ?, ?, ?)
""", (
identity_id,
name,
json.dumps(embedding.tolist()),
json.dumps(metadata) if metadata else None,
face_image_path
))
# تحديث الإحصاءات
cursor.execute("""
UPDATE statistics
SET metric_value = metric_value + 1, last_updated = CURRENT_TIMESTAMP
WHERE metric_name = 'total_enrollments'
""")
conn.commit()
logger.info(f"✅ Added identity: {identity_id} - {name}")
return True
except Exception as e:
logger.error(f"❌ Failed to add identity: {e}")
return False
def get_identity(self, identity_id: str) -> Optional[Dict]:
"""الحصول على هوية محددة"""
try:
with self.get_connection() as conn:
cursor = conn.cursor()
cursor.execute("SELECT * FROM identities WHERE id = ?", (identity_id,))
row = cursor.fetchone()
if row:
return dict(row)
return None
except Exception as e:
logger.error(f"❌ Failed to get identity: {e}")
return None
def update_verification(self, identity_id: str):
"""تحديث عدد التحقق من الهوية"""
try:
with self.get_connection() as conn:
cursor = conn.cursor()
cursor.execute("""
UPDATE identities
SET verification_count = verification_count + 1,
last_verified = CURRENT_TIMESTAMP,
updated_at = CURRENT_TIMESTAMP
WHERE id = ?
""", (identity_id,))
cursor.execute("""
UPDATE statistics
SET metric_value = metric_value + 1, last_updated = CURRENT_TIMESTAMP
WHERE metric_name = 'total_matches'
""")
conn.commit()
return True
except Exception as e:
logger.error(f"❌ Failed to update verification: {e}")
return False
def log_action(self, action: str, identity_id: str = None,
status: str = "success", details: str = None,
ip_address: str = None, user_agent: str = None):
"""تسجيل عملية في سجل المراجعة"""
try:
with self.get_connection() as conn:
cursor = conn.cursor()
cursor.execute("""
INSERT INTO audit_log (action, identity_id, status, details, ip_address, user_agent)
VALUES (?, ?, ?, ?, ?, ?)
""", (action, identity_id, status, details, ip_address, user_agent))
conn.commit()
except Exception as e:
logger.error(f"❌ Failed to log action: {e}")
def get_statistics(self) -> Dict:
"""الحصول على الإحصائيات"""
try:
with self.get_connection() as conn:
cursor = conn.cursor()
cursor.execute("SELECT metric_name, metric_value FROM statistics")
stats = {row['metric_name']: row['metric_value'] for row in cursor.fetchall()}
# إحصائيات إضافية
cursor.execute("SELECT COUNT(*) as total FROM identities")
stats['total_identities'] = cursor.fetchone()['total']
cursor.execute("""
SELECT COUNT(*) as verified_today
FROM identities
WHERE DATE(last_verified) = DATE('now')
""")
stats['verified_today'] = cursor.fetchone()['verified_today']
return stats
except Exception as e:
logger.error(f"❌ Failed to get statistics: {e}")
return {}
def get_all_identities(self, limit: int = 100, offset: int = 0) -> List[Dict]:
"""الحصول على جميع الهويات"""
try:
with self.get_connection() as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT id, name, created_at, updated_at, verification_count, last_verified
FROM identities
ORDER BY created_at DESC
LIMIT ? OFFSET ?
""", (limit, offset))
return [dict(row) for row in cursor.fetchall()]
except Exception as e:
logger.error(f"❌ Failed to get all identities: {e}")
return []
def delete_identity(self, identity_id: str) -> bool:
"""حذف هوية"""
try:
with self.get_connection() as conn:
cursor = conn.cursor()
cursor.execute("DELETE FROM identities WHERE id = ?", (identity_id,))
conn.commit()
if cursor.rowcount > 0:
logger.info(f"✅ Deleted identity: {identity_id}")
return True
return False
except Exception as e:
logger.error(f"❌ Failed to delete identity: {e}")
return False