|
|
|
|
|
import os |
|
|
os.environ['TF_ENABLE_ONEDNN_OPTS'] = '0' |
|
|
|
|
|
|
|
|
import base64 |
|
|
import json |
|
|
import logging |
|
|
import queue |
|
|
import threading |
|
|
import time |
|
|
from datetime import datetime, date |
|
|
from io import BytesIO |
|
|
from typing import Tuple, Optional, List, Dict |
|
|
import pickle |
|
|
from collections import OrderedDict |
|
|
|
|
|
|
|
|
import cv2 |
|
|
import gradio as gr |
|
|
import numpy as np |
|
|
from PIL import Image |
|
|
import requests |
|
|
from dotenv import load_dotenv |
|
|
from deepface import DeepFace |
|
|
from retrying import retry |
|
|
from simple_salesforce import Salesforce |
|
|
from scipy.spatial import distance as dist |
|
|
|
|
|
|
|
|
logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") |
|
|
logger = logging.getLogger(__name__) |
|
|
load_dotenv() |
|
|
|
|
|
|
|
|
HF_API_URL = "https://api-inference.huggingface.co/models/Salesforce/blip-image-captioning-base" |
|
|
HF_API_TOKEN = os.getenv("HUGGINGFACE_API_TOKEN") |
|
|
SF_CREDENTIALS = { |
|
|
"username": os.getenv("SF_USERNAME", "smartlabour@attendance.system"), |
|
|
"password": os.getenv("SF_PASSWORD", "#Prashanth@1234"), |
|
|
"security_token": os.getenv("SF_SECURITY_TOKEN", "7xPmtDFoWlZUGK0V2QSwFZJ6c"), |
|
|
"domain": os.getenv("SF_DOMAIN", "login") |
|
|
} |
|
|
|
|
|
|
|
|
FACE_MATCH_THRESHOLD = 0.6 |
|
|
AUTO_REGISTER_CONFIDENCE = 0.98 |
|
|
MAX_DISAPPEARED_FRAMES = 20 |
|
|
|
|
|
|
|
|
class FaceTracker: |
|
|
def __init__(self, attendance_system): |
|
|
self.attendance_system = attendance_system |
|
|
self.next_object_id = 0 |
|
|
self.objects = OrderedDict() |
|
|
self.disappeared = OrderedDict() |
|
|
self.face_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml') |
|
|
|
|
|
def register(self, centroid, bbox): |
|
|
object_id = self.next_object_id |
|
|
self.objects[object_id] = {'centroid': centroid, 'bbox': bbox, 'name': "Identifying...", 'worker_id': None, 'color': (255, 255, 0)} |
|
|
self.disappeared[object_id] = 0 |
|
|
self.next_object_id += 1 |
|
|
|
|
|
|
|
|
threading.Thread(target=self.recognize_face, args=(object_id, bbox)).start() |
|
|
|
|
|
def recognize_face(self, object_id, bbox): |
|
|
"""Runs expensive recognition and updates the tracker object.""" |
|
|
(startX, startY, endX, endY) = bbox |
|
|
face_image = self.attendance_system.current_frame[startY:endY, startX:endX] |
|
|
if face_image.size == 0: return |
|
|
|
|
|
result = self.attendance_system.identify_and_log_face(face_image) |
|
|
if result and object_id in self.objects: |
|
|
self.objects[object_id].update({ |
|
|
'name': result['name'], |
|
|
'worker_id': result['worker_id'], |
|
|
'color': result['color'] |
|
|
}) |
|
|
|
|
|
def deregister(self, object_id): |
|
|
del self.objects[object_id] |
|
|
del self.disappeared[object_id] |
|
|
|
|
|
def update(self, frame): |
|
|
|
|
|
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) |
|
|
rects = self.face_cascade.detectMultiScale(gray, scaleFactor=1.1, minNeighbors=5, minSize=(30, 30)) |
|
|
|
|
|
input_centroids = np.zeros((len(rects), 2), dtype="int") |
|
|
bboxes = [] |
|
|
for (i, (x, y, w, h)) in enumerate(rects): |
|
|
cX = int(x + w / 2.0) |
|
|
cY = int(y + h / 2.0) |
|
|
input_centroids[i] = (cX, cY) |
|
|
bboxes.append((x, y, x + w, y + h)) |
|
|
|
|
|
if len(self.objects) == 0: |
|
|
for i in range(len(input_centroids)): |
|
|
self.register(input_centroids[i], bboxes[i]) |
|
|
else: |
|
|
object_ids = list(self.objects.keys()) |
|
|
object_centroids = [obj['centroid'] for obj in self.objects.values()] |
|
|
|
|
|
D = dist.cdist(np.array(object_centroids), input_centroids) |
|
|
rows = D.min(axis=1).argsort() |
|
|
cols = D.argmin(axis=1)[rows] |
|
|
|
|
|
used_rows, used_cols = set(), set() |
|
|
for (row, col) in zip(rows, cols): |
|
|
if row in used_rows or col in used_cols: continue |
|
|
object_id = object_ids[row] |
|
|
self.objects[object_id]['centroid'] = input_centroids[col] |
|
|
self.objects[object_id]['bbox'] = bboxes[col] |
|
|
self.disappeared[object_id] = 0 |
|
|
used_rows.add(row) |
|
|
used_cols.add(col) |
|
|
|
|
|
unused_rows = set(range(D.shape[0])).difference(used_rows) |
|
|
unused_cols = set(range(D.shape[1])).difference(used_cols) |
|
|
|
|
|
for row in unused_rows: |
|
|
object_id = object_ids[row] |
|
|
self.disappeared[object_id] += 1 |
|
|
if self.disappeared[object_id] > MAX_DISAPPEARED_FRAMES: |
|
|
self.deregister(object_id) |
|
|
|
|
|
for col in unused_cols: |
|
|
self.register(input_centroids[col], bboxes[col]) |
|
|
|
|
|
|
|
|
for object_id, data in self.objects.items(): |
|
|
(startX, startY, endX, endY) = data['bbox'] |
|
|
label = data['name'] |
|
|
cv2.rectangle(frame, (startX, startY), (endX, endY), data['color'], 2) |
|
|
cv2.putText(frame, label, (startX, startY - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.7, data['color'], 2) |
|
|
|
|
|
return frame |
|
|
|
|
|
|
|
|
@retry(stop_max_attempt_number=3, wait_fixed=2000) |
|
|
def connect_to_salesforce() -> Optional[Salesforce]: |
|
|
try: |
|
|
sf = Salesforce(**SF_CREDENTIALS) |
|
|
sf.describe() |
|
|
logger.info("β
Successfully connected to Salesforce.") |
|
|
return sf |
|
|
except Exception as e: |
|
|
logger.error(f"β Salesforce connection failed: {e}") |
|
|
raise |
|
|
|
|
|
|
|
|
class AttendanceSystem: |
|
|
def __init__(self): |
|
|
self.processing_thread = None |
|
|
self.is_processing = threading.Event() |
|
|
self.frame_queue = queue.Queue(maxsize=30) |
|
|
self.error_message = None |
|
|
self.current_frame = None |
|
|
|
|
|
self.known_face_embeddings: List[np.ndarray] = [] |
|
|
self.known_face_names: List[str] = [] |
|
|
self.known_face_ids: List[str] = [] |
|
|
self.next_worker_id: int = 1 |
|
|
|
|
|
self.session_log: List[str] = [] |
|
|
self.session_attended_ids = set() |
|
|
|
|
|
self.sf = connect_to_salesforce() |
|
|
self._create_directories() |
|
|
self.load_worker_data() |
|
|
|
|
|
def _create_directories(self): |
|
|
os.makedirs("data/faces", exist_ok=True) |
|
|
|
|
|
def load_worker_data(self): |
|
|
|
|
|
logger.info("Loading worker data...") |
|
|
if self.sf: |
|
|
try: |
|
|
workers = self.sf.query_all("SELECT Worker_ID__c, Name, Face_Embedding__c FROM Worker__c")['records'] |
|
|
if not workers: |
|
|
self._load_local_worker_data() |
|
|
return |
|
|
|
|
|
temp_embeddings, temp_names, temp_ids, max_id = [], [], [], 0 |
|
|
for worker in workers: |
|
|
if worker.get('Face_Embedding__c'): |
|
|
temp_embeddings.append(np.array(json.loads(worker['Face_Embedding__c']))) |
|
|
temp_names.append(worker['Name']) |
|
|
temp_ids.append(worker['Worker_ID__c']) |
|
|
try: |
|
|
worker_num = int(''.join(filter(str.isdigit, worker['Worker_ID__c']))) |
|
|
if worker_num > max_id: |
|
|
max_id = worker_num |
|
|
except (ValueError, TypeError): |
|
|
continue |
|
|
|
|
|
self.known_face_embeddings = temp_embeddings |
|
|
self.known_face_names = temp_names |
|
|
self.known_face_ids = temp_ids |
|
|
self.next_worker_id = max_id + 1 |
|
|
self.save_local_worker_data() |
|
|
logger.info(f"β
Loaded {len(self.known_face_ids)} workers from Salesforce. Next ID: {self.next_worker_id}") |
|
|
except Exception as e: |
|
|
logger.error(f"β Error loading from Salesforce: {e}. Attempting local load.") |
|
|
self._load_local_worker_data() |
|
|
else: |
|
|
logger.warning("Salesforce not connected. Loading from local cache.") |
|
|
self._load_local_worker_data() |
|
|
|
|
|
def _load_local_worker_data(self): |
|
|
try: |
|
|
if os.path.exists("data/workers.pkl"): |
|
|
with open("data/workers.pkl", "rb") as f: data = pickle.load(f) |
|
|
self.known_face_embeddings = data.get("embeddings", []) |
|
|
self.known_face_names = data.get("names", []) |
|
|
self.known_face_ids = data.get("ids", []) |
|
|
self.next_worker_id = data.get("next_id", 1) |
|
|
logger.info(f"β
Loaded {len(self.known_face_ids)} workers from local cache. Next ID: {self.next_worker_id}") |
|
|
except Exception as e: |
|
|
logger.error(f"β Error loading local data: {e}") |
|
|
|
|
|
def save_local_worker_data(self): |
|
|
try: |
|
|
worker_data = {"embeddings": self.known_face_embeddings, "names": self.known_face_names, "ids": self.known_face_ids, "next_id": self.next_worker_id} |
|
|
with open("data/workers.pkl", "wb") as f: pickle.dump(worker_data, f) |
|
|
except Exception as e: |
|
|
logger.error(f"β Error saving local worker data: {e}") |
|
|
|
|
|
def register_worker_manual(self, image: Image.Image, name: str) -> Tuple[str, str]: |
|
|
|
|
|
if image is None or not name.strip(): |
|
|
return "β Please provide both image and name!", self.get_registered_workers_info() |
|
|
try: |
|
|
image_array = np.array(image) |
|
|
DeepFace.analyze(img_path=image_array, actions=['emotion'], enforce_detection=True) |
|
|
embedding = DeepFace.represent(img_path=image_array, model_name='Facenet', enforce_detection=False)[0]['embedding'] |
|
|
|
|
|
if self._is_duplicate_face(embedding): |
|
|
return f"β Face matches an existing worker!", self.get_registered_workers_info() |
|
|
|
|
|
worker_id = f"W{self.next_worker_id:04d}" |
|
|
name = name.strip().title() |
|
|
|
|
|
self._add_worker_to_system(worker_id, name, embedding, image_array) |
|
|
self.save_local_worker_data() |
|
|
return f"β
{name} registered with ID: {worker_id}!", self.get_registered_workers_info() |
|
|
except ValueError: |
|
|
return "β No face detected in the image!", self.get_registered_workers_info() |
|
|
except Exception as e: |
|
|
logger.error(f"Manual registration error: {e}") |
|
|
return f"β Registration error: {e}", self.get_registered_workers_info() |
|
|
|
|
|
def _register_worker_auto(self, face_image: np.ndarray) -> Optional[Dict]: |
|
|
try: |
|
|
embedding = DeepFace.represent(img_path=face_image, model_name='Facenet', enforce_detection=False)[0]['embedding'] |
|
|
if self._is_duplicate_face(embedding): return None |
|
|
|
|
|
worker_id = f"W{self.next_worker_id:04d}" |
|
|
worker_name = f"New Worker {self.next_worker_id}" |
|
|
self._add_worker_to_system(worker_id, worker_name, embedding, face_image) |
|
|
self.save_local_worker_data() |
|
|
|
|
|
self.mark_attendance(worker_id, worker_name, is_new=True) |
|
|
return {'name': worker_name, 'worker_id': worker_id} |
|
|
except Exception as e: |
|
|
logger.error(f"β Auto-registration error: {e}") |
|
|
return None |
|
|
|
|
|
def _add_worker_to_system(self, worker_id: str, name: str, embedding: List[float], image_array: np.ndarray): |
|
|
self.known_face_embeddings.append(np.array(embedding)) |
|
|
self.known_face_names.append(name) |
|
|
self.known_face_ids.append(worker_id) |
|
|
self.next_worker_id += 1 |
|
|
|
|
|
|
|
|
def _is_duplicate_face(self, embedding: List[float]) -> bool: |
|
|
if not self.known_face_embeddings: return False |
|
|
distances = [np.linalg.norm(np.array(embedding) - known) for known in self.known_face_embeddings] |
|
|
return min(distances) < FACE_MATCH_THRESHOLD |
|
|
|
|
|
def mark_attendance(self, worker_id: str, worker_name: str, is_new: bool = False): |
|
|
if worker_id in self.session_attended_ids: |
|
|
return |
|
|
|
|
|
self.session_attended_ids.add(worker_id) |
|
|
current_time_str = datetime.now().strftime('%H:%M:%S') |
|
|
log_msg = "" |
|
|
if is_new: |
|
|
log_msg = f"π [{current_time_str}] Registered: {worker_name} ({worker_id})" |
|
|
else: |
|
|
log_msg = f"β
[{current_time_str}] Present: {worker_name} ({worker_id})" |
|
|
|
|
|
self.session_log.insert(0, log_msg) |
|
|
|
|
|
|
|
|
|
|
|
def identify_and_log_face(self, face_image: np.ndarray) -> Optional[Dict]: |
|
|
"""Core recognition logic called by the tracker.""" |
|
|
try: |
|
|
embedding = DeepFace.represent(img_path=face_image, model_name='Facenet', enforce_detection=False)[0]['embedding'] |
|
|
|
|
|
if self.known_face_embeddings: |
|
|
distances = [np.linalg.norm(np.array(embedding) - known) for known in self.known_face_embeddings] |
|
|
min_dist = min(distances) |
|
|
if min_dist < FACE_MATCH_THRESHOLD: |
|
|
match_index = np.argmin(distances) |
|
|
worker_id = self.known_face_ids[match_index] |
|
|
worker_name = self.known_face_names[match_index] |
|
|
self.mark_attendance(worker_id, worker_name) |
|
|
return {'name': worker_name, 'worker_id': worker_id, 'color': (0, 255, 0)} |
|
|
|
|
|
|
|
|
new_worker = self._register_worker_auto(face_image) |
|
|
if new_worker: |
|
|
return {'name': new_worker['name'], 'worker_id': new_worker['worker_id'], 'color': (0, 165, 255)} |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Face identification failed: {e}") |
|
|
|
|
|
return {'name': 'Unknown', 'worker_id': None, 'color': (0, 0, 255)} |
|
|
|
|
|
def _processing_loop(self, source): |
|
|
video_capture = cv2.VideoCapture(source) |
|
|
if not video_capture.isOpened(): |
|
|
self.error_message = "β Error: Could not open video source." |
|
|
self.is_processing.clear() |
|
|
return |
|
|
|
|
|
tracker = FaceTracker(self) |
|
|
while self.is_processing.is_set(): |
|
|
ret, frame = video_capture.read() |
|
|
if not ret: |
|
|
break |
|
|
|
|
|
self.current_frame = frame.copy() |
|
|
annotated_frame = tracker.update(self.current_frame) |
|
|
|
|
|
if not self.frame_queue.full(): |
|
|
self.frame_queue.put(annotated_frame) |
|
|
else: |
|
|
|
|
|
try: self.frame_queue.get_nowait() |
|
|
except queue.Empty: pass |
|
|
self.frame_queue.put(annotated_frame) |
|
|
|
|
|
time.sleep(0.01) |
|
|
|
|
|
video_capture.release() |
|
|
self.is_processing.clear() |
|
|
logger.info("Processing finished.") |
|
|
|
|
|
def start_processing(self, source) -> str: |
|
|
if self.is_processing.is_set(): return "β οΈ Processing is already active." |
|
|
|
|
|
self.session_log.clear() |
|
|
self.session_attended_ids.clear() |
|
|
self.error_message = None |
|
|
while not self.frame_queue.empty(): self.frame_queue.get() |
|
|
|
|
|
self.is_processing.set() |
|
|
self.processing_thread = threading.Thread(target=self._processing_loop, args=(source,)) |
|
|
self.processing_thread.daemon = True |
|
|
self.processing_thread.start() |
|
|
return "β
Started processing..." |
|
|
|
|
|
def stop_processing(self) -> str: |
|
|
if not self.is_processing.is_set(): |
|
|
return "β οΈ Processing is not currently active." |
|
|
self.is_processing.clear() |
|
|
if self.processing_thread: |
|
|
self.processing_thread.join(timeout=2) |
|
|
return "β
Processing stopped by user." |
|
|
|
|
|
def get_registered_workers_info(self) -> str: |
|
|
|
|
|
self.load_worker_data() |
|
|
if not self.known_face_ids: return "No workers registered." |
|
|
|
|
|
info_list = [f"- **{name}** (ID: {id})" for name, id in sorted(zip(self.known_face_names, self.known_face_ids))] |
|
|
return f"**π₯ Registered Workers ({len(info_list)})**\n" + "\n".join(info_list) |
|
|
|
|
|
|
|
|
attendance_system = AttendanceSystem() |
|
|
|
|
|
def create_interface(): |
|
|
with gr.Blocks(theme=gr.themes.Soft(), title="Attendance System") as demo: |
|
|
gr.Markdown("# π High-Performance Face Recognition Attendance") |
|
|
with gr.Tabs(): |
|
|
with gr.Tab("βοΈ Controls & Status"): |
|
|
|
|
|
gr.Markdown("### 1. Choose Input Source & Start Processing") |
|
|
with gr.Row(): |
|
|
with gr.Column(scale=1): |
|
|
selected_tab_index = gr.Number(value=0, visible=False) |
|
|
with gr.Tabs() as video_tabs: |
|
|
with gr.Tab("Live Camera", id=0): |
|
|
camera_source = gr.Number(label="Camera Source Index", value=0, precision=0) |
|
|
with gr.Tab("Upload Video", id=1): |
|
|
video_file = gr.Video(label="Upload Video File", sources=["upload"]) |
|
|
with gr.Column(scale=1): |
|
|
start_btn = gr.Button("βΆοΈ Start Processing", variant="primary") |
|
|
stop_btn = gr.Button("βΉοΈ Stop Processing", variant="stop") |
|
|
status_box = gr.Textbox(label="Status", interactive=False, value="System Ready.") |
|
|
gr.Markdown("### 2. View Results in the 'Output & Log' Tab") |
|
|
gr.Markdown("**π¨ Color Coding:** <font color='cyan'>Identifying</font>, <font color='green'>Present</font>, <font color='orange'>New</font>, <font color='red'>Unknown</font>") |
|
|
|
|
|
with gr.Tab("π Output & Log"): |
|
|
with gr.Row(equal_height=True): |
|
|
with gr.Column(scale=2): |
|
|
video_output = gr.Image(label="Recognition Output", interactive=False, type="numpy") |
|
|
with gr.Column(scale=1): |
|
|
session_log_display = gr.Markdown(label="π Session Log", value="System is ready.") |
|
|
|
|
|
with gr.Tab("π€ Worker Management"): |
|
|
|
|
|
with gr.Row(): |
|
|
with gr.Column(): |
|
|
gr.Markdown("### Register New Worker") |
|
|
register_image = gr.Image(label="Upload Worker's Photo", type="pil", sources=["upload"]) |
|
|
register_name = gr.Textbox(label="Worker's Full Name") |
|
|
register_btn = gr.Button("Register Worker", variant="primary") |
|
|
register_output = gr.Textbox(label="Registration Status", interactive=False) |
|
|
with gr.Column(): |
|
|
gr.Markdown("### Current Worker Roster") |
|
|
registered_workers_info = gr.Markdown(value=attendance_system.get_registered_workers_info()) |
|
|
refresh_workers_btn = gr.Button("π Refresh List") |
|
|
|
|
|
|
|
|
def on_tab_select(evt: gr.SelectData): return evt.index |
|
|
video_tabs.select(fn=on_tab_select, inputs=None, outputs=[selected_tab_index]) |
|
|
|
|
|
def start_wrapper(tab_index, cam_src, vid_path): |
|
|
source = int(cam_src) if tab_index == 0 else vid_path |
|
|
return "Please provide an input source." if source is None else attendance_system.start_processing(source) |
|
|
|
|
|
start_btn.click(fn=start_wrapper, inputs=[selected_tab_index, camera_source, video_file], outputs=[status_box]) |
|
|
stop_btn.click(fn=attendance_system.stop_processing, inputs=None, outputs=[status_box]) |
|
|
|
|
|
register_btn.click(fn=attendance_system.register_worker_manual, inputs=[register_image, register_name], outputs=[register_output, registered_workers_info]) |
|
|
refresh_workers_btn.click(fn=attendance_system.get_registered_workers_info, outputs=[registered_workers_info]) |
|
|
|
|
|
def update_ui_generator(): |
|
|
last_frame = None |
|
|
while True: |
|
|
status_text = "Status: Processing..." |
|
|
log_md = "\n".join(attendance_system.session_log) or "Awaiting detections..." |
|
|
frame = None |
|
|
|
|
|
if attendance_system.is_processing.is_set(): |
|
|
try: |
|
|
frame = attendance_system.frame_queue.get_nowait() |
|
|
last_frame = frame |
|
|
except queue.Empty: |
|
|
frame = last_frame |
|
|
else: |
|
|
status_text = "Status: Stopped." |
|
|
log_md = "Processing stopped. Final Log:\n\n" + log_md |
|
|
|
|
|
if frame is not None: |
|
|
yield cv2.cvtColor(frame, cv2.COLOR_BGR2RGB), log_md, status_text |
|
|
else: |
|
|
yield None, log_md, status_text |
|
|
|
|
|
time.sleep(1/30) |
|
|
|
|
|
demo.load(fn=update_ui_generator, outputs=[video_output, session_log_display, status_box]) |
|
|
|
|
|
return demo |
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
|
|
def _get_image_caption(image: Image.Image) -> str: |
|
|
|
|
|
if not HF_API_TOKEN: return "Hugging Face API token not configured." |
|
|
try: |
|
|
buffered = BytesIO() |
|
|
image.save(buffered, format="JPEG") |
|
|
img_data = buffered.getvalue() |
|
|
headers = {"Authorization": f"Bearer {HF_API_TOKEN}"} |
|
|
response = requests.post(HF_API_URL, headers=headers, data=img_data, timeout=10) |
|
|
response.raise_for_status() |
|
|
result = response.json() |
|
|
return result[0].get("generated_text", "No caption found.") |
|
|
except Exception as e: |
|
|
logger.error(f"Hugging Face API error: {e}") |
|
|
return "Caption generation failed." |
|
|
|
|
|
def _upload_image_to_salesforce(self, image: Image.Image, record_id: str, worker_id: str) -> Optional[str]: |
|
|
|
|
|
if not self.sf: return None |
|
|
try: |
|
|
buffered = BytesIO() |
|
|
image.save(buffered, format="JPEG") |
|
|
encoded_image = base64.b64encode(buffered.getvalue()).decode('utf-8') |
|
|
cv = self.sf.ContentVersion.create({'Title': f'Image_{worker_id}', 'PathOnClient': f'{worker_id}.jpg', 'VersionData': encoded_image, 'FirstPublishLocationId': record_id}) |
|
|
content_doc_link = self.sf.query(f"SELECT ContentDocumentId FROM ContentDocumentLink WHERE LinkedEntityId = '{record_id}'")['records'][0] |
|
|
content_doc_id = content_doc_link['ContentDocumentId'] |
|
|
return f"/sfc/servlet.shepherd/document/download/{content_doc_id}" |
|
|
except Exception as e: |
|
|
logger.error(f"Salesforce image upload error: {e}") |
|
|
return None |
|
|
|
|
|
|
|
|
AttendanceSystem._get_image_caption = _get_image_caption |
|
|
AttendanceSystem._upload_image_to_salesforce = _upload_image_to_salesforce |
|
|
|
|
|
app = create_interface() |
|
|
app.queue() |
|
|
app.launch(server_name="0.0.0.0", server_port=7860, show_error=True, debug=True) |