SecureAttendAI / backend /database.py
Nishant Katiyar
Deploy biometric node to HF Spaces
b561839
Raw
History Blame Contribute Delete
9.89 kB
import os
import sqlite3
from datetime import datetime
import numpy as np
DB_PATH = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "attendance.db")
def get_db_connection():
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
return conn
def init_db():
conn = get_db_connection()
cursor = conn.cursor()
# 1. Create Employees table
cursor.execute("""
CREATE TABLE IF NOT EXISTS employees (
id TEXT PRIMARY KEY,
name TEXT NOT NULL,
email TEXT UNIQUE NOT NULL,
role TEXT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
# 2. Create Face Embeddings table (multi-angle storage)
cursor.execute("""
CREATE TABLE IF NOT EXISTS face_embeddings (
id INTEGER PRIMARY KEY AUTOINCREMENT,
employee_id TEXT NOT NULL,
angle TEXT NOT NULL, -- 'center', 'left', 'right', 'up', 'down'
embedding BLOB NOT NULL, -- Binary float array (128 floats)
FOREIGN KEY (employee_id) REFERENCES employees(id) ON DELETE CASCADE,
UNIQUE(employee_id, angle)
)
""")
# 3. Create Attendance Logs table
cursor.execute("""
CREATE TABLE IF NOT EXISTS attendance_logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
employee_id TEXT NOT NULL,
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
event_type TEXT NOT NULL, -- 'check_in', 'check_out'
similarity_score REAL NOT NULL,
FOREIGN KEY (employee_id) REFERENCES employees(id) ON DELETE CASCADE
)
""")
conn.commit()
conn.close()
print("[+] SQLite Database initialized successfully!")
# Initialize immediately upon module load
init_db()
# CRUD: Employees
def add_employee(emp_id: str, name: str, email: str, role: str):
conn = get_db_connection()
cursor = conn.cursor()
try:
cursor.execute(
"INSERT INTO employees (id, name, email, role) VALUES (?, ?, ?, ?)",
(emp_id, name, email, role)
)
conn.commit()
return True
except sqlite3.IntegrityError as e:
print(f"[-] Database Integrity Error: {e}")
return False
finally:
conn.close()
def delete_employee(emp_id: str):
conn = get_db_connection()
cursor = conn.cursor()
try:
# Cascade deletes will clean up embeddings and logs due to CASCADE setup
cursor.execute("PRAGMA foreign_keys = ON")
cursor.execute("DELETE FROM employees WHERE id = ?", (emp_id,))
conn.commit()
return True
except Exception as e:
print(f"[-] Failed to delete employee {emp_id}: {e}")
return False
finally:
conn.close()
def get_employee(emp_id: str):
conn = get_db_connection()
row = conn.execute("SELECT * FROM employees WHERE id = ?", (emp_id,)).fetchone()
conn.close()
return dict(row) if row else None
def get_all_employees():
conn = get_db_connection()
rows = conn.execute("SELECT * FROM employees ORDER BY created_at DESC").fetchall()
conn.close()
return [dict(r) for r in rows]
# Face Embeddings Operations
def save_face_embedding(emp_id: str, angle: str, embedding: np.ndarray):
conn = get_db_connection()
cursor = conn.cursor()
try:
# SFace embedding is a 1x128 array of float32
# Convert numpy array to raw bytes for binary storage
embedding_bytes = embedding.astype(np.float32).tobytes()
cursor.execute(
"""
INSERT OR REPLACE INTO face_embeddings (employee_id, angle, embedding)
VALUES (?, ?, ?)
""",
(emp_id, angle, embedding_bytes)
)
conn.commit()
return True
except Exception as e:
print(f"[-] Error saving embedding: {e}")
return False
finally:
conn.close()
def get_all_embeddings():
conn = get_db_connection()
rows = conn.execute(
"""
SELECT fe.employee_id, fe.angle, fe.embedding, e.name
FROM face_embeddings fe
JOIN employees e ON fe.employee_id = e.id
"""
).fetchall()
conn.close()
embeddings_list = []
for r in rows:
# Deserialize bytes back to float32 numpy array and reshape to (1, 128) for SFace
emb_arr = np.frombuffer(r['embedding'], dtype=np.float32).reshape(1, 128)
embeddings_list.append({
"employee_id": r['employee_id'],
"name": r['name'],
"angle": r['angle'],
"embedding": emb_arr
})
return embeddings_list
def has_all_embeddings(emp_id: str):
conn = get_db_connection()
count = conn.execute(
"SELECT COUNT(*) as count FROM face_embeddings WHERE employee_id = ?",
(emp_id,)
).fetchone()['count']
conn.close()
return count >= 5 # center, left, right, up, down
# Attendance Logs Operations
def log_attendance(emp_id: str, event_type: str, score: float):
conn = get_db_connection()
cursor = conn.cursor()
try:
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
cursor.execute(
"""
INSERT INTO attendance_logs (employee_id, event_type, similarity_score, timestamp)
VALUES (?, ?, ?, ?)
""",
(emp_id, event_type, score, timestamp)
)
conn.commit()
return True
except Exception as e:
print(f"[-] Error logging attendance: {e}")
return False
finally:
conn.close()
def get_last_attendance_log(emp_id: str):
conn = get_db_connection()
row = conn.execute(
"""
SELECT * FROM attendance_logs
WHERE employee_id = ?
ORDER BY timestamp DESC LIMIT 1
""",
(emp_id,)
).fetchone()
conn.close()
return dict(row) if row else None
def get_todays_logs():
conn = get_db_connection()
today_str = datetime.now().strftime("%Y-%m-%d")
rows = conn.execute(
"""
SELECT al.id, al.employee_id, al.event_type, al.similarity_score, al.timestamp, e.name, e.role
FROM attendance_logs al
JOIN employees e ON al.employee_id = e.id
WHERE al.timestamp LIKE ?
ORDER BY al.timestamp DESC
""",
(f"{today_str}%",)
).fetchall()
conn.close()
return [dict(r) for r in rows]
def get_attendance_history(limit: int = 100):
conn = get_db_connection()
rows = conn.execute(
"""
SELECT al.id, al.employee_id, al.event_type, al.similarity_score, al.timestamp, e.name, e.role, e.email
FROM attendance_logs al
JOIN employees e ON al.employee_id = e.id
ORDER BY al.timestamp DESC LIMIT ?
""",
(limit,)
).fetchall()
conn.close()
return [dict(r) for r in rows]
def get_todays_stats():
conn = get_db_connection()
today_str = datetime.now().strftime("%Y-%m-%d")
# 1. Total employees
total_emp = conn.execute("SELECT COUNT(*) as count FROM employees").fetchone()['count']
# 2. Total active logs today
today_logs = conn.execute(
"SELECT COUNT(*) as count FROM attendance_logs WHERE timestamp LIKE ?",
(f"{today_str}%",)
).fetchone()['count']
# 3. Unique present employees today
present_today = conn.execute(
"""
SELECT COUNT(DISTINCT employee_id) as count
FROM attendance_logs
WHERE timestamp LIKE ? AND event_type = 'check_in'
""",
(f"{today_str}%",)
).fetchone()['count']
# 4. Checked out today
checked_out_today = conn.execute(
"""
SELECT COUNT(DISTINCT employee_id) as count
FROM attendance_logs
WHERE timestamp LIKE ? AND event_type = 'check_out'
""",
(f"{today_str}%",)
).fetchone()['count']
conn.close()
return {
"total_employees": total_emp,
"today_logs_count": today_logs,
"present_count": present_today,
"checked_out_count": checked_out_today,
"absent_count": max(0, total_emp - present_today)
}
def get_weekly_attendance():
conn = get_db_connection()
cursor = conn.cursor()
# Generate past 7 days (including today)
import datetime as dt
dates = []
for i in range(6, -1, -1):
d = (dt.datetime.now() - dt.timedelta(days=i)).strftime("%Y-%m-%d")
dates.append(d)
stats = []
for d in dates:
cursor.execute(
"""
SELECT COUNT(DISTINCT employee_id) as count
FROM attendance_logs
WHERE timestamp LIKE ? AND event_type = 'check_in'
""",
(f"{d}%",)
)
row = cursor.fetchone()
count = row['count'] if row else 0
date_obj = dt.datetime.strptime(d, "%Y-%m-%d")
day_label = date_obj.strftime("%a")
stats.append({"date": d, "label": day_label, "count": count})
conn.close()
return stats
def get_hourly_attendance():
conn = get_db_connection()
cursor = conn.cursor()
today_str = datetime.now().strftime("%Y-%m-%d")
hours = ["09", "10", "11", "12", "13", "14", "15", "16", "17"]
stats = []
for h in hours:
cursor.execute(
"""
SELECT COUNT(*) as count
FROM attendance_logs
WHERE timestamp LIKE ? AND strftime('%H', timestamp) = ? AND event_type = 'check_in'
""",
(f"{today_str}%", h)
)
row = cursor.fetchone()
count = row['count'] if row else 0
h_int = int(h)
ampm = "AM" if h_int < 12 else "PM"
disp_h = h_int if h_int <= 12 else h_int - 12
if disp_h == 0:
disp_h = 12
stats.append({"hour": h, "label": f"{disp_h} {ampm}", "count": count})
conn.close()
return stats