Spaces:
Sleeping
Sleeping
| import os | |
| import sqlite3 | |
| from datetime import datetime | |
| import numpy as np | |
| DB_PATH = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "attendance.db") | |
| def get_db_connection(): | |
| conn = sqlite3.connect(DB_PATH) | |
| conn.row_factory = sqlite3.Row | |
| return conn | |
| def init_db(): | |
| conn = get_db_connection() | |
| cursor = conn.cursor() | |
| # 1. Create Employees table | |
| cursor.execute(""" | |
| CREATE TABLE IF NOT EXISTS employees ( | |
| id TEXT PRIMARY KEY, | |
| name TEXT NOT NULL, | |
| email TEXT UNIQUE NOT NULL, | |
| role TEXT NOT NULL, | |
| created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP | |
| ) | |
| """) | |
| # 2. Create Face Embeddings table (multi-angle storage) | |
| cursor.execute(""" | |
| CREATE TABLE IF NOT EXISTS face_embeddings ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| employee_id TEXT NOT NULL, | |
| angle TEXT NOT NULL, -- 'center', 'left', 'right', 'up', 'down' | |
| embedding BLOB NOT NULL, -- Binary float array (128 floats) | |
| FOREIGN KEY (employee_id) REFERENCES employees(id) ON DELETE CASCADE, | |
| UNIQUE(employee_id, angle) | |
| ) | |
| """) | |
| # 3. Create Attendance Logs table | |
| cursor.execute(""" | |
| CREATE TABLE IF NOT EXISTS attendance_logs ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| employee_id TEXT NOT NULL, | |
| timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP, | |
| event_type TEXT NOT NULL, -- 'check_in', 'check_out' | |
| similarity_score REAL NOT NULL, | |
| FOREIGN KEY (employee_id) REFERENCES employees(id) ON DELETE CASCADE | |
| ) | |
| """) | |
| conn.commit() | |
| conn.close() | |
| print("[+] SQLite Database initialized successfully!") | |
| # Initialize immediately upon module load | |
| init_db() | |
| # CRUD: Employees | |
| def add_employee(emp_id: str, name: str, email: str, role: str): | |
| conn = get_db_connection() | |
| cursor = conn.cursor() | |
| try: | |
| cursor.execute( | |
| "INSERT INTO employees (id, name, email, role) VALUES (?, ?, ?, ?)", | |
| (emp_id, name, email, role) | |
| ) | |
| conn.commit() | |
| return True | |
| except sqlite3.IntegrityError as e: | |
| print(f"[-] Database Integrity Error: {e}") | |
| return False | |
| finally: | |
| conn.close() | |
| def delete_employee(emp_id: str): | |
| conn = get_db_connection() | |
| cursor = conn.cursor() | |
| try: | |
| # Cascade deletes will clean up embeddings and logs due to CASCADE setup | |
| cursor.execute("PRAGMA foreign_keys = ON") | |
| cursor.execute("DELETE FROM employees WHERE id = ?", (emp_id,)) | |
| conn.commit() | |
| return True | |
| except Exception as e: | |
| print(f"[-] Failed to delete employee {emp_id}: {e}") | |
| return False | |
| finally: | |
| conn.close() | |
| def get_employee(emp_id: str): | |
| conn = get_db_connection() | |
| row = conn.execute("SELECT * FROM employees WHERE id = ?", (emp_id,)).fetchone() | |
| conn.close() | |
| return dict(row) if row else None | |
| def get_all_employees(): | |
| conn = get_db_connection() | |
| rows = conn.execute("SELECT * FROM employees ORDER BY created_at DESC").fetchall() | |
| conn.close() | |
| return [dict(r) for r in rows] | |
| # Face Embeddings Operations | |
| def save_face_embedding(emp_id: str, angle: str, embedding: np.ndarray): | |
| conn = get_db_connection() | |
| cursor = conn.cursor() | |
| try: | |
| # SFace embedding is a 1x128 array of float32 | |
| # Convert numpy array to raw bytes for binary storage | |
| embedding_bytes = embedding.astype(np.float32).tobytes() | |
| cursor.execute( | |
| """ | |
| INSERT OR REPLACE INTO face_embeddings (employee_id, angle, embedding) | |
| VALUES (?, ?, ?) | |
| """, | |
| (emp_id, angle, embedding_bytes) | |
| ) | |
| conn.commit() | |
| return True | |
| except Exception as e: | |
| print(f"[-] Error saving embedding: {e}") | |
| return False | |
| finally: | |
| conn.close() | |
| def get_all_embeddings(): | |
| conn = get_db_connection() | |
| rows = conn.execute( | |
| """ | |
| SELECT fe.employee_id, fe.angle, fe.embedding, e.name | |
| FROM face_embeddings fe | |
| JOIN employees e ON fe.employee_id = e.id | |
| """ | |
| ).fetchall() | |
| conn.close() | |
| embeddings_list = [] | |
| for r in rows: | |
| # Deserialize bytes back to float32 numpy array and reshape to (1, 128) for SFace | |
| emb_arr = np.frombuffer(r['embedding'], dtype=np.float32).reshape(1, 128) | |
| embeddings_list.append({ | |
| "employee_id": r['employee_id'], | |
| "name": r['name'], | |
| "angle": r['angle'], | |
| "embedding": emb_arr | |
| }) | |
| return embeddings_list | |
| def has_all_embeddings(emp_id: str): | |
| conn = get_db_connection() | |
| count = conn.execute( | |
| "SELECT COUNT(*) as count FROM face_embeddings WHERE employee_id = ?", | |
| (emp_id,) | |
| ).fetchone()['count'] | |
| conn.close() | |
| return count >= 5 # center, left, right, up, down | |
| # Attendance Logs Operations | |
| def log_attendance(emp_id: str, event_type: str, score: float): | |
| conn = get_db_connection() | |
| cursor = conn.cursor() | |
| try: | |
| timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
| cursor.execute( | |
| """ | |
| INSERT INTO attendance_logs (employee_id, event_type, similarity_score, timestamp) | |
| VALUES (?, ?, ?, ?) | |
| """, | |
| (emp_id, event_type, score, timestamp) | |
| ) | |
| conn.commit() | |
| return True | |
| except Exception as e: | |
| print(f"[-] Error logging attendance: {e}") | |
| return False | |
| finally: | |
| conn.close() | |
| def get_last_attendance_log(emp_id: str): | |
| conn = get_db_connection() | |
| row = conn.execute( | |
| """ | |
| SELECT * FROM attendance_logs | |
| WHERE employee_id = ? | |
| ORDER BY timestamp DESC LIMIT 1 | |
| """, | |
| (emp_id,) | |
| ).fetchone() | |
| conn.close() | |
| return dict(row) if row else None | |
| def get_todays_logs(): | |
| conn = get_db_connection() | |
| today_str = datetime.now().strftime("%Y-%m-%d") | |
| rows = conn.execute( | |
| """ | |
| SELECT al.id, al.employee_id, al.event_type, al.similarity_score, al.timestamp, e.name, e.role | |
| FROM attendance_logs al | |
| JOIN employees e ON al.employee_id = e.id | |
| WHERE al.timestamp LIKE ? | |
| ORDER BY al.timestamp DESC | |
| """, | |
| (f"{today_str}%",) | |
| ).fetchall() | |
| conn.close() | |
| return [dict(r) for r in rows] | |
| def get_attendance_history(limit: int = 100): | |
| conn = get_db_connection() | |
| rows = conn.execute( | |
| """ | |
| SELECT al.id, al.employee_id, al.event_type, al.similarity_score, al.timestamp, e.name, e.role, e.email | |
| FROM attendance_logs al | |
| JOIN employees e ON al.employee_id = e.id | |
| ORDER BY al.timestamp DESC LIMIT ? | |
| """, | |
| (limit,) | |
| ).fetchall() | |
| conn.close() | |
| return [dict(r) for r in rows] | |
| def get_todays_stats(): | |
| conn = get_db_connection() | |
| today_str = datetime.now().strftime("%Y-%m-%d") | |
| # 1. Total employees | |
| total_emp = conn.execute("SELECT COUNT(*) as count FROM employees").fetchone()['count'] | |
| # 2. Total active logs today | |
| today_logs = conn.execute( | |
| "SELECT COUNT(*) as count FROM attendance_logs WHERE timestamp LIKE ?", | |
| (f"{today_str}%",) | |
| ).fetchone()['count'] | |
| # 3. Unique present employees today | |
| present_today = conn.execute( | |
| """ | |
| SELECT COUNT(DISTINCT employee_id) as count | |
| FROM attendance_logs | |
| WHERE timestamp LIKE ? AND event_type = 'check_in' | |
| """, | |
| (f"{today_str}%",) | |
| ).fetchone()['count'] | |
| # 4. Checked out today | |
| checked_out_today = conn.execute( | |
| """ | |
| SELECT COUNT(DISTINCT employee_id) as count | |
| FROM attendance_logs | |
| WHERE timestamp LIKE ? AND event_type = 'check_out' | |
| """, | |
| (f"{today_str}%",) | |
| ).fetchone()['count'] | |
| conn.close() | |
| return { | |
| "total_employees": total_emp, | |
| "today_logs_count": today_logs, | |
| "present_count": present_today, | |
| "checked_out_count": checked_out_today, | |
| "absent_count": max(0, total_emp - present_today) | |
| } | |
| def get_weekly_attendance(): | |
| conn = get_db_connection() | |
| cursor = conn.cursor() | |
| # Generate past 7 days (including today) | |
| import datetime as dt | |
| dates = [] | |
| for i in range(6, -1, -1): | |
| d = (dt.datetime.now() - dt.timedelta(days=i)).strftime("%Y-%m-%d") | |
| dates.append(d) | |
| stats = [] | |
| for d in dates: | |
| cursor.execute( | |
| """ | |
| SELECT COUNT(DISTINCT employee_id) as count | |
| FROM attendance_logs | |
| WHERE timestamp LIKE ? AND event_type = 'check_in' | |
| """, | |
| (f"{d}%",) | |
| ) | |
| row = cursor.fetchone() | |
| count = row['count'] if row else 0 | |
| date_obj = dt.datetime.strptime(d, "%Y-%m-%d") | |
| day_label = date_obj.strftime("%a") | |
| stats.append({"date": d, "label": day_label, "count": count}) | |
| conn.close() | |
| return stats | |
| def get_hourly_attendance(): | |
| conn = get_db_connection() | |
| cursor = conn.cursor() | |
| today_str = datetime.now().strftime("%Y-%m-%d") | |
| hours = ["09", "10", "11", "12", "13", "14", "15", "16", "17"] | |
| stats = [] | |
| for h in hours: | |
| cursor.execute( | |
| """ | |
| SELECT COUNT(*) as count | |
| FROM attendance_logs | |
| WHERE timestamp LIKE ? AND strftime('%H', timestamp) = ? AND event_type = 'check_in' | |
| """, | |
| (f"{today_str}%", h) | |
| ) | |
| row = cursor.fetchone() | |
| count = row['count'] if row else 0 | |
| h_int = int(h) | |
| ampm = "AM" if h_int < 12 else "PM" | |
| disp_h = h_int if h_int <= 12 else h_int - 12 | |
| if disp_h == 0: | |
| disp_h = 12 | |
| stats.append({"hour": h, "label": f"{disp_h} {ampm}", "count": count}) | |
| conn.close() | |
| return stats | |