Spaces:
Sleeping
Sleeping
| # Suppress TensorFlow oneDNN warnings | |
| import os | |
| os.environ['TF_ENABLE_ONEDNN_OPTS'] = '0' | |
| # Standard Library Imports | |
| import base64 | |
| import json | |
| import logging | |
| import queue | |
| import threading | |
| import time | |
| from datetime import datetime, date, timedelta | |
| from io import BytesIO | |
| from typing import Tuple, Optional, List, Dict | |
| import pickle | |
| from collections import OrderedDict | |
| # Third-Party Imports | |
| import cv2 | |
| import gradio as gr | |
| import numpy as np | |
| from PIL import Image | |
| import requests | |
| from dotenv import load_dotenv | |
| from deepface import DeepFace | |
| from retrying import retry | |
| from simple_salesforce import Salesforce | |
| from scipy.spatial import distance as dist | |
| # --- CONFIGURATION --- | |
| logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") | |
| logger = logging.getLogger(__name__) | |
| load_dotenv() | |
| # --- API & SALESFORCE CREDENTIALS --- | |
| HF_API_URL = "https://api-inference.huggingface.co/models/Salesforce/blip-image-captioning-base" | |
| HF_API_TOKEN = os.getenv("HUGGINGFACE_API_TOKEN") | |
| SF_CREDENTIALS = { | |
| "username": os.getenv("SF_USERNAME", "smartlabour@attendance.system"), | |
| "password": os.getenv("SF_PASSWORD", "#Prashanth@1234"), | |
| "security_token": os.getenv("SF_SECURITY_TOKEN", "7xPmtDFoWlZUGK0V2QSwFZJ6c"), | |
| "domain": os.getenv("SF_DOMAIN", "login") | |
| } | |
| # --- PERFORMANCE & ACCURACY TUNING --- | |
| FACE_MATCH_THRESHOLD = 0.6 # Stricter threshold for accurate matching | |
| AUTO_REGISTER_CONFIDENCE = 0.98 # Confidence to register a new face | |
| MAX_DISAPPEARED_FRAMES = 20 # Frames to keep tracking a lost face | |
| BATCH_SIZE = 5 # Process faces in batches for efficiency | |
| # --- FACE TRACKER CLASS --- | |
| class FaceTracker: | |
| def __init__(self, attendance_system): | |
| self.attendance_system = attendance_system | |
| self.next_object_id = 0 | |
| self.objects = OrderedDict() | |
| self.disappeared = OrderedDict() | |
| self.face_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml') | |
| def register(self, centroid, bbox): | |
| object_id = self.next_object_id | |
| self.objects[object_id] = {'centroid': centroid, 'bbox': bbox, 'name': "Identifying...", 'worker_id': None, 'color': (255, 255, 0)} # Cyan | |
| self.disappeared[object_id] = 0 | |
| self.next_object_id += 1 | |
| threading.Thread(target=self.recognize_face, args=(object_id, bbox)).start() | |
| def recognize_face(self, object_id, bbox): | |
| (startX, startY, endX, endY) = bbox | |
| face_image = self.attendance_system.current_frame[startY:endY, startX:endX] | |
| if face_image.size == 0: | |
| return | |
| result = self.attendance_system.identify_and_log_face(face_image) | |
| if result and object_id in self.objects: | |
| self.objects[object_id].update({ | |
| 'name': result['name'], | |
| 'worker_id': result['worker_id'], | |
| 'color': result['color'] | |
| }) | |
| def deregister(self, object_id): | |
| del self.objects[object_id] | |
| del self.disappeared[object_id] | |
| def update(self, frame): | |
| gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) | |
| rects = self.face_cascade.detectMultiScale(gray, scaleFactor=1.1, minNeighbors=5, minSize=(30, 30)) | |
| input_centroids = np.zeros((len(rects), 2), dtype="int") | |
| bboxes = [] | |
| for (i, (x, y, w, h)) in enumerate(rects): | |
| cX = int(x + w / 2.0) | |
| cY = int(y + h / 2.0) | |
| input_centroids[i] = (cX, cY) | |
| bboxes.append((x, y, x + w, y + h)) | |
| if len(self.objects) == 0: | |
| for i in range(len(input_centroids)): | |
| self.register(input_centroids[i], bboxes[i]) | |
| else: | |
| object_ids = list(self.objects.keys()) | |
| object_centroids = [obj['centroid'] for obj in self.objects.values()] | |
| D = dist.cdist(np.array(object_centroids), input_centroids) | |
| rows = D.min(axis=1).argsort() | |
| cols = D.argmin(axis=1)[rows] | |
| used_rows, used_cols = set(), set() | |
| for (row, col) in zip(rows, cols): | |
| if row in used_rows or col in used_cols: | |
| continue | |
| object_id = object_ids[row] | |
| self.objects[object_id]['centroid'] = input_centroids[col] | |
| self.objects[object_id]['bbox'] = bboxes[col] | |
| self.disappeared[object_id] = 0 | |
| used_rows.add(row) | |
| used_cols.add(col) | |
| unused_rows = set(range(D.shape[0])).difference(used_rows) | |
| unused_cols = set(range(D.shape[1])).difference(used_cols) | |
| for row in unused_rows: | |
| object_id = object_ids[row] | |
| self.disappeared[object_id] += 1 | |
| if self.disappeared[object_id] > MAX_DISAPPEARED_FRAMES: | |
| self.deregister(object_id) | |
| for col in unused_cols: | |
| self.register(input_centroids[col], bboxes[col]) | |
| for object_id, data in self.objects.items(): | |
| (startX, startY, endX, endY) = data['bbox'] | |
| label = data['name'] | |
| cv2.rectangle(frame, (startX, startY), (endX, endY), data['color'], 2) | |
| cv2.putText(frame, label, (startX, startY - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.7, data['color'], 2) | |
| return frame | |
| # --- SALESFORCE CONNECTION --- | |
| def connect_to_salesforce() -> Optional[Salesforce]: | |
| try: | |
| sf = Salesforce(**SF_CREDENTIALS) | |
| sf.describe() | |
| logger.info("β Successfully connected to Salesforce.") | |
| return sf | |
| except Exception as e: | |
| logger.error(f"β Salesforce connection failed: {e}") | |
| raise | |
| # --- CORE LOGIC --- | |
| class AttendanceSystem: | |
| def __init__(self): | |
| self.processing_thread = None | |
| self.is_processing = threading.Event() | |
| self.frame_queue = queue.Queue(maxsize=30) | |
| self.error_message = None | |
| self.current_frame = None | |
| self.known_face_embeddings: List[List[np.ndarray]] = [] # Store multiple embeddings per worker | |
| self.known_face_names: List[str] = [] | |
| self.known_face_ids: List[str] = [] | |
| self.next_worker_id: int = 1 | |
| self.session_log_present: List[str] = [] # Workers marked present | |
| self.session_log_new: List[str] = [] # Newly registered workers | |
| self.session_attended_ids: Dict[str, datetime] = {} # Track attendance with timestamps | |
| self.sf = connect_to_salesforce() | |
| self._create_directories() | |
| self.load_worker_data() | |
| def _create_directories(self): | |
| os.makedirs("data/faces", exist_ok=True) | |
| def load_worker_data(self): | |
| logger.info("Loading worker data...") | |
| if self.sf: | |
| try: | |
| workers = self.sf.query_all("SELECT Worker_ID__c, Name, Face_Embeddings__c FROM Worker__c")['records'] | |
| if not workers: | |
| self._load_local_worker_data() | |
| return | |
| temp_embeddings, temp_names, temp_ids, max_id = [], [], [], 0 | |
| for worker in workers: | |
| if worker.get('Face_Embeddings__c'): | |
| embeddings = [np.array(emb) for emb in json.loads(worker['Face_Embeddings__c'])] | |
| temp_embeddings.append(embeddings) | |
| temp_names.append(worker['Name']) | |
| temp_ids.append(worker['Worker_ID__c']) | |
| try: | |
| worker_num = int(''.join(filter(str.isdigit, worker['Worker_ID__c']))) | |
| if worker_num > max_id: | |
| max_id = worker_num | |
| except (ValueError, TypeError): | |
| continue | |
| self.known_face_embeddings = temp_embeddings | |
| self.known_face_names = temp_names | |
| self.known_face_ids = temp_ids | |
| self.next_worker_id = max_id + 1 | |
| self.save_local_worker_data() | |
| logger.info(f"β Loaded {len(self.known_face_ids)} workers from Salesforce. Next ID: {self.next_worker_id}") | |
| except Exception as e: | |
| logger.error(f"β Error loading from Salesforce: {e}. Attempting local load.") | |
| self._load_local_worker_data() | |
| else: | |
| logger.warning("Salesforce not connected. Loading from local cache.") | |
| self._load_local_worker_data() | |
| def _load_local_worker_data(self): | |
| try: | |
| if os.path.exists("data/workers.pkl"): | |
| with open("data/workers.pkl", "rb") as f: | |
| data = pickle.load(f) | |
| self.known_face_embeddings = data.get("embeddings", []) | |
| self.known_face_names = data.get("names", []) | |
| self.known_face_ids = data.get("ids", []) | |
| self.next_worker_id = data.get("next_id", 1) | |
| logger.info(f"β Loaded {len(self.known_face_ids)} workers from local cache. Next ID: {self.next_worker_id}") | |
| except Exception as e: | |
| logger.error(f"β Error loading local data: {e}") | |
| def save_local_worker_data(self): | |
| try: | |
| worker_data = { | |
| "embeddings": self.known_face_embeddings, | |
| "names": self.known_face_names, | |
| "ids": self.known_face_ids, | |
| "next_id": self.next_worker_id | |
| } | |
| with open("data/workers.pkl", "wb") as f: | |
| pickle.dump(worker_data, f) | |
| except Exception as e: | |
| logger.error(f"β Error saving local worker data: {e}") | |
| def register_worker_manual(self, image: Image.Image, name: str) -> Tuple[str, str]: | |
| if image is None or not name.strip(): | |
| return "β Please provide both image and name!", self.get_registered_workers_info() | |
| try: | |
| image_array = np.array(image) | |
| DeepFace.analyze(img_path=image_array, actions=['emotion'], enforce_detection=True) | |
| embedding = DeepFace.represent(img_path=image_array, model_name='Facenet', enforce_detection=False)[0]['embedding'] | |
| if self._is_duplicate_face(embedding): | |
| return f"β Face matches an existing worker!", self.get_registered_workers_info() | |
| worker_id = f"W{self.next_worker_id:04d}" | |
| name = name.strip().title() | |
| self._add_worker_to_system(worker_id, name, [embedding], image_array) | |
| self.save_local_worker_data() | |
| return f"β {name} registered with ID: {worker_id}!", self.get_registered_workers_info() | |
| except ValueError: | |
| return "β No face detected in the image!", self.get_registered_workers_info() | |
| except Exception as e: | |
| logger.error(f"Manual registration error: {e}") | |
| return f"β Registration error: {e}", self.get_registered_workers_info() | |
| def _register_worker_auto(self, face_image: np.ndarray) -> Optional[Dict]: | |
| try: | |
| analysis = DeepFace.analyze(img_path=face_image, actions=['emotion'], enforce_detection=False) | |
| if analysis[0]['face_confidence'] < AUTO_REGISTER_CONFIDENCE: | |
| return None | |
| embedding = DeepFace.represent(img_path=face_image, model_name='Facenet', enforce_detection=False)[0]['embedding'] | |
| if self._is_duplicate_face(embedding): | |
| return None | |
| worker_id = f"W{self.next_worker_id:04d}" | |
| worker_name = f"New Worker {self.next_worker_id}" | |
| self._add_worker_to_system(worker_id, worker_name, [embedding], face_image) | |
| self.save_local_worker_data() | |
| self.mark_attendance(worker_id, worker_name, is_new=True) | |
| return {'name': worker_name, 'worker_id': worker_id} | |
| except Exception as e: | |
| logger.error(f"β Auto-registration error: {e}") | |
| return None | |
| def _add_worker_to_system(self, worker_id: str, name: str, embeddings: List[np.ndarray], image_array: np.ndarray): | |
| self.known_face_embeddings.append(embeddings) | |
| self.known_face_names.append(name) | |
| self.known_face_ids.append(worker_id) | |
| self.next_worker_id += 1 | |
| if self.sf: | |
| try: | |
| image = Image.fromarray(cv2.cvtColor(image_array, cv2.COLOR_BGR2RGB)) | |
| caption = self._get_image_caption(image) | |
| worker_record = self.sf.Worker__c.create({ | |
| 'Worker_ID__c': worker_id, | |
| 'Name': name, | |
| 'Face_Embeddings__c': json.dumps([emb.tolist() for emb in embeddings]), | |
| 'Image_Caption__c': caption | |
| }) | |
| content_doc_id = self._upload_image_to_salesforce(image, worker_record['id'], worker_id) | |
| if content_doc_id: | |
| self.sf.Worker__c.update(worker_record['id'], {'Image_URL__c': content_doc_id}) | |
| except Exception as e: | |
| logger.error(f"β Salesforce worker sync error: {e}") | |
| def _is_duplicate_face(self, embedding: List[float]) -> bool: | |
| if not self.known_face_embeddings: | |
| return False | |
| for worker_embeddings in self.known_face_embeddings: | |
| for known_emb in worker_embeddings: | |
| distance = np.linalg.norm(np.array(embedding) - np.array(known_emb)) | |
| if distance < FACE_MATCH_THRESHOLD: | |
| return True | |
| return False | |
| def mark_attendance(self, worker_id: str, worker_name: str, is_new: bool = False): | |
| now = datetime.now() | |
| if worker_id in self.session_attended_ids: | |
| last_attendance = self.session_attended_ids[worker_id] | |
| if now - last_attendance < timedelta(hours=24): | |
| return | |
| self.session_attended_ids[worker_id] = now | |
| current_time_str = now.strftime('%H:%M:%S') | |
| log_msg = f"π [{current_time_str}] Registered: {worker_name} ({worker_id})" if is_new else f"β [{current_time_str}] Present: {worker_name} ({worker_id})" | |
| target_log = self.session_log_new if is_new else self.session_log_present | |
| target_log.insert(0, log_msg) | |
| if self.sf: | |
| try: | |
| self.sf.Attendance__c.create({ | |
| 'Worker_ID__c': worker_id, | |
| 'Name': worker_name, | |
| 'Attendance_Date__c': now.date().isoformat(), | |
| 'Attendance_Time__c': current_time_str, | |
| 'Status__c': 'Present' | |
| }) | |
| except Exception as e: | |
| logger.error(f"β Salesforce attendance sync error: {e}") | |
| def identify_and_log_face(self, face_image: np.ndarray) -> Optional[Dict]: | |
| try: | |
| embedding = DeepFace.represent(img_path=face_image, model_name='Facenet', enforce_detection=False)[0]['embedding'] | |
| if self.known_face_embeddings: | |
| min_dist = float('inf') | |
| match_index = -1 | |
| for i, worker_embeddings in enumerate(self.known_face_embeddings): | |
| for known_emb in worker_embeddings: | |
| distance = np.linalg.norm(np.array(embedding) - np.array(known_emb)) | |
| if distance < min_dist and distance < FACE_MATCH_THRESHOLD: | |
| min_dist = distance | |
| match_index = i | |
| if match_index >= 0: | |
| worker_id = self.known_face_ids[match_index] | |
| worker_name = self.known_face_names[match_index] | |
| self.mark_attendance(worker_id, worker_name) | |
| return {'name': worker_name, 'worker_id': worker_id, 'color': (0, 255, 0)} # Green | |
| new_worker = self._register_worker_auto(face_image) | |
| if new_worker: | |
| return {'name': new_worker['name'], 'worker_id': new_worker['worker_id'], 'color': (0, 165, 255)} # Orange | |
| except Exception as e: | |
| logger.error(f"Face identification failed: {e}") | |
| return {'name': 'Unknown', 'worker_id': None, 'color': (0, 0, 255)} # Red | |
| def _processing_loop(self, source): | |
| video_capture = cv2.VideoCapture(source) | |
| if not video_capture.isOpened(): | |
| self.error_message = f"β Error: Could not open video source: {source}" | |
| logger.error(self.error_message) | |
| self.is_processing.clear() | |
| return | |
| tracker = FaceTracker(self) | |
| frame_count = 0 | |
| while self.is_processing.is_set(): | |
| ret, frame = video_capture.read() | |
| if not ret: | |
| logger.info("Video ended or failed to read frame.") | |
| break | |
| self.current_frame = frame.copy() | |
| annotated_frame = tracker.update(self.current_frame) | |
| logger.debug(f"Processed frame {frame_count}, queue size: {self.frame_queue.qsize()}") | |
| if not self.frame_queue.full(): | |
| self.frame_queue.put(annotated_frame) | |
| else: | |
| try: | |
| self.frame_queue.get_nowait() | |
| except queue.Empty: | |
| pass | |
| self.frame_queue.put(annotated_frame) | |
| frame_count += 1 | |
| time.sleep(0.01) | |
| video_capture.release() | |
| self.is_processing.clear() | |
| logger.info("Processing finished.") | |
| def start_processing(self, source) -> str: | |
| if self.is_processing.is_set(): | |
| return "β οΈ Processing is already active." | |
| self.session_log_present.clear() | |
| self.session_log_new.clear() | |
| self.session_attended_ids.clear() | |
| self.error_message = None | |
| while not self.frame_queue.empty(): | |
| self.frame_queue.get() | |
| self.is_processing.set() | |
| self.processing_thread = threading.Thread(target=self._processing_loop, args=(source,)) | |
| self.processing_thread.daemon = True | |
| self.processing_thread.start() | |
| return "β Started processing..." | |
| def stop_processing(self) -> str: | |
| if not self.is_processing.is_set(): | |
| return "β οΈ Processing is not currently active." | |
| self.is_processing.clear() | |
| if self.processing_thread: | |
| self.processing_thread.join(timeout=2) | |
| return "β Processing stopped by user." | |
| def get_registered_workers_info(self) -> str: | |
| self.load_worker_data() | |
| if not self.known_face_ids: | |
| return "No workers registered." | |
| info_list = [f"- **{name}** (ID: {id})" for name, id in sorted(zip(self.known_face_names, self.known_face_ids))] | |
| return f"**π₯ Registered Workers ({len(info_list)})**\n" + "\n".join(info_list) | |
| # --- GRADIO UI --- | |
| attendance_system = AttendanceSystem() | |
| def create_interface(): | |
| with gr.Blocks(theme=gr.themes.Soft(), title="Attendance System") as demo: | |
| gr.Markdown("# π High-Performance Face Recognition Attendance") | |
| with gr.Tabs(): | |
| with gr.Tab("βοΈ Controls & Status"): | |
| gr.Markdown("### 1. Choose Input Source & Start Processing") | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| selected_tab_index = gr.Number(value=0, visible=False) | |
| with gr.Tabs() as video_tabs: | |
| with gr.Tab("Live Camera", id=0): | |
| camera_source = gr.Number(label="Camera Source Index", value=0, precision=0) | |
| with gr.Tab("Upload Video", id=1): | |
| video_file = gr.Video(label="Upload Video File", sources=["upload"]) | |
| with gr.Column(scale=1): | |
| start_btn = gr.Button("βΆοΈ Start Processing", variant="primary") | |
| stop_btn = gr.Button("βΉοΈ Stop Processing", variant="stop") | |
| status_box = gr.Textbox(label="Status", interactive=False, value="System Ready.") | |
| gr.Markdown("### 2. View Results in the 'Output & Log' Tab") | |
| gr.Markdown("**π¨ Color Coding:** <font color='cyan'>Identifying</font>, <font color='green'>Present</font>, <font color='orange'>New</font>, <font color='red'>Unknown</font>") | |
| with gr.Tab("π Output & Log"): | |
| with gr.Row(equal_height=True): | |
| with gr.Column(scale=2): | |
| video_output = gr.Image(label="Recognition Output", interactive=False, type="numpy") | |
| with gr.Column(scale=1): | |
| gr.Markdown("### π Present Workers") | |
| session_log_present = gr.Markdown(label="Present Workers Log", value="Awaiting detections...") | |
| gr.Markdown("### π Newly Registered Workers") | |
| session_log_new = gr.Markdown(label="New Workers Log", value="Awaiting new registrations...") | |
| with gr.Tab("π€ Worker Management"): | |
| with gr.Row(): | |
| with gr.Column(): | |
| gr.Markdown("### Register New Worker") | |
| register_image = gr.Image(label="Upload Worker's Photo", type="pil", sources=["upload"]) | |
| register_name = gr.Textbox(label="Worker's Full Name") | |
| register_btn = gr.Button("Register Worker", variant="primary") | |
| register_output = gr.Textbox(label="Registration Status", interactive=False) | |
| with gr.Column(): | |
| gr.Markdown("### Current Worker Roster") | |
| registered_workers_info = gr.Markdown(value=attendance_system.get_registered_workers_info()) | |
| refresh_workers_btn = gr.Button("π Refresh List") | |
| def on_tab_select(evt: gr.SelectData): | |
| return evt.index | |
| video_tabs.select(fn=on_tab_select, inputs=None, outputs=[selected_tab_index]) | |
| def start_wrapper(tab_index, cam_src, vid_path): | |
| source = int(cam_src) if tab_index == 0 else vid_path | |
| if tab_index == 1 and vid_path is None: | |
| return "β Please upload a video file." | |
| return attendance_system.start_processing(source) if source is not None else "Please provide an input source." | |
| start_btn.click(fn=start_wrapper, inputs=[selected_tab_index, camera_source, video_file], outputs=[status_box]) | |
| stop_btn.click(fn=attendance_system.stop_processing, inputs=None, outputs=[status_box]) | |
| register_btn.click(fn=attendance_system.register_worker_manual, inputs=[register_image, register_name], outputs=[register_output, registered_workers_info]) | |
| refresh_workers_btn.click(fn=attendance_system.get_registered_workers_info, outputs=[registered_workers_info]) | |
| def update_ui_generator(): | |
| last_frame = None | |
| while True: | |
| status_text = "Status: Processing..." if attendance_system.is_processing.is_set() else "Status: Stopped." | |
| present_md = "\n".join(attendance_system.session_log_present) or "Awaiting detections..." | |
| new_md = "\n".join(attendance_system.session_log_new) or "Awaiting new registrations..." | |
| frame = None | |
| if attendance_system.is_processing.is_set(): | |
| try: | |
| frame = attendance_system.frame_queue.get_nowait() | |
| last_frame = frame | |
| logger.debug("Retrieved frame from queue for UI update") | |
| except queue.Empty: | |
| frame = last_frame | |
| logger.debug("Using last frame due to empty queue") | |
| else: | |
| present_md = "Processing stopped. Final Present Log:\n\n" + present_md | |
| new_md = "Processing stopped. Final New Workers Log:\n\n" + new_md | |
| if frame is not None: | |
| yield cv2.cvtColor(frame, cv2.COLOR_BGR2RGB), present_md, new_md, status_text | |
| else: | |
| yield None, present_md, new_md, status_text | |
| time.sleep(1/30) | |
| demo.load(fn=update_ui_generator, outputs=[video_output, session_log_present, session_log_new, status_box]) | |
| return demo | |
| if __name__ == "__main__": | |
| def _get_image_caption(image: Image.Image) -> str: | |
| if not HF_API_TOKEN: | |
| return "Hugging Face API token not configured." | |
| try: | |
| buffered = BytesIO() | |
| image.save(buffered, format="JPEG") | |
| img_data = buffered.getvalue() | |
| headers = {"Authorization": f"Bearer {HF_API_TOKEN}"} | |
| response = requests.post(HF_API_URL, headers=headers, data=img_data, timeout=10) | |
| response.raise_for_status() | |
| result = response.json() | |
| return result[0].get("generated_text", "No caption found.") | |
| except Exception as e: | |
| logger.error(f"Hugging Face API error: {e}") | |
| return "Caption generation failed." | |
| def _upload_image_to_salesforce(self, image: Image.Image, record_id: str, worker_id: str) -> Optional[str]: | |
| if not self.sf: | |
| return None | |
| try: | |
| buffered = BytesIO() | |
| image.save(buffered, format="JPEG") | |
| encoded_image = base64.b64encode(buffered.getvalue()).decode('utf-8') | |
| cv = self.sf.ContentVersion.create({ | |
| 'Title': f'Image_{worker_id}', | |
| 'PathOnClient': f'{worker_id}.jpg', | |
| 'VersionData': encoded_image, | |
| 'FirstPublishLocationId': record_id | |
| }) | |
| content_doc_link = self.sf.query(f"SELECT ContentDocumentId FROM ContentDocumentLink WHERE LinkedEntityId = '{record_id}'")['records'][0] | |
| content_doc_id = content_doc_link['ContentDocumentId'] | |
| return f"/sfc/servlet.shepherd/document/download/{content_doc_id}" | |
| except Exception as e: | |
| logger.error(f"Salesforce image upload error: {e}") | |
| return None | |
| AttendanceSystem._get_image_caption = _get_image_caption | |
| AttendanceSystem._upload_image_to_salesforce = _upload_image_to_salesforce | |
| app = create_interface() | |
| app.queue() | |
| app.launch(server_name="0.0.0.0", server_port=7860, show_error=True, debug=True) |