|
|
|
|
|
""" |
|
|
WiFi-Based Attendance & Break Tracker - Advanced Employee Management Version |
|
|
Integrated main application entry point |
|
|
|
|
|
This application tracks employee attendance by detecting MAC addresses |
|
|
of devices connected to the local WiFi network with advanced features: |
|
|
- Employee management (add, search, password protection) |
|
|
- Time-in/time-out tracking |
|
|
- Break start/end monitoring |
|
|
- Automatic 5:00 PM timeout |
|
|
- Detailed attendance sheets |
|
|
- Real-time web interface |
|
|
- Non-admin operation (where possible) |
|
|
""" |
|
|
|
|
|
import sys |
|
|
import os |
|
|
import argparse |
|
|
import threading |
|
|
import time |
|
|
import json |
|
|
from datetime import datetime |
|
|
from flask import Flask, render_template, jsonify, request, session, redirect, url_for |
|
|
from flask_cors import CORS |
|
|
from functools import wraps |
|
|
|
|
|
|
|
|
from attendance_tracker import AttendanceTracker |
|
|
from database import AttendanceDatabase |
|
|
from auth import AuthManager |
|
|
|
|
|
|
|
|
app = Flask(__name__) |
|
|
app.secret_key = 'wifi_attendance_tracker_secret_key_2025' |
|
|
CORS(app) |
|
|
|
|
|
|
|
|
tracker = None |
|
|
db = None |
|
|
auth = None |
|
|
latest_events = [] |
|
|
is_monitoring = False |
|
|
|
|
|
|
|
|
DASHBOARD_PASSWORD = "admin123" |
|
|
|
|
|
def login_required(f): |
|
|
"""Decorator to require login for protected routes.""" |
|
|
@wraps(f) |
|
|
def decorated_function(*args, **kwargs): |
|
|
if 'logged_in' not in session or not session['logged_in']: |
|
|
return redirect(url_for('login_page')) |
|
|
return f(*args, **kwargs) |
|
|
return decorated_function |
|
|
|
|
|
def check_dashboard_password(password): |
|
|
"""Check if the provided password matches the dashboard password.""" |
|
|
return password == DASHBOARD_PASSWORD |
|
|
|
|
|
def print_banner(): |
|
|
"""Print application banner.""" |
|
|
banner = """ |
|
|
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ |
|
|
β WiFi Attendance Tracker - Employee Management β |
|
|
β β |
|
|
β Advanced MAC Detection with Employee Management Features β |
|
|
β Version 3.0 - 2025 β |
|
|
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ |
|
|
""" |
|
|
print(banner) |
|
|
|
|
|
def check_requirements(): |
|
|
"""Check if the system meets requirements.""" |
|
|
issues = [] |
|
|
|
|
|
|
|
|
if not os.path.exists('config.json'): |
|
|
issues.append("config.json file not found") |
|
|
|
|
|
|
|
|
if not os.path.exists('employees.json'): |
|
|
issues.append("employees.json file not found") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return issues |
|
|
|
|
|
def initialize_system(): |
|
|
"""Initialize the attendance tracking system.""" |
|
|
global tracker, db, auth |
|
|
|
|
|
|
|
|
db = AttendanceDatabase() |
|
|
|
|
|
|
|
|
auth = AuthManager(db) |
|
|
|
|
|
|
|
|
tracker = AttendanceTracker() |
|
|
|
|
|
|
|
|
tracker.sync_employees_from_json() |
|
|
|
|
|
def monitoring_loop(): |
|
|
"""Background monitoring loop.""" |
|
|
global latest_events, is_monitoring |
|
|
|
|
|
while is_monitoring: |
|
|
try: |
|
|
events = tracker.scan_once() |
|
|
|
|
|
|
|
|
latest_events.extend(events) |
|
|
latest_events = latest_events[-50:] |
|
|
|
|
|
time.sleep(tracker.scan_interval) |
|
|
except Exception as e: |
|
|
print(f"Error in monitoring loop: {e}") |
|
|
time.sleep(5) |
|
|
|
|
|
|
|
|
@app.route('/') |
|
|
def index(): |
|
|
"""Redirect to login page.""" |
|
|
return redirect(url_for('login_page')) |
|
|
|
|
|
@app.route('/login') |
|
|
def login_page(): |
|
|
"""Serve the login page.""" |
|
|
if 'logged_in' in session and session['logged_in']: |
|
|
return redirect(url_for('dashboard')) |
|
|
return render_template('login.html') |
|
|
|
|
|
@login_required |
|
|
@app.route('/api/login', methods=['POST']) |
|
|
def login(): |
|
|
"""Handle login authentication.""" |
|
|
try: |
|
|
data = request.get_json() |
|
|
password = data.get('password', '') |
|
|
|
|
|
if check_dashboard_password(password): |
|
|
session['logged_in'] = True |
|
|
return jsonify({'success': True, 'message': 'Login successful'}) |
|
|
else: |
|
|
return jsonify({'success': False, 'message': 'Invalid password'}) |
|
|
except Exception as e: |
|
|
return jsonify({'success': False, 'message': 'Login error occurred'}) |
|
|
|
|
|
@app.route('/logout') |
|
|
def logout(): |
|
|
"""Handle logout.""" |
|
|
session.pop('logged_in', None) |
|
|
return redirect(url_for('login_page')) |
|
|
|
|
|
@app.route('/dashboard') |
|
|
@login_required |
|
|
def dashboard(): |
|
|
"""Serve the main dashboard page.""" |
|
|
return render_template('index.html') |
|
|
|
|
|
@login_required |
|
|
@app.route('/api/status') |
|
|
@login_required |
|
|
def get_status(): |
|
|
"""Get current system status.""" |
|
|
return jsonify({ |
|
|
'is_monitoring': is_monitoring, |
|
|
'employee_count': len(tracker.employees) if tracker else 0, |
|
|
'scan_interval': tracker.scan_interval if tracker else 60, |
|
|
'current_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S'), |
|
|
'office_timeout': f"{tracker.office_timeout_hour:02d}:{tracker.office_timeout_minute:02d}" if tracker else "17:00" |
|
|
}) |
|
|
|
|
|
@login_required |
|
|
@app.route('/api/employees') |
|
|
@login_required |
|
|
def get_employees(): |
|
|
"""Get all employees and their current status.""" |
|
|
if not tracker: |
|
|
return jsonify([]) |
|
|
|
|
|
search_query = request.args.get('search', '').strip() |
|
|
|
|
|
status = tracker.get_current_status() |
|
|
employees = [] |
|
|
|
|
|
for mac, info in status.items(): |
|
|
|
|
|
employee_info = db.get_employee_by_mac(mac) |
|
|
picture_path = employee_info.get('picture_path') if employee_info else None |
|
|
|
|
|
employee_data = { |
|
|
'name': info['name'], |
|
|
'mac': info['mac'], |
|
|
'is_present': info['is_present'], |
|
|
'status': info['status'], |
|
|
'last_seen': info['last_seen'], |
|
|
'time_in': info['time_in'], |
|
|
'picture': picture_path |
|
|
} |
|
|
|
|
|
|
|
|
if search_query: |
|
|
if (search_query.lower() in info['name'].lower() or |
|
|
search_query.lower() in info['mac'].lower()): |
|
|
employees.append(employee_data) |
|
|
else: |
|
|
employees.append(employee_data) |
|
|
|
|
|
return jsonify(employees) |
|
|
|
|
|
@app.route('/api/events') |
|
|
@login_required |
|
|
def get_recent_events(): |
|
|
"""Get recent attendance events.""" |
|
|
global latest_events |
|
|
|
|
|
|
|
|
events_data = [] |
|
|
for mac, event_type, timestamp in latest_events: |
|
|
employee_name = tracker.get_employee_name(mac) if tracker else f"Unknown ({mac})" |
|
|
events_data.append({ |
|
|
'name': employee_name, |
|
|
'mac': mac, |
|
|
'event_type': event_type, |
|
|
'timestamp': timestamp.strftime('%Y-%m-%d %H:%M:%S'), |
|
|
'time_ago': get_time_ago(timestamp) |
|
|
}) |
|
|
|
|
|
|
|
|
events_data.sort(key=lambda x: x['timestamp'], reverse=True) |
|
|
|
|
|
return jsonify(events_data) |
|
|
|
|
|
@login_required |
|
|
@app.route('/api/attendance_events') |
|
|
def get_attendance_events(): |
|
|
"""Get attendance events from database.""" |
|
|
if not db: |
|
|
return jsonify([]) |
|
|
|
|
|
date_filter = request.args.get('date') |
|
|
limit = int(request.args.get('limit', 50)) |
|
|
|
|
|
events = db.get_attendance_events(date=date_filter, limit=limit) |
|
|
|
|
|
|
|
|
for event in events: |
|
|
timestamp = datetime.fromisoformat(event['timestamp']) |
|
|
event['time_ago'] = get_time_ago(timestamp) |
|
|
|
|
|
return jsonify(events) |
|
|
|
|
|
@login_required |
|
|
@app.route('/api/daily_summary') |
|
|
def get_daily_summary(): |
|
|
"""Get daily attendance summary.""" |
|
|
if not db: |
|
|
return jsonify([]) |
|
|
|
|
|
date_str = request.args.get('date', datetime.now().strftime('%Y-%m-%d')) |
|
|
summary = db.get_daily_summary(date_str) |
|
|
|
|
|
|
|
|
for employee in summary: |
|
|
employee['total_break_formatted'] = format_duration(employee['total_break_duration']) |
|
|
employee['total_work_formatted'] = format_duration(employee['total_work_duration']) |
|
|
|
|
|
return jsonify(summary) |
|
|
|
|
|
@login_required |
|
|
@app.route('/api/summary_stats') |
|
|
def get_summary_stats(): |
|
|
"""Get summary statistics for the dashboard.""" |
|
|
if not db: |
|
|
return jsonify({}) |
|
|
|
|
|
date_str = request.args.get('date', datetime.now().strftime('%Y-%m-%d')) |
|
|
summary = db.get_daily_summary(date_str) |
|
|
|
|
|
stats = { |
|
|
'total_employees': len(summary), |
|
|
'present_count': len([emp for emp in summary if emp['status'] == 'Present']), |
|
|
'absent_count': len([emp for emp in summary if emp['status'] == 'Absent']), |
|
|
'on_break_count': len([emp for emp in summary if emp['status'] == 'On Break']), |
|
|
'timed_out_count': len([emp for emp in summary if emp['status'] == 'Timed Out']), |
|
|
'total_events': len(db.get_attendance_events(date=date_str, limit=1000)) |
|
|
} |
|
|
|
|
|
return jsonify(stats) |
|
|
|
|
|
@login_required |
|
|
@app.route('/api/start_monitoring', methods=['POST']) |
|
|
def start_monitoring(): |
|
|
"""Start the attendance monitoring.""" |
|
|
global is_monitoring |
|
|
|
|
|
if not is_monitoring: |
|
|
is_monitoring = True |
|
|
monitoring_thread = threading.Thread(target=monitoring_loop, daemon=True) |
|
|
monitoring_thread.start() |
|
|
return jsonify({'success': True, 'message': 'Monitoring started'}) |
|
|
else: |
|
|
return jsonify({'success': False, 'message': 'Monitoring already running'}) |
|
|
|
|
|
@login_required |
|
|
@app.route('/api/stop_monitoring', methods=['POST']) |
|
|
def stop_monitoring(): |
|
|
"""Stop the attendance monitoring.""" |
|
|
global is_monitoring |
|
|
is_monitoring = False |
|
|
return jsonify({'success': True, 'message': 'Monitoring stopped'}) |
|
|
|
|
|
@login_required |
|
|
@app.route('/api/export_csv') |
|
|
def export_csv(): |
|
|
"""Export daily summary to CSV.""" |
|
|
if not db: |
|
|
return jsonify({'success': False, 'message': 'Database not available'}) |
|
|
|
|
|
date_str = request.args.get('date', datetime.now().strftime('%Y-%m-%d')) |
|
|
|
|
|
try: |
|
|
db.export_daily_summary_to_csv(date_str) |
|
|
return jsonify({'success': True, 'message': f'CSV exported for {date_str}'}) |
|
|
except Exception as e: |
|
|
return jsonify({'success': False, 'message': f'Export failed: {str(e)}'}) |
|
|
|
|
|
@login_required |
|
|
@app.route('/api/add_employee', methods=['POST']) |
|
|
def add_employee(): |
|
|
"""Add a new employee with password authentication.""" |
|
|
if not auth or not db: |
|
|
return jsonify({'success': False, 'message': 'Authentication system not available'}) |
|
|
|
|
|
try: |
|
|
data = request.get_json() |
|
|
|
|
|
|
|
|
if not all(key in data for key in ['name', 'mac', 'password']): |
|
|
return jsonify({'success': False, 'message': 'Missing required fields'}) |
|
|
|
|
|
|
|
|
if not auth.authenticate_admin(data['password']): |
|
|
return jsonify({'success': False, 'message': 'Invalid admin password'}) |
|
|
|
|
|
|
|
|
mac_address = data['mac'].lower().strip() |
|
|
if not is_valid_mac_format(mac_address): |
|
|
return jsonify({'success': False, 'message': 'Invalid MAC address format. Use: aa-bb-cc-dd-ee-ff'}) |
|
|
|
|
|
|
|
|
success = db.add_employee( |
|
|
name=data['name'].strip(), |
|
|
mac_address=mac_address, |
|
|
picture_path=data.get('picture', '').strip() or None |
|
|
) |
|
|
|
|
|
if success: |
|
|
|
|
|
update_employees_json(data['name'].strip(), mac_address, data.get('picture', '').strip()) |
|
|
|
|
|
|
|
|
tracker.employees = tracker.load_employees() |
|
|
tracker._initialize_employee_states() |
|
|
|
|
|
return jsonify({'success': True, 'message': 'Employee added successfully'}) |
|
|
else: |
|
|
return jsonify({'success': False, 'message': 'Employee with this MAC address already exists'}) |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Error adding employee: {e}") |
|
|
return jsonify({'success': False, 'message': f'Error adding employee: {str(e)}'}) |
|
|
|
|
|
@login_required |
|
|
@app.route('/api/search_employees') |
|
|
def search_employees(): |
|
|
"""Search employees by name or MAC address.""" |
|
|
if not db: |
|
|
return jsonify([]) |
|
|
|
|
|
search_query = request.args.get('q', '').strip() |
|
|
|
|
|
if not search_query: |
|
|
return jsonify([]) |
|
|
|
|
|
employees = db.get_all_employees(search_query=search_query) |
|
|
|
|
|
|
|
|
result = [] |
|
|
for emp in employees: |
|
|
result.append({ |
|
|
'id': emp['id'], |
|
|
'name': emp['name'], |
|
|
'mac_address': emp['mac_address'], |
|
|
'picture_path': emp['picture_path'], |
|
|
'created_at': emp['created_at'] |
|
|
}) |
|
|
|
|
|
return jsonify(result) |
|
|
|
|
|
@app.route('/api/delete_employee', methods=['POST']) |
|
|
@login_required |
|
|
def delete_employee(): |
|
|
"""Delete an employee with password authentication.""" |
|
|
if not auth or not db: |
|
|
return jsonify({'success': False, 'message': 'Authentication system not available'}) |
|
|
|
|
|
try: |
|
|
data = request.get_json() |
|
|
|
|
|
|
|
|
if not all(key in data for key in ['password']) or not any(key in data for key in ['employee_id', 'mac_address']): |
|
|
return jsonify({'success': False, 'message': 'Missing required fields'}) |
|
|
|
|
|
|
|
|
if not auth.authenticate_admin(data['password']): |
|
|
return jsonify({'success': False, 'message': 'Invalid admin password'}) |
|
|
|
|
|
|
|
|
if 'employee_id' in data and isinstance(data['employee_id'], int): |
|
|
employee = db.get_employee_by_id(data['employee_id']) |
|
|
else: |
|
|
|
|
|
mac_address = data.get('employee_id', data.get('mac_address')) |
|
|
employee = db.get_employee_by_mac(mac_address) |
|
|
|
|
|
if not employee: |
|
|
return jsonify({'success': False, 'message': 'Employee not found'}) |
|
|
|
|
|
|
|
|
success = db.delete_employee(employee['id']) |
|
|
|
|
|
if success: |
|
|
|
|
|
remove_employee_from_json(employee['mac_address']) |
|
|
|
|
|
|
|
|
tracker.employees = tracker.load_employees() |
|
|
tracker._initialize_employee_states() |
|
|
|
|
|
return jsonify({'success': True, 'message': f'Employee {employee["name"]} deleted successfully'}) |
|
|
else: |
|
|
return jsonify({'success': False, 'message': 'Failed to delete employee'}) |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Error deleting employee: {e}") |
|
|
return jsonify({'success': False, 'message': f'Error deleting employee: {str(e)}'}) |
|
|
|
|
|
@app.route('/api/modify_employee', methods=['POST']) |
|
|
@login_required |
|
|
def modify_employee(): |
|
|
"""Modify employee information with password authentication.""" |
|
|
if not auth or not db: |
|
|
return jsonify({'success': False, 'message': 'Authentication system not available'}) |
|
|
|
|
|
try: |
|
|
data = request.get_json() |
|
|
|
|
|
|
|
|
if not all(key in data for key in ['password']) or not any(key in data for key in ['employee_id', 'mac_address']): |
|
|
return jsonify({'success': False, 'message': 'Missing required fields'}) |
|
|
|
|
|
|
|
|
if not auth.authenticate_admin(data['password']): |
|
|
return jsonify({'success': False, 'message': 'Invalid admin password'}) |
|
|
|
|
|
|
|
|
if 'employee_id' in data and isinstance(data['employee_id'], int): |
|
|
employee = db.get_employee_by_id(data['employee_id']) |
|
|
else: |
|
|
|
|
|
mac_address = data.get('employee_id', data.get('mac_address')) |
|
|
employee = db.get_employee_by_mac(mac_address) |
|
|
|
|
|
if not employee: |
|
|
return jsonify({'success': False, 'message': 'Employee not found'}) |
|
|
|
|
|
|
|
|
update_data = {} |
|
|
if 'name' in data and data['name'].strip(): |
|
|
update_data['name'] = data['name'].strip() |
|
|
if 'mac_address' in data and data['mac_address'].strip(): |
|
|
new_mac = data['mac_address'].lower().strip() |
|
|
if not is_valid_mac_format(new_mac): |
|
|
return jsonify({'success': False, 'message': 'Invalid MAC address format. Use: aa-bb-cc-dd-ee-ff'}) |
|
|
update_data['mac_address'] = new_mac |
|
|
if 'picture_path' in data: |
|
|
update_data['picture_path'] = data['picture_path'].strip() or None |
|
|
|
|
|
if not update_data: |
|
|
return jsonify({'success': False, 'message': 'No valid fields to update'}) |
|
|
|
|
|
|
|
|
success = db.update_employee(employee['id'], **update_data) |
|
|
|
|
|
if success: |
|
|
|
|
|
update_employee_in_json(employee['mac_address'], update_data) |
|
|
|
|
|
|
|
|
tracker.employees = tracker.load_employees() |
|
|
tracker._initialize_employee_states() |
|
|
|
|
|
return jsonify({'success': True, 'message': 'Employee updated successfully'}) |
|
|
else: |
|
|
return jsonify({'success': False, 'message': 'Failed to update employee'}) |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Error modifying employee: {e}") |
|
|
return jsonify({'success': False, 'message': f'Error modifying employee: {str(e)}'}) |
|
|
|
|
|
@app.route('/api/change_password', methods=['POST']) |
|
|
@login_required |
|
|
def change_password(): |
|
|
"""Change the admin password.""" |
|
|
if not auth: |
|
|
return jsonify({'success': False, 'message': 'Authentication system not available'}) |
|
|
|
|
|
try: |
|
|
data = request.get_json() |
|
|
|
|
|
if not all(key in data for key in ['currentPassword', 'newPassword']): |
|
|
return jsonify({'success': False, 'message': 'Missing required fields'}) |
|
|
|
|
|
success = auth.change_admin_password(data['currentPassword'], data['newPassword']) |
|
|
|
|
|
if success: |
|
|
return jsonify({'success': True, 'message': 'Password changed successfully'}) |
|
|
else: |
|
|
return jsonify({'success': False, 'message': 'Current password is incorrect'}) |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Error changing password: {e}") |
|
|
return jsonify({'success': False, 'message': f'Error changing password: {str(e)}'}) |
|
|
|
|
|
def is_valid_mac_format(mac): |
|
|
"""Validate MAC address format.""" |
|
|
if not mac: |
|
|
return False |
|
|
|
|
|
|
|
|
parts = mac.split('-') |
|
|
if len(parts) != 6: |
|
|
return False |
|
|
|
|
|
for part in parts: |
|
|
if len(part) != 2: |
|
|
return False |
|
|
try: |
|
|
int(part, 16) |
|
|
except ValueError: |
|
|
return False |
|
|
|
|
|
return True |
|
|
|
|
|
def update_employees_json(name, mac_address, picture_path): |
|
|
"""Update the employees.json file with new employee.""" |
|
|
try: |
|
|
|
|
|
employees = [] |
|
|
if os.path.exists('employees.json'): |
|
|
with open('employees.json', 'r') as f: |
|
|
employees = json.load(f) |
|
|
|
|
|
|
|
|
for emp in employees: |
|
|
if emp.get('mac_address') == mac_address: |
|
|
return |
|
|
|
|
|
|
|
|
new_employee = { |
|
|
'name': name, |
|
|
'mac_address': mac_address, |
|
|
'picture': picture_path if picture_path else f"static/img/{name.lower().replace(' ', '_')}.jpg" |
|
|
} |
|
|
employees.append(new_employee) |
|
|
|
|
|
|
|
|
with open('employees.json', 'w') as f: |
|
|
json.dump(employees, f, indent=2) |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Error updating employees.json: {e}") |
|
|
|
|
|
def remove_employee_from_json(mac_address): |
|
|
"""Remove an employee from employees.json file.""" |
|
|
try: |
|
|
if not os.path.exists('employees.json'): |
|
|
return |
|
|
|
|
|
|
|
|
with open('employees.json', 'r') as f: |
|
|
employees = json.load(f) |
|
|
|
|
|
|
|
|
employees = [emp for emp in employees if emp.get('mac_address') != mac_address] |
|
|
|
|
|
|
|
|
with open('employees.json', 'w') as f: |
|
|
json.dump(employees, f, indent=2) |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Error removing employee from employees.json: {e}") |
|
|
|
|
|
def update_employee_in_json(old_mac_address, update_data): |
|
|
"""Update an employee in employees.json file.""" |
|
|
try: |
|
|
if not os.path.exists('employees.json'): |
|
|
return |
|
|
|
|
|
|
|
|
with open('employees.json', 'r') as f: |
|
|
employees = json.load(f) |
|
|
|
|
|
|
|
|
for emp in employees: |
|
|
if emp.get('mac_address') == old_mac_address: |
|
|
if 'name' in update_data: |
|
|
emp['name'] = update_data['name'] |
|
|
if 'mac_address' in update_data: |
|
|
emp['mac_address'] = update_data['mac_address'] |
|
|
if 'picture_path' in update_data: |
|
|
emp['picture'] = update_data['picture_path'] or f"static/img/{emp['name'].lower().replace(' ', '_')}.jpg" |
|
|
break |
|
|
|
|
|
|
|
|
with open('employees.json', 'w') as f: |
|
|
json.dump(employees, f, indent=2) |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Error updating employee in employees.json: {e}") |
|
|
|
|
|
def get_time_ago(timestamp): |
|
|
"""Get human-readable time difference.""" |
|
|
now = datetime.now() |
|
|
diff = now - timestamp |
|
|
|
|
|
if diff.days > 0: |
|
|
return f"{diff.days} day{'s' if diff.days > 1 else ''} ago" |
|
|
elif diff.seconds > 3600: |
|
|
hours = diff.seconds // 3600 |
|
|
return f"{hours} hour{'s' if hours > 1 else ''} ago" |
|
|
elif diff.seconds > 60: |
|
|
minutes = diff.seconds // 60 |
|
|
return f"{minutes} minute{'s' if minutes > 1 else ''} ago" |
|
|
else: |
|
|
return "Just now" |
|
|
|
|
|
def format_duration(seconds): |
|
|
"""Format duration in seconds to HH:MM:SS.""" |
|
|
if seconds is None: |
|
|
return "00:00:00" |
|
|
|
|
|
hours = seconds // 3600 |
|
|
minutes = (seconds % 3600) // 60 |
|
|
seconds = seconds % 60 |
|
|
|
|
|
return f"{hours:02d}:{minutes:02d}:{seconds:02d}" |
|
|
|
|
|
def run_console_mode(): |
|
|
"""Run the application in console mode (no web interface).""" |
|
|
print("Starting in console mode...") |
|
|
print("Press Ctrl+C to stop\n") |
|
|
|
|
|
initialize_system() |
|
|
|
|
|
try: |
|
|
tracker.start_monitoring() |
|
|
except KeyboardInterrupt: |
|
|
print("\nShutting down...") |
|
|
sys.exit(0) |
|
|
|
|
|
def run_web_mode(port=5000): |
|
|
"""Run the application with web interface.""" |
|
|
print("Starting integrated web interface mode...") |
|
|
|
|
|
|
|
|
initialize_system() |
|
|
|
|
|
print(f"\nWeb interface will be available at:") |
|
|
print(f" Local: http://localhost:{port}") |
|
|
print(f" Network: http://0.0.0.0:{port}") |
|
|
print("\nPress Ctrl+C to stop\n") |
|
|
|
|
|
|
|
|
print("Note: This application attempts to run without administrator privileges.") |
|
|
print("If network scanning fails, you may need to run as administrator/root.") |
|
|
print("Employee management features work regardless of privilege level.\n") |
|
|
|
|
|
|
|
|
global is_monitoring |
|
|
is_monitoring = True |
|
|
monitoring_thread = threading.Thread(target=monitoring_loop, daemon=True) |
|
|
monitoring_thread.start() |
|
|
|
|
|
try: |
|
|
app.run(host='0.0.0.0', port=port, debug=False) |
|
|
except KeyboardInterrupt: |
|
|
print("\nShutting down...") |
|
|
is_monitoring = False |
|
|
sys.exit(0) |
|
|
|
|
|
def show_status(): |
|
|
"""Show current system status.""" |
|
|
print("System Status:") |
|
|
print("=" * 50) |
|
|
|
|
|
|
|
|
initialize_system() |
|
|
|
|
|
|
|
|
try: |
|
|
employees = db.get_all_employees() |
|
|
print(f"Database: Connected") |
|
|
print(f"Employees: {len(employees)}") |
|
|
|
|
|
for emp in employees: |
|
|
picture_info = f" (Picture: {emp['picture_path']})" if emp['picture_path'] else "" |
|
|
print(f" - {emp['name']} ({emp['mac_address']}){picture_info}") |
|
|
|
|
|
|
|
|
recent_events = db.get_attendance_events(limit=5) |
|
|
print(f"\nRecent Events: {len(recent_events)}") |
|
|
for event in recent_events[-5:]: |
|
|
print(f" {event['timestamp']}: {event['employee_name']} - {event['event_type']}") |
|
|
|
|
|
|
|
|
today = datetime.now().strftime('%Y-%m-%d') |
|
|
summary = db.get_daily_summary(today) |
|
|
print(f"\nToday's Summary ({today}):") |
|
|
for emp in summary: |
|
|
print(f" {emp['name']}: {emp['status']} | In: {emp['time_in'] or 'N/A'} | Out: {emp['time_out'] or 'N/A'}") |
|
|
|
|
|
except Exception as e: |
|
|
print(f"Database: Error - {e}") |
|
|
|
|
|
|
|
|
try: |
|
|
import json |
|
|
with open('config.json', 'r') as f: |
|
|
config = json.load(f) |
|
|
print(f"\nConfiguration:") |
|
|
print(f" Scan Interval: {config.get('scan_interval_seconds', 60)} seconds") |
|
|
print(f" Web Port: {config.get('web_port', 5000)}") |
|
|
print(f" Office Timeout: {config.get('office_timeout_hour', 17)}:{config.get('office_timeout_minute', 0):02d}") |
|
|
except Exception as e: |
|
|
print(f"\nConfiguration: Error - {e}") |
|
|
|
|
|
|
|
|
try: |
|
|
with open('employees.json', 'r') as f: |
|
|
employees_json = json.load(f) |
|
|
print(f"\nEmployees JSON File:") |
|
|
print(f" Configured Employees: {len(employees_json)}") |
|
|
for emp in employees_json: |
|
|
print(f" - {emp.get('name', 'Unknown')} ({emp.get('mac_address', 'Unknown')})") |
|
|
except Exception as e: |
|
|
print(f"\nEmployees JSON: Error - {e}") |
|
|
|
|
|
|
|
|
try: |
|
|
print(f"\nAuthentication:") |
|
|
print(f" Admin Password: {auth.get_current_admin_password_hint()}") |
|
|
except Exception as e: |
|
|
print(f"\nAuthentication: Error - {e}") |
|
|
|
|
|
def main(): |
|
|
"""Main application entry point.""" |
|
|
parser = argparse.ArgumentParser( |
|
|
description='WiFi-Based Attendance & Break Tracker - Employee Management Version', |
|
|
formatter_class=argparse.RawDescriptionHelpFormatter, |
|
|
epilog=""" |
|
|
Examples: |
|
|
python main.py # Run with web interface (default) |
|
|
python main.py --console # Run in console mode only |
|
|
python main.py --status # Show system status |
|
|
python main.py --port 8080 # Run web interface on port 8080 |
|
|
|
|
|
New Features: |
|
|
- Employee management (add, search, password protection) |
|
|
- Non-admin operation (where possible) |
|
|
- Enhanced web interface with employee pictures |
|
|
- Separate employees.json for better data management |
|
|
- Password-protected employee addition (default: 1122) |
|
|
|
|
|
For more information, see README.md |
|
|
""" |
|
|
) |
|
|
|
|
|
parser.add_argument( |
|
|
'--console', |
|
|
action='store_true', |
|
|
help='Run in console mode without web interface' |
|
|
) |
|
|
|
|
|
parser.add_argument( |
|
|
'--status', |
|
|
action='store_true', |
|
|
help='Show system status and exit' |
|
|
) |
|
|
|
|
|
parser.add_argument( |
|
|
'--port', |
|
|
type=int, |
|
|
default=5000, |
|
|
help='Port for web interface (default: 5000)' |
|
|
) |
|
|
|
|
|
args = parser.parse_args() |
|
|
|
|
|
|
|
|
print_banner() |
|
|
|
|
|
|
|
|
issues = check_requirements() |
|
|
if issues: |
|
|
print("β οΈ System Requirements Issues:") |
|
|
for issue in issues: |
|
|
print(f" - {issue}") |
|
|
print() |
|
|
|
|
|
|
|
|
if "config.json file not found" in str(issues): |
|
|
print("Creating default config.json...") |
|
|
default_config = { |
|
|
"scan_interval_seconds": 60, |
|
|
"web_port": 5000, |
|
|
"office_timeout_hour": 17, |
|
|
"office_timeout_minute": 0 |
|
|
} |
|
|
with open('config.json', 'w') as f: |
|
|
json.dump(default_config, f, indent=2) |
|
|
|
|
|
if "employees.json file not found" in str(issues): |
|
|
print("Creating default employees.json...") |
|
|
default_employees = [ |
|
|
{ |
|
|
"name": "Sample Employee", |
|
|
"mac_address": "aa-bb-cc-dd-ee-ff", |
|
|
"picture": "static/img/sample.jpg" |
|
|
} |
|
|
] |
|
|
with open('employees.json', 'w') as f: |
|
|
json.dump(default_employees, f, indent=2) |
|
|
|
|
|
print("Default files created. Please edit them with your actual data.\n") |
|
|
|
|
|
|
|
|
if args.status: |
|
|
show_status() |
|
|
return |
|
|
|
|
|
if args.console: |
|
|
run_console_mode() |
|
|
else: |
|
|
run_web_mode(args.port) |
|
|
|
|
|
if __name__ == "__main__": |
|
|
main() |
|
|
|
|
|
|