Spaces:
Runtime error
Runtime error
| # Suppress TensorFlow oneDNN warnings | |
| import os | |
| os.environ['TF_ENABLE_ONEDNN_OPTS'] = '0' | |
| # Standard Library Imports | |
| import base64 | |
| import json | |
| import logging | |
| import queue | |
| import threading | |
| import time | |
| from datetime import datetime, date | |
| from io import BytesIO | |
| from typing import Tuple, Optional, List | |
| import pickle | |
| # Third-Party Imports | |
| import cv2 | |
| import gradio as gr | |
| import numpy as np | |
| import pandas as pd | |
| from PIL import Image | |
| import requests | |
| from dotenv import load_dotenv | |
| from deepface import DeepFace | |
| from retrying import retry | |
| from simple_salesforce import Salesforce | |
| # --- CONFIGURATION --- | |
| # Setup logging | |
| logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") | |
| logger = logging.getLogger(__name__) | |
| # Load environment variables from .env file | |
| load_dotenv() | |
| # Hugging Face API configuration | |
| HF_API_URL = "https://api-inference.huggingface.co/models/Salesforce/blip-image-captioning-base" | |
| HF_API_TOKEN = os.getenv("HUGGINGFACE_API_TOKEN") | |
| # Salesforce configuration | |
| SF_CREDENTIALS = { | |
| "username": os.getenv("SF_USERNAME", "smartlabour@attendance.system"), | |
| "password": os.getenv("SF_PASSWORD", "#Prashanth@1234"), | |
| "security_token": os.getenv("SF_SECURITY_TOKEN", "7xPmtDFoWlZUGK0V2QSwFZJ6c"), | |
| "domain": os.getenv("SF_DOMAIN", "login") | |
| } | |
| # --- SALESFORCE CONNECTION --- | |
| def connect_to_salesforce() -> Optional[Salesforce]: | |
| """Establish a connection to Salesforce with retry logic.""" | |
| try: | |
| sf = Salesforce(**SF_CREDENTIALS) | |
| sf.describe() # Test connection | |
| logger.info("β Successfully connected to Salesforce.") | |
| return sf | |
| except Exception as e: | |
| logger.error(f"β Salesforce connection failed: {e}") | |
| raise | |
| # --- CORE LOGIC --- | |
| class AttendanceSystem: | |
| """ | |
| Manages all backend logic for the face recognition attendance system. | |
| """ | |
| def __init__(self): | |
| # State Management | |
| self.processing_thread = None | |
| self.is_processing = threading.Event() | |
| self.frame_queue = queue.Queue(maxsize=10) | |
| self.error_message = None | |
| self.last_processed_frame = None | |
| self.final_log = None | |
| # Data Storage | |
| self.known_face_embeddings: List[np.ndarray] = [] | |
| self.known_face_names: List[str] = [] | |
| self.known_face_ids: List[str] = [] | |
| self.next_worker_id: int = 1 | |
| # Session Tracking | |
| self.last_recognition_time = {} | |
| self.recognition_cooldown = 10 | |
| self.session_log: List[str] = [] | |
| self.session_marked_present = set() | |
| self.session_registered = set() | |
| self.face_recognition_buffer = {} | |
| self.buffer_threshold = 3 | |
| self.frame_skip_counter = 0 | |
| # Initialize | |
| self.sf = connect_to_salesforce() | |
| self._create_directories() | |
| self.load_worker_data() | |
| def _create_directories(self): | |
| os.makedirs("data/faces", exist_ok=True) | |
| def load_worker_data(self): | |
| logger.info("Loading worker data...") | |
| if self.sf: | |
| try: | |
| workers = self.sf.query_all("SELECT Worker_ID__c, Name, Face_Embedding__c FROM Worker__c")['records'] | |
| if not workers: | |
| self._load_local_worker_data() | |
| return | |
| temp_embeddings, temp_names, temp_ids, max_id = [], [], [], 0 | |
| for worker in workers: | |
| if worker.get('Face_Embedding__c'): | |
| temp_embeddings.append(np.array(json.loads(worker['Face_Embedding__c']))) | |
| temp_names.append(worker['Name']) | |
| temp_ids.append(worker['Worker_ID__c']) | |
| try: | |
| worker_num = int(worker['Worker_ID__c'][1:]) | |
| if worker_num > max_id: | |
| max_id = worker_num | |
| except (ValueError, TypeError): | |
| continue | |
| self.known_face_embeddings = temp_embeddings | |
| self.known_face_names = temp_names | |
| self.known_face_ids = temp_ids | |
| self.next_worker_id = max_id + 1 | |
| self.save_local_worker_data() | |
| logger.info(f"β Loaded {len(self.known_face_ids)} workers from Salesforce.") | |
| except Exception as e: | |
| logger.error(f"β Error loading from Salesforce: {e}. Attempting local load.") | |
| self._load_local_worker_data() | |
| else: | |
| logger.warning("Salesforce not connected. Loading from local cache.") | |
| self._load_local_worker_data() | |
| def _load_local_worker_data(self): | |
| try: | |
| if os.path.exists("data/workers.pkl"): | |
| with open("data/workers.pkl", "rb") as f: | |
| data = pickle.load(f) | |
| self.known_face_embeddings = data.get("embeddings", []) | |
| self.known_face_names = data.get("names", []) | |
| self.known_face_ids = data.get("ids", []) | |
| self.next_worker_id = data.get("next_id", 1) | |
| logger.info(f"β Loaded {len(self.known_face_ids)} workers from local cache.") | |
| except Exception as e: | |
| logger.error(f"β Error loading local data: {e}") | |
| def save_local_worker_data(self): | |
| try: | |
| worker_data = { | |
| "embeddings": self.known_face_embeddings, | |
| "names": self.known_face_names, | |
| "ids": self.known_face_ids, | |
| "next_id": self.next_worker_id | |
| } | |
| with open("data/workers.pkl", "wb") as f: | |
| pickle.dump(worker_data, f) | |
| except Exception as e: | |
| logger.error(f"β Error saving local worker data: {e}") | |
| # --- Registration and Attendance --- | |
| def register_worker_manual(self, image: Image.Image, name: str) -> Tuple[str, str]: | |
| if image is None or not name.strip(): | |
| return "β Please provide both image and name!", self.get_registered_workers_info() | |
| try: | |
| image_array = np.array(image) | |
| DeepFace.analyze(img_path=image_array, actions=['emotion'], enforce_detection=True) | |
| embedding = DeepFace.represent(img_path=image_array, model_name='Facenet')[0]['embedding'] | |
| if self._is_duplicate_face(embedding): | |
| return f"β Face matches an existing worker!", self.get_registered_workers_info() | |
| worker_id = f"W{self.next_worker_id:04d}" | |
| name = name.strip().title() | |
| self._add_worker_to_system(worker_id, name, embedding, image_array) | |
| self.save_local_worker_data() | |
| self.load_worker_data() | |
| return f"β {name} registered with ID: {worker_id}!", self.get_registered_workers_info() | |
| except ValueError: | |
| return "β No face detected in the image!", self.get_registered_workers_info() | |
| except Exception as e: | |
| return f"β Registration error: {e}", self.get_registered_workers_info() | |
| def _register_worker_auto(self, face_image: np.ndarray, face_embedding: List[float]) -> Optional[Tuple[str, str]]: | |
| try: | |
| # Check for duplicates with strict threshold | |
| if self._is_duplicate_face(face_embedding, threshold=10.0): | |
| return None | |
| worker_id = f"W{self.next_worker_id:04d}" | |
| # Check if already auto-registered in this session | |
| if worker_id in self.session_registered: | |
| return None | |
| worker_name = f"Unknown Worker {self.next_worker_id}" | |
| self._add_worker_to_system(worker_id, worker_name, face_embedding, face_image) | |
| self.save_local_worker_data() | |
| # Mark as registered in this session | |
| self.session_registered.add(worker_id) | |
| log_msg = f"π [{datetime.now().strftime('%H:%M:%S')}] Auto-registered: {worker_name} ({worker_id})" | |
| self.session_log.append(log_msg) | |
| logger.info(log_msg) | |
| return worker_id, worker_name | |
| except Exception as e: | |
| logger.error(f"β Auto-registration error: {e}") | |
| return None | |
| def _add_worker_to_system(self, worker_id: str, name: str, embedding: List[float], image_array: np.ndarray): | |
| self.known_face_embeddings.append(np.array(embedding)) | |
| self.known_face_names.append(name) | |
| self.known_face_ids.append(worker_id) | |
| self.next_worker_id += 1 | |
| face_pil = Image.fromarray(cv2.cvtColor(image_array, cv2.COLOR_BGR2RGB)) | |
| face_pil.save(f"data/faces/{worker_id}.jpg") | |
| caption = self._get_image_caption(face_pil) | |
| if self.sf: | |
| try: | |
| worker_record = self.sf.Worker__c.create({ | |
| 'Name': name, | |
| 'Worker_ID__c': worker_id, | |
| 'Face_Embedding__c': json.dumps(embedding), | |
| 'Image_Caption__c': caption | |
| }) | |
| image_url = self._upload_image_to_salesforce(face_pil, worker_record['id'], worker_id) | |
| if image_url: | |
| self.sf.Worker__c.update(worker_record['id'], {'Image_URL__c': image_url}) | |
| logger.info(f"β Worker {worker_id} synced to Salesforce.") | |
| except Exception as e: | |
| logger.error(f"β Salesforce sync error for {worker_id}: {e}") | |
| def _is_duplicate_face(self, embedding: List[float], threshold: float = 10.0) -> bool: | |
| """Strict duplicate detection with cosine similarity""" | |
| if not self.known_face_embeddings: | |
| return False | |
| embedding_array = np.array(embedding) | |
| for known_embedding in self.known_face_embeddings: | |
| # Normalize vectors | |
| embedding_array_norm = embedding_array / np.linalg.norm(embedding_array) | |
| known_embedding_norm = known_embedding / np.linalg.norm(known_embedding) | |
| # Calculate cosine similarity | |
| cosine_sim = np.dot(embedding_array_norm, known_embedding_norm) | |
| if cosine_sim > 0.85: # Strict threshold | |
| return True | |
| return False | |
| def mark_attendance(self, worker_id: str, worker_name: str) -> bool: | |
| """Attendance marking with strict checks""" | |
| if worker_id in self.session_marked_present: | |
| return False | |
| today_str = date.today().isoformat() | |
| if self._has_attended_today(worker_id, today_str): | |
| return False | |
| current_time = datetime.now() | |
| if self.sf: | |
| try: | |
| self.sf.Attendance__c.create({ | |
| 'Worker_ID__c': worker_id, | |
| 'Name__c': worker_name, | |
| 'Date__c': today_str, | |
| 'Timestamp__c': current_time.isoformat(), | |
| 'Status__c': "Present" | |
| }) | |
| except Exception as e: | |
| logger.error(f"β Error saving attendance to Salesforce: {e}") | |
| self.session_marked_present.add(worker_id) | |
| log_msg = f"β [{current_time.strftime('%H:%M:%S')}] Marked Present: {worker_name} ({worker_id})" | |
| self.session_log.append(log_msg) | |
| return True | |
| def _has_attended_today(self, worker_id: str, today_str: str) -> bool: | |
| last_seen = self.last_recognition_time.get(worker_id) | |
| if last_seen and (time.time() - last_seen < self.recognition_cooldown): | |
| return True | |
| if self.sf: | |
| try: | |
| query = f"SELECT Id FROM Attendance__c WHERE Worker_ID__c = '{worker_id}' AND Date__c = '{today_str}'" | |
| if self.sf.query(query)['totalSize'] > 0: | |
| return True | |
| except Exception as e: | |
| logger.error(f"Attendance check error: {e}") | |
| return False | |
| def _find_best_match(self, target_embedding: np.ndarray) -> Tuple[int, float]: | |
| """Find best match using cosine similarity""" | |
| if not self.known_face_embeddings: | |
| return -1, 0.0 | |
| best_match_idx = -1 | |
| best_score = 0.0 | |
| target_norm = target_embedding / np.linalg.norm(target_embedding) | |
| for i, known_embedding in enumerate(self.known_face_embeddings): | |
| known_norm = known_embedding / np.linalg.norm(known_embedding) | |
| cosine_sim = np.dot(target_norm, known_norm) | |
| if cosine_sim > best_score: | |
| best_score = cosine_sim | |
| best_match_idx = i | |
| return best_match_idx, best_score | |
| # --- Video Processing --- | |
| def process_frame(self, frame: np.ndarray) -> np.ndarray: | |
| """Frame processing with strict recognition rules""" | |
| try: | |
| # Skip frames for performance | |
| self.frame_skip_counter += 1 | |
| if self.frame_skip_counter % 3 != 0: | |
| return frame | |
| # Detect faces with multiple backends | |
| face_objs = [] | |
| try: | |
| face_objs = DeepFace.extract_faces( | |
| img_path=frame, | |
| detector_backend='opencv', | |
| enforce_detection=False | |
| ) | |
| except Exception as e: | |
| logger.warning(f"OpenCV detector failed: {e}") | |
| try: | |
| face_objs = DeepFace.extract_faces( | |
| img_path=frame, | |
| detector_backend='mtcnn', | |
| enforce_detection=False | |
| ) | |
| except Exception as e: | |
| logger.warning(f"MTCNN detector failed: {e}") | |
| if face_objs: | |
| logger.debug(f"Found {len(face_objs)} faces in frame") | |
| for face_obj in face_objs: | |
| confidence = face_obj.get('confidence', 0.0) | |
| # Strict confidence threshold | |
| if confidence < 0.90: | |
| continue | |
| facial_area = face_obj['facial_area'] | |
| x, y, w, h = facial_area['x'], facial_area['y'], facial_area['w'], facial_area['h'] | |
| face_image = frame[y:y+h, x:x+w] | |
| if face_image.size == 0 or w < 50 or h < 50: | |
| continue | |
| try: | |
| embedding_obj = DeepFace.represent( | |
| img_path=face_image, | |
| model_name='Facenet', | |
| enforce_detection=False | |
| ) | |
| embedding = embedding_obj[0]['embedding'] | |
| embedding_array = np.array(embedding) | |
| except Exception as e: | |
| logger.warning(f"Embedding generation failed: {e}") | |
| continue | |
| color = (0, 0, 255) # Default red for unknown | |
| worker_id = None | |
| worker_name = "Unknown" | |
| if self.known_face_embeddings: | |
| match_index, match_score = self._find_best_match(embedding_array) | |
| # Strict matching threshold | |
| if match_index != -1 and match_score > 0.85: | |
| worker_id = self.known_face_ids[match_index] | |
| worker_name = self.known_face_names[match_index] | |
| color = (0, 255, 0) # Green for known | |
| # Buffer recognition | |
| buffer_key = f"{worker_id}" | |
| if buffer_key not in self.face_recognition_buffer: | |
| self.face_recognition_buffer[buffer_key] = { | |
| 'count': 1, | |
| 'last_time': time.time() | |
| } | |
| else: | |
| self.face_recognition_buffer[buffer_key]['count'] += 1 | |
| self.face_recognition_buffer[buffer_key]['last_time'] = time.time() | |
| # Mark attendance after consistent detections | |
| if (self.face_recognition_buffer[buffer_key]['count'] >= self.buffer_threshold and | |
| confidence >= 0.90): | |
| if self.mark_attendance(worker_id, worker_name): | |
| self.last_recognition_time[worker_id] = time.time() | |
| del self.face_recognition_buffer[buffer_key] | |
| else: | |
| # Only register new if very different from existing faces | |
| if match_score < 0.70: # Low similarity threshold | |
| color = (0, 165, 255) # Orange for new | |
| new_worker = self._register_worker_auto(face_image, embedding) | |
| if new_worker: | |
| worker_id, worker_name = new_worker | |
| if confidence >= 0.90: | |
| self.mark_attendance(worker_id, worker_name) | |
| else: | |
| # No known faces, auto-register with high confidence | |
| if confidence >= 0.90: | |
| color = (0, 165, 255) # Orange for new | |
| new_worker = self._register_worker_auto(face_image, embedding) | |
| if new_worker: | |
| worker_id, worker_name = new_worker | |
| self.mark_attendance(worker_id, worker_name) | |
| # Clean old buffer entries | |
| current_time = time.time() | |
| for key in list(self.face_recognition_buffer.keys()): | |
| if current_time - self.face_recognition_buffer[key]['last_time'] > 5.0: | |
| del self.face_recognition_buffer[key] | |
| # Draw bounding box and label | |
| label = f"{worker_name}" + (f" ({worker_id})" if worker_id else "") | |
| cv2.rectangle(frame, (x, y), (x+w, y+h), color, 2) | |
| cv2.putText(frame, label, (x, y-10), cv2.FONT_HERSHEY_SIMPLEX, 0.7, color, 2) | |
| return frame | |
| except Exception as e: | |
| logger.error(f"Frame processing error: {e}") | |
| return frame | |
| def _processing_loop(self, source): | |
| video_capture = cv2.VideoCapture(source) | |
| if not video_capture.isOpened(): | |
| err_msg = "β Could not open video source" | |
| self.error_message = err_msg | |
| self.is_processing.clear() | |
| return | |
| while self.is_processing.is_set(): | |
| ret, frame = video_capture.read() | |
| if not ret: | |
| break | |
| processed_frame = self.process_frame(frame) | |
| if not self.frame_queue.full(): | |
| self.frame_queue.put(processed_frame) | |
| self.last_processed_frame = processed_frame | |
| time.sleep(0.05) | |
| self.final_log = self.session_log.copy() | |
| video_capture.release() | |
| self.is_processing.clear() | |
| def start_processing(self, source) -> str: | |
| if self.is_processing.is_set(): | |
| return "β οΈ Processing is already active." | |
| # Reset session state | |
| self.session_log.clear() | |
| self.last_recognition_time.clear() | |
| self.session_marked_present.clear() | |
| self.session_registered.clear() | |
| self.face_recognition_buffer.clear() | |
| self.error_message = None | |
| self.last_processed_frame = None | |
| self.final_log = None | |
| self.frame_skip_counter = 0 | |
| self.is_processing.set() | |
| self.processing_thread = threading.Thread( | |
| target=self._processing_loop, | |
| args=(source,), | |
| daemon=True | |
| ) | |
| self.processing_thread.start() | |
| return "β Started processing..." | |
| def stop_processing(self) -> str: | |
| self.is_processing.clear() | |
| self.error_message = None | |
| self.last_processed_frame = None | |
| self.final_log = None | |
| self.face_recognition_buffer.clear() | |
| return "β Processing stopped." | |
| # --- Helper Methods --- | |
| def _get_image_caption(self, image: Image.Image) -> str: | |
| if not HF_API_TOKEN: | |
| return "Hugging Face API token not configured." | |
| try: | |
| buffered = BytesIO() | |
| image.save(buffered, format="JPEG") | |
| headers = {"Authorization": f"Bearer {HF_API_TOKEN}"} | |
| response = requests.post(HF_API_URL, headers=headers, data=buffered.getvalue()) | |
| response.raise_for_status() | |
| return response.json()[0].get("generated_text", "No caption found.") | |
| except Exception as e: | |
| logger.error(f"Hugging Face API error: {e}") | |
| return "Caption generation failed." | |
| def _upload_image_to_salesforce(self, image: Image.Image, record_id: str, worker_id: str) -> Optional[str]: | |
| if not self.sf: | |
| return None | |
| try: | |
| buffered = BytesIO() | |
| image.save(buffered, format="JPEG") | |
| encoded_image = base64.b64encode(buffered.getvalue()).decode('utf-8') | |
| cv = self.sf.ContentVersion.create({ | |
| 'Title': f'Image_{worker_id}', | |
| 'PathOnClient': f'{worker_id}.jpg', | |
| 'VersionData': encoded_image, | |
| 'FirstPublishLocationId': record_id | |
| }) | |
| return f"/{cv['id']}" | |
| except Exception as e: | |
| logger.error(f"Salesforce image upload error: {e}") | |
| return None | |
| def get_registered_workers_info(self) -> str: | |
| if not self.sf: | |
| return "β Salesforce not connected." | |
| try: | |
| records = self.sf.query_all( | |
| "SELECT Name, Worker_ID__c FROM Worker__c ORDER BY Name" | |
| )['records'] | |
| if not records: | |
| return "No workers registered." | |
| worker_list = "\n".join( | |
| f"- **{w['Name']}** (ID: {w['Worker_ID__c']})" | |
| for w in records | |
| ) | |
| return f"**π₯ Registered Workers ({len(records)})**\n{worker_list}" | |
| except Exception as e: | |
| return f"Error: {e}" | |
| # --- GRADIO UI --- | |
| attendance_system = AttendanceSystem() | |
| def create_interface(): | |
| with gr.Blocks(theme=gr.themes.Soft(), title="Attendance System") as demo: | |
| gr.Markdown("# π― Advanced Face Recognition Attendance System") | |
| with gr.Tabs(): | |
| with gr.Tab("βοΈ Controls & Status"): | |
| gr.Markdown("### 1. Choose Input Source & Start Processing") | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| selected_tab_index = gr.Number(value=0, visible=False) | |
| with gr.Tabs() as video_tabs: | |
| with gr.Tab("Live Camera", id=0): | |
| camera_source = gr.Number(label="Camera Source", value=0, precision=0) | |
| with gr.Tab("Upload Video", id=1): | |
| video_file = gr.Video(label="Upload Video File", sources=["upload"]) | |
| with gr.Column(scale=1): | |
| start_btn = gr.Button("βΆοΈ Start Processing", variant="primary") | |
| stop_btn = gr.Button("βΉοΈ Stop Processing", variant="stop") | |
| status_box = gr.Textbox(label="Status", interactive=False, value="System Ready.") | |
| gr.Markdown("### 2. View Results in the 'Output & Log' Tab") | |
| gr.Markdown("**π¨ Color Coding:** <font color='green'>Green</font> = Known, <font color='orange'>Orange</font> = New, <font color='red'>Red</font> = Unknown") | |
| with gr.Tab("π Output & Log"): | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| video_output = gr.Image(label="Recognition Output", interactive=False) | |
| with gr.Column(scale=1): | |
| session_log_display = gr.Markdown(label="π Session Log", value="System is ready.") | |
| with gr.Tab("π€ Worker Management"): | |
| with gr.Row(): | |
| with gr.Column(): | |
| register_image = gr.Image(label="Upload Worker's Photo", type="pil") | |
| register_name = gr.Textbox(label="Worker's Full Name") | |
| register_btn = gr.Button("Register Worker", variant="primary") | |
| register_output = gr.Textbox(label="Registration Status", interactive=False) | |
| with gr.Column(): | |
| registered_workers_info = gr.Markdown(value=attendance_system.get_registered_workers_info()) | |
| refresh_workers_btn = gr.Button("π Refresh List") | |
| # --- Event Handlers --- | |
| def on_tab_select(evt: gr.SelectData): | |
| return evt.index | |
| video_tabs.select(fn=on_tab_select, inputs=None, outputs=[selected_tab_index]) | |
| def start_wrapper(tab_index, cam_src, vid_path): | |
| source = cam_src if tab_index == 0 else vid_path | |
| if source is None: | |
| return "Please provide an input source." | |
| return attendance_system.start_processing(source) | |
| start_btn.click( | |
| fn=start_wrapper, | |
| inputs=[selected_tab_index, camera_source, video_file], | |
| outputs=[status_box] | |
| ) | |
| stop_btn.click( | |
| fn=attendance_system.stop_processing, | |
| inputs=None, | |
| outputs=[status_box] | |
| ) | |
| register_btn.click( | |
| fn=attendance_system.register_worker_manual, | |
| inputs=[register_image, register_name], | |
| outputs=[register_output, registered_workers_info] | |
| ) | |
| refresh_workers_btn.click( | |
| fn=attendance_system.get_registered_workers_info, | |
| outputs=[registered_workers_info] | |
| ) | |
| def update_ui_generator(): | |
| while True: | |
| if attendance_system.error_message: | |
| yield None, attendance_system.error_message | |
| time.sleep(2) | |
| attendance_system.error_message = None | |
| continue | |
| if attendance_system.is_processing.is_set(): | |
| frame, log_md = None, "\n".join(reversed(attendance_system.session_log)) or "Processing..." | |
| try: | |
| if not attendance_system.frame_queue.empty(): | |
| frame = attendance_system.frame_queue.get_nowait() | |
| if frame is not None: | |
| frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) | |
| except queue.Empty: | |
| pass | |
| yield frame, log_md | |
| else: | |
| if attendance_system.last_processed_frame is not None: | |
| final_frame = cv2.cvtColor(attendance_system.last_processed_frame, cv2.COLOR_BGR2RGB) | |
| final_log_md = "\n".join(reversed(attendance_system.final_log)) or "Processing complete. No log entries." | |
| yield final_frame, final_log_md | |
| else: | |
| yield None, "System stopped. Go to 'Controls & Status' to start." | |
| time.sleep(0.1) | |
| demo.load( | |
| fn=update_ui_generator, | |
| outputs=[video_output, session_log_display] | |
| ) | |
| return demo | |
| if __name__ == "__main__": | |
| app = create_interface() | |
| app.queue() | |
| app.launch(server_name="0.0.0.0", server_port=7860, show_error=True, debug=True) |