File size: 8,943 Bytes
fd86882
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
import sqlite3
import json
from pathlib import Path
from datetime import datetime
from typing import Optional, List, Dict
from contextlib import contextmanager
from loguru import logger

class DatabaseManager:
    def __init__(self, db_path="database/identities.db"):
        self.db_path = db_path
        self.init_database()
    
    def init_database(self):
        """تهيئة قاعدة البيانات والجداول"""
        Path(self.db_path).parent.mkdir(parents=True, exist_ok=True)
        
        with self.get_connection() as conn:
            cursor = conn.cursor()
            
            # جدول الهويات
            cursor.execute("""
                CREATE TABLE IF NOT EXISTS identities (
                    id TEXT PRIMARY KEY,
                    name TEXT NOT NULL,
                    embedding TEXT NOT NULL,
                    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                    metadata TEXT,
                    face_image_path TEXT,
                    verification_count INTEGER DEFAULT 0,
                    last_verified TIMESTAMP
                )
            """)
            
            # جدول سجل العمليات
            cursor.execute("""
                CREATE TABLE IF NOT EXISTS audit_log (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    action TEXT NOT NULL,
                    identity_id TEXT,
                    status TEXT,
                    details TEXT,
                    timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                    ip_address TEXT,
                    user_agent TEXT
                )
            """)
            
            # جدول الإحصائيات
            cursor.execute("""
                CREATE TABLE IF NOT EXISTS statistics (
                    metric_name TEXT PRIMARY KEY,
                    metric_value INTEGER DEFAULT 0,
                    last_updated TIMESTAMP DEFAULT CURRENT_TIMESTAMP
                )
            """)
            
            # إضافة بعض الإحصائيات الابتدائية
            cursor.execute("""
                INSERT OR IGNORE INTO statistics (metric_name, metric_value) 
                VALUES ('total_enrollments', 0), ('total_searches', 0), ('total_matches', 0)
            """)
            
            conn.commit()
            logger.info("✅ Database initialized successfully")
    
    @contextmanager
    def get_connection(self):
        """إنشاء اتصال بقاعدة البيانات"""
        conn = sqlite3.connect(self.db_path)
        conn.row_factory = sqlite3.Row
        try:
            yield conn
        finally:
            conn.close()
    
    def add_identity(self, identity_id: str, name: str, embedding: np.ndarray, 
                     metadata: Dict = None, face_image_path: str = None) -> bool:
        """إضافة هوية جديدة"""
        try:
            with self.get_connection() as conn:
                cursor = conn.cursor()
                cursor.execute("""
                    INSERT INTO identities (id, name, embedding, metadata, face_image_path)
                    VALUES (?, ?, ?, ?, ?)
                """, (
                    identity_id, 
                    name, 
                    json.dumps(embedding.tolist()),
                    json.dumps(metadata) if metadata else None,
                    face_image_path
                ))
                
                # تحديث الإحصاءات
                cursor.execute("""
                    UPDATE statistics 
                    SET metric_value = metric_value + 1, last_updated = CURRENT_TIMESTAMP
                    WHERE metric_name = 'total_enrollments'
                """)
                
                conn.commit()
                logger.info(f"✅ Added identity: {identity_id} - {name}")
                return True
                
        except Exception as e:
            logger.error(f"❌ Failed to add identity: {e}")
            return False
    
    def get_identity(self, identity_id: str) -> Optional[Dict]:
        """الحصول على هوية محددة"""
        try:
            with self.get_connection() as conn:
                cursor = conn.cursor()
                cursor.execute("SELECT * FROM identities WHERE id = ?", (identity_id,))
                row = cursor.fetchone()
                
                if row:
                    return dict(row)
                return None
                
        except Exception as e:
            logger.error(f"❌ Failed to get identity: {e}")
            return None
    
    def update_verification(self, identity_id: str):
        """تحديث عدد التحقق من الهوية"""
        try:
            with self.get_connection() as conn:
                cursor = conn.cursor()
                cursor.execute("""
                    UPDATE identities 
                    SET verification_count = verification_count + 1,
                        last_verified = CURRENT_TIMESTAMP,
                        updated_at = CURRENT_TIMESTAMP
                    WHERE id = ?
                """, (identity_id,))
                
                cursor.execute("""
                    UPDATE statistics 
                    SET metric_value = metric_value + 1, last_updated = CURRENT_TIMESTAMP
                    WHERE metric_name = 'total_matches'
                """)
                
                conn.commit()
                return True
                
        except Exception as e:
            logger.error(f"❌ Failed to update verification: {e}")
            return False
    
    def log_action(self, action: str, identity_id: str = None, 
                   status: str = "success", details: str = None,
                   ip_address: str = None, user_agent: str = None):
        """تسجيل عملية في سجل المراجعة"""
        try:
            with self.get_connection() as conn:
                cursor = conn.cursor()
                cursor.execute("""
                    INSERT INTO audit_log (action, identity_id, status, details, ip_address, user_agent)
                    VALUES (?, ?, ?, ?, ?, ?)
                """, (action, identity_id, status, details, ip_address, user_agent))
                conn.commit()
                
        except Exception as e:
            logger.error(f"❌ Failed to log action: {e}")
    
    def get_statistics(self) -> Dict:
        """الحصول على الإحصائيات"""
        try:
            with self.get_connection() as conn:
                cursor = conn.cursor()
                cursor.execute("SELECT metric_name, metric_value FROM statistics")
                stats = {row['metric_name']: row['metric_value'] for row in cursor.fetchall()}
                
                # إحصائيات إضافية
                cursor.execute("SELECT COUNT(*) as total FROM identities")
                stats['total_identities'] = cursor.fetchone()['total']
                
                cursor.execute("""
                    SELECT COUNT(*) as verified_today 
                    FROM identities 
                    WHERE DATE(last_verified) = DATE('now')
                """)
                stats['verified_today'] = cursor.fetchone()['verified_today']
                
                return stats
                
        except Exception as e:
            logger.error(f"❌ Failed to get statistics: {e}")
            return {}
    
    def get_all_identities(self, limit: int = 100, offset: int = 0) -> List[Dict]:
        """الحصول على جميع الهويات"""
        try:
            with self.get_connection() as conn:
                cursor = conn.cursor()
                cursor.execute("""
                    SELECT id, name, created_at, updated_at, verification_count, last_verified
                    FROM identities
                    ORDER BY created_at DESC
                    LIMIT ? OFFSET ?
                """, (limit, offset))
                
                return [dict(row) for row in cursor.fetchall()]
                
        except Exception as e:
            logger.error(f"❌ Failed to get all identities: {e}")
            return []
    
    def delete_identity(self, identity_id: str) -> bool:
        """حذف هوية"""
        try:
            with self.get_connection() as conn:
                cursor = conn.cursor()
                cursor.execute("DELETE FROM identities WHERE id = ?", (identity_id,))
                conn.commit()
                
                if cursor.rowcount > 0:
                    logger.info(f"✅ Deleted identity: {identity_id}")
                    return True
                return False
                
        except Exception as e:
            logger.error(f"❌ Failed to delete identity: {e}")
            return False