Att / app.py
Jacksonnavigator7's picture
Update app.py
679b4c8 verified
import gradio as gr
import sqlite3
import datetime
import hashlib
import os
import threading
import time
import secrets
import re
import json
import logging
from typing import Dict, List, Optional, Tuple, Union
from pathlib import Path
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler("attendance_system.log"),
logging.StreamHandler()
]
)
logger = logging.getLogger("attendance_system")
# Constants and Configuration
DB_PATH = os.getenv("DB_PATH", "attendance.db")
SESSION_EXPIRY = 3600 # Session token expires after 1 hour
ATTENDANCE_WINDOW = 300 # Default attendance window: 5 minutes
DEFAULT_ROLES = ["admin", "teacher", "student"]
SALT_LENGTH = 16 # For password security
# ========== DATABASE LAYER ==========
class Database:
"""Database management layer with connection pooling"""
def __init__(self, db_path: str):
self.db_path = db_path
self._ensure_directory_exists()
self.init_db()
def _ensure_directory_exists(self):
"""Ensure the directory for the database exists"""
directory = os.path.dirname(self.db_path)
if directory and not os.path.exists(directory):
os.makedirs(directory)
def get_connection(self):
"""Get a database connection"""
return sqlite3.connect(self.db_path)
def init_db(self):
"""Initialize the database schema"""
with self.get_connection() as conn:
c = conn.cursor()
# Users table with improved fields
c.execute('''CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT UNIQUE NOT NULL,
password_hash TEXT NOT NULL,
salt TEXT NOT NULL,
role TEXT NOT NULL,
email TEXT UNIQUE,
full_name TEXT,
created_at TEXT NOT NULL,
last_login TEXT,
active INTEGER DEFAULT 1
)''')
# Sessions table for better auth management
c.execute('''CREATE TABLE IF NOT EXISTS sessions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL,
token TEXT UNIQUE NOT NULL,
device_id TEXT,
created_at TEXT NOT NULL,
expires_at TEXT NOT NULL,
FOREIGN KEY (user_id) REFERENCES users(id)
)''')
# Enhanced attendance table
c.execute('''CREATE TABLE IF NOT EXISTS attendance (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL,
date TEXT NOT NULL,
time_in TEXT NOT NULL,
time_out TEXT,
session_id TEXT NOT NULL,
status TEXT DEFAULT 'Present',
latitude TEXT,
longitude TEXT,
device_info TEXT,
notes TEXT,
FOREIGN KEY (user_id) REFERENCES users(id),
UNIQUE(user_id, date, session_id)
)''')
# Sessions/classes table
c.execute('''CREATE TABLE IF NOT EXISTS class_sessions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
description TEXT,
created_by INTEGER NOT NULL,
start_time TEXT NOT NULL,
end_time TEXT,
attendance_window INTEGER DEFAULT 300,
code TEXT UNIQUE,
active INTEGER DEFAULT 1,
FOREIGN KEY (created_by) REFERENCES users(id)
)''')
# Settings table
c.execute('''CREATE TABLE IF NOT EXISTS settings (
id INTEGER PRIMARY KEY,
site_name TEXT DEFAULT 'Attendance System',
attendance_mode TEXT DEFAULT 'manual',
auto_close_window INTEGER DEFAULT 1,
default_window_time INTEGER DEFAULT 300,
geo_verification INTEGER DEFAULT 0,
allowed_radius INTEGER DEFAULT 100,
site_latitude TEXT,
site_longitude TEXT,
require_device_verification INTEGER DEFAULT 0,
theme TEXT DEFAULT 'light'
)''')
# Create admin user if not exists
c.execute("SELECT COUNT(*) FROM users WHERE role='admin'")
if c.fetchone()[0] == 0:
salt = secrets.token_hex(SALT_LENGTH)
password_hash = self._hash_password("admin", salt)
now = datetime.datetime.now().isoformat()
c.execute(
"INSERT INTO users (username, password_hash, salt, role, created_at) VALUES (?, ?, ?, ?, ?)",
("admin", password_hash, salt, "admin", now)
)
# Create default settings if not exists
c.execute("SELECT COUNT(*) FROM settings")
if c.fetchone()[0] == 0:
c.execute("INSERT INTO settings (id) VALUES (1)")
conn.commit()
def _hash_password(self, password: str, salt: str) -> str:
"""Hash the password with the given salt"""
return hashlib.pbkdf2_hmac(
'sha256',
password.encode(),
salt.encode(),
100000 # 100,000 iterations for security
).hex()
# ========== AUTHENTICATION & USER MANAGEMENT ==========
class UserManager:
"""Handles user authentication and management"""
def __init__(self, db: Database):
self.db = db
def register_user(self, username: str, password: str, role: str, email: str = None, full_name: str = None) -> Tuple[bool, str]:
"""Register a new user"""
# Validate inputs
if not username or not password or not role:
return False, "All required fields must be provided."
if role not in DEFAULT_ROLES:
return False, f"Invalid role. Must be one of: {', '.join(DEFAULT_ROLES)}"
if not self._validate_password_strength(password):
return False, "Password is too weak. Must be at least 8 characters with numbers and letters."
if email and not self._validate_email(email):
return False, "Invalid email format."
# Create the user
try:
with self.db.get_connection() as conn:
c = conn.cursor()
salt = secrets.token_hex(SALT_LENGTH)
password_hash = self.db._hash_password(password, salt)
now = datetime.datetime.now().isoformat()
c.execute(
"INSERT INTO users (username, password_hash, salt, role, email, full_name, created_at) VALUES (?, ?, ?, ?, ?, ?, ?)",
(username, password_hash, salt, role, email, full_name, now)
)
conn.commit()
logger.info(f"User registered: {username} with role {role}")
return True, "User registered successfully"
except sqlite3.IntegrityError:
return False, "Username or email already exists."
except Exception as e:
logger.error(f"Error registering user: {str(e)}")
return False, f"Registration error: {str(e)}"
def login(self, username: str, password: str, device_id: str = None) -> Tuple[bool, Union[str, Dict]]:
"""Authenticate a user and create a session"""
try:
with self.db.get_connection() as conn:
c = conn.cursor()
# Get user by username
c.execute("SELECT id, username, password_hash, salt, role FROM users WHERE username=? AND active=1", (username,))
user = c.fetchone()
if not user:
logger.warning(f"Login attempt for non-existent user: {username}")
return False, "Invalid username or password."
# Verify password
user_id, username, password_hash, salt, role = user
if password_hash != self.db._hash_password(password, salt):
logger.warning(f"Failed login attempt for user: {username}")
return False, "Invalid username or password."
# Create session token
token = secrets.token_urlsafe(32)
now = datetime.datetime.now()
expires = now + datetime.timedelta(seconds=SESSION_EXPIRY)
c.execute(
"INSERT INTO sessions (user_id, token, device_id, created_at, expires_at) VALUES (?, ?, ?, ?, ?)",
(user_id, token, device_id, now.isoformat(), expires.isoformat())
)
# Update last login
c.execute("UPDATE users SET last_login=? WHERE id=?", (now.isoformat(), user_id))
conn.commit()
logger.info(f"User logged in: {username}")
return True, {
"token": token,
"user_id": user_id,
"username": username,
"role": role,
"expires_at": expires.isoformat()
}
except Exception as e:
logger.error(f"Login error: {str(e)}")
return False, f"Login error: {str(e)}"
def verify_session(self, token: str, device_id: str = None) -> Tuple[bool, Union[str, Dict]]:
"""Verify a session token"""
try:
with self.db.get_connection() as conn:
c = conn.cursor()
now = datetime.datetime.now().isoformat()
query = """
SELECT s.id, s.user_id, u.username, u.role, s.device_id
FROM sessions s
JOIN users u ON s.user_id = u.id
WHERE s.token = ? AND s.expires_at > ? AND u.active = 1
"""
c.execute(query, (token, now))
session = c.fetchone()
if not session:
return False, "Session expired or invalid."
session_id, user_id, username, role, stored_device_id = session
# Device verification if required
if device_id and stored_device_id and device_id != stored_device_id:
logger.warning(f"Device mismatch for user {username}: {device_id} vs {stored_device_id}")
return False, "Device verification failed."
return True, {
"user_id": user_id,
"username": username,
"role": role
}
except Exception as e:
logger.error(f"Session verification error: {str(e)}")
return False, f"Session error: {str(e)}"
def logout(self, token: str) -> Tuple[bool, str]:
"""Invalidate a session token"""
try:
with self.db.get_connection() as conn:
c = conn.cursor()
c.execute("DELETE FROM sessions WHERE token = ?", (token,))
conn.commit()
return True, "Logged out successfully."
except Exception as e:
logger.error(f"Logout error: {str(e)}")
return False, f"Logout error: {str(e)}"
def _validate_password_strength(self, password: str) -> bool:
"""Validate password strength"""
if len(password) < 8:
return False
if not re.search(r'\d', password): # At least one digit
return False
if not re.search(r'[a-zA-Z]', password): # At least one letter
return False
return True
def _validate_email(self, email: str) -> bool:
"""Validate email format"""
pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
return bool(re.match(pattern, email))
def get_users(self, role: str = None) -> List[Dict]:
"""Get list of users, optionally filtered by role"""
try:
with self.db.get_connection() as conn:
c = conn.cursor()
query = "SELECT id, username, role, email, full_name, active FROM users"
params = ()
if role:
query += " WHERE role = ?"
params = (role,)
c.execute(query, params)
users = c.fetchall()
return [
{
"id": user[0],
"username": user[1],
"role": user[2],
"email": user[3],
"full_name": user[4],
"active": bool(user[5])
}
for user in users
]
except Exception as e:
logger.error(f"Error getting users: {str(e)}")
return []
# ========== ATTENDANCE MANAGEMENT ==========
class AttendanceManager:
"""Handles attendance-related operations"""
def __init__(self, db: Database):
self.db = db
def create_session(self, name: str, created_by: int, description: str = None,
window_time: int = None) -> Tuple[bool, Union[str, Dict]]:
"""Create a new attendance session/class"""
try:
with self.db.get_connection() as conn:
c = conn.cursor()
now = datetime.datetime.now()
# Get default window time from settings if not provided
if window_time is None:
c.execute("SELECT default_window_time FROM settings WHERE id=1")
window_time = c.fetchone()[0]
# Generate unique code for the session
code = secrets.token_hex(3).upper()
c.execute(
"""INSERT INTO class_sessions
(name, description, created_by, start_time, attendance_window, code, active)
VALUES (?, ?, ?, ?, ?, ?, 1)""",
(name, description, created_by, now.isoformat(), window_time, code)
)
session_id = c.lastrowid
conn.commit()
# Auto-close thread if needed
c.execute("SELECT auto_close_window FROM settings WHERE id=1")
auto_close = c.fetchone()[0]
if auto_close:
threading.Thread(
target=self._auto_close_session,
args=(session_id, window_time),
daemon=True
).start()
logger.info(f"Created session: {name} with code {code}")
return True, {
"session_id": session_id,
"code": code,
"name": name,
"start_time": now.isoformat(),
"window_time": window_time
}
except Exception as e:
logger.error(f"Error creating session: {str(e)}")
return False, f"Error creating session: {str(e)}"
def _auto_close_session(self, session_id: int, window_time: int):
"""Automatically close a session after window_time expires"""
time.sleep(window_time)
try:
with self.db.get_connection() as conn:
c = conn.cursor()
now = datetime.datetime.now().isoformat()
c.execute(
"UPDATE class_sessions SET active=0, end_time=? WHERE id=?",
(now, session_id)
)
conn.commit()
logger.info(f"Auto-closed session ID: {session_id}")
except Exception as e:
logger.error(f"Error auto-closing session {session_id}: {str(e)}")
def close_session(self, session_id: int) -> Tuple[bool, str]:
"""Manually close an attendance session"""
try:
with self.db.get_connection() as conn:
c = conn.cursor()
now = datetime.datetime.now().isoformat()
c.execute(
"UPDATE class_sessions SET active=0, end_time=? WHERE id=?",
(now, session_id)
)
conn.commit()
logger.info(f"Manually closed session ID: {session_id}")
return True, "Session closed successfully."
except Exception as e:
logger.error(f"Error closing session: {str(e)}")
return False, f"Error closing session: {str(e)}"
def mark_attendance(self, user_id: int, session_code: str,
device_info: str = None, geo_data: Dict = None) -> Tuple[bool, str]:
"""Mark attendance for a user"""
try:
with self.db.get_connection() as conn:
c = conn.cursor()
now = datetime.datetime.now()
today = now.date().isoformat()
# Check if session exists and is active
c.execute("SELECT id, active FROM class_sessions WHERE code=?", (session_code,))
session = c.fetchone()
if not session:
return False, "Invalid session code."
session_id, is_active = session
if not is_active:
return False, "This session is closed. Attendance cannot be marked."
# Check if attendance already marked
c.execute(
"SELECT id FROM attendance WHERE user_id=? AND date=? AND session_id=?",
(user_id, today, session_id)
)
if c.fetchone():
return False, "You have already marked attendance for this session today."
# Prepare geo data
latitude = longitude = None
if geo_data:
latitude = geo_data.get('latitude')
longitude = geo_data.get('longitude')
# Verify geo location if needed
c.execute("SELECT geo_verification, site_latitude, site_longitude, allowed_radius FROM settings WHERE id=1")
geo_settings = c.fetchone()
geo_verification, site_lat, site_lon, allowed_radius = geo_settings
if geo_verification and site_lat and site_lon:
if not self._verify_location(latitude, longitude, float(site_lat), float(site_lon), allowed_radius):
return False, "You are outside the allowed area for attendance."
# Insert attendance record
c.execute(
"""INSERT INTO attendance
(user_id, date, time_in, session_id, device_info, latitude, longitude)
VALUES (?, ?, ?, ?, ?, ?, ?)""",
(user_id, today, now.isoformat(), session_id,
json.dumps(device_info) if device_info else None,
str(latitude) if latitude else None,
str(longitude) if longitude else None)
)
conn.commit()
logger.info(f"Attendance marked for user {user_id} in session {session_code}")
return True, f"Attendance marked successfully at {now.strftime('%H:%M:%S')}"
except sqlite3.IntegrityError:
return False, "You have already marked attendance for this session."
except Exception as e:
logger.error(f"Error marking attendance: {str(e)}")
return False, f"Error marking attendance: {str(e)}"
def _verify_location(self, user_lat: float, user_lon: float,
site_lat: float, site_lon: float, max_distance: float) -> bool:
"""Verify if user is within allowed distance from site"""
# Simple Euclidean distance for demo purposes
# In production, use proper geospatial calculations
if not user_lat or not user_lon:
return False
# Approximate conversion from lat/long to meters
# This is a rough approximation - works only for short distances
lat_diff = abs(user_lat - site_lat) * 111000 # 1 degree lat ≈ 111km
lon_diff = abs(user_lon - site_lon) * 111000 * abs(math.cos(math.radians(site_lat)))
distance = math.sqrt(lat_diff**2 + lon_diff**2)
return distance <= max_distance
def get_attendance_report(self, filters: Dict = None) -> List[Dict]:
"""Get attendance report with optional filters"""
try:
with self.db.get_connection() as conn:
c = conn.cursor()
# Build query based on filters
query = """
SELECT a.id, u.username, u.full_name, a.date, a.time_in, a.time_out,
cs.name as session_name, cs.code as session_code, a.status
FROM attendance a
JOIN users u ON a.user_id = u.id
JOIN class_sessions cs ON a.session_id = cs.id
"""
conditions = []
params = []
if filters:
if 'user_id' in filters:
conditions.append("a.user_id = ?")
params.append(filters['user_id'])
if 'date' in filters:
conditions.append("a.date = ?")
params.append(filters['date'])
if 'session_id' in filters:
conditions.append("a.session_id = ?")
params.append(filters['session_id'])
if 'status' in filters:
conditions.append("a.status = ?")
params.append(filters['status'])
if 'date_range' in filters:
start, end = filters['date_range']
conditions.append("a.date BETWEEN ? AND ?")
params.extend([start, end])
if conditions:
query += " WHERE " + " AND ".join(conditions)
query += " ORDER BY a.date DESC, a.time_in DESC"
c.execute(query, params)
records = c.fetchall()
return [
{
"id": r[0],
"username": r[1],
"full_name": r[2],
"date": r[3],
"time_in": r[4],
"time_out": r[5],
"session_name": r[6],
"session_code": r[7],
"status": r[8]
}
for r in records
]
except Exception as e:
logger.error(f"Error generating attendance report: {str(e)}")
return []
def update_attendance_status(self, attendance_id: int, status: str) -> Tuple[bool, str]:
"""Update the status of an attendance record"""
valid_statuses = ["Present", "Absent", "Late", "Excused"]
if status not in valid_statuses:
return False, f"Invalid status. Must be one of: {', '.join(valid_statuses)}"
try:
with self.db.get_connection() as conn:
c = conn.cursor()
c.execute("UPDATE attendance SET status=? WHERE id=?", (status, attendance_id))
conn.commit()
logger.info(f"Updated attendance {attendance_id} status to {status}")
return True, f"Status updated to {status}"
except Exception as e:
logger.error(f"Error updating attendance status: {str(e)}")
return False, f"Error updating status: {str(e)}"
def get_active_sessions(self) -> List[Dict]:
"""Get all active attendance sessions"""
try:
with self.db.get_connection() as conn:
c = conn.cursor()
c.execute("""
SELECT cs.id, cs.name, cs.description, cs.code, cs.start_time,
u.username as creator
FROM class_sessions cs
JOIN users u ON cs.created_by = u.id
WHERE cs.active = 1
ORDER BY cs.start_time DESC
""")
sessions = c.fetchall()
return [
{
"id": s[0],
"name": s[1],
"description": s[2],
"code": s[3],
"start_time": s[4],
"creator": s[5]
}
for s in sessions
]
except Exception as e:
logger.error(f"Error getting active sessions: {str(e)}")
return []
def get_sessions_history(self, limit: int = 50) -> List[Dict]:
"""Get history of attendance sessions"""
try:
with self.db.get_connection() as conn:
c = conn.cursor()
c.execute("""
SELECT cs.id, cs.name, cs.code, cs.start_time, cs.end_time,
(SELECT COUNT(*) FROM attendance WHERE session_id = cs.id) as count
FROM class_sessions cs
ORDER BY cs.start_time DESC
LIMIT ?
""", (limit,))
sessions = c.fetchall()
return [
{
"id": s[0],
"name": s[1],
"code": s[2],
"start_time": s[3],
"end_time": s[4],
"attendance_count": s[5]
}
for s in sessions
]
except Exception as e:
logger.error(f"Error getting sessions history: {str(e)}")
return []
# ========== SETTINGS MANAGEMENT ==========
class SettingsManager:
"""Manages system settings"""
def __init__(self, db: Database):
self.db = db
def get_settings(self) -> Dict:
"""Get all system settings"""
try:
with self.db.get_connection() as conn:
c = conn.cursor()
c.execute("SELECT * FROM settings WHERE id=1")
columns = [description[0] for description in c.description]
values = c.fetchone()
if not values:
return {}
return dict(zip(columns, values))
except Exception as e:
logger.error(f"Error fetching settings: {str(e)}")
return {}
def update_settings(self, settings: Dict) -> Tuple[bool, str]:
"""Update system settings"""
try:
with self.db.get_connection() as conn:
c = conn.cursor()
allowed_keys = {
'site_name', 'attendance_mode', 'auto_close_window',
'default_window_time', 'geo_verification', 'allowed_radius',
'site_latitude', 'site_longitude', 'require_device_verification',
'theme'
}
# Validate keys
invalid_keys = set(settings.keys()) - allowed_keys
if invalid_keys:
return False, f"Invalid settings: {', '.join(invalid_keys)}"
# Build update query
if not settings:
return False, "No settings provided to update."
query_parts = []
params = []
for key, value in settings.items():
query_parts.append(f"{key} = ?")
params.append(value)
query = "UPDATE settings SET " + ", ".join(query_parts) + " WHERE id=1"
c.execute(query, params)
conn.commit()
logger.info(f"Updated settings: {', '.join(settings.keys())}")
return True, "Settings updated successfully."
except Exception as e:
logger.error(f"Error updating settings: {str(e)}")
return False, f"Error updating settings: {str(e)}"
# ========== REPORTING ==========
class ReportGenerator:
"""Generates various reports from attendance data"""
def __init__(self, db: Database):
self.db = db
def summary_report(self, start_date: str, end_date: str) -> Dict:
"""Generate summary attendance report for a date range"""
try:
with self.db.get_connection() as conn:
c = conn.cursor()
# Total attendance counts
c.execute("""
SELECT COUNT(*) as total_records,
COUNT(DISTINCT user_id) as unique_students,
COUNT(DISTINCT session_id) as unique_sessions
FROM attendance
WHERE date BETWEEN ? AND ?
""", (start_date, end_date))
totals = c.fetchone()
# Status breakdown
c.execute("""
SELECT status, COUNT(*) as count
FROM attendance
WHERE date BETWEEN ? AND ?
GROUP BY status
""", (start_date, end_date))
status_counts = {status: count for status, count in c.fetchall()}
# Session attendance rates
c.execute("""
SELECT cs.name, cs.code, COUNT(*) as attendance_count
FROM attendance a
JOIN class_sessions cs ON a.session_id = cs.id
WHERE a.date BETWEEN ? AND ?
GROUP BY a.session_id
ORDER BY attendance_count DESC
""", (start_date, end_date))
session_stats = [
{"name": name, "code": code, "count": count}
for name, code, count in c.fetchall()
]
# Student attendance frequency
c.execute("""
SELECT u.username, u.full_name, COUNT(*) as attendance_count
FROM attendance a
JOIN users u ON a.user_id = u.id
WHERE a.date BETWEEN ? AND ?
GROUP BY a.user_id
ORDER BY attendance_count DESC
""", (start_date, end_date))
student_stats = [
{"username": username, "full_name": full_name, "count": count}
for username, full_name, count in c.fetchall()
]
return {
"date_range": {"start": start_date, "end": end_date},
"totals": {
"records": totals[0],
"students": totals[1],
"sessions": totals[2]
},
"status_breakdown": status_counts,
"top_sessions": session_stats[:10],
"top_students": student_stats[:10]
}
except Exception as e:
logger.error(f"Error generating summary report: {str(e)}")
return {
"error": str(e),
"date_range": {"start": start_date, "end": end_date},
"totals": {"records": 0, "students": 0, "sessions": 0},
"status_breakdown": {},
"top_sessions": [],
"top_students": []
}
def student_detail_report(self, user_id: int, start_date: str = None, end_date: str = None) -> Dict:
"""Generate detailed report for a specific student"""
try:
with self.db.get_connection() as conn:
c = conn.cursor()
# Get student info
c.execute("SELECT username, full_name, email FROM users WHERE id=?", (user_id,))
student = c.fetchone()
if not student:
return {"error": "Student not found"}
username, full_name, email = student
# Build query for attendance records
query = """
SELECT a.date, a.time_in, cs.name as session_name, cs.code, a.status
FROM attendance a
JOIN class_sessions cs ON a.session_id = cs.id
WHERE a.user_id = ?
"""
params = [user_id]
if start_date:
query += " AND a.date >= ?"
params.append(start_date)
if end_date:
query += " AND a.date <= ?"
params.append(end_date)
query += " ORDER BY a.date DESC, a.time_in DESC"
c.execute(query, params)
attendance_records = [
{
"date": date,
"time": time_in,
"session": session_name,
"code": code,
"status": status
}
for date, time_in, session_name, code, status in c.fetchall()
]
# Calculate statistics
attendance_count = len(attendance_records)
status_counts = {}
for record in attendance_records:
status = record["status"]
status_counts[status] = status_counts.get(status, 0) + 1
return {
"student": {
"id": user_id,
"username": username,
"full_name": full_name,
"email": email
},
"attendance": {
"total": attendance_count,
"status_breakdown": status_counts,
"records": attendance_records
}
}
except Exception as e:
logger.error(f"Error generating student report: {str(e)}")
return {"error": str(e)}
def export_csv(self, data: List[Dict], filename: str) -> str:
"""Export report data to CSV file"""
try:
if not data:
return "No data to export"
# Create directory if it doesn't exist
os.makedirs("exports", exist_ok=True)
filepath = os.path.join("exports", filename)
# Write to CSV
with open(filepath, 'w', newline='') as csvfile:
if isinstance(data[0], dict):
fieldnames = data[0].keys()
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(data)
else:
writer = csv.writer(csvfile)
writer.writerows(data)
return filepath
except Exception as e:
logger.error(f"Error exporting to CSV: {str(e)}")
return f"Error: {str(e)}"
# ========== GRADIO UI ==========
# Initialize system components
db = Database(DB_PATH)
user_manager = UserManager(db)
attendance_manager = AttendanceManager(db)
settings_manager = SettingsManager(db)
report_generator = ReportGenerator(db)
# UI Helper functions
def format_message(success: bool, message: str) -> str:
"""Format a message with appropriate emoji"""
return f"✅ {message}" if success else f"❌ {message}"
def format_time(time_str: str) -> str:
"""Format time string for display"""
if not time_str:
return ""
try:
dt = datetime.datetime.fromisoformat(time_str)
return dt.strftime("%Y-%m-%d %H:%M:%S")
except:
return time_str
# Main Gradio UI
with gr.Blocks(title="Advanced Attendance System", theme="default") as app:
# Store session state
session_state = gr.State({})
# Header and theme
with gr.Row():
gr.Markdown("# 📝 Advanced Attendance System")
theme_toggle = gr.Radio(
["Light", "Dark"],
label="Theme",
value="Light",
interactive=True
)
# Login/Register tabs
with gr.Tab("👤 Authentication"):
with gr.Row():
with gr.Column():
gr.Markdown("### 🔑 Login")
login_username = gr.Text(label="Username")
login_password = gr.Text(label="Password", type="password")
login_device = gr.Text(label="Device ID (Optional)", placeholder="e.g., device-uuid")
login_button = gr.Button("Login", variant="primary")
login_output = gr.Markdown()
with gr.Column():
gr.Markdown("### 📝 Register")
reg_username = gr.Text(label="Username")
reg_password = gr.Text(label="Password", type="password")
reg_confirm = gr.Text(label="Confirm Password", type="password")
reg_email = gr.Text(label="Email (Optional)")
reg_fullname = gr.Text(label="Full Name (Optional)")
reg_role = gr.Radio(["student", "teacher"], label="Role", value="student")
reg_button = gr.Button("Register")
reg_output = gr.Markdown()
# Student tab
with gr.Tab("👨‍🎓 Student"):
gr.Markdown("### 📊 Mark Attendance")
with gr.Row():
student_code = gr.Text(label="Session Code", placeholder="Enter the code provided by your teacher")
student_mark_btn = gr.Button("Mark Attendance", variant="primary")
with gr.Row():
student_status = gr.Markdown("Please login to access student features")
with gr.Accordion("My Attendance History", open=False):
student_refresh_btn = gr.Button("Refresh History")
student_history = gr.DataFrame(label="My Attendance Records")
# Teacher tab
with gr.Tab("👩‍🏫 Teacher") as teacher_tab:
with gr.Row():
teacher_status = gr.Markdown("Please login as a teacher to access these features")
with gr.Tabs() as teacher_tabs:
with gr.Tab("Create Session"):
with gr.Row():
session_name = gr.Text(label="Session Name", placeholder="e.g., Math 101 - Week 3")
session_desc = gr.Text(label="Description (Optional)", placeholder="Brief description of this session")
with gr.Row():
session_window = gr.Slider(label="Attendance Window (seconds)",
minimum=60, maximum=1800, value=300, step=60)
create_session_btn = gr.Button("Create New Session", variant="primary")
create_output = gr.Markdown()
with gr.Accordion("Active Sessions", open=True):
active_refresh = gr.Button("Refresh Active Sessions")
active_sessions = gr.DataFrame(label="Currently Active Sessions")
close_session_id = gr.Number(label="Session ID to Close", precision=0)
close_btn = gr.Button("Close Selected Session")
close_output = gr.Markdown()
with gr.Tab("Attendance Reports"):
with gr.Row():
report_date_start = gr.Date(label="Start Date")
report_date_end = gr.Date(label="End Date")
report_type = gr.Radio(
["Summary", "Detailed", "By Student", "By Session"],
label="Report Type",
value="Summary"
)
with gr.Row():
report_filter = gr.Dropdown(label="Additional Filter (Optional)")
report_gen_btn = gr.Button("Generate Report", variant="primary")
report_output = gr.Markdown()
report_data = gr.DataFrame(label="Report Data")
export_btn = gr.Button("Export to CSV")
export_output = gr.Markdown()
# Admin tab
with gr.Tab("⚙️ Admin"):
admin_status = gr.Markdown("Please login as admin to access these features")
with gr.Tabs() as admin_tabs:
with gr.Tab("User Management"):
with gr.Row():
user_filter = gr.Radio(
["All Users", "Students", "Teachers", "Admins"],
label="Filter Users",
value="All Users"
)
user_refresh = gr.Button("Refresh User List")
users_table = gr.DataFrame(label="Users")
with gr.Row():
user_action = gr.Radio(
["Activate", "Deactivate", "Change Role", "Reset Password"],
label="Action"
)
user_id = gr.Number(label="User ID", precision=0)
user_param = gr.Text(label="New Value (for role/password)")
user_action_btn = gr.Button("Apply Action")
user_action_output = gr.Markdown()
with gr.Tab("System Settings"):
with gr.Row():
settings_refresh = gr.Button("Load Current Settings")
with gr.Group():
gr.Markdown("### General Settings")
site_name = gr.Text(label="Site Name")
theme_setting = gr.Radio(["light", "dark"], label="Default Theme")
with gr.Group():
gr.Markdown("### Attendance Settings")
attendance_mode = gr.Radio(
["manual", "auto", "scheduled"],
label="Attendance Mode"
)
auto_close = gr.Checkbox(label="Auto-close attendance windows")
window_time = gr.Slider(
label="Default Window Time (seconds)",
minimum=60, maximum=3600, value=300, step=60
)
with gr.Group():
gr.Markdown("### Security Settings")
geo_verify = gr.Checkbox(label="Enable Geographic Verification")
with gr.Row(visible=False) as geo_settings:
site_lat = gr.Number(label="Site Latitude")
site_lon = gr.Number(label="Site Longitude")
geo_radius = gr.Slider(
label="Allowed Radius (meters)",
minimum=10, maximum=1000, value=100
)
device_verify = gr.Checkbox(label="Require Device Verification")
settings_save = gr.Button("Save Settings", variant="primary")
settings_output = gr.Markdown()
# Login function
def do_login(username, password, device_id):
success, result = user_manager.login(username, password, device_id)
if success:
return {
"content": f"✅ Welcome, {result['username']}! You are now logged in.",
"user": result
}
else:
return {"content": f"❌ {result}", "user": None}
# Register function
def do_register(username, password, confirm, email, fullname, role):
if not username or not password:
return "❌ Username and password are required."
if password != confirm:
return "❌ Passwords do not match."
success, message = user_manager.register_user(
username=username,
password=password,
role=role,
email=email,
full_name=fullname
)
return format_message(success, message)
# Mark attendance function
def do_mark_attendance(code, state):
if not state.get("user"):
return "❌ Please login first."
user = state["user"]
success, message = attendance_manager.mark_attendance(
user_id=user["user_id"],
session_code=code,
device_info={"device_id": state.get("device_id")}
)
return format_message(success, message)
# Get student history
def get_student_history(state):
if not state.get("user"):
return []
user = state["user"]
filters = {"user_id": user["user_id"]}
records = attendance_manager.get_attendance_report(filters)
return pd.DataFrame([
{
"Date": r["date"],
"Time": r["time_in"],
"Session": r["session_name"],
"Code": r["session_code"],
"Status": r["status"]
} for r in records
])
# Create session function
def do_create_session(name, desc, window, state):
if not state.get("user") or state["user"]["role"] not in ["teacher", "admin"]:
return "❌ You must be logged in as a teacher or admin."
if not name:
return "❌ Session name is required."
success, result = attendance_manager.create_session(
name=name,
description=desc,
created_by=state["user"]["user_id"],
window_time=int(window)
)
if success:
return f"✅ Session created successfully!\n\nSession code: **{result['code']}**\n\nShare this code with students to allow them to mark attendance."
else:
return f"❌ {result}"
# Get active sessions
def get_active_sessions():
sessions = attendance_manager.get_active_sessions()
if not sessions:
return []
return pd.DataFrame([
{
"ID": s["id"],
"Name": s["name"],
"Description": s["description"] or "-",
"Code": s["code"],
"Started": format_time(s["start_time"]),
"Creator": s["creator"]
} for s in sessions
])
# Close session function
def do_close_session(session_id, state):
if not state.get("user") or state["user"]["role"] not in ["teacher", "admin"]:
return "❌ You must be logged in as a teacher or admin."
if not session_id:
return "❌ Session ID is required."
success, message = attendance_manager.close_session(int(session_id))
return format_message(success, message)
# Generate report function
def generate_report(start_date, end_date, report_type, filter_value, state):
if not state.get("user") or state["user"]["role"] not in ["teacher", "admin"]:
return "❌ You must be logged in as a teacher or admin.", None
if not start_date or not end_date:
return "❌ Please select both start and end dates.", None
if start_date > end_date:
return "❌ Start date must be before end date.", None
if report_type == "Summary":
report = report_generator.summary_report(start_date.isoformat(), end_date.isoformat())
# Format report summary
summary = f"### Report Summary: {start_date} to {end_date}\n\n"
summary += f"Total Records: {report['totals']['records']}\n"
summary += f"Unique Students: {report['totals']['students']}\n"
summary += f"Sessions: {report['totals']['sessions']}\n\n"
# Prepare DataFrame
session_data = []
for s in report["top_sessions"]:
session_data.append({
"Session Name": s["name"],
"Code": s["code"],
"Attendance Count": s["count"]
})
return summary, pd.DataFrame(session_data)
elif report_type == "By Student":
if not filter_value:
return "❌ Please select a student.", None
report = report_generator.student_detail_report(
int(filter_value),
start_date.isoformat(),
end_date.isoformat()
)
if "error" in report:
return f"❌ {report['error']}", None
summary = f"### Student Report: {report['student']['full_name'] or report['student']['username']}\n\n"
summary += f"Total Attendance: {report['attendance']['total']} records\n\n"
summary += "Status Breakdown:\n"
for status, count in report['attendance']['status_breakdown'].items():
summary += f"- {status}: {count}\n"
# Prepare DataFrame
attendance_data = []
for r in report["attendance"]["records"]:
attendance_data.append({
"Date": r["date"],
"Time": r["time"],
"Session": r["session"],
"Code": r["code"],
"Status": r["status"]
})
return summary, pd.DataFrame(attendance_data)
else:
records = attendance_manager.get_attendance_report({
"date_range": (start_date.isoformat(), end_date.isoformat())
})
summary = f"### Detailed Attendance Report: {start_date} to {end_date}\n\n"
summary += f"Total Records: {len(records)}\n"
# Prepare DataFrame
data = []
for r in records:
data.append({
"Student": r["username"],
"Full Name": r["full_name"] or "-",
"Date": r["date"],
"Time": r["time_in"],
"Session": r["session_name"],
"Status": r["status"]
})
return summary, pd.DataFrame(data)
# Export report
def export_report_csv(report_data, state):
if not state.get("user") or state["user"]["role"] not in ["teacher", "admin"]:
return "❌ You must be logged in as a teacher or admin."
if report_data is None or report_data.empty:
return "❌ No data to export."
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"attendance_report_{timestamp}.csv"
# Convert DataFrame to dict list
data = report_data.to_dict('records')
filepath = report_generator.export_csv(data, filename)
if filepath.startswith("Error"):
return f"❌ {filepath}"
else:
return f"✅ Report exported successfully to {filepath}"
# Get users list
def get_users_list(filter_type, state):
if not state.get("user") or state["user"]["role"] != "admin":
return []
role_filter = None
if filter_type == "Students":
role_filter = "student"
elif filter_type == "Teachers":
role_filter = "teacher"
elif filter_type == "Admins":
role_filter = "admin"
users = user_manager.get_users(role_filter)
return pd.DataFrame([
{
"ID": u["id"],
"Username": u["username"],
"Role": u["role"],
"Email": u["email"] or "-",
"Full Name": u["full_name"] or "-",
"Active": "Yes" if u["active"] else "No"
} for u in users
])
# Process user action
def process_user_action(action, user_id, param, state):
if not state.get("user") or state["user"]["role"] != "admin":
return "❌ You must be an admin to perform this action."
if not user_id:
return "❌ Please select a user ID."
# Connect and perform action
try:
with db.get_connection() as conn:
c = conn.cursor()
if action == "Activate":
c.execute("UPDATE users SET active=1 WHERE id=?", (int(user_id),))
result = "User activated successfully."
elif action == "Deactivate":
c.execute("UPDATE users SET active=0 WHERE id=?", (int(user_id),))
result = "User deactivated successfully."
elif action == "Change Role":
if param not in DEFAULT_ROLES:
return f"❌ Invalid role. Must be one of: {', '.join(DEFAULT_ROLES)}"
c.execute("UPDATE users SET role=? WHERE id=?", (param, int(user_id)))
result = "User role updated successfully."
elif action == "Reset Password":
if not param or len(param) < 8:
return "❌ Password must be at least 8 characters."
salt = secrets.token_hex(SALT_LENGTH)
password_hash = db._hash_password(param, salt)
c.execute("UPDATE users SET password_hash=?, salt=? WHERE id=?",
(password_hash, salt, int(user_id)))
result = "Password reset successfully."
else:
return "❌ Invalid action."
conn.commit()
return f"✅ {result}"
except Exception as e:
logger.error(f"Error in user action: {str(e)}")
return f"❌ Error: {str(e)}"
# Get current settings
def get_current_settings(state):
if not state.get("user") or state["user"]["role"] != "admin":
return ("", "light", "manual", False, 300, False, 0, 0, 100, False)
settings = settings_manager.get_settings()
if not settings:
return ("", "light", "manual", False, 300, False, 0, 0, 100, False)
return (
settings.get("site_name", ""),
settings.get("theme", "light"),
settings.get("attendance_mode", "manual"),
bool(settings.get("auto_close_window", False)),
settings.get("default_window_time", 300),
bool(settings.get("geo_verification", False)),
float(settings.get("site_latitude", 0) or 0),
float(settings.get("site_longitude", 0) or 0),
settings.get("allowed_radius", 100),
bool(settings.get("require_device_verification", False))
)
# Save settings
def save_settings(site_name, theme, mode, auto_close, window_time,
geo_verify, site_lat, site_lon, radius, device_verify, state):
if not state.get("user") or state["user"]["role"] != "admin":
return "❌ You must be an admin to change settings."
updated_settings = {
"site_name": site_name,
"theme": theme,
"attendance_mode": mode,
"auto_close_window": 1 if auto_close else 0,
"default_window_time": int(window_time),
"geo_verification": 1 if geo_verify else 0,
"require_device_verification": 1 if device_verify else 0
}
if geo_verify:
updated_settings.update({
"site_latitude": str(site_lat),
"site_longitude": str(site_lon),
"allowed_radius": int(radius)
})
success, message = settings_manager.update_settings(updated_settings)
return format_message(success, message)
# Connect UI components to functions
login_button.click(
fn=do_login,
inputs=[login_username, login_password, login_device],
outputs=[login_output, session_state]
)
reg_button.click(
fn=do_register,
inputs=[reg_username, reg_password, reg_confirm, reg_email, reg_fullname, reg_role],
outputs=reg_output
)
student_mark_btn.click(
fn=do_mark_attendance,
inputs=[student_code, session_state],
outputs=student_status
)
student_refresh_btn.click(
fn=get_student_history,
inputs=[session_state],
outputs=student_history
)
create_session_btn.click(
fn=do_create_session,
inputs=[session_name, session_desc, session_window, session_state],
outputs=create_output
)
active_refresh.click(
fn=get_active_sessions,
inputs=None,
outputs=active_sessions
)
close_btn.click(
fn=do_close_session,
inputs=[close_session_id, session_state],
outputs=close_output
)
report_gen_btn.click(
fn=generate_report,
inputs=[report_date_start, report_date_end, report_type, report_filter, session_state],
outputs=[report_output, report_data]
)
export_btn.click(
fn=export_report_csv,
inputs=[report_data, session_state],
outputs=export_output
)
user_refresh.click(
fn=get_users_list,
inputs=[user_filter, session_state],
outputs=users_table
)
user_action_btn.click(
fn=process_user_action,
inputs=[user_action, user_id, user_param, session_state],
outputs=user_action_output
)
settings_refresh.click(
fn=get_current_settings,
inputs=[session_state],
outputs=[site_name, theme_setting, attendance_mode, auto_close, window_time,
geo_verify, site_lat, site_lon, geo_radius, device_verify]
)
settings_save.click(
fn=save_settings,
inputs=[site_name, theme_setting, attendance_mode, auto_close, window_time,
geo_verify, site_lat, site_lon, geo_radius, device_verify, session_state],
outputs=settings_output
)
# Dynamic visibility and state update handlers
def update_student_status(state):
if not state:
return "Please login to access student features"
return f"Logged in as: {state.get('username', 'Unknown')}"
def update_teacher_status(state):
if not state:
return "Please login to access teacher features"
user = state.get("user", {})
if user and user.get("role") in ["teacher", "admin"]:
return f"👋 Welcome, {user.get('username')}! You have access to teacher features."
else:
return "❌ You must be logged in as a teacher to access these features."
def update_admin_status(state):
if not state:
return "Please login to access admin features"
user = state.get("user", {})
if user and user.get("role") == "admin":
return f"👋 Welcome, Administrator {user.get('username')}!"
else:
return "❌ You must be logged in as an admin to access these features."
def update_teacher_tabs_visibility(state):
user = state.get("user", {})
return gr.update(visible=(user and user.get("role") in ["teacher", "admin"]))
def update_admin_tabs_visibility(state):
user = state.get("user", {})
return gr.update(visible=(user and user.get("role") == "admin"))
def update_geo_settings_visibility(geo_verify):
return gr.update(visible=geo_verify)
session_state.change(
fn=update_student_status,
inputs=session_state,
outputs=student_status
)
session_state.change(
fn=update_teacher_status,
inputs=session_state,
outputs=teacher_status
)
session_state.change(
fn=update_admin_status,
inputs=session_state,
outputs=admin_status
)
session_state.change(
fn=update_teacher_tabs_visibility,
inputs=session_state,
outputs=teacher_tabs
)
session_state.change(
fn=update_admin_tabs_visibility,
inputs=session_state,
outputs=admin_tabs
)
geo_verify.change(
fn=update_geo_settings_visibility,
inputs=geo_verify,
outputs=geo_settings
)
# Change theme function
def change_theme(choice):
return gr.update(theme=choice.lower())
theme_toggle.change(
fn=change_theme,
inputs=theme_toggle,
outputs=app
)
# Launch the app
if __name__ == "__main__":
# Create the log directory if it doesn't exist
os.makedirs("logs", exist_ok=True)
# Start the app
app.launch()