Spaces:
Runtime error
Runtime error
| import gradio as gr | |
| import sqlite3 | |
| import datetime | |
| import hashlib | |
| import os | |
| import threading | |
| import time | |
| import secrets | |
| import re | |
| import json | |
| import logging | |
| from typing import Dict, List, Optional, Tuple, Union | |
| from pathlib import Path | |
| # Configure logging | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', | |
| handlers=[ | |
| logging.FileHandler("attendance_system.log"), | |
| logging.StreamHandler() | |
| ] | |
| ) | |
| logger = logging.getLogger("attendance_system") | |
| # Constants and Configuration | |
| DB_PATH = os.getenv("DB_PATH", "attendance.db") | |
| SESSION_EXPIRY = 3600 # Session token expires after 1 hour | |
| ATTENDANCE_WINDOW = 300 # Default attendance window: 5 minutes | |
| DEFAULT_ROLES = ["admin", "teacher", "student"] | |
| SALT_LENGTH = 16 # For password security | |
| # ========== DATABASE LAYER ========== | |
| class Database: | |
| """Database management layer with connection pooling""" | |
| def __init__(self, db_path: str): | |
| self.db_path = db_path | |
| self._ensure_directory_exists() | |
| self.init_db() | |
| def _ensure_directory_exists(self): | |
| """Ensure the directory for the database exists""" | |
| directory = os.path.dirname(self.db_path) | |
| if directory and not os.path.exists(directory): | |
| os.makedirs(directory) | |
| def get_connection(self): | |
| """Get a database connection""" | |
| return sqlite3.connect(self.db_path) | |
| def init_db(self): | |
| """Initialize the database schema""" | |
| with self.get_connection() as conn: | |
| c = conn.cursor() | |
| # Users table with improved fields | |
| c.execute('''CREATE TABLE IF NOT EXISTS users ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| username TEXT UNIQUE NOT NULL, | |
| password_hash TEXT NOT NULL, | |
| salt TEXT NOT NULL, | |
| role TEXT NOT NULL, | |
| email TEXT UNIQUE, | |
| full_name TEXT, | |
| created_at TEXT NOT NULL, | |
| last_login TEXT, | |
| active INTEGER DEFAULT 1 | |
| )''') | |
| # Sessions table for better auth management | |
| c.execute('''CREATE TABLE IF NOT EXISTS sessions ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| user_id INTEGER NOT NULL, | |
| token TEXT UNIQUE NOT NULL, | |
| device_id TEXT, | |
| created_at TEXT NOT NULL, | |
| expires_at TEXT NOT NULL, | |
| FOREIGN KEY (user_id) REFERENCES users(id) | |
| )''') | |
| # Enhanced attendance table | |
| c.execute('''CREATE TABLE IF NOT EXISTS attendance ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| user_id INTEGER NOT NULL, | |
| date TEXT NOT NULL, | |
| time_in TEXT NOT NULL, | |
| time_out TEXT, | |
| session_id TEXT NOT NULL, | |
| status TEXT DEFAULT 'Present', | |
| latitude TEXT, | |
| longitude TEXT, | |
| device_info TEXT, | |
| notes TEXT, | |
| FOREIGN KEY (user_id) REFERENCES users(id), | |
| UNIQUE(user_id, date, session_id) | |
| )''') | |
| # Sessions/classes table | |
| c.execute('''CREATE TABLE IF NOT EXISTS class_sessions ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| name TEXT NOT NULL, | |
| description TEXT, | |
| created_by INTEGER NOT NULL, | |
| start_time TEXT NOT NULL, | |
| end_time TEXT, | |
| attendance_window INTEGER DEFAULT 300, | |
| code TEXT UNIQUE, | |
| active INTEGER DEFAULT 1, | |
| FOREIGN KEY (created_by) REFERENCES users(id) | |
| )''') | |
| # Settings table | |
| c.execute('''CREATE TABLE IF NOT EXISTS settings ( | |
| id INTEGER PRIMARY KEY, | |
| site_name TEXT DEFAULT 'Attendance System', | |
| attendance_mode TEXT DEFAULT 'manual', | |
| auto_close_window INTEGER DEFAULT 1, | |
| default_window_time INTEGER DEFAULT 300, | |
| geo_verification INTEGER DEFAULT 0, | |
| allowed_radius INTEGER DEFAULT 100, | |
| site_latitude TEXT, | |
| site_longitude TEXT, | |
| require_device_verification INTEGER DEFAULT 0, | |
| theme TEXT DEFAULT 'light' | |
| )''') | |
| # Create admin user if not exists | |
| c.execute("SELECT COUNT(*) FROM users WHERE role='admin'") | |
| if c.fetchone()[0] == 0: | |
| salt = secrets.token_hex(SALT_LENGTH) | |
| password_hash = self._hash_password("admin", salt) | |
| now = datetime.datetime.now().isoformat() | |
| c.execute( | |
| "INSERT INTO users (username, password_hash, salt, role, created_at) VALUES (?, ?, ?, ?, ?)", | |
| ("admin", password_hash, salt, "admin", now) | |
| ) | |
| # Create default settings if not exists | |
| c.execute("SELECT COUNT(*) FROM settings") | |
| if c.fetchone()[0] == 0: | |
| c.execute("INSERT INTO settings (id) VALUES (1)") | |
| conn.commit() | |
| def _hash_password(self, password: str, salt: str) -> str: | |
| """Hash the password with the given salt""" | |
| return hashlib.pbkdf2_hmac( | |
| 'sha256', | |
| password.encode(), | |
| salt.encode(), | |
| 100000 # 100,000 iterations for security | |
| ).hex() | |
| # ========== AUTHENTICATION & USER MANAGEMENT ========== | |
| class UserManager: | |
| """Handles user authentication and management""" | |
| def __init__(self, db: Database): | |
| self.db = db | |
| def register_user(self, username: str, password: str, role: str, email: str = None, full_name: str = None) -> Tuple[bool, str]: | |
| """Register a new user""" | |
| # Validate inputs | |
| if not username or not password or not role: | |
| return False, "All required fields must be provided." | |
| if role not in DEFAULT_ROLES: | |
| return False, f"Invalid role. Must be one of: {', '.join(DEFAULT_ROLES)}" | |
| if not self._validate_password_strength(password): | |
| return False, "Password is too weak. Must be at least 8 characters with numbers and letters." | |
| if email and not self._validate_email(email): | |
| return False, "Invalid email format." | |
| # Create the user | |
| try: | |
| with self.db.get_connection() as conn: | |
| c = conn.cursor() | |
| salt = secrets.token_hex(SALT_LENGTH) | |
| password_hash = self.db._hash_password(password, salt) | |
| now = datetime.datetime.now().isoformat() | |
| c.execute( | |
| "INSERT INTO users (username, password_hash, salt, role, email, full_name, created_at) VALUES (?, ?, ?, ?, ?, ?, ?)", | |
| (username, password_hash, salt, role, email, full_name, now) | |
| ) | |
| conn.commit() | |
| logger.info(f"User registered: {username} with role {role}") | |
| return True, "User registered successfully" | |
| except sqlite3.IntegrityError: | |
| return False, "Username or email already exists." | |
| except Exception as e: | |
| logger.error(f"Error registering user: {str(e)}") | |
| return False, f"Registration error: {str(e)}" | |
| def login(self, username: str, password: str, device_id: str = None) -> Tuple[bool, Union[str, Dict]]: | |
| """Authenticate a user and create a session""" | |
| try: | |
| with self.db.get_connection() as conn: | |
| c = conn.cursor() | |
| # Get user by username | |
| c.execute("SELECT id, username, password_hash, salt, role FROM users WHERE username=? AND active=1", (username,)) | |
| user = c.fetchone() | |
| if not user: | |
| logger.warning(f"Login attempt for non-existent user: {username}") | |
| return False, "Invalid username or password." | |
| # Verify password | |
| user_id, username, password_hash, salt, role = user | |
| if password_hash != self.db._hash_password(password, salt): | |
| logger.warning(f"Failed login attempt for user: {username}") | |
| return False, "Invalid username or password." | |
| # Create session token | |
| token = secrets.token_urlsafe(32) | |
| now = datetime.datetime.now() | |
| expires = now + datetime.timedelta(seconds=SESSION_EXPIRY) | |
| c.execute( | |
| "INSERT INTO sessions (user_id, token, device_id, created_at, expires_at) VALUES (?, ?, ?, ?, ?)", | |
| (user_id, token, device_id, now.isoformat(), expires.isoformat()) | |
| ) | |
| # Update last login | |
| c.execute("UPDATE users SET last_login=? WHERE id=?", (now.isoformat(), user_id)) | |
| conn.commit() | |
| logger.info(f"User logged in: {username}") | |
| return True, { | |
| "token": token, | |
| "user_id": user_id, | |
| "username": username, | |
| "role": role, | |
| "expires_at": expires.isoformat() | |
| } | |
| except Exception as e: | |
| logger.error(f"Login error: {str(e)}") | |
| return False, f"Login error: {str(e)}" | |
| def verify_session(self, token: str, device_id: str = None) -> Tuple[bool, Union[str, Dict]]: | |
| """Verify a session token""" | |
| try: | |
| with self.db.get_connection() as conn: | |
| c = conn.cursor() | |
| now = datetime.datetime.now().isoformat() | |
| query = """ | |
| SELECT s.id, s.user_id, u.username, u.role, s.device_id | |
| FROM sessions s | |
| JOIN users u ON s.user_id = u.id | |
| WHERE s.token = ? AND s.expires_at > ? AND u.active = 1 | |
| """ | |
| c.execute(query, (token, now)) | |
| session = c.fetchone() | |
| if not session: | |
| return False, "Session expired or invalid." | |
| session_id, user_id, username, role, stored_device_id = session | |
| # Device verification if required | |
| if device_id and stored_device_id and device_id != stored_device_id: | |
| logger.warning(f"Device mismatch for user {username}: {device_id} vs {stored_device_id}") | |
| return False, "Device verification failed." | |
| return True, { | |
| "user_id": user_id, | |
| "username": username, | |
| "role": role | |
| } | |
| except Exception as e: | |
| logger.error(f"Session verification error: {str(e)}") | |
| return False, f"Session error: {str(e)}" | |
| def logout(self, token: str) -> Tuple[bool, str]: | |
| """Invalidate a session token""" | |
| try: | |
| with self.db.get_connection() as conn: | |
| c = conn.cursor() | |
| c.execute("DELETE FROM sessions WHERE token = ?", (token,)) | |
| conn.commit() | |
| return True, "Logged out successfully." | |
| except Exception as e: | |
| logger.error(f"Logout error: {str(e)}") | |
| return False, f"Logout error: {str(e)}" | |
| def _validate_password_strength(self, password: str) -> bool: | |
| """Validate password strength""" | |
| if len(password) < 8: | |
| return False | |
| if not re.search(r'\d', password): # At least one digit | |
| return False | |
| if not re.search(r'[a-zA-Z]', password): # At least one letter | |
| return False | |
| return True | |
| def _validate_email(self, email: str) -> bool: | |
| """Validate email format""" | |
| pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$' | |
| return bool(re.match(pattern, email)) | |
| def get_users(self, role: str = None) -> List[Dict]: | |
| """Get list of users, optionally filtered by role""" | |
| try: | |
| with self.db.get_connection() as conn: | |
| c = conn.cursor() | |
| query = "SELECT id, username, role, email, full_name, active FROM users" | |
| params = () | |
| if role: | |
| query += " WHERE role = ?" | |
| params = (role,) | |
| c.execute(query, params) | |
| users = c.fetchall() | |
| return [ | |
| { | |
| "id": user[0], | |
| "username": user[1], | |
| "role": user[2], | |
| "email": user[3], | |
| "full_name": user[4], | |
| "active": bool(user[5]) | |
| } | |
| for user in users | |
| ] | |
| except Exception as e: | |
| logger.error(f"Error getting users: {str(e)}") | |
| return [] | |
| # ========== ATTENDANCE MANAGEMENT ========== | |
| class AttendanceManager: | |
| """Handles attendance-related operations""" | |
| def __init__(self, db: Database): | |
| self.db = db | |
| def create_session(self, name: str, created_by: int, description: str = None, | |
| window_time: int = None) -> Tuple[bool, Union[str, Dict]]: | |
| """Create a new attendance session/class""" | |
| try: | |
| with self.db.get_connection() as conn: | |
| c = conn.cursor() | |
| now = datetime.datetime.now() | |
| # Get default window time from settings if not provided | |
| if window_time is None: | |
| c.execute("SELECT default_window_time FROM settings WHERE id=1") | |
| window_time = c.fetchone()[0] | |
| # Generate unique code for the session | |
| code = secrets.token_hex(3).upper() | |
| c.execute( | |
| """INSERT INTO class_sessions | |
| (name, description, created_by, start_time, attendance_window, code, active) | |
| VALUES (?, ?, ?, ?, ?, ?, 1)""", | |
| (name, description, created_by, now.isoformat(), window_time, code) | |
| ) | |
| session_id = c.lastrowid | |
| conn.commit() | |
| # Auto-close thread if needed | |
| c.execute("SELECT auto_close_window FROM settings WHERE id=1") | |
| auto_close = c.fetchone()[0] | |
| if auto_close: | |
| threading.Thread( | |
| target=self._auto_close_session, | |
| args=(session_id, window_time), | |
| daemon=True | |
| ).start() | |
| logger.info(f"Created session: {name} with code {code}") | |
| return True, { | |
| "session_id": session_id, | |
| "code": code, | |
| "name": name, | |
| "start_time": now.isoformat(), | |
| "window_time": window_time | |
| } | |
| except Exception as e: | |
| logger.error(f"Error creating session: {str(e)}") | |
| return False, f"Error creating session: {str(e)}" | |
| def _auto_close_session(self, session_id: int, window_time: int): | |
| """Automatically close a session after window_time expires""" | |
| time.sleep(window_time) | |
| try: | |
| with self.db.get_connection() as conn: | |
| c = conn.cursor() | |
| now = datetime.datetime.now().isoformat() | |
| c.execute( | |
| "UPDATE class_sessions SET active=0, end_time=? WHERE id=?", | |
| (now, session_id) | |
| ) | |
| conn.commit() | |
| logger.info(f"Auto-closed session ID: {session_id}") | |
| except Exception as e: | |
| logger.error(f"Error auto-closing session {session_id}: {str(e)}") | |
| def close_session(self, session_id: int) -> Tuple[bool, str]: | |
| """Manually close an attendance session""" | |
| try: | |
| with self.db.get_connection() as conn: | |
| c = conn.cursor() | |
| now = datetime.datetime.now().isoformat() | |
| c.execute( | |
| "UPDATE class_sessions SET active=0, end_time=? WHERE id=?", | |
| (now, session_id) | |
| ) | |
| conn.commit() | |
| logger.info(f"Manually closed session ID: {session_id}") | |
| return True, "Session closed successfully." | |
| except Exception as e: | |
| logger.error(f"Error closing session: {str(e)}") | |
| return False, f"Error closing session: {str(e)}" | |
| def mark_attendance(self, user_id: int, session_code: str, | |
| device_info: str = None, geo_data: Dict = None) -> Tuple[bool, str]: | |
| """Mark attendance for a user""" | |
| try: | |
| with self.db.get_connection() as conn: | |
| c = conn.cursor() | |
| now = datetime.datetime.now() | |
| today = now.date().isoformat() | |
| # Check if session exists and is active | |
| c.execute("SELECT id, active FROM class_sessions WHERE code=?", (session_code,)) | |
| session = c.fetchone() | |
| if not session: | |
| return False, "Invalid session code." | |
| session_id, is_active = session | |
| if not is_active: | |
| return False, "This session is closed. Attendance cannot be marked." | |
| # Check if attendance already marked | |
| c.execute( | |
| "SELECT id FROM attendance WHERE user_id=? AND date=? AND session_id=?", | |
| (user_id, today, session_id) | |
| ) | |
| if c.fetchone(): | |
| return False, "You have already marked attendance for this session today." | |
| # Prepare geo data | |
| latitude = longitude = None | |
| if geo_data: | |
| latitude = geo_data.get('latitude') | |
| longitude = geo_data.get('longitude') | |
| # Verify geo location if needed | |
| c.execute("SELECT geo_verification, site_latitude, site_longitude, allowed_radius FROM settings WHERE id=1") | |
| geo_settings = c.fetchone() | |
| geo_verification, site_lat, site_lon, allowed_radius = geo_settings | |
| if geo_verification and site_lat and site_lon: | |
| if not self._verify_location(latitude, longitude, float(site_lat), float(site_lon), allowed_radius): | |
| return False, "You are outside the allowed area for attendance." | |
| # Insert attendance record | |
| c.execute( | |
| """INSERT INTO attendance | |
| (user_id, date, time_in, session_id, device_info, latitude, longitude) | |
| VALUES (?, ?, ?, ?, ?, ?, ?)""", | |
| (user_id, today, now.isoformat(), session_id, | |
| json.dumps(device_info) if device_info else None, | |
| str(latitude) if latitude else None, | |
| str(longitude) if longitude else None) | |
| ) | |
| conn.commit() | |
| logger.info(f"Attendance marked for user {user_id} in session {session_code}") | |
| return True, f"Attendance marked successfully at {now.strftime('%H:%M:%S')}" | |
| except sqlite3.IntegrityError: | |
| return False, "You have already marked attendance for this session." | |
| except Exception as e: | |
| logger.error(f"Error marking attendance: {str(e)}") | |
| return False, f"Error marking attendance: {str(e)}" | |
| def _verify_location(self, user_lat: float, user_lon: float, | |
| site_lat: float, site_lon: float, max_distance: float) -> bool: | |
| """Verify if user is within allowed distance from site""" | |
| # Simple Euclidean distance for demo purposes | |
| # In production, use proper geospatial calculations | |
| if not user_lat or not user_lon: | |
| return False | |
| # Approximate conversion from lat/long to meters | |
| # This is a rough approximation - works only for short distances | |
| lat_diff = abs(user_lat - site_lat) * 111000 # 1 degree lat ≈ 111km | |
| lon_diff = abs(user_lon - site_lon) * 111000 * abs(math.cos(math.radians(site_lat))) | |
| distance = math.sqrt(lat_diff**2 + lon_diff**2) | |
| return distance <= max_distance | |
| def get_attendance_report(self, filters: Dict = None) -> List[Dict]: | |
| """Get attendance report with optional filters""" | |
| try: | |
| with self.db.get_connection() as conn: | |
| c = conn.cursor() | |
| # Build query based on filters | |
| query = """ | |
| SELECT a.id, u.username, u.full_name, a.date, a.time_in, a.time_out, | |
| cs.name as session_name, cs.code as session_code, a.status | |
| FROM attendance a | |
| JOIN users u ON a.user_id = u.id | |
| JOIN class_sessions cs ON a.session_id = cs.id | |
| """ | |
| conditions = [] | |
| params = [] | |
| if filters: | |
| if 'user_id' in filters: | |
| conditions.append("a.user_id = ?") | |
| params.append(filters['user_id']) | |
| if 'date' in filters: | |
| conditions.append("a.date = ?") | |
| params.append(filters['date']) | |
| if 'session_id' in filters: | |
| conditions.append("a.session_id = ?") | |
| params.append(filters['session_id']) | |
| if 'status' in filters: | |
| conditions.append("a.status = ?") | |
| params.append(filters['status']) | |
| if 'date_range' in filters: | |
| start, end = filters['date_range'] | |
| conditions.append("a.date BETWEEN ? AND ?") | |
| params.extend([start, end]) | |
| if conditions: | |
| query += " WHERE " + " AND ".join(conditions) | |
| query += " ORDER BY a.date DESC, a.time_in DESC" | |
| c.execute(query, params) | |
| records = c.fetchall() | |
| return [ | |
| { | |
| "id": r[0], | |
| "username": r[1], | |
| "full_name": r[2], | |
| "date": r[3], | |
| "time_in": r[4], | |
| "time_out": r[5], | |
| "session_name": r[6], | |
| "session_code": r[7], | |
| "status": r[8] | |
| } | |
| for r in records | |
| ] | |
| except Exception as e: | |
| logger.error(f"Error generating attendance report: {str(e)}") | |
| return [] | |
| def update_attendance_status(self, attendance_id: int, status: str) -> Tuple[bool, str]: | |
| """Update the status of an attendance record""" | |
| valid_statuses = ["Present", "Absent", "Late", "Excused"] | |
| if status not in valid_statuses: | |
| return False, f"Invalid status. Must be one of: {', '.join(valid_statuses)}" | |
| try: | |
| with self.db.get_connection() as conn: | |
| c = conn.cursor() | |
| c.execute("UPDATE attendance SET status=? WHERE id=?", (status, attendance_id)) | |
| conn.commit() | |
| logger.info(f"Updated attendance {attendance_id} status to {status}") | |
| return True, f"Status updated to {status}" | |
| except Exception as e: | |
| logger.error(f"Error updating attendance status: {str(e)}") | |
| return False, f"Error updating status: {str(e)}" | |
| def get_active_sessions(self) -> List[Dict]: | |
| """Get all active attendance sessions""" | |
| try: | |
| with self.db.get_connection() as conn: | |
| c = conn.cursor() | |
| c.execute(""" | |
| SELECT cs.id, cs.name, cs.description, cs.code, cs.start_time, | |
| u.username as creator | |
| FROM class_sessions cs | |
| JOIN users u ON cs.created_by = u.id | |
| WHERE cs.active = 1 | |
| ORDER BY cs.start_time DESC | |
| """) | |
| sessions = c.fetchall() | |
| return [ | |
| { | |
| "id": s[0], | |
| "name": s[1], | |
| "description": s[2], | |
| "code": s[3], | |
| "start_time": s[4], | |
| "creator": s[5] | |
| } | |
| for s in sessions | |
| ] | |
| except Exception as e: | |
| logger.error(f"Error getting active sessions: {str(e)}") | |
| return [] | |
| def get_sessions_history(self, limit: int = 50) -> List[Dict]: | |
| """Get history of attendance sessions""" | |
| try: | |
| with self.db.get_connection() as conn: | |
| c = conn.cursor() | |
| c.execute(""" | |
| SELECT cs.id, cs.name, cs.code, cs.start_time, cs.end_time, | |
| (SELECT COUNT(*) FROM attendance WHERE session_id = cs.id) as count | |
| FROM class_sessions cs | |
| ORDER BY cs.start_time DESC | |
| LIMIT ? | |
| """, (limit,)) | |
| sessions = c.fetchall() | |
| return [ | |
| { | |
| "id": s[0], | |
| "name": s[1], | |
| "code": s[2], | |
| "start_time": s[3], | |
| "end_time": s[4], | |
| "attendance_count": s[5] | |
| } | |
| for s in sessions | |
| ] | |
| except Exception as e: | |
| logger.error(f"Error getting sessions history: {str(e)}") | |
| return [] | |
| # ========== SETTINGS MANAGEMENT ========== | |
| class SettingsManager: | |
| """Manages system settings""" | |
| def __init__(self, db: Database): | |
| self.db = db | |
| def get_settings(self) -> Dict: | |
| """Get all system settings""" | |
| try: | |
| with self.db.get_connection() as conn: | |
| c = conn.cursor() | |
| c.execute("SELECT * FROM settings WHERE id=1") | |
| columns = [description[0] for description in c.description] | |
| values = c.fetchone() | |
| if not values: | |
| return {} | |
| return dict(zip(columns, values)) | |
| except Exception as e: | |
| logger.error(f"Error fetching settings: {str(e)}") | |
| return {} | |
| def update_settings(self, settings: Dict) -> Tuple[bool, str]: | |
| """Update system settings""" | |
| try: | |
| with self.db.get_connection() as conn: | |
| c = conn.cursor() | |
| allowed_keys = { | |
| 'site_name', 'attendance_mode', 'auto_close_window', | |
| 'default_window_time', 'geo_verification', 'allowed_radius', | |
| 'site_latitude', 'site_longitude', 'require_device_verification', | |
| 'theme' | |
| } | |
| # Validate keys | |
| invalid_keys = set(settings.keys()) - allowed_keys | |
| if invalid_keys: | |
| return False, f"Invalid settings: {', '.join(invalid_keys)}" | |
| # Build update query | |
| if not settings: | |
| return False, "No settings provided to update." | |
| query_parts = [] | |
| params = [] | |
| for key, value in settings.items(): | |
| query_parts.append(f"{key} = ?") | |
| params.append(value) | |
| query = "UPDATE settings SET " + ", ".join(query_parts) + " WHERE id=1" | |
| c.execute(query, params) | |
| conn.commit() | |
| logger.info(f"Updated settings: {', '.join(settings.keys())}") | |
| return True, "Settings updated successfully." | |
| except Exception as e: | |
| logger.error(f"Error updating settings: {str(e)}") | |
| return False, f"Error updating settings: {str(e)}" | |
| # ========== REPORTING ========== | |
| class ReportGenerator: | |
| """Generates various reports from attendance data""" | |
| def __init__(self, db: Database): | |
| self.db = db | |
| def summary_report(self, start_date: str, end_date: str) -> Dict: | |
| """Generate summary attendance report for a date range""" | |
| try: | |
| with self.db.get_connection() as conn: | |
| c = conn.cursor() | |
| # Total attendance counts | |
| c.execute(""" | |
| SELECT COUNT(*) as total_records, | |
| COUNT(DISTINCT user_id) as unique_students, | |
| COUNT(DISTINCT session_id) as unique_sessions | |
| FROM attendance | |
| WHERE date BETWEEN ? AND ? | |
| """, (start_date, end_date)) | |
| totals = c.fetchone() | |
| # Status breakdown | |
| c.execute(""" | |
| SELECT status, COUNT(*) as count | |
| FROM attendance | |
| WHERE date BETWEEN ? AND ? | |
| GROUP BY status | |
| """, (start_date, end_date)) | |
| status_counts = {status: count for status, count in c.fetchall()} | |
| # Session attendance rates | |
| c.execute(""" | |
| SELECT cs.name, cs.code, COUNT(*) as attendance_count | |
| FROM attendance a | |
| JOIN class_sessions cs ON a.session_id = cs.id | |
| WHERE a.date BETWEEN ? AND ? | |
| GROUP BY a.session_id | |
| ORDER BY attendance_count DESC | |
| """, (start_date, end_date)) | |
| session_stats = [ | |
| {"name": name, "code": code, "count": count} | |
| for name, code, count in c.fetchall() | |
| ] | |
| # Student attendance frequency | |
| c.execute(""" | |
| SELECT u.username, u.full_name, COUNT(*) as attendance_count | |
| FROM attendance a | |
| JOIN users u ON a.user_id = u.id | |
| WHERE a.date BETWEEN ? AND ? | |
| GROUP BY a.user_id | |
| ORDER BY attendance_count DESC | |
| """, (start_date, end_date)) | |
| student_stats = [ | |
| {"username": username, "full_name": full_name, "count": count} | |
| for username, full_name, count in c.fetchall() | |
| ] | |
| return { | |
| "date_range": {"start": start_date, "end": end_date}, | |
| "totals": { | |
| "records": totals[0], | |
| "students": totals[1], | |
| "sessions": totals[2] | |
| }, | |
| "status_breakdown": status_counts, | |
| "top_sessions": session_stats[:10], | |
| "top_students": student_stats[:10] | |
| } | |
| except Exception as e: | |
| logger.error(f"Error generating summary report: {str(e)}") | |
| return { | |
| "error": str(e), | |
| "date_range": {"start": start_date, "end": end_date}, | |
| "totals": {"records": 0, "students": 0, "sessions": 0}, | |
| "status_breakdown": {}, | |
| "top_sessions": [], | |
| "top_students": [] | |
| } | |
| def student_detail_report(self, user_id: int, start_date: str = None, end_date: str = None) -> Dict: | |
| """Generate detailed report for a specific student""" | |
| try: | |
| with self.db.get_connection() as conn: | |
| c = conn.cursor() | |
| # Get student info | |
| c.execute("SELECT username, full_name, email FROM users WHERE id=?", (user_id,)) | |
| student = c.fetchone() | |
| if not student: | |
| return {"error": "Student not found"} | |
| username, full_name, email = student | |
| # Build query for attendance records | |
| query = """ | |
| SELECT a.date, a.time_in, cs.name as session_name, cs.code, a.status | |
| FROM attendance a | |
| JOIN class_sessions cs ON a.session_id = cs.id | |
| WHERE a.user_id = ? | |
| """ | |
| params = [user_id] | |
| if start_date: | |
| query += " AND a.date >= ?" | |
| params.append(start_date) | |
| if end_date: | |
| query += " AND a.date <= ?" | |
| params.append(end_date) | |
| query += " ORDER BY a.date DESC, a.time_in DESC" | |
| c.execute(query, params) | |
| attendance_records = [ | |
| { | |
| "date": date, | |
| "time": time_in, | |
| "session": session_name, | |
| "code": code, | |
| "status": status | |
| } | |
| for date, time_in, session_name, code, status in c.fetchall() | |
| ] | |
| # Calculate statistics | |
| attendance_count = len(attendance_records) | |
| status_counts = {} | |
| for record in attendance_records: | |
| status = record["status"] | |
| status_counts[status] = status_counts.get(status, 0) + 1 | |
| return { | |
| "student": { | |
| "id": user_id, | |
| "username": username, | |
| "full_name": full_name, | |
| "email": email | |
| }, | |
| "attendance": { | |
| "total": attendance_count, | |
| "status_breakdown": status_counts, | |
| "records": attendance_records | |
| } | |
| } | |
| except Exception as e: | |
| logger.error(f"Error generating student report: {str(e)}") | |
| return {"error": str(e)} | |
| def export_csv(self, data: List[Dict], filename: str) -> str: | |
| """Export report data to CSV file""" | |
| try: | |
| if not data: | |
| return "No data to export" | |
| # Create directory if it doesn't exist | |
| os.makedirs("exports", exist_ok=True) | |
| filepath = os.path.join("exports", filename) | |
| # Write to CSV | |
| with open(filepath, 'w', newline='') as csvfile: | |
| if isinstance(data[0], dict): | |
| fieldnames = data[0].keys() | |
| writer = csv.DictWriter(csvfile, fieldnames=fieldnames) | |
| writer.writeheader() | |
| writer.writerows(data) | |
| else: | |
| writer = csv.writer(csvfile) | |
| writer.writerows(data) | |
| return filepath | |
| except Exception as e: | |
| logger.error(f"Error exporting to CSV: {str(e)}") | |
| return f"Error: {str(e)}" | |
| # ========== GRADIO UI ========== | |
| # Initialize system components | |
| db = Database(DB_PATH) | |
| user_manager = UserManager(db) | |
| attendance_manager = AttendanceManager(db) | |
| settings_manager = SettingsManager(db) | |
| report_generator = ReportGenerator(db) | |
| # UI Helper functions | |
| def format_message(success: bool, message: str) -> str: | |
| """Format a message with appropriate emoji""" | |
| return f"✅ {message}" if success else f"❌ {message}" | |
| def format_time(time_str: str) -> str: | |
| """Format time string for display""" | |
| if not time_str: | |
| return "" | |
| try: | |
| dt = datetime.datetime.fromisoformat(time_str) | |
| return dt.strftime("%Y-%m-%d %H:%M:%S") | |
| except: | |
| return time_str | |
| # Main Gradio UI | |
| with gr.Blocks(title="Advanced Attendance System", theme="default") as app: | |
| # Store session state | |
| session_state = gr.State({}) | |
| # Header and theme | |
| with gr.Row(): | |
| gr.Markdown("# 📝 Advanced Attendance System") | |
| theme_toggle = gr.Radio( | |
| ["Light", "Dark"], | |
| label="Theme", | |
| value="Light", | |
| interactive=True | |
| ) | |
| # Login/Register tabs | |
| with gr.Tab("👤 Authentication"): | |
| with gr.Row(): | |
| with gr.Column(): | |
| gr.Markdown("### 🔑 Login") | |
| login_username = gr.Text(label="Username") | |
| login_password = gr.Text(label="Password", type="password") | |
| login_device = gr.Text(label="Device ID (Optional)", placeholder="e.g., device-uuid") | |
| login_button = gr.Button("Login", variant="primary") | |
| login_output = gr.Markdown() | |
| with gr.Column(): | |
| gr.Markdown("### 📝 Register") | |
| reg_username = gr.Text(label="Username") | |
| reg_password = gr.Text(label="Password", type="password") | |
| reg_confirm = gr.Text(label="Confirm Password", type="password") | |
| reg_email = gr.Text(label="Email (Optional)") | |
| reg_fullname = gr.Text(label="Full Name (Optional)") | |
| reg_role = gr.Radio(["student", "teacher"], label="Role", value="student") | |
| reg_button = gr.Button("Register") | |
| reg_output = gr.Markdown() | |
| # Student tab | |
| with gr.Tab("👨🎓 Student"): | |
| gr.Markdown("### 📊 Mark Attendance") | |
| with gr.Row(): | |
| student_code = gr.Text(label="Session Code", placeholder="Enter the code provided by your teacher") | |
| student_mark_btn = gr.Button("Mark Attendance", variant="primary") | |
| with gr.Row(): | |
| student_status = gr.Markdown("Please login to access student features") | |
| with gr.Accordion("My Attendance History", open=False): | |
| student_refresh_btn = gr.Button("Refresh History") | |
| student_history = gr.DataFrame(label="My Attendance Records") | |
| # Teacher tab | |
| with gr.Tab("👩🏫 Teacher") as teacher_tab: | |
| with gr.Row(): | |
| teacher_status = gr.Markdown("Please login as a teacher to access these features") | |
| with gr.Tabs() as teacher_tabs: | |
| with gr.Tab("Create Session"): | |
| with gr.Row(): | |
| session_name = gr.Text(label="Session Name", placeholder="e.g., Math 101 - Week 3") | |
| session_desc = gr.Text(label="Description (Optional)", placeholder="Brief description of this session") | |
| with gr.Row(): | |
| session_window = gr.Slider(label="Attendance Window (seconds)", | |
| minimum=60, maximum=1800, value=300, step=60) | |
| create_session_btn = gr.Button("Create New Session", variant="primary") | |
| create_output = gr.Markdown() | |
| with gr.Accordion("Active Sessions", open=True): | |
| active_refresh = gr.Button("Refresh Active Sessions") | |
| active_sessions = gr.DataFrame(label="Currently Active Sessions") | |
| close_session_id = gr.Number(label="Session ID to Close", precision=0) | |
| close_btn = gr.Button("Close Selected Session") | |
| close_output = gr.Markdown() | |
| with gr.Tab("Attendance Reports"): | |
| with gr.Row(): | |
| report_date_start = gr.Date(label="Start Date") | |
| report_date_end = gr.Date(label="End Date") | |
| report_type = gr.Radio( | |
| ["Summary", "Detailed", "By Student", "By Session"], | |
| label="Report Type", | |
| value="Summary" | |
| ) | |
| with gr.Row(): | |
| report_filter = gr.Dropdown(label="Additional Filter (Optional)") | |
| report_gen_btn = gr.Button("Generate Report", variant="primary") | |
| report_output = gr.Markdown() | |
| report_data = gr.DataFrame(label="Report Data") | |
| export_btn = gr.Button("Export to CSV") | |
| export_output = gr.Markdown() | |
| # Admin tab | |
| with gr.Tab("⚙️ Admin"): | |
| admin_status = gr.Markdown("Please login as admin to access these features") | |
| with gr.Tabs() as admin_tabs: | |
| with gr.Tab("User Management"): | |
| with gr.Row(): | |
| user_filter = gr.Radio( | |
| ["All Users", "Students", "Teachers", "Admins"], | |
| label="Filter Users", | |
| value="All Users" | |
| ) | |
| user_refresh = gr.Button("Refresh User List") | |
| users_table = gr.DataFrame(label="Users") | |
| with gr.Row(): | |
| user_action = gr.Radio( | |
| ["Activate", "Deactivate", "Change Role", "Reset Password"], | |
| label="Action" | |
| ) | |
| user_id = gr.Number(label="User ID", precision=0) | |
| user_param = gr.Text(label="New Value (for role/password)") | |
| user_action_btn = gr.Button("Apply Action") | |
| user_action_output = gr.Markdown() | |
| with gr.Tab("System Settings"): | |
| with gr.Row(): | |
| settings_refresh = gr.Button("Load Current Settings") | |
| with gr.Group(): | |
| gr.Markdown("### General Settings") | |
| site_name = gr.Text(label="Site Name") | |
| theme_setting = gr.Radio(["light", "dark"], label="Default Theme") | |
| with gr.Group(): | |
| gr.Markdown("### Attendance Settings") | |
| attendance_mode = gr.Radio( | |
| ["manual", "auto", "scheduled"], | |
| label="Attendance Mode" | |
| ) | |
| auto_close = gr.Checkbox(label="Auto-close attendance windows") | |
| window_time = gr.Slider( | |
| label="Default Window Time (seconds)", | |
| minimum=60, maximum=3600, value=300, step=60 | |
| ) | |
| with gr.Group(): | |
| gr.Markdown("### Security Settings") | |
| geo_verify = gr.Checkbox(label="Enable Geographic Verification") | |
| with gr.Row(visible=False) as geo_settings: | |
| site_lat = gr.Number(label="Site Latitude") | |
| site_lon = gr.Number(label="Site Longitude") | |
| geo_radius = gr.Slider( | |
| label="Allowed Radius (meters)", | |
| minimum=10, maximum=1000, value=100 | |
| ) | |
| device_verify = gr.Checkbox(label="Require Device Verification") | |
| settings_save = gr.Button("Save Settings", variant="primary") | |
| settings_output = gr.Markdown() | |
| # Login function | |
| def do_login(username, password, device_id): | |
| success, result = user_manager.login(username, password, device_id) | |
| if success: | |
| return { | |
| "content": f"✅ Welcome, {result['username']}! You are now logged in.", | |
| "user": result | |
| } | |
| else: | |
| return {"content": f"❌ {result}", "user": None} | |
| # Register function | |
| def do_register(username, password, confirm, email, fullname, role): | |
| if not username or not password: | |
| return "❌ Username and password are required." | |
| if password != confirm: | |
| return "❌ Passwords do not match." | |
| success, message = user_manager.register_user( | |
| username=username, | |
| password=password, | |
| role=role, | |
| email=email, | |
| full_name=fullname | |
| ) | |
| return format_message(success, message) | |
| # Mark attendance function | |
| def do_mark_attendance(code, state): | |
| if not state.get("user"): | |
| return "❌ Please login first." | |
| user = state["user"] | |
| success, message = attendance_manager.mark_attendance( | |
| user_id=user["user_id"], | |
| session_code=code, | |
| device_info={"device_id": state.get("device_id")} | |
| ) | |
| return format_message(success, message) | |
| # Get student history | |
| def get_student_history(state): | |
| if not state.get("user"): | |
| return [] | |
| user = state["user"] | |
| filters = {"user_id": user["user_id"]} | |
| records = attendance_manager.get_attendance_report(filters) | |
| return pd.DataFrame([ | |
| { | |
| "Date": r["date"], | |
| "Time": r["time_in"], | |
| "Session": r["session_name"], | |
| "Code": r["session_code"], | |
| "Status": r["status"] | |
| } for r in records | |
| ]) | |
| # Create session function | |
| def do_create_session(name, desc, window, state): | |
| if not state.get("user") or state["user"]["role"] not in ["teacher", "admin"]: | |
| return "❌ You must be logged in as a teacher or admin." | |
| if not name: | |
| return "❌ Session name is required." | |
| success, result = attendance_manager.create_session( | |
| name=name, | |
| description=desc, | |
| created_by=state["user"]["user_id"], | |
| window_time=int(window) | |
| ) | |
| if success: | |
| return f"✅ Session created successfully!\n\nSession code: **{result['code']}**\n\nShare this code with students to allow them to mark attendance." | |
| else: | |
| return f"❌ {result}" | |
| # Get active sessions | |
| def get_active_sessions(): | |
| sessions = attendance_manager.get_active_sessions() | |
| if not sessions: | |
| return [] | |
| return pd.DataFrame([ | |
| { | |
| "ID": s["id"], | |
| "Name": s["name"], | |
| "Description": s["description"] or "-", | |
| "Code": s["code"], | |
| "Started": format_time(s["start_time"]), | |
| "Creator": s["creator"] | |
| } for s in sessions | |
| ]) | |
| # Close session function | |
| def do_close_session(session_id, state): | |
| if not state.get("user") or state["user"]["role"] not in ["teacher", "admin"]: | |
| return "❌ You must be logged in as a teacher or admin." | |
| if not session_id: | |
| return "❌ Session ID is required." | |
| success, message = attendance_manager.close_session(int(session_id)) | |
| return format_message(success, message) | |
| # Generate report function | |
| def generate_report(start_date, end_date, report_type, filter_value, state): | |
| if not state.get("user") or state["user"]["role"] not in ["teacher", "admin"]: | |
| return "❌ You must be logged in as a teacher or admin.", None | |
| if not start_date or not end_date: | |
| return "❌ Please select both start and end dates.", None | |
| if start_date > end_date: | |
| return "❌ Start date must be before end date.", None | |
| if report_type == "Summary": | |
| report = report_generator.summary_report(start_date.isoformat(), end_date.isoformat()) | |
| # Format report summary | |
| summary = f"### Report Summary: {start_date} to {end_date}\n\n" | |
| summary += f"Total Records: {report['totals']['records']}\n" | |
| summary += f"Unique Students: {report['totals']['students']}\n" | |
| summary += f"Sessions: {report['totals']['sessions']}\n\n" | |
| # Prepare DataFrame | |
| session_data = [] | |
| for s in report["top_sessions"]: | |
| session_data.append({ | |
| "Session Name": s["name"], | |
| "Code": s["code"], | |
| "Attendance Count": s["count"] | |
| }) | |
| return summary, pd.DataFrame(session_data) | |
| elif report_type == "By Student": | |
| if not filter_value: | |
| return "❌ Please select a student.", None | |
| report = report_generator.student_detail_report( | |
| int(filter_value), | |
| start_date.isoformat(), | |
| end_date.isoformat() | |
| ) | |
| if "error" in report: | |
| return f"❌ {report['error']}", None | |
| summary = f"### Student Report: {report['student']['full_name'] or report['student']['username']}\n\n" | |
| summary += f"Total Attendance: {report['attendance']['total']} records\n\n" | |
| summary += "Status Breakdown:\n" | |
| for status, count in report['attendance']['status_breakdown'].items(): | |
| summary += f"- {status}: {count}\n" | |
| # Prepare DataFrame | |
| attendance_data = [] | |
| for r in report["attendance"]["records"]: | |
| attendance_data.append({ | |
| "Date": r["date"], | |
| "Time": r["time"], | |
| "Session": r["session"], | |
| "Code": r["code"], | |
| "Status": r["status"] | |
| }) | |
| return summary, pd.DataFrame(attendance_data) | |
| else: | |
| records = attendance_manager.get_attendance_report({ | |
| "date_range": (start_date.isoformat(), end_date.isoformat()) | |
| }) | |
| summary = f"### Detailed Attendance Report: {start_date} to {end_date}\n\n" | |
| summary += f"Total Records: {len(records)}\n" | |
| # Prepare DataFrame | |
| data = [] | |
| for r in records: | |
| data.append({ | |
| "Student": r["username"], | |
| "Full Name": r["full_name"] or "-", | |
| "Date": r["date"], | |
| "Time": r["time_in"], | |
| "Session": r["session_name"], | |
| "Status": r["status"] | |
| }) | |
| return summary, pd.DataFrame(data) | |
| # Export report | |
| def export_report_csv(report_data, state): | |
| if not state.get("user") or state["user"]["role"] not in ["teacher", "admin"]: | |
| return "❌ You must be logged in as a teacher or admin." | |
| if report_data is None or report_data.empty: | |
| return "❌ No data to export." | |
| timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") | |
| filename = f"attendance_report_{timestamp}.csv" | |
| # Convert DataFrame to dict list | |
| data = report_data.to_dict('records') | |
| filepath = report_generator.export_csv(data, filename) | |
| if filepath.startswith("Error"): | |
| return f"❌ {filepath}" | |
| else: | |
| return f"✅ Report exported successfully to {filepath}" | |
| # Get users list | |
| def get_users_list(filter_type, state): | |
| if not state.get("user") or state["user"]["role"] != "admin": | |
| return [] | |
| role_filter = None | |
| if filter_type == "Students": | |
| role_filter = "student" | |
| elif filter_type == "Teachers": | |
| role_filter = "teacher" | |
| elif filter_type == "Admins": | |
| role_filter = "admin" | |
| users = user_manager.get_users(role_filter) | |
| return pd.DataFrame([ | |
| { | |
| "ID": u["id"], | |
| "Username": u["username"], | |
| "Role": u["role"], | |
| "Email": u["email"] or "-", | |
| "Full Name": u["full_name"] or "-", | |
| "Active": "Yes" if u["active"] else "No" | |
| } for u in users | |
| ]) | |
| # Process user action | |
| def process_user_action(action, user_id, param, state): | |
| if not state.get("user") or state["user"]["role"] != "admin": | |
| return "❌ You must be an admin to perform this action." | |
| if not user_id: | |
| return "❌ Please select a user ID." | |
| # Connect and perform action | |
| try: | |
| with db.get_connection() as conn: | |
| c = conn.cursor() | |
| if action == "Activate": | |
| c.execute("UPDATE users SET active=1 WHERE id=?", (int(user_id),)) | |
| result = "User activated successfully." | |
| elif action == "Deactivate": | |
| c.execute("UPDATE users SET active=0 WHERE id=?", (int(user_id),)) | |
| result = "User deactivated successfully." | |
| elif action == "Change Role": | |
| if param not in DEFAULT_ROLES: | |
| return f"❌ Invalid role. Must be one of: {', '.join(DEFAULT_ROLES)}" | |
| c.execute("UPDATE users SET role=? WHERE id=?", (param, int(user_id))) | |
| result = "User role updated successfully." | |
| elif action == "Reset Password": | |
| if not param or len(param) < 8: | |
| return "❌ Password must be at least 8 characters." | |
| salt = secrets.token_hex(SALT_LENGTH) | |
| password_hash = db._hash_password(param, salt) | |
| c.execute("UPDATE users SET password_hash=?, salt=? WHERE id=?", | |
| (password_hash, salt, int(user_id))) | |
| result = "Password reset successfully." | |
| else: | |
| return "❌ Invalid action." | |
| conn.commit() | |
| return f"✅ {result}" | |
| except Exception as e: | |
| logger.error(f"Error in user action: {str(e)}") | |
| return f"❌ Error: {str(e)}" | |
| # Get current settings | |
| def get_current_settings(state): | |
| if not state.get("user") or state["user"]["role"] != "admin": | |
| return ("", "light", "manual", False, 300, False, 0, 0, 100, False) | |
| settings = settings_manager.get_settings() | |
| if not settings: | |
| return ("", "light", "manual", False, 300, False, 0, 0, 100, False) | |
| return ( | |
| settings.get("site_name", ""), | |
| settings.get("theme", "light"), | |
| settings.get("attendance_mode", "manual"), | |
| bool(settings.get("auto_close_window", False)), | |
| settings.get("default_window_time", 300), | |
| bool(settings.get("geo_verification", False)), | |
| float(settings.get("site_latitude", 0) or 0), | |
| float(settings.get("site_longitude", 0) or 0), | |
| settings.get("allowed_radius", 100), | |
| bool(settings.get("require_device_verification", False)) | |
| ) | |
| # Save settings | |
| def save_settings(site_name, theme, mode, auto_close, window_time, | |
| geo_verify, site_lat, site_lon, radius, device_verify, state): | |
| if not state.get("user") or state["user"]["role"] != "admin": | |
| return "❌ You must be an admin to change settings." | |
| updated_settings = { | |
| "site_name": site_name, | |
| "theme": theme, | |
| "attendance_mode": mode, | |
| "auto_close_window": 1 if auto_close else 0, | |
| "default_window_time": int(window_time), | |
| "geo_verification": 1 if geo_verify else 0, | |
| "require_device_verification": 1 if device_verify else 0 | |
| } | |
| if geo_verify: | |
| updated_settings.update({ | |
| "site_latitude": str(site_lat), | |
| "site_longitude": str(site_lon), | |
| "allowed_radius": int(radius) | |
| }) | |
| success, message = settings_manager.update_settings(updated_settings) | |
| return format_message(success, message) | |
| # Connect UI components to functions | |
| login_button.click( | |
| fn=do_login, | |
| inputs=[login_username, login_password, login_device], | |
| outputs=[login_output, session_state] | |
| ) | |
| reg_button.click( | |
| fn=do_register, | |
| inputs=[reg_username, reg_password, reg_confirm, reg_email, reg_fullname, reg_role], | |
| outputs=reg_output | |
| ) | |
| student_mark_btn.click( | |
| fn=do_mark_attendance, | |
| inputs=[student_code, session_state], | |
| outputs=student_status | |
| ) | |
| student_refresh_btn.click( | |
| fn=get_student_history, | |
| inputs=[session_state], | |
| outputs=student_history | |
| ) | |
| create_session_btn.click( | |
| fn=do_create_session, | |
| inputs=[session_name, session_desc, session_window, session_state], | |
| outputs=create_output | |
| ) | |
| active_refresh.click( | |
| fn=get_active_sessions, | |
| inputs=None, | |
| outputs=active_sessions | |
| ) | |
| close_btn.click( | |
| fn=do_close_session, | |
| inputs=[close_session_id, session_state], | |
| outputs=close_output | |
| ) | |
| report_gen_btn.click( | |
| fn=generate_report, | |
| inputs=[report_date_start, report_date_end, report_type, report_filter, session_state], | |
| outputs=[report_output, report_data] | |
| ) | |
| export_btn.click( | |
| fn=export_report_csv, | |
| inputs=[report_data, session_state], | |
| outputs=export_output | |
| ) | |
| user_refresh.click( | |
| fn=get_users_list, | |
| inputs=[user_filter, session_state], | |
| outputs=users_table | |
| ) | |
| user_action_btn.click( | |
| fn=process_user_action, | |
| inputs=[user_action, user_id, user_param, session_state], | |
| outputs=user_action_output | |
| ) | |
| settings_refresh.click( | |
| fn=get_current_settings, | |
| inputs=[session_state], | |
| outputs=[site_name, theme_setting, attendance_mode, auto_close, window_time, | |
| geo_verify, site_lat, site_lon, geo_radius, device_verify] | |
| ) | |
| settings_save.click( | |
| fn=save_settings, | |
| inputs=[site_name, theme_setting, attendance_mode, auto_close, window_time, | |
| geo_verify, site_lat, site_lon, geo_radius, device_verify, session_state], | |
| outputs=settings_output | |
| ) | |
| # Dynamic visibility and state update handlers | |
| def update_student_status(state): | |
| if not state: | |
| return "Please login to access student features" | |
| return f"Logged in as: {state.get('username', 'Unknown')}" | |
| def update_teacher_status(state): | |
| if not state: | |
| return "Please login to access teacher features" | |
| user = state.get("user", {}) | |
| if user and user.get("role") in ["teacher", "admin"]: | |
| return f"👋 Welcome, {user.get('username')}! You have access to teacher features." | |
| else: | |
| return "❌ You must be logged in as a teacher to access these features." | |
| def update_admin_status(state): | |
| if not state: | |
| return "Please login to access admin features" | |
| user = state.get("user", {}) | |
| if user and user.get("role") == "admin": | |
| return f"👋 Welcome, Administrator {user.get('username')}!" | |
| else: | |
| return "❌ You must be logged in as an admin to access these features." | |
| def update_teacher_tabs_visibility(state): | |
| user = state.get("user", {}) | |
| return gr.update(visible=(user and user.get("role") in ["teacher", "admin"])) | |
| def update_admin_tabs_visibility(state): | |
| user = state.get("user", {}) | |
| return gr.update(visible=(user and user.get("role") == "admin")) | |
| def update_geo_settings_visibility(geo_verify): | |
| return gr.update(visible=geo_verify) | |
| session_state.change( | |
| fn=update_student_status, | |
| inputs=session_state, | |
| outputs=student_status | |
| ) | |
| session_state.change( | |
| fn=update_teacher_status, | |
| inputs=session_state, | |
| outputs=teacher_status | |
| ) | |
| session_state.change( | |
| fn=update_admin_status, | |
| inputs=session_state, | |
| outputs=admin_status | |
| ) | |
| session_state.change( | |
| fn=update_teacher_tabs_visibility, | |
| inputs=session_state, | |
| outputs=teacher_tabs | |
| ) | |
| session_state.change( | |
| fn=update_admin_tabs_visibility, | |
| inputs=session_state, | |
| outputs=admin_tabs | |
| ) | |
| geo_verify.change( | |
| fn=update_geo_settings_visibility, | |
| inputs=geo_verify, | |
| outputs=geo_settings | |
| ) | |
| # Change theme function | |
| def change_theme(choice): | |
| return gr.update(theme=choice.lower()) | |
| theme_toggle.change( | |
| fn=change_theme, | |
| inputs=theme_toggle, | |
| outputs=app | |
| ) | |
| # Launch the app | |
| if __name__ == "__main__": | |
| # Create the log directory if it doesn't exist | |
| os.makedirs("logs", exist_ok=True) | |
| # Start the app | |
| app.launch() |