Spaces:
Sleeping
Sleeping
File size: 9,852 Bytes
a420b85 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 | import json
import os
import sqlite3
from datetime import datetime
import numpy as np
from config import DATABASE_PATH, MODELS_DIR
class AttendanceDatabase:
"""SQLite storage for students, face embeddings, attendance, and alerts."""
def __init__(self, db_path=DATABASE_PATH):
self.db_path = db_path
os.makedirs(os.path.dirname(db_path) or MODELS_DIR, exist_ok=True)
self._init_schema()
def _connect(self):
conn = sqlite3.connect(self.db_path)
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA foreign_keys = ON")
return conn
def _init_schema(self):
with self._connect() as conn:
conn.executescript(
"""
CREATE TABLE IF NOT EXISTS students (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL UNIQUE,
external_id TEXT,
department TEXT,
email TEXT,
phone TEXT,
status TEXT NOT NULL DEFAULT 'active',
created_at TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS face_embeddings (
id INTEGER PRIMARY KEY AUTOINCREMENT,
student_id INTEGER NOT NULL,
embedding BLOB NOT NULL,
embedding_dim INTEGER NOT NULL,
image_path TEXT,
model_name TEXT NOT NULL,
quality_score REAL,
created_at TEXT NOT NULL,
FOREIGN KEY(student_id) REFERENCES students(id) ON DELETE CASCADE
);
CREATE TABLE IF NOT EXISTS attendance (
id INTEGER PRIMARY KEY AUTOINCREMENT,
student_id INTEGER,
student_name TEXT NOT NULL,
date TEXT NOT NULL,
time TEXT NOT NULL,
status TEXT NOT NULL,
confidence REAL NOT NULL,
camera_id TEXT,
created_at TEXT NOT NULL,
UNIQUE(student_name, date),
FOREIGN KEY(student_id) REFERENCES students(id) ON DELETE SET NULL
);
CREATE TABLE IF NOT EXISTS alerts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
alert_type TEXT NOT NULL,
message TEXT NOT NULL,
image_path TEXT,
created_at TEXT NOT NULL
);
"""
)
def upsert_student(self, name, **fields):
now = datetime.now().isoformat(timespec="seconds")
with self._connect() as conn:
conn.execute(
"""
INSERT INTO students (name, external_id, department, email, phone, status, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(name) DO UPDATE SET
external_id = COALESCE(excluded.external_id, students.external_id),
department = COALESCE(excluded.department, students.department),
email = COALESCE(excluded.email, students.email),
phone = COALESCE(excluded.phone, students.phone),
status = excluded.status
""",
(
name,
fields.get("external_id"),
fields.get("department"),
fields.get("email"),
fields.get("phone"),
fields.get("status", "active"),
now,
),
)
row = conn.execute("SELECT id FROM students WHERE name = ?", (name,)).fetchone()
return int(row["id"])
def clear_embeddings(self):
with self._connect() as conn:
conn.execute("DELETE FROM face_embeddings")
def add_embedding(self, student_id, embedding, image_path=None, model_name="unknown", quality_score=None):
embedding_array = np.asarray(embedding, dtype=np.float32)
now = datetime.now().isoformat(timespec="seconds")
with self._connect() as conn:
conn.execute(
"""
INSERT INTO face_embeddings
(student_id, embedding, embedding_dim, image_path, model_name, quality_score, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?)
""",
(
student_id,
embedding_array.tobytes(),
int(embedding_array.shape[0]),
image_path,
model_name,
quality_score,
now,
),
)
def load_embeddings(self):
with self._connect() as conn:
rows = conn.execute(
"""
SELECT
face_embeddings.id,
face_embeddings.student_id,
face_embeddings.embedding,
face_embeddings.embedding_dim,
face_embeddings.image_path,
face_embeddings.model_name,
face_embeddings.quality_score,
students.name AS student_name
FROM face_embeddings
JOIN students ON students.id = face_embeddings.student_id
WHERE students.status = 'active'
"""
).fetchall()
embeddings = []
for row in rows:
vector = np.frombuffer(row["embedding"], dtype=np.float32, count=row["embedding_dim"])
embeddings.append(
{
"id": int(row["id"]),
"student_id": int(row["student_id"]),
"student_name": row["student_name"],
"embedding": vector,
"image_path": row["image_path"],
"model_name": row["model_name"],
"quality_score": row["quality_score"],
}
)
return embeddings
def list_students(self):
with self._connect() as conn:
rows = conn.execute(
"""
SELECT students.*, COUNT(face_embeddings.id) AS embedding_count
FROM students
LEFT JOIN face_embeddings ON face_embeddings.student_id = students.id
GROUP BY students.id
ORDER BY students.name
"""
).fetchall()
return [dict(row) for row in rows]
def mark_attendance(self, student_id, student_name, confidence, camera_id=None, status="Present"):
timestamp = datetime.now()
date_text = timestamp.strftime("%Y-%m-%d")
time_text = timestamp.strftime("%H:%M:%S")
created_at = timestamp.isoformat(timespec="seconds")
with self._connect() as conn:
cursor = conn.execute(
"""
INSERT OR IGNORE INTO attendance
(student_id, student_name, date, time, status, confidence, camera_id, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""",
(student_id, student_name, date_text, time_text, status, confidence, camera_id, created_at),
)
return cursor.rowcount == 1
def list_attendance(self, date=None, limit=200):
params = []
where_clause = ""
if date:
where_clause = "WHERE date = ?"
params.append(date)
params.append(limit)
with self._connect() as conn:
rows = conn.execute(
f"""
SELECT id, student_id, student_name, date, time, status, confidence, camera_id, created_at
FROM attendance
{where_clause}
ORDER BY created_at DESC
LIMIT ?
""",
params,
).fetchall()
return [dict(row) for row in rows]
def add_alert(self, alert_type, message, image_path=None):
with self._connect() as conn:
conn.execute(
"""
INSERT INTO alerts (alert_type, message, image_path, created_at)
VALUES (?, ?, ?, ?)
""",
(alert_type, message, image_path, datetime.now().isoformat(timespec="seconds")),
)
def list_alerts(self, limit=100):
with self._connect() as conn:
rows = conn.execute(
"""
SELECT id, alert_type, message, image_path, created_at
FROM alerts
ORDER BY created_at DESC
LIMIT ?
""",
(limit,),
).fetchall()
return [dict(row) for row in rows]
def get_total_embeddings(self):
"""Get total count of face embeddings."""
with self._connect() as conn:
row = conn.execute("SELECT COUNT(*) as count FROM face_embeddings").fetchone()
return int(row["count"]) if row else 0
def get_attendance_by_date(self, date):
"""Get attendance records for a specific date."""
with self._connect() as conn:
rows = conn.execute(
"""SELECT student_name, time, confidence FROM attendance
WHERE date = ? ORDER BY time DESC""",
(date,)
).fetchall()
return [tuple(row) for row in rows]
def export_snapshot(self):
return json.dumps(
{
"students": self.list_students(),
"attendance": self.list_attendance(limit=500),
"alerts": self.list_alerts(limit=100),
},
indent=2,
)
|