Spaces:
Sleeping
Sleeping
| import cv2 | |
| import numpy as np | |
| import base64 | |
| import requests | |
| import logging | |
| from datetime import datetime | |
| import os | |
| from dotenv import load_dotenv | |
| import warnings | |
| # Suppress warnings | |
| os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3' | |
| warnings.filterwarnings('ignore') | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
| logger = logging.getLogger(__name__) | |
| # Load environment variables | |
| load_dotenv() | |
| SALESFORCE_TOKEN = os.getenv("SALESFORCE_TOKEN", "your_salesforce_oauth_token") | |
| SALESFORCE_BASE_URL = "https://construction-site.secure.force.com/WorkerRecognition/services/apexrest" | |
| SALESFORCE_ENDPOINT = f"{SALESFORCE_BASE_URL}/ProcessImage" | |
| # Import DeepFace after environment setup | |
| from deepface import DeepFace | |
| from mtcnn import MTCNN | |
| class WorkerRecognitionSystem: | |
| def __init__(self): | |
| """Initialize with original parameters from document""" | |
| self.frame_rate = 10 | |
| self.confidence_threshold = 0.90 | |
| self.image_size = (224, 224) | |
| # Initialize components | |
| self.detector = MTCNN() | |
| self.worker_db = {} | |
| self.processed_workers = set() | |
| logger.info("System initialized with MTCNN detector") | |
| def load_worker_database(self): | |
| """Load worker data as per original document""" | |
| try: | |
| # Simulated Salesforce response | |
| workers = [ | |
| { | |
| "Worker_ID__c": "W001", | |
| "Facial_Features__c": [np.random.rand(128).tolist() for _ in range(3)] | |
| }, | |
| { | |
| "Worker_ID__c": "W002", | |
| "Facial_Features__c": [np.random.rand(128).tolist() for _ in range(3)] | |
| } | |
| ] | |
| for worker in workers: | |
| embeddings = worker.get("Facial_Features__c", []) | |
| if len(embeddings) >= 3: | |
| self.worker_db[worker["Worker_ID__c"]] = [np.array(emb) for emb in embeddings] | |
| logger.info(f"Loaded {len(self.worker_db)} workers") | |
| except Exception as e: | |
| logger.error(f"Database loading failed: {str(e)}") | |
| raise | |
| def preprocess_frame(self, frame): | |
| """Original preprocessing from document""" | |
| frame = cv2.convertScaleAbs(frame, alpha=1.2, beta=10) | |
| frame = cv2.GaussianBlur(frame, (5, 5), 0) | |
| return cv2.resize(frame, (1280, 720)) | |
| def detect_faces(self, frame): | |
| """MTCNN detection as per document""" | |
| rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) | |
| faces = self.detector.detect_faces(rgb_frame) | |
| return [(face['box'][0], face['box'][1], face['box'][2], face['box'][3]) | |
| for face in faces if face['confidence'] > 0.9] | |
| def extract_features(self, face_image): | |
| """Ensemble method from document""" | |
| try: | |
| face_image = cv2.resize(face_image, self.image_size) | |
| face_image = cv2.cvtColor(face_image, cv2.COLOR_BGR2RGB) | |
| # Facenet512 + ArcFace ensemble | |
| embedding_facenet = DeepFace.represent( | |
| img_path=face_image, | |
| model_name="Facenet512", | |
| enforce_detection=False | |
| ) | |
| embedding_arcface = DeepFace.represent( | |
| img_path=face_image, | |
| model_name="ArcFace", | |
| enforce_detection=False | |
| ) | |
| return (np.array(embedding_facenet) + np.array(embedding_arcface)) / 2 | |
| except Exception as e: | |
| logger.error(f"Feature extraction failed: {str(e)}") | |
| return None | |
| def recognize_worker(self, embedding): | |
| """Original recognition logic""" | |
| max_confidence = 0 | |
| worker_id = None | |
| for wid, stored_embeddings in self.worker_db.items(): | |
| confidences = [ | |
| np.dot(embedding, stored_emb) / (np.linalg.norm(embedding) * np.linalg.norm(stored_emb)) | |
| for stored_emb in stored_embeddings | |
| ] | |
| confidence = np.mean([(sim + 1) / 2 for sim in confidences]) | |
| if confidence > max_confidence: | |
| max_confidence = confidence | |
| worker_id = wid | |
| return worker_id, max_confidence | |
| def process_frame(self, frame): | |
| """Original frame processing logic""" | |
| faces = self.detect_faces(frame) | |
| results = [] | |
| for (x, y, w, h) in faces: | |
| face_image = frame[y:y+h, x:x+w] | |
| embedding = self.extract_features(face_image) | |
| if embedding is None: | |
| continue | |
| worker_id, confidence = self.recognize_worker(embedding) | |
| entry_time = datetime.now() | |
| _, buffer = cv2.imencode('.jpg', face_image) | |
| image_base64 = base64.b64encode(buffer).decode('utf-8') | |
| if confidence >= self.confidence_threshold and worker_id: | |
| results.append({ | |
| "worker_id": worker_id, | |
| "confidence": confidence, | |
| "entry_time": entry_time, | |
| "image_base64": image_base64 | |
| }) | |
| else: | |
| results.append({ | |
| "worker_id": "Unknown", | |
| "confidence": confidence, | |
| "entry_time": entry_time, | |
| "image_base64": image_base64 | |
| }) | |
| return results |