PrashanthB461 commited on
Commit
fbb3bfc
·
verified ·
1 Parent(s): d4f611f

Update utils.py

Browse files
Files changed (1) hide show
  1. utils.py +152 -145
utils.py CHANGED
@@ -1,161 +1,168 @@
1
- """
2
- Utility functions for the Attendance Analyzer application
3
- """
4
-
5
  import cv2
6
  import numpy as np
7
- from datetime import datetime, date
8
- import os
9
  import logging
 
 
 
 
 
10
 
11
- # Setup logging
12
- logging.basicConfig(level=logging.INFO)
13
  logger = logging.getLogger(__name__)
14
 
15
- def setup_logging():
16
- """Setup logging configuration"""
17
- log_format = '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
18
- logging.basicConfig(
19
- level=logging.INFO,
20
- format=log_format,
21
- handlers=[
22
- logging.FileHandler('attendance_analyzer.log'),
23
- logging.StreamHandler()
24
- ]
25
- )
 
 
 
 
 
 
 
 
 
26
 
27
- def validate_image(image):
28
- """Validate uploaded image"""
29
- if image is None:
30
- return False, "No image provided"
31
-
32
- try:
33
- if isinstance(image, np.ndarray):
34
- if len(image.shape) != 3:
35
- return False, "Invalid image format"
36
- return True, "Valid image"
37
- else:
38
- return True, "Valid image"
39
- except Exception as e:
40
- return False, f"Image validation error: {str(e)}"
 
 
 
 
41
 
42
- def resize_image(image, max_size=(800, 600)):
43
- """Resize image while maintaining aspect ratio"""
44
- try:
45
- height, width = image.shape[:2]
46
- max_width, max_height = max_size
47
-
48
- if width <= max_width and height <= max_height:
49
- return image
50
-
51
- # Calculate scaling factor
52
- scale = min(max_width / width, max_height / height)
53
- new_width = int(width * scale)
54
- new_height = int(height * scale)
55
-
56
- resized = cv2.resize(image, (new_width, new_height), interpolation=cv2.INTER_AREA)
57
- return resized
58
- except Exception as e:
59
- logger.error(f"Error resizing image: {e}")
60
- return image
61
 
62
- def format_datetime(dt=None):
63
- """Format datetime for database storage"""
64
- if dt is None:
65
- dt = datetime.now()
66
- return dt.strftime("%Y-%m-%d %H:%M:%S")
 
67
 
68
- def format_date(d=None):
69
- """Format date for database storage"""
70
- if d is None:
71
- d = date.today()
72
- return d.strftime("%Y-%m-%d")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
73
 
74
- def create_directory_if_not_exists(directory_path):
75
- """Create directory if it doesn't exist"""
76
- try:
77
- if not os.path.exists(directory_path):
78
- os.makedirs(directory_path)
79
- logger.info(f"Created directory: {directory_path}")
80
- return True
81
- except Exception as e:
82
- logger.error(f"Error creating directory {directory_path}: {e}")
83
- return False
 
 
 
 
84
 
85
- def cleanup_old_files(directory, days_old=30):
86
- """Clean up old files from specified directory"""
87
- try:
88
- current_time = datetime.now()
89
- for filename in os.listdir(directory):
90
- file_path = os.path.join(directory, filename)
91
- if os.path.isfile(file_path):
92
- file_time = datetime.fromtimestamp(os.path.getctime(file_path))
93
- if (current_time - file_time).days > days_old:
94
- os.remove(file_path)
95
- logger.info(f"Removed old file: {filename}")
96
- except Exception as e:
97
- logger.error(f"Error cleaning up old files: {e}")
 
 
 
 
 
 
 
 
 
 
98
 
99
- def validate_name(name):
100
- """Validate person name"""
101
- if not name or not name.strip():
102
- return False, "Name cannot be empty"
103
-
104
- if len(name.strip()) < 2:
105
- return False, "Name must be at least 2 characters long"
106
-
107
- if len(name.strip()) > 50:
108
- return False, "Name cannot exceed 50 characters"
109
-
110
- # Check for valid characters (letters, spaces, hyphens, apostrophes)
111
- import re
112
- if not re.match(r"^[a-zA-Z\s\-'\.]+$", name.strip()):
113
- return False, "Name contains invalid characters"
114
-
115
- return True, "Valid name"
116
 
117
- def validate_email(email):
118
- """Validate email address"""
119
- if not email:
120
- return True, "Email is optional"
121
-
122
- import re
123
- email_pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
124
- if re.match(email_pattern, email):
125
- return True, "Valid email"
126
- else:
127
- return False, "Invalid email format"
128
 
129
- def get_system_info():
130
- """Get system information for debugging"""
131
- import platform
132
- import psutil
133
-
134
- info = {
135
- "platform": platform.platform(),
136
- "python_version": platform.python_version(),
137
- "cpu_count": psutil.cpu_count(),
138
- "memory_total": f"{psutil.virtual_memory().total / (1024**3):.2f} GB",
139
- "memory_available": f"{psutil.virtual_memory().available / (1024**3):.2f} GB",
140
- }
141
- return info
142
 
143
- def benchmark_face_detection(image, iterations=5):
144
- """Benchmark face detection performance"""
145
- import time
146
- import face_recognition
147
-
148
- times = []
149
- for _ in range(iterations):
150
- start_time = time.time()
151
- face_locations = face_recognition.face_locations(image)
152
- end_time = time.time()
153
- times.append(end_time - start_time)
154
-
155
- avg_time = sum(times) / len(times)
156
- return {
157
- "average_time": avg_time,
158
- "min_time": min(times),
159
- "max_time": max(times),
160
- "iterations": iterations
161
- }
 
 
 
 
 
 
1
  import cv2
2
  import numpy as np
3
+ import base64
4
+ import requests
5
  import logging
6
+ from datetime import datetime
7
+ from deepface import DeepFace
8
+ from mtcnn import MTCNN
9
+ import os
10
+ from dotenv import load_dotenv
11
 
12
+ # Configure logging
13
+ logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
14
  logger = logging.getLogger(__name__)
15
 
16
+ # Load environment variables
17
+ load_dotenv()
18
+ SALESFORCE_TOKEN = os.getenv("SALESFORCE_TOKEN", "your_salesforce_oauth_token")
19
+ SALESFORCE_BASE_URL = "https://construction-site.secure.force.com/WorkerRecognition/services/apexrest"
20
+ SALESFORCE_ENDPOINT = f"{SALESFORCE_BASE_URL}/ProcessImage"
21
+
22
+ class WorkerRecognitionSystem:
23
+ def __init__(self):
24
+ """Initialize the face recognition system."""
25
+ self.frame_rate = 10 # Process every 10th frame
26
+ self.confidence_threshold = 0.90 # Fine-tuned for high accuracy
27
+ self.image_size = (224, 224)
28
+
29
+ # Initialize MTCNN for face detection
30
+ self.detector = MTCNN()
31
+ logger.info("Initialized MTCNN face detector")
32
+
33
+ # Initialize worker database
34
+ self.worker_db = {} # {worker_id: [embeddings]}
35
+ self.processed_workers = set() # Track processed workers
36
 
37
+ def load_worker_database(self):
38
+ """Load worker facial embeddings from Salesforce (simulated)."""
39
+ try:
40
+ response = requests.get(
41
+ f"{SALESFORCE_BASE_URL}/WorkerData",
42
+ headers={"Authorization": f"Bearer {SALESFORCE_TOKEN}"}
43
+ )
44
+ if response.status_code == 200:
45
+ workers = response.json()
46
+ for worker in workers:
47
+ embeddings = worker.get("Facial_Features__c", [])
48
+ if len(embeddings) >= 3: # Require at least 3 embeddings
49
+ self.worker_db[worker["Worker_ID__c"]] = [np.array(emb) for emb in embeddings]
50
+ logger.info(f"Loaded {len(self.worker_db)} workers from Salesforce")
51
+ else:
52
+ logger.error(f"Failed to load worker database: {response.status_code}")
53
+ except Exception as e:
54
+ logger.error(f"Error loading worker database: {str(e)}")
55
 
56
+ def preprocess_frame(self, frame):
57
+ """Preprocess frame to improve quality."""
58
+ frame = cv2.convertScaleAbs(frame, alpha=1.2, beta=10) # Enhance contrast
59
+ frame = cv2.GaussianBlur(frame, (5, 5), 0) # Reduce noise
60
+ frame = cv2.resize(frame, (1280, 720)) # Ensure sufficient resolution
61
+ return frame
 
 
 
 
 
 
 
 
 
 
 
 
 
62
 
63
+ def detect_faces(self, frame):
64
+ """Detect faces using MTCNN."""
65
+ rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
66
+ faces = self.detector.detect_faces(rgb_frame)
67
+ return [(face['box'][0], face['box'][1], face['box'][2], face['box'][3])
68
+ for face in faces if face['confidence'] > 0.9]
69
 
70
+ def extract_features(self, face_image):
71
+ """Extract facial features using ensemble of Facenet512 and ArcFace."""
72
+ try:
73
+ face_image = cv2.resize(face_image, self.image_size)
74
+ face_image = cv2.cvtColor(face_image, cv2.COLOR_BGR2RGB)
75
+ embedding_facenet = DeepFace.represent(
76
+ img_path=face_image,
77
+ model_name="Facenet512",
78
+ enforce_detection=False
79
+ )
80
+ embedding_arcface = DeepFace.represent(
81
+ img_path=face_image,
82
+ model_name="ArcFace",
83
+ enforce_detection=False
84
+ )
85
+ embedding = (np.array(embedding_facenet) + np.array(embedding_arcface)) / 2
86
+ return embedding
87
+ except Exception as e:
88
+ logger.error(f"Error extracting features: {str(e)}")
89
+ return None
90
 
91
+ def recognize_worker(self, embedding):
92
+ """Compare embedding with worker database."""
93
+ max_confidence = 0
94
+ worker_id = None
95
+ for wid, stored_embeddings in self.worker_db.items():
96
+ confidences = [
97
+ np.dot(embedding, stored_emb) / (np.linalg.norm(embedding) * np.linalg.norm(stored_emb))
98
+ for stored_emb in stored_embeddings
99
+ ]
100
+ confidence = np.mean([(sim + 1) / 2 for sim in confidences])
101
+ if confidence > max_confidence:
102
+ max_confidence = confidence
103
+ worker_id = wid
104
+ return worker_id, max_confidence
105
 
106
+ def send_to_salesforce(self, worker_id, confidence, image_base64, entry_time):
107
+ """Send recognition result to Salesforce."""
108
+ payload = {
109
+ "workerId": worker_id,
110
+ "confidence": confidence,
111
+ "image": image_base64,
112
+ "entryTime": entry_time.isoformat(),
113
+ "needsReview": confidence < self.confidence_threshold
114
+ }
115
+ headers = {
116
+ "Authorization": f"Bearer {SALESFORCE_TOKEN}",
117
+ "Content-Type": "application/json"
118
+ }
119
+ try:
120
+ response = requests.post(SALESFORCE_ENDPOINT, json=payload, headers=headers)
121
+ if response.status_code == 200:
122
+ logger.info(f"Successfully sent data to Salesforce for worker {worker_id or 'unknown'}")
123
+ return response.json()
124
+ else:
125
+ logger.error(f"Failed to send to Salesforce: {response.status_code} - {response.text}")
126
+ except Exception as e:
127
+ logger.error(f"Error sending to Salesforce: {str(e)}")
128
+ return None
129
 
130
+ def process_frame(self, frame):
131
+ """Process a single frame for recognition."""
132
+ faces = self.detect_faces(frame)
133
+ results = []
134
+ for (x, y, w, h) in faces:
135
+ face_image = frame[y:y+h, x:x+w]
136
+ embedding = self.extract_features(face_image)
137
+ if embedding is None:
138
+ continue
 
 
 
 
 
 
 
 
139
 
140
+ worker_id, confidence = self.recognize_worker(embedding)
141
+ entry_time = datetime.now()
142
+ worker_key = f"{worker_id}_{entry_time.date().isoformat()}"
143
+ if worker_key in self.processed_workers:
144
+ continue
 
 
 
 
 
 
145
 
146
+ _, buffer = cv2.imencode('.jpg', face_image)
147
+ image_base64 = base64.b64encode(buffer).decode('utf-8')
 
 
 
 
 
 
 
 
 
 
 
148
 
149
+ if confidence >= self.confidence_threshold and worker_id:
150
+ logger.info(f"Recognized worker {worker_id} with confidence {confidence:.2f}")
151
+ self.processed_workers.add(worker_key)
152
+ self.send_to_salesforce(worker_id, confidence, image_base64, entry_time)
153
+ results.append({
154
+ "worker_id": worker_id,
155
+ "confidence": confidence,
156
+ "entry_time": entry_time,
157
+ "image_base64": image_base64
158
+ })
159
+ else:
160
+ logger.info(f"Unrecognized face, confidence {confidence:.2f}, flagging for verification")
161
+ self.send_to_salesforce(None, confidence, image_base64, entry_time)
162
+ results.append({
163
+ "worker_id": "Unknown",
164
+ "confidence": confidence,
165
+ "entry_time": entry_time,
166
+ "image_base64": image_base64
167
+ })
168
+ return results