|
|
import subprocess |
|
|
import json |
|
|
import time |
|
|
import platform |
|
|
from datetime import datetime, timedelta |
|
|
from typing import Dict, Set, List, Tuple |
|
|
import os |
|
|
from database import AttendanceDatabase |
|
|
|
|
|
class AttendanceTracker: |
|
|
def __init__(self, config_path: str = "config.json", employees_path: str = "employees.json", db_path: str = "attendance.db"): |
|
|
"""Initialize the attendance tracker with configuration and database.""" |
|
|
self.config_path = config_path |
|
|
self.employees_path = employees_path |
|
|
self.config = self.load_config() |
|
|
self.employees = self.load_employees() |
|
|
self.scan_interval = self.config.get("scan_interval_seconds", 60) |
|
|
self.office_timeout_hour = self.config.get("office_timeout_hour", 17) |
|
|
self.office_timeout_minute = self.config.get("office_timeout_minute", 0) |
|
|
|
|
|
self.db = AttendanceDatabase(db_path) |
|
|
|
|
|
|
|
|
|
|
|
self.employee_states = {} |
|
|
self._initialize_employee_states() |
|
|
|
|
|
|
|
|
os.makedirs("logs", exist_ok=True) |
|
|
|
|
|
def _initialize_employee_states(self): |
|
|
"""Initialize employee states from the database for the current day.""" |
|
|
today_str = datetime.now().strftime("%Y-%m-%d") |
|
|
for mac, name in self.employees.items(): |
|
|
employee_info = self.db.get_employee_by_mac(mac) |
|
|
if employee_info: |
|
|
summary = self.db.get_daily_summary_for_employee(employee_info["id"], today_str) |
|
|
if summary and summary["status"] == "Present": |
|
|
|
|
|
self.employee_states[mac] = { |
|
|
"is_present": True, |
|
|
"last_seen": datetime.now(), |
|
|
"time_in": datetime.strptime(f"{today_str} {summary['time_in']}", |
|
|
"%Y-%m-%d %H:%M:%S") if summary["time_in"] else None, |
|
|
"on_break": False, |
|
|
"break_start_time": None |
|
|
} |
|
|
else: |
|
|
self.employee_states[mac] = { |
|
|
"is_present": False, |
|
|
"last_seen": None, |
|
|
"time_in": None, |
|
|
"on_break": False, |
|
|
"break_start_time": None |
|
|
} |
|
|
else: |
|
|
self.employee_states[mac] = { |
|
|
"is_present": False, |
|
|
"last_seen": None, |
|
|
"time_in": None, |
|
|
"on_break": False, |
|
|
"break_start_time": None |
|
|
} |
|
|
|
|
|
def load_config(self) -> dict: |
|
|
"""Load configuration from JSON file.""" |
|
|
try: |
|
|
with open(self.config_path, 'r') as f: |
|
|
return json.load(f) |
|
|
except FileNotFoundError: |
|
|
print(f"Config file {self.config_path} not found. Using default settings.") |
|
|
return {"scan_interval_seconds": 60, "office_timeout_hour": 17, "office_timeout_minute": 0} |
|
|
except json.JSONDecodeError: |
|
|
print(f"Invalid JSON in {self.config_path}. Using default settings.") |
|
|
return {"scan_interval_seconds": 60, "office_timeout_hour": 17, "office_timeout_minute": 0} |
|
|
|
|
|
def load_employees(self) -> dict: |
|
|
"""Load employees from employees.json file.""" |
|
|
try: |
|
|
with open(self.employees_path, 'r') as f: |
|
|
employees_list = json.load(f) |
|
|
|
|
|
|
|
|
employees_dict = {} |
|
|
for emp in employees_list: |
|
|
employees_dict[emp['mac_address']] = emp['name'] |
|
|
|
|
|
return employees_dict |
|
|
except FileNotFoundError: |
|
|
print(f"Employees file {self.employees_path} not found. Using empty employee list.") |
|
|
return {} |
|
|
except json.JSONDecodeError: |
|
|
print(f"Invalid JSON in {self.employees_path}. Using empty employee list.") |
|
|
return {} |
|
|
except Exception as e: |
|
|
print(f"Error loading employees: {e}") |
|
|
return {} |
|
|
|
|
|
def get_connected_devices(self) -> Set[str]: |
|
|
"""Get MAC addresses of devices connected to the local network.""" |
|
|
connected_macs = set() |
|
|
|
|
|
try: |
|
|
if platform.system() == "Windows": |
|
|
result = subprocess.run(['arp', '-a'], capture_output=True, text=True, timeout=30) |
|
|
if result.returncode == 0: |
|
|
lines = result.stdout.split('\n') |
|
|
for line in lines: |
|
|
parts = line.strip().split() |
|
|
if len(parts) >= 2: |
|
|
potential_mac = parts[1] |
|
|
if self.is_valid_mac(potential_mac): |
|
|
normalized_mac = self.normalize_mac(potential_mac) |
|
|
connected_macs.add(normalized_mac) |
|
|
else: |
|
|
result = subprocess.run(['arp', '-a'], capture_output=True, text=True, timeout=30) |
|
|
if result.returncode == 0: |
|
|
lines = result.stdout.split('\n') |
|
|
for line in lines: |
|
|
if '(' in line and ')' in line and 'at' in line: |
|
|
parts = line.split(' at ') |
|
|
if len(parts) >= 2: |
|
|
mac_part = parts[1].split(' ')[0] |
|
|
if self.is_valid_mac(mac_part): |
|
|
normalized_mac = self.normalize_mac(mac_part) |
|
|
connected_macs.add(normalized_mac) |
|
|
|
|
|
except subprocess.TimeoutExpired: |
|
|
print("ARP command timed out") |
|
|
except Exception as e: |
|
|
print(f"Error getting connected devices: {e}") |
|
|
|
|
|
return connected_macs |
|
|
|
|
|
def is_valid_mac(self, mac: str) -> bool: |
|
|
"""Check if a string is a valid MAC address.""" |
|
|
if not mac: |
|
|
return False |
|
|
clean_mac = mac.replace('-', '').replace(':', '').replace('.', '') |
|
|
if len(clean_mac) != 12: |
|
|
return False |
|
|
try: |
|
|
int(clean_mac, 16) |
|
|
return True |
|
|
except ValueError: |
|
|
return False |
|
|
|
|
|
def normalize_mac(self, mac: str) -> str: |
|
|
"""Normalize MAC address to lowercase with dashes.""" |
|
|
clean_mac = mac.replace('-', '').replace(':', '').replace('.', '').lower() |
|
|
return '-'.join([clean_mac[i:i+2] for i in range(0, 12, 2)]) |
|
|
|
|
|
def get_employee_name(self, mac: str) -> str: |
|
|
"""Get employee name from MAC address.""" |
|
|
return self.employees.get(mac, f"Unknown ({mac})") |
|
|
|
|
|
def process_scan_results(self, detected_macs: Set[str]) -> List[Tuple[str, str, datetime]]: |
|
|
"""Process scan results and generate attendance events.""" |
|
|
current_time = datetime.now() |
|
|
today_str = current_time.strftime("%Y-%m-%d") |
|
|
events = [] |
|
|
|
|
|
known_detected = {mac for mac in detected_macs if mac in self.employees} |
|
|
|
|
|
for mac in self.employees: |
|
|
employee_info = self.db.get_employee_by_mac(mac) |
|
|
if not employee_info: |
|
|
continue |
|
|
employee_id = employee_info["id"] |
|
|
|
|
|
current_state = self.employee_states.get(mac, {}) |
|
|
was_present = current_state.get("is_present", False) |
|
|
on_break = current_state.get("on_break", False) |
|
|
time_in = current_state.get("time_in") |
|
|
|
|
|
is_currently_present = mac in known_detected |
|
|
|
|
|
|
|
|
if is_currently_present and not was_present: |
|
|
event_type = "time_in" |
|
|
self.db.log_attendance_event(mac, event_type, current_time) |
|
|
events.append((mac, event_type, current_time)) |
|
|
self.employee_states[mac]["is_present"] = True |
|
|
self.employee_states[mac]["last_seen"] = current_time |
|
|
self.employee_states[mac]["time_in"] = current_time |
|
|
self.employee_states[mac]["on_break"] = False |
|
|
self.employee_states[mac]["break_start_time"] = None |
|
|
|
|
|
|
|
|
self.db.update_daily_summary(employee_id, today_str, |
|
|
time_in=current_time.strftime("%H:%M:%S"), status="Present") |
|
|
|
|
|
|
|
|
elif was_present and not is_currently_present and not on_break: |
|
|
event_type = "break_start" |
|
|
self.db.log_attendance_event(mac, event_type, current_time) |
|
|
events.append((mac, event_type, current_time)) |
|
|
self.employee_states[mac]["is_present"] = False |
|
|
self.employee_states[mac]["last_seen"] = current_time |
|
|
self.employee_states[mac]["on_break"] = True |
|
|
self.employee_states[mac]["break_start_time"] = current_time |
|
|
|
|
|
|
|
|
self.db.update_daily_summary(employee_id, today_str, status="On Break") |
|
|
|
|
|
|
|
|
elif not was_present and is_currently_present and on_break: |
|
|
event_type = "break_end" |
|
|
self.db.log_attendance_event(mac, event_type, current_time) |
|
|
events.append((mac, event_type, current_time)) |
|
|
self.employee_states[mac]["is_present"] = True |
|
|
self.employee_states[mac]["last_seen"] = current_time |
|
|
self.employee_states[mac]["on_break"] = False |
|
|
self.employee_states[mac]["break_start_time"] = None |
|
|
|
|
|
|
|
|
durations = self.db.calculate_durations(employee_id, today_str) |
|
|
self.db.update_daily_summary(employee_id, today_str, status="Present", |
|
|
total_break_duration=durations["total_break_duration"]) |
|
|
|
|
|
|
|
|
elif was_present and not is_currently_present and not on_break and time_in: |
|
|
|
|
|
event_type = "time_out" |
|
|
self.db.log_attendance_event(mac, event_type, current_time) |
|
|
events.append((mac, event_type, current_time)) |
|
|
self.employee_states[mac]["is_present"] = False |
|
|
self.employee_states[mac]["last_seen"] = current_time |
|
|
self.employee_states[mac]["on_break"] = False |
|
|
|
|
|
|
|
|
durations = self.db.calculate_durations(employee_id, today_str) |
|
|
self.db.update_daily_summary(employee_id, today_str, |
|
|
time_out=current_time.strftime("%H:%M:%S"), status="Absent", |
|
|
total_work_duration=durations["total_work_duration"]) |
|
|
|
|
|
|
|
|
if is_currently_present: |
|
|
self.employee_states[mac]["last_seen"] = current_time |
|
|
|
|
|
|
|
|
timeout_time = current_time.replace(hour=self.office_timeout_hour, minute=self.office_timeout_minute, second=0, microsecond=0) |
|
|
if current_time >= timeout_time and was_present and time_in and not on_break: |
|
|
|
|
|
summary = self.db.get_daily_summary_for_employee(employee_id, today_str) |
|
|
if summary and summary["status"] != "Timed Out": |
|
|
event_type = "timeout_5pm" |
|
|
self.db.log_attendance_event(mac, event_type, timeout_time) |
|
|
events.append((mac, event_type, timeout_time)) |
|
|
self.employee_states[mac]["is_present"] = False |
|
|
self.employee_states[mac]["last_seen"] = timeout_time |
|
|
|
|
|
|
|
|
durations = self.db.calculate_durations(employee_id, today_str) |
|
|
self.db.update_daily_summary(employee_id, today_str, |
|
|
time_out=timeout_time.strftime("%H:%M:%S"), status="Timed Out", |
|
|
total_work_duration=durations["total_work_duration"]) |
|
|
|
|
|
|
|
|
|
|
|
self.db.export_daily_summary_to_csv(today_str) |
|
|
|
|
|
return events |
|
|
|
|
|
def scan_once(self) -> List[Tuple[str, str, datetime]]: |
|
|
"""Perform one scan and return any events.""" |
|
|
print(f"Scanning network at {datetime.now().strftime('%H:%M:%S')}...") |
|
|
detected_macs = self.get_connected_devices() |
|
|
print(f"Detected {len(detected_macs)} devices") |
|
|
|
|
|
events = self.process_scan_results(detected_macs) |
|
|
return events |
|
|
|
|
|
def start_monitoring(self): |
|
|
"""Start continuous monitoring loop.""" |
|
|
print("Starting WiFi Attendance Tracker...") |
|
|
print(f"Monitoring {len(self.employees)} employees") |
|
|
print(f"Scan interval: {self.scan_interval} seconds") |
|
|
print("Press Ctrl+C to stop") |
|
|
|
|
|
try: |
|
|
while True: |
|
|
self.scan_once() |
|
|
time.sleep(self.scan_interval) |
|
|
except KeyboardInterrupt: |
|
|
print("\nStopping attendance tracker...") |
|
|
|
|
|
def get_current_status(self) -> Dict[str, dict]: |
|
|
"""Get current attendance status of all employees.""" |
|
|
status = {} |
|
|
current_time = datetime.now() |
|
|
|
|
|
for mac, name in self.employees.items(): |
|
|
state = self.employee_states.get(mac, {}) |
|
|
is_present = state.get("is_present", False) |
|
|
last_seen_time = state.get("last_seen") |
|
|
on_break = state.get("on_break", False) |
|
|
|
|
|
display_status = "Present" if is_present else ("On Break" if on_break else "Absent") |
|
|
|
|
|
status[mac] = { |
|
|
'name': name, |
|
|
'mac': mac, |
|
|
'is_present': is_present, |
|
|
'status': display_status, |
|
|
'last_seen': last_seen_time.strftime('%Y-%m-%d %H:%M:%S') if last_seen_time else 'Never', |
|
|
'time_in': state.get('time_in').strftime('%H:%M:%S') if state.get('time_in') else 'N/A' |
|
|
} |
|
|
|
|
|
return status |
|
|
|
|
|
def sync_employees_from_json(self): |
|
|
"""Sync employees from employees.json to database.""" |
|
|
try: |
|
|
with open(self.employees_path, 'r') as f: |
|
|
employees_list = json.load(f) |
|
|
|
|
|
synced_count = 0 |
|
|
for emp in employees_list: |
|
|
name = emp.get('name') |
|
|
mac_address = emp.get('mac_address') |
|
|
picture_path = emp.get('picture') |
|
|
|
|
|
if name and mac_address: |
|
|
if self.db.add_employee(name, mac_address, picture_path=picture_path): |
|
|
synced_count += 1 |
|
|
|
|
|
today_str = datetime.now().strftime('%Y-%m-%d') |
|
|
employee_info = self.db.get_employee_by_mac(mac_address) |
|
|
if employee_info: |
|
|
self.db.update_daily_summary(employee_info['id'], today_str, status='Absent') |
|
|
|
|
|
print(f"Synced {synced_count} new employees from {self.employees_path}") |
|
|
return synced_count |
|
|
|
|
|
except FileNotFoundError: |
|
|
print(f"Employees file {self.employees_path} not found") |
|
|
return 0 |
|
|
except json.JSONDecodeError: |
|
|
print(f"Invalid JSON in {self.employees_path}") |
|
|
return 0 |
|
|
except Exception as e: |
|
|
print(f"Error syncing employees: {e}") |
|
|
return 0 |
|
|
|
|
|
if __name__ == "__main__": |
|
|
tracker = AttendanceTracker() |
|
|
tracker.sync_employees_from_json() |
|
|
tracker.start_monitoring() |
|
|
|
|
|
|