import os import sqlite3 from datetime import datetime import numpy as np DB_PATH = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "attendance.db") def get_db_connection(): conn = sqlite3.connect(DB_PATH) conn.row_factory = sqlite3.Row return conn def init_db(): conn = get_db_connection() cursor = conn.cursor() # 1. Create Employees table cursor.execute(""" CREATE TABLE IF NOT EXISTS employees ( id TEXT PRIMARY KEY, name TEXT NOT NULL, email TEXT UNIQUE NOT NULL, role TEXT NOT NULL, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) """) # 2. Create Face Embeddings table (multi-angle storage) cursor.execute(""" CREATE TABLE IF NOT EXISTS face_embeddings ( id INTEGER PRIMARY KEY AUTOINCREMENT, employee_id TEXT NOT NULL, angle TEXT NOT NULL, -- 'center', 'left', 'right', 'up', 'down' embedding BLOB NOT NULL, -- Binary float array (128 floats) FOREIGN KEY (employee_id) REFERENCES employees(id) ON DELETE CASCADE, UNIQUE(employee_id, angle) ) """) # 3. Create Attendance Logs table cursor.execute(""" CREATE TABLE IF NOT EXISTS attendance_logs ( id INTEGER PRIMARY KEY AUTOINCREMENT, employee_id TEXT NOT NULL, timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP, event_type TEXT NOT NULL, -- 'check_in', 'check_out' similarity_score REAL NOT NULL, FOREIGN KEY (employee_id) REFERENCES employees(id) ON DELETE CASCADE ) """) conn.commit() conn.close() print("[+] SQLite Database initialized successfully!") # Initialize immediately upon module load init_db() # CRUD: Employees def add_employee(emp_id: str, name: str, email: str, role: str): conn = get_db_connection() cursor = conn.cursor() try: cursor.execute( "INSERT INTO employees (id, name, email, role) VALUES (?, ?, ?, ?)", (emp_id, name, email, role) ) conn.commit() return True except sqlite3.IntegrityError as e: print(f"[-] Database Integrity Error: {e}") return False finally: conn.close() def delete_employee(emp_id: str): conn = get_db_connection() cursor = conn.cursor() try: # Cascade deletes will clean up embeddings and logs due to CASCADE setup cursor.execute("PRAGMA foreign_keys = ON") cursor.execute("DELETE FROM employees WHERE id = ?", (emp_id,)) conn.commit() return True except Exception as e: print(f"[-] Failed to delete employee {emp_id}: {e}") return False finally: conn.close() def get_employee(emp_id: str): conn = get_db_connection() row = conn.execute("SELECT * FROM employees WHERE id = ?", (emp_id,)).fetchone() conn.close() return dict(row) if row else None def get_all_employees(): conn = get_db_connection() rows = conn.execute("SELECT * FROM employees ORDER BY created_at DESC").fetchall() conn.close() return [dict(r) for r in rows] # Face Embeddings Operations def save_face_embedding(emp_id: str, angle: str, embedding: np.ndarray): conn = get_db_connection() cursor = conn.cursor() try: # SFace embedding is a 1x128 array of float32 # Convert numpy array to raw bytes for binary storage embedding_bytes = embedding.astype(np.float32).tobytes() cursor.execute( """ INSERT OR REPLACE INTO face_embeddings (employee_id, angle, embedding) VALUES (?, ?, ?) """, (emp_id, angle, embedding_bytes) ) conn.commit() return True except Exception as e: print(f"[-] Error saving embedding: {e}") return False finally: conn.close() def get_all_embeddings(): conn = get_db_connection() rows = conn.execute( """ SELECT fe.employee_id, fe.angle, fe.embedding, e.name FROM face_embeddings fe JOIN employees e ON fe.employee_id = e.id """ ).fetchall() conn.close() embeddings_list = [] for r in rows: # Deserialize bytes back to float32 numpy array and reshape to (1, 128) for SFace emb_arr = np.frombuffer(r['embedding'], dtype=np.float32).reshape(1, 128) embeddings_list.append({ "employee_id": r['employee_id'], "name": r['name'], "angle": r['angle'], "embedding": emb_arr }) return embeddings_list def has_all_embeddings(emp_id: str): conn = get_db_connection() count = conn.execute( "SELECT COUNT(*) as count FROM face_embeddings WHERE employee_id = ?", (emp_id,) ).fetchone()['count'] conn.close() return count >= 5 # center, left, right, up, down # Attendance Logs Operations def log_attendance(emp_id: str, event_type: str, score: float): conn = get_db_connection() cursor = conn.cursor() try: timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") cursor.execute( """ INSERT INTO attendance_logs (employee_id, event_type, similarity_score, timestamp) VALUES (?, ?, ?, ?) """, (emp_id, event_type, score, timestamp) ) conn.commit() return True except Exception as e: print(f"[-] Error logging attendance: {e}") return False finally: conn.close() def get_last_attendance_log(emp_id: str): conn = get_db_connection() row = conn.execute( """ SELECT * FROM attendance_logs WHERE employee_id = ? ORDER BY timestamp DESC LIMIT 1 """, (emp_id,) ).fetchone() conn.close() return dict(row) if row else None def get_todays_logs(): conn = get_db_connection() today_str = datetime.now().strftime("%Y-%m-%d") rows = conn.execute( """ SELECT al.id, al.employee_id, al.event_type, al.similarity_score, al.timestamp, e.name, e.role FROM attendance_logs al JOIN employees e ON al.employee_id = e.id WHERE al.timestamp LIKE ? ORDER BY al.timestamp DESC """, (f"{today_str}%",) ).fetchall() conn.close() return [dict(r) for r in rows] def get_attendance_history(limit: int = 100): conn = get_db_connection() rows = conn.execute( """ SELECT al.id, al.employee_id, al.event_type, al.similarity_score, al.timestamp, e.name, e.role, e.email FROM attendance_logs al JOIN employees e ON al.employee_id = e.id ORDER BY al.timestamp DESC LIMIT ? """, (limit,) ).fetchall() conn.close() return [dict(r) for r in rows] def get_todays_stats(): conn = get_db_connection() today_str = datetime.now().strftime("%Y-%m-%d") # 1. Total employees total_emp = conn.execute("SELECT COUNT(*) as count FROM employees").fetchone()['count'] # 2. Total active logs today today_logs = conn.execute( "SELECT COUNT(*) as count FROM attendance_logs WHERE timestamp LIKE ?", (f"{today_str}%",) ).fetchone()['count'] # 3. Unique present employees today present_today = conn.execute( """ SELECT COUNT(DISTINCT employee_id) as count FROM attendance_logs WHERE timestamp LIKE ? AND event_type = 'check_in' """, (f"{today_str}%",) ).fetchone()['count'] # 4. Checked out today checked_out_today = conn.execute( """ SELECT COUNT(DISTINCT employee_id) as count FROM attendance_logs WHERE timestamp LIKE ? AND event_type = 'check_out' """, (f"{today_str}%",) ).fetchone()['count'] conn.close() return { "total_employees": total_emp, "today_logs_count": today_logs, "present_count": present_today, "checked_out_count": checked_out_today, "absent_count": max(0, total_emp - present_today) } def get_weekly_attendance(): conn = get_db_connection() cursor = conn.cursor() # Generate past 7 days (including today) import datetime as dt dates = [] for i in range(6, -1, -1): d = (dt.datetime.now() - dt.timedelta(days=i)).strftime("%Y-%m-%d") dates.append(d) stats = [] for d in dates: cursor.execute( """ SELECT COUNT(DISTINCT employee_id) as count FROM attendance_logs WHERE timestamp LIKE ? AND event_type = 'check_in' """, (f"{d}%",) ) row = cursor.fetchone() count = row['count'] if row else 0 date_obj = dt.datetime.strptime(d, "%Y-%m-%d") day_label = date_obj.strftime("%a") stats.append({"date": d, "label": day_label, "count": count}) conn.close() return stats def get_hourly_attendance(): conn = get_db_connection() cursor = conn.cursor() today_str = datetime.now().strftime("%Y-%m-%d") hours = ["09", "10", "11", "12", "13", "14", "15", "16", "17"] stats = [] for h in hours: cursor.execute( """ SELECT COUNT(*) as count FROM attendance_logs WHERE timestamp LIKE ? AND strftime('%H', timestamp) = ? AND event_type = 'check_in' """, (f"{today_str}%", h) ) row = cursor.fetchone() count = row['count'] if row else 0 h_int = int(h) ampm = "AM" if h_int < 12 else "PM" disp_h = h_int if h_int <= 12 else h_int - 12 if disp_h == 0: disp_h = 12 stats.append({"hour": h, "label": f"{disp_h} {ampm}", "count": count}) conn.close() return stats