Spaces:
Sleeping
Sleeping
Complete app.py with all routes and error fixes
Browse files
app.py
CHANGED
|
@@ -26,7 +26,7 @@ except ImportError as e:
|
|
| 26 |
print(f"⚠️ ONNX Runtime not available: {e}")
|
| 27 |
print("🔄 Falling back to OpenCV-based alternatives")
|
| 28 |
|
| 29 |
-
# --- Evaluation Metrics Counters
|
| 30 |
total_attempts = 0
|
| 31 |
correct_recognitions = 0
|
| 32 |
false_accepts = 0
|
|
@@ -35,14 +35,12 @@ unauthorized_attempts = 0
|
|
| 35 |
inference_times = []
|
| 36 |
|
| 37 |
def get_client_ip():
|
| 38 |
-
# Handles proxy headers if any (like when using nginx or cloud services)
|
| 39 |
if request.headers.get('X-Forwarded-For'):
|
| 40 |
ip = request.headers.get('X-Forwarded-For').split(',')[0]
|
| 41 |
else:
|
| 42 |
ip = request.remote_addr
|
| 43 |
return ip
|
| 44 |
|
| 45 |
-
# ---------------------------------------------------
|
| 46 |
# Load environment variables
|
| 47 |
load_dotenv()
|
| 48 |
|
|
@@ -50,16 +48,14 @@ load_dotenv()
|
|
| 50 |
app = Flask(__name__, static_folder='app/static', template_folder='app/templates')
|
| 51 |
app.secret_key = os.getenv('SECRET_KEY', os.urandom(24))
|
| 52 |
|
| 53 |
-
# MongoDB Connection
|
| 54 |
def init_mongodb():
|
| 55 |
try:
|
| 56 |
-
# For Hugging Face Spaces, use the environment variable
|
| 57 |
mongo_uri = os.getenv('MONGO_URI', 'mongodb://localhost:27017/')
|
| 58 |
print(f"Connecting to MongoDB...")
|
| 59 |
|
| 60 |
client = MongoClient(mongo_uri, serverSelectionTimeoutMS=5000)
|
| 61 |
-
# Test
|
| 62 |
-
client.server_info()
|
| 63 |
|
| 64 |
db = client['face_attendance_system']
|
| 65 |
students_collection = db['students']
|
|
@@ -89,22 +85,18 @@ def init_mongodb():
|
|
| 89 |
# Initialize MongoDB
|
| 90 |
client, db, students_collection, teachers_collection, attendance_collection, metrics_events = init_mongodb()
|
| 91 |
|
| 92 |
-
#
|
| 93 |
def _get_providers():
|
| 94 |
-
"""Get ONNX Runtime providers with CPU-only fallback"""
|
| 95 |
if not ONNX_AVAILABLE:
|
| 96 |
return []
|
| 97 |
-
|
| 98 |
try:
|
| 99 |
-
# Force CPU-only to avoid executable stack issues
|
| 100 |
return ["CPUExecutionProvider"]
|
| 101 |
except Exception as e:
|
| 102 |
print(f"Error getting ONNX providers: {e}")
|
| 103 |
return []
|
| 104 |
|
| 105 |
-
#
|
| 106 |
def detect_faces_opencv(image):
|
| 107 |
-
"""OpenCV-based face detection fallback"""
|
| 108 |
try:
|
| 109 |
face_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml')
|
| 110 |
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
|
|
@@ -114,7 +106,7 @@ def detect_faces_opencv(image):
|
|
| 114 |
for (x, y, w, h) in faces:
|
| 115 |
detections.append({
|
| 116 |
"bbox": [x, y, x+w, y+h],
|
| 117 |
-
"score": 0.9
|
| 118 |
})
|
| 119 |
return detections
|
| 120 |
except Exception as e:
|
|
@@ -122,33 +114,23 @@ def detect_faces_opencv(image):
|
|
| 122 |
return []
|
| 123 |
|
| 124 |
def simple_liveness_check(face_crop):
|
| 125 |
-
"""Simple liveness check without ONNX Runtime"""
|
| 126 |
try:
|
| 127 |
gray = cv2.cvtColor(face_crop, cv2.COLOR_BGR2GRAY)
|
| 128 |
-
|
| 129 |
-
# Check image sharpness (blurry might indicate photo of photo)
|
| 130 |
laplacian_var = cv2.Laplacian(gray, cv2.CV_64F).var()
|
| 131 |
-
|
| 132 |
-
# Check brightness distribution
|
| 133 |
mean_brightness = np.mean(gray)
|
| 134 |
|
| 135 |
-
# Simple heuristic scoring
|
| 136 |
sharpness_score = min(1.0, laplacian_var / 100.0)
|
| 137 |
brightness_score = 1.0 if 50 < mean_brightness < 200 else 0.5
|
| 138 |
-
|
| 139 |
-
# Combine scores
|
| 140 |
live_score = (sharpness_score + brightness_score) / 2.0
|
| 141 |
|
| 142 |
-
# Return a value between 0.3 and 0.9
|
| 143 |
return 0.3 + (live_score * 0.6)
|
| 144 |
-
|
| 145 |
except Exception as e:
|
| 146 |
print(f"Fallback liveness check error: {e}")
|
| 147 |
-
return 0.7
|
| 148 |
|
| 149 |
-
#
|
| 150 |
def _letterbox(image, new_shape=(640, 640), color=(114, 114, 114), auto=False, scaleFill=False, scaleup=True):
|
| 151 |
-
shape = image.shape[:2]
|
| 152 |
if isinstance(new_shape, int):
|
| 153 |
new_shape = (new_shape, new_shape)
|
| 154 |
r = min(new_shape[0] / shape[0], new_shape[1] / shape[1])
|
|
@@ -198,6 +180,7 @@ def _nms(boxes: np.ndarray, scores: np.ndarray, iou_threshold: float):
|
|
| 198 |
order = order[inds + 1]
|
| 199 |
return keep
|
| 200 |
|
|
|
|
| 201 |
class YoloV5FaceDetector:
|
| 202 |
def __init__(self, model_path: str, input_size: int = 640, conf_threshold: float = 0.3, iou_threshold: float = 0.45):
|
| 203 |
self.input_size = int(input_size)
|
|
@@ -220,10 +203,9 @@ class YoloV5FaceDetector:
|
|
| 220 |
print("⚠️ No ONNX providers available for YOLOv5")
|
| 221 |
except Exception as e:
|
| 222 |
print(f"⚠️ Failed to initialize YOLOv5 with ONNX Runtime: {e}")
|
| 223 |
-
print("🔄 Will use OpenCV fallback for face detection")
|
| 224 |
self.session = None
|
| 225 |
else:
|
| 226 |
-
print("⚠️ ONNX Runtime not available - using OpenCV fallback
|
| 227 |
|
| 228 |
@staticmethod
|
| 229 |
def _xywh2xyxy(x: np.ndarray) -> np.ndarray:
|
|
@@ -236,7 +218,6 @@ class YoloV5FaceDetector:
|
|
| 236 |
|
| 237 |
def detect(self, image_bgr: np.ndarray, max_det: int = 20):
|
| 238 |
if self.session is None:
|
| 239 |
-
# Fallback to OpenCV face detection
|
| 240 |
return detect_faces_opencv(image_bgr)
|
| 241 |
|
| 242 |
try:
|
|
@@ -247,13 +228,16 @@ class YoloV5FaceDetector:
|
|
| 247 |
img = np.transpose(img, (2, 0, 1))
|
| 248 |
img = np.expand_dims(img, 0)
|
| 249 |
preds = self.session.run(self.output_names, {self.input_name: img})[0]
|
|
|
|
| 250 |
if preds.ndim == 3 and preds.shape[0] == 1:
|
| 251 |
preds = preds[0]
|
| 252 |
if preds.ndim != 2:
|
| 253 |
raise RuntimeError(f"Unexpected YOLO output shape: {preds.shape}")
|
|
|
|
| 254 |
num_attrs = preds.shape[1]
|
| 255 |
has_landmarks = num_attrs >= 15
|
| 256 |
boxes_xywh = preds[:, 0:4]
|
|
|
|
| 257 |
if has_landmarks:
|
| 258 |
scores = preds[:, 4]
|
| 259 |
else:
|
|
@@ -264,11 +248,14 @@ class YoloV5FaceDetector:
|
|
| 264 |
else:
|
| 265 |
class_conf = cls_scores.max(axis=1, keepdims=True)
|
| 266 |
scores = (obj * class_conf).squeeze(-1)
|
|
|
|
| 267 |
keep = scores > self.conf_threshold
|
| 268 |
boxes_xywh = boxes_xywh[keep]
|
| 269 |
scores = scores[keep]
|
|
|
|
| 270 |
if boxes_xywh.shape[0] == 0:
|
| 271 |
return []
|
|
|
|
| 272 |
boxes_xyxy = self._xywh2xyxy(boxes_xywh)
|
| 273 |
boxes_xyxy[:, [0, 2]] -= dwdh[0]
|
| 274 |
boxes_xyxy[:, [1, 3]] -= dwdh[1]
|
|
@@ -277,9 +264,11 @@ class YoloV5FaceDetector:
|
|
| 277 |
boxes_xyxy[:, 1] = np.clip(boxes_xyxy[:, 1], 0, h0 - 1)
|
| 278 |
boxes_xyxy[:, 2] = np.clip(boxes_xyxy[:, 2], 0, w0 - 1)
|
| 279 |
boxes_xyxy[:, 3] = np.clip(boxes_xyxy[:, 3], 0, h0 - 1)
|
|
|
|
| 280 |
keep_inds = _nms(boxes_xyxy, scores, self.iou_threshold)
|
| 281 |
if len(keep_inds) > max_det:
|
| 282 |
keep_inds = keep_inds[:max_det]
|
|
|
|
| 283 |
dets = []
|
| 284 |
for i in keep_inds:
|
| 285 |
dets.append({"bbox": boxes_xyxy[i].tolist(), "score": float(scores[i])})
|
|
@@ -291,11 +280,8 @@ class YoloV5FaceDetector:
|
|
| 291 |
def _sigmoid(x: np.ndarray) -> np.ndarray:
|
| 292 |
return 1.0 / (1.0 + np.exp(-x))
|
| 293 |
|
|
|
|
| 294 |
class AntiSpoofBinary:
|
| 295 |
-
"""
|
| 296 |
-
Binary anti-spoof model wrapper (AntiSpoofing_bin_1.5_128.onnx).
|
| 297 |
-
Returns live probability in [0,1].
|
| 298 |
-
"""
|
| 299 |
def __init__(self, model_path: str, input_size: int = 128, rgb: bool = True, normalize: bool = True,
|
| 300 |
mean=(0.5, 0.5, 0.5), std=(0.5, 0.5, 0.5), live_index: int = 1):
|
| 301 |
self.input_size = int(input_size)
|
|
@@ -318,7 +304,6 @@ class AntiSpoofBinary:
|
|
| 318 |
print("⚠️ No ONNX providers available for anti-spoofing")
|
| 319 |
except Exception as e:
|
| 320 |
print(f"⚠️ Failed to initialize anti-spoofing with ONNX Runtime: {e}")
|
| 321 |
-
print("🔄 Will use simple liveness check fallback")
|
| 322 |
self.session = None
|
| 323 |
else:
|
| 324 |
print("⚠️ ONNX Runtime not available - using simple liveness check fallback")
|
|
@@ -336,7 +321,6 @@ class AntiSpoofBinary:
|
|
| 336 |
|
| 337 |
def predict_live_prob(self, face_bgr: np.ndarray) -> float:
|
| 338 |
if self.session is None:
|
| 339 |
-
# Use simple fallback liveness check
|
| 340 |
return simple_liveness_check(face_bgr)
|
| 341 |
|
| 342 |
try:
|
|
@@ -357,6 +341,7 @@ class AntiSpoofBinary:
|
|
| 357 |
print(f"ONNX anti-spoofing error, using fallback: {e}")
|
| 358 |
return simple_liveness_check(face_bgr)
|
| 359 |
|
|
|
|
| 360 |
def expand_and_clip_box(bbox_xyxy, scale: float, w: int, h: int):
|
| 361 |
x1, y1, x2, y2 = bbox_xyxy
|
| 362 |
bw = x2 - x1
|
|
@@ -395,11 +380,9 @@ YOLO_FACE_MODEL_PATH = "models/yolov5s-face.onnx"
|
|
| 395 |
ANTI_SPOOF_BIN_MODEL_PATH = "models/anti_spoofing/AntiSpoofing_bin_1.5_128.onnx"
|
| 396 |
|
| 397 |
def ensure_models_exist():
|
| 398 |
-
"""Download and verify all required models"""
|
| 399 |
os.makedirs(DLIB_MODELS_DIR, exist_ok=True)
|
| 400 |
os.makedirs("models/anti_spoofing", exist_ok=True)
|
| 401 |
|
| 402 |
-
# Download dlib face recognition model if not present
|
| 403 |
if not os.path.exists(FACE_RECOGNITION_MODEL_PATH):
|
| 404 |
print("Downloading dlib_face_recognition_resnet_model_v1.dat.bz2...")
|
| 405 |
try:
|
|
@@ -416,7 +399,6 @@ def ensure_models_exist():
|
|
| 416 |
except Exception as e:
|
| 417 |
print(f"Failed to download dlib face recognition model: {e}")
|
| 418 |
|
| 419 |
-
# Check required models
|
| 420 |
required_models = [SHAPE_PREDICTOR_PATH, FACE_RECOGNITION_MODEL_PATH]
|
| 421 |
optional_models = [YOLO_FACE_MODEL_PATH, ANTI_SPOOF_BIN_MODEL_PATH]
|
| 422 |
|
|
@@ -434,9 +416,7 @@ def ensure_models_exist():
|
|
| 434 |
print("✅ All critical models are available!")
|
| 435 |
return True
|
| 436 |
|
| 437 |
-
# Initialize models
|
| 438 |
def init_models():
|
| 439 |
-
"""Initialize all ML models with robust error handling"""
|
| 440 |
global yolo_face, anti_spoof_bin, detector, shape_predictor, face_recognition_model
|
| 441 |
|
| 442 |
try:
|
|
@@ -444,21 +424,18 @@ def init_models():
|
|
| 444 |
print("❌ Cannot initialize critical models - some files are missing")
|
| 445 |
return False
|
| 446 |
|
| 447 |
-
# Initialize dlib models (required)
|
| 448 |
print("Loading dlib models...")
|
| 449 |
detector = dlib.get_frontal_face_detector()
|
| 450 |
shape_predictor = dlib.shape_predictor(SHAPE_PREDICTOR_PATH)
|
| 451 |
face_recognition_model = dlib.face_recognition_model_v1(FACE_RECOGNITION_MODEL_PATH)
|
| 452 |
print("✅ Dlib models loaded successfully!")
|
| 453 |
|
| 454 |
-
# Initialize ONNX models (with fallback)
|
| 455 |
print("Loading ONNX models...")
|
| 456 |
try:
|
| 457 |
yolo_face = YoloV5FaceDetector(YOLO_FACE_MODEL_PATH, input_size=640, conf_threshold=0.3, iou_threshold=0.45)
|
| 458 |
anti_spoof_bin = AntiSpoofBinary(ANTI_SPOOF_BIN_MODEL_PATH, input_size=128, rgb=True, normalize=True, live_index=1)
|
| 459 |
except Exception as e:
|
| 460 |
print(f"⚠️ ONNX models initialization had issues: {e}")
|
| 461 |
-
print("🔄 Fallback methods will be used")
|
| 462 |
|
| 463 |
print("✅ Model initialization complete!")
|
| 464 |
return True
|
|
@@ -467,10 +444,9 @@ def init_models():
|
|
| 467 |
print(f"❌ Error initializing models: {e}")
|
| 468 |
return False
|
| 469 |
|
| 470 |
-
# Initialize models
|
| 471 |
models_loaded = init_models()
|
| 472 |
|
| 473 |
-
# Face processing functions
|
| 474 |
def decode_image(base64_image):
|
| 475 |
if ',' in base64_image:
|
| 476 |
base64_image = base64_image.split(',')[1]
|
|
@@ -480,7 +456,6 @@ def decode_image(base64_image):
|
|
| 480 |
return image
|
| 481 |
|
| 482 |
def align_face(image, shape):
|
| 483 |
-
"""Align the face using eye landmarks"""
|
| 484 |
left_eye = (shape.part(36).x, shape.part(36).y)
|
| 485 |
right_eye = (shape.part(45).x, shape.part(45).y)
|
| 486 |
dx = right_eye[0] - left_eye[0]
|
|
@@ -492,7 +467,6 @@ def align_face(image, shape):
|
|
| 492 |
return aligned_image
|
| 493 |
|
| 494 |
def get_face_features(image):
|
| 495 |
-
"""Extract aligned face features using ResNet model"""
|
| 496 |
if not models_loaded:
|
| 497 |
return None
|
| 498 |
|
|
@@ -512,7 +486,6 @@ def get_face_features(image):
|
|
| 512 |
return np.array(face_descriptor)
|
| 513 |
|
| 514 |
def recognize_face(image, user_id, user_type='student'):
|
| 515 |
-
"""Face recognition function"""
|
| 516 |
global total_attempts, correct_recognitions, false_accepts, false_rejects, inference_times, unauthorized_attempts
|
| 517 |
|
| 518 |
if not models_loaded:
|
|
@@ -559,7 +532,7 @@ def recognize_face(image, user_id, user_type='student'):
|
|
| 559 |
except Exception as e:
|
| 560 |
return False, f"Error in face recognition: {str(e)}"
|
| 561 |
|
| 562 |
-
# Metrics functions
|
| 563 |
def log_metrics_event(event: dict):
|
| 564 |
if not metrics_events:
|
| 565 |
return
|
|
@@ -568,42 +541,30 @@ def log_metrics_event(event: dict):
|
|
| 568 |
except Exception as e:
|
| 569 |
print("Failed to log metrics event:", e)
|
| 570 |
|
| 571 |
-
def log_metrics_event_normalized(
|
| 572 |
-
|
| 573 |
-
|
| 574 |
-
attempt_type: str,
|
| 575 |
-
claimed_id: Optional[str],
|
| 576 |
-
recognized_id: Optional[str],
|
| 577 |
-
liveness_pass: bool,
|
| 578 |
-
distance: Optional[float],
|
| 579 |
-
live_prob: Optional[float],
|
| 580 |
-
latency_ms: Optional[float],
|
| 581 |
-
client_ip: Optional[str],
|
| 582 |
-
reason: Optional[str] = None
|
| 583 |
-
):
|
| 584 |
-
if not liveness_pass:
|
| 585 |
decision = "spoof_blocked"
|
| 586 |
else:
|
| 587 |
-
decision = "recognized" if event.startswith("accept") else "not_recognized"
|
| 588 |
|
| 589 |
doc = {
|
| 590 |
"ts": datetime.now(timezone.utc),
|
| 591 |
"event": event,
|
| 592 |
-
"attempt_type": attempt_type,
|
| 593 |
-
"claimed_id": claimed_id,
|
| 594 |
-
"recognized_id": recognized_id,
|
| 595 |
-
"liveness_pass": bool(liveness_pass),
|
| 596 |
-
"distance": distance,
|
| 597 |
-
"live_prob": live_prob,
|
| 598 |
-
"latency_ms": latency_ms,
|
| 599 |
-
"client_ip": client_ip,
|
| 600 |
-
"reason": reason,
|
| 601 |
"decision": decision,
|
| 602 |
}
|
| 603 |
log_metrics_event(doc)
|
| 604 |
|
| 605 |
def classify_event(ev: Dict[str, Any]) -> Tuple[Optional[str], Optional[str]]:
|
| 606 |
-
"""Returns (event, attempt_type)"""
|
| 607 |
if ev.get("event"):
|
| 608 |
e = ev.get("event")
|
| 609 |
at = ev.get("attempt_type")
|
|
@@ -634,7 +595,6 @@ def classify_event(ev: Dict[str, Any]) -> Tuple[Optional[str], Optional[str]]:
|
|
| 634 |
return None, None
|
| 635 |
|
| 636 |
def compute_metrics(limit: int = 10000):
|
| 637 |
-
"""Compute system metrics"""
|
| 638 |
if not metrics_events:
|
| 639 |
return {
|
| 640 |
"counts": {"trueAccepts": 0, "falseAccepts": 0, "trueRejects": 0, "falseRejects": 0,
|
|
@@ -696,7 +656,7 @@ def compute_latency_avg(limit: int = 300) -> Optional[float]:
|
|
| 696 |
return None
|
| 697 |
return sum(vals) / len(vals)
|
| 698 |
|
| 699 |
-
# Flask Routes
|
| 700 |
@app.route('/')
|
| 701 |
def home():
|
| 702 |
return render_template('home.html')
|
|
@@ -713,6 +673,7 @@ def register_page():
|
|
| 713 |
def metrics_dashboard():
|
| 714 |
return render_template('metrics.html')
|
| 715 |
|
|
|
|
| 716 |
@app.route('/register', methods=['POST'])
|
| 717 |
def register():
|
| 718 |
if not students_collection:
|
|
@@ -834,6 +795,62 @@ def face_login():
|
|
| 834 |
flash('Face not recognized. Please try again or contact admin.', 'danger')
|
| 835 |
return redirect(url_for('login_page'))
|
| 836 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 837 |
@app.route('/dashboard')
|
| 838 |
def dashboard():
|
| 839 |
if 'logged_in' not in session or session.get('user_type') != 'student':
|
|
@@ -856,6 +873,14 @@ def dashboard():
|
|
| 856 |
|
| 857 |
return render_template('dashboard.html', student=student, attendance_records=attendance_records)
|
| 858 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 859 |
@app.route('/mark-attendance', methods=['POST'])
|
| 860 |
def mark_attendance():
|
| 861 |
if 'logged_in' not in session or session.get('user_type') != 'student':
|
|
@@ -955,12 +980,169 @@ def mark_attendance():
|
|
| 955 |
else:
|
| 956 |
return jsonify({'success': False, 'message': message, 'overlay': overlay_data})
|
| 957 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 958 |
@app.route('/logout')
|
| 959 |
def logout():
|
| 960 |
session.clear()
|
| 961 |
flash('You have been logged out', 'info')
|
| 962 |
return redirect(url_for('login_page'))
|
| 963 |
|
|
|
|
| 964 |
@app.route('/metrics-json')
|
| 965 |
def metrics_json():
|
| 966 |
m = compute_metrics()
|
|
@@ -982,6 +1164,47 @@ def metrics_json():
|
|
| 982 |
'Unauthorized Attempts': counts["unauthorizedRejected"],
|
| 983 |
})
|
| 984 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 985 |
# Health check endpoint
|
| 986 |
@app.route('/health')
|
| 987 |
def health_check():
|
|
@@ -989,10 +1212,18 @@ def health_check():
|
|
| 989 |
'status': 'healthy',
|
| 990 |
'onnx_available': ONNX_AVAILABLE,
|
| 991 |
'models_loaded': models_loaded,
|
| 992 |
-
'database_connected': db is not None,
|
| 993 |
'timestamp': datetime.now().isoformat()
|
| 994 |
})
|
| 995 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 996 |
|
| 997 |
if __name__ == '__main__':
|
| 998 |
port = int(os.environ.get("PORT", 7860))
|
|
|
|
| 26 |
print(f"⚠️ ONNX Runtime not available: {e}")
|
| 27 |
print("🔄 Falling back to OpenCV-based alternatives")
|
| 28 |
|
| 29 |
+
# --- Evaluation Metrics Counters ---
|
| 30 |
total_attempts = 0
|
| 31 |
correct_recognitions = 0
|
| 32 |
false_accepts = 0
|
|
|
|
| 35 |
inference_times = []
|
| 36 |
|
| 37 |
def get_client_ip():
|
|
|
|
| 38 |
if request.headers.get('X-Forwarded-For'):
|
| 39 |
ip = request.headers.get('X-Forwarded-For').split(',')[0]
|
| 40 |
else:
|
| 41 |
ip = request.remote_addr
|
| 42 |
return ip
|
| 43 |
|
|
|
|
| 44 |
# Load environment variables
|
| 45 |
load_dotenv()
|
| 46 |
|
|
|
|
| 48 |
app = Flask(__name__, static_folder='app/static', template_folder='app/templates')
|
| 49 |
app.secret_key = os.getenv('SECRET_KEY', os.urandom(24))
|
| 50 |
|
| 51 |
+
# MongoDB Connection with robust error handling
|
| 52 |
def init_mongodb():
|
| 53 |
try:
|
|
|
|
| 54 |
mongo_uri = os.getenv('MONGO_URI', 'mongodb://localhost:27017/')
|
| 55 |
print(f"Connecting to MongoDB...")
|
| 56 |
|
| 57 |
client = MongoClient(mongo_uri, serverSelectionTimeoutMS=5000)
|
| 58 |
+
client.server_info() # Test connection
|
|
|
|
| 59 |
|
| 60 |
db = client['face_attendance_system']
|
| 61 |
students_collection = db['students']
|
|
|
|
| 85 |
# Initialize MongoDB
|
| 86 |
client, db, students_collection, teachers_collection, attendance_collection, metrics_events = init_mongodb()
|
| 87 |
|
| 88 |
+
# ONNX Runtime Provider Configuration
|
| 89 |
def _get_providers():
|
|
|
|
| 90 |
if not ONNX_AVAILABLE:
|
| 91 |
return []
|
|
|
|
| 92 |
try:
|
|
|
|
| 93 |
return ["CPUExecutionProvider"]
|
| 94 |
except Exception as e:
|
| 95 |
print(f"Error getting ONNX providers: {e}")
|
| 96 |
return []
|
| 97 |
|
| 98 |
+
# OpenCV Fallback Functions
|
| 99 |
def detect_faces_opencv(image):
|
|
|
|
| 100 |
try:
|
| 101 |
face_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml')
|
| 102 |
gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
|
|
|
|
| 106 |
for (x, y, w, h) in faces:
|
| 107 |
detections.append({
|
| 108 |
"bbox": [x, y, x+w, y+h],
|
| 109 |
+
"score": 0.9
|
| 110 |
})
|
| 111 |
return detections
|
| 112 |
except Exception as e:
|
|
|
|
| 114 |
return []
|
| 115 |
|
| 116 |
def simple_liveness_check(face_crop):
|
|
|
|
| 117 |
try:
|
| 118 |
gray = cv2.cvtColor(face_crop, cv2.COLOR_BGR2GRAY)
|
|
|
|
|
|
|
| 119 |
laplacian_var = cv2.Laplacian(gray, cv2.CV_64F).var()
|
|
|
|
|
|
|
| 120 |
mean_brightness = np.mean(gray)
|
| 121 |
|
|
|
|
| 122 |
sharpness_score = min(1.0, laplacian_var / 100.0)
|
| 123 |
brightness_score = 1.0 if 50 < mean_brightness < 200 else 0.5
|
|
|
|
|
|
|
| 124 |
live_score = (sharpness_score + brightness_score) / 2.0
|
| 125 |
|
|
|
|
| 126 |
return 0.3 + (live_score * 0.6)
|
|
|
|
| 127 |
except Exception as e:
|
| 128 |
print(f"Fallback liveness check error: {e}")
|
| 129 |
+
return 0.7
|
| 130 |
|
| 131 |
+
# YOLO and Anti-Spoofing Helper Functions
|
| 132 |
def _letterbox(image, new_shape=(640, 640), color=(114, 114, 114), auto=False, scaleFill=False, scaleup=True):
|
| 133 |
+
shape = image.shape[:2]
|
| 134 |
if isinstance(new_shape, int):
|
| 135 |
new_shape = (new_shape, new_shape)
|
| 136 |
r = min(new_shape[0] / shape[0], new_shape[1] / shape[1])
|
|
|
|
| 180 |
order = order[inds + 1]
|
| 181 |
return keep
|
| 182 |
|
| 183 |
+
# YOLO Face Detector Class
|
| 184 |
class YoloV5FaceDetector:
|
| 185 |
def __init__(self, model_path: str, input_size: int = 640, conf_threshold: float = 0.3, iou_threshold: float = 0.45):
|
| 186 |
self.input_size = int(input_size)
|
|
|
|
| 203 |
print("⚠️ No ONNX providers available for YOLOv5")
|
| 204 |
except Exception as e:
|
| 205 |
print(f"⚠️ Failed to initialize YOLOv5 with ONNX Runtime: {e}")
|
|
|
|
| 206 |
self.session = None
|
| 207 |
else:
|
| 208 |
+
print("⚠️ ONNX Runtime not available - using OpenCV fallback")
|
| 209 |
|
| 210 |
@staticmethod
|
| 211 |
def _xywh2xyxy(x: np.ndarray) -> np.ndarray:
|
|
|
|
| 218 |
|
| 219 |
def detect(self, image_bgr: np.ndarray, max_det: int = 20):
|
| 220 |
if self.session is None:
|
|
|
|
| 221 |
return detect_faces_opencv(image_bgr)
|
| 222 |
|
| 223 |
try:
|
|
|
|
| 228 |
img = np.transpose(img, (2, 0, 1))
|
| 229 |
img = np.expand_dims(img, 0)
|
| 230 |
preds = self.session.run(self.output_names, {self.input_name: img})[0]
|
| 231 |
+
|
| 232 |
if preds.ndim == 3 and preds.shape[0] == 1:
|
| 233 |
preds = preds[0]
|
| 234 |
if preds.ndim != 2:
|
| 235 |
raise RuntimeError(f"Unexpected YOLO output shape: {preds.shape}")
|
| 236 |
+
|
| 237 |
num_attrs = preds.shape[1]
|
| 238 |
has_landmarks = num_attrs >= 15
|
| 239 |
boxes_xywh = preds[:, 0:4]
|
| 240 |
+
|
| 241 |
if has_landmarks:
|
| 242 |
scores = preds[:, 4]
|
| 243 |
else:
|
|
|
|
| 248 |
else:
|
| 249 |
class_conf = cls_scores.max(axis=1, keepdims=True)
|
| 250 |
scores = (obj * class_conf).squeeze(-1)
|
| 251 |
+
|
| 252 |
keep = scores > self.conf_threshold
|
| 253 |
boxes_xywh = boxes_xywh[keep]
|
| 254 |
scores = scores[keep]
|
| 255 |
+
|
| 256 |
if boxes_xywh.shape[0] == 0:
|
| 257 |
return []
|
| 258 |
+
|
| 259 |
boxes_xyxy = self._xywh2xyxy(boxes_xywh)
|
| 260 |
boxes_xyxy[:, [0, 2]] -= dwdh[0]
|
| 261 |
boxes_xyxy[:, [1, 3]] -= dwdh[1]
|
|
|
|
| 264 |
boxes_xyxy[:, 1] = np.clip(boxes_xyxy[:, 1], 0, h0 - 1)
|
| 265 |
boxes_xyxy[:, 2] = np.clip(boxes_xyxy[:, 2], 0, w0 - 1)
|
| 266 |
boxes_xyxy[:, 3] = np.clip(boxes_xyxy[:, 3], 0, h0 - 1)
|
| 267 |
+
|
| 268 |
keep_inds = _nms(boxes_xyxy, scores, self.iou_threshold)
|
| 269 |
if len(keep_inds) > max_det:
|
| 270 |
keep_inds = keep_inds[:max_det]
|
| 271 |
+
|
| 272 |
dets = []
|
| 273 |
for i in keep_inds:
|
| 274 |
dets.append({"bbox": boxes_xyxy[i].tolist(), "score": float(scores[i])})
|
|
|
|
| 280 |
def _sigmoid(x: np.ndarray) -> np.ndarray:
|
| 281 |
return 1.0 / (1.0 + np.exp(-x))
|
| 282 |
|
| 283 |
+
# Anti-Spoofing Binary Class
|
| 284 |
class AntiSpoofBinary:
|
|
|
|
|
|
|
|
|
|
|
|
|
| 285 |
def __init__(self, model_path: str, input_size: int = 128, rgb: bool = True, normalize: bool = True,
|
| 286 |
mean=(0.5, 0.5, 0.5), std=(0.5, 0.5, 0.5), live_index: int = 1):
|
| 287 |
self.input_size = int(input_size)
|
|
|
|
| 304 |
print("⚠️ No ONNX providers available for anti-spoofing")
|
| 305 |
except Exception as e:
|
| 306 |
print(f"⚠️ Failed to initialize anti-spoofing with ONNX Runtime: {e}")
|
|
|
|
| 307 |
self.session = None
|
| 308 |
else:
|
| 309 |
print("⚠️ ONNX Runtime not available - using simple liveness check fallback")
|
|
|
|
| 321 |
|
| 322 |
def predict_live_prob(self, face_bgr: np.ndarray) -> float:
|
| 323 |
if self.session is None:
|
|
|
|
| 324 |
return simple_liveness_check(face_bgr)
|
| 325 |
|
| 326 |
try:
|
|
|
|
| 341 |
print(f"ONNX anti-spoofing error, using fallback: {e}")
|
| 342 |
return simple_liveness_check(face_bgr)
|
| 343 |
|
| 344 |
+
# Helper Functions
|
| 345 |
def expand_and_clip_box(bbox_xyxy, scale: float, w: int, h: int):
|
| 346 |
x1, y1, x2, y2 = bbox_xyxy
|
| 347 |
bw = x2 - x1
|
|
|
|
| 380 |
ANTI_SPOOF_BIN_MODEL_PATH = "models/anti_spoofing/AntiSpoofing_bin_1.5_128.onnx"
|
| 381 |
|
| 382 |
def ensure_models_exist():
|
|
|
|
| 383 |
os.makedirs(DLIB_MODELS_DIR, exist_ok=True)
|
| 384 |
os.makedirs("models/anti_spoofing", exist_ok=True)
|
| 385 |
|
|
|
|
| 386 |
if not os.path.exists(FACE_RECOGNITION_MODEL_PATH):
|
| 387 |
print("Downloading dlib_face_recognition_resnet_model_v1.dat.bz2...")
|
| 388 |
try:
|
|
|
|
| 399 |
except Exception as e:
|
| 400 |
print(f"Failed to download dlib face recognition model: {e}")
|
| 401 |
|
|
|
|
| 402 |
required_models = [SHAPE_PREDICTOR_PATH, FACE_RECOGNITION_MODEL_PATH]
|
| 403 |
optional_models = [YOLO_FACE_MODEL_PATH, ANTI_SPOOF_BIN_MODEL_PATH]
|
| 404 |
|
|
|
|
| 416 |
print("✅ All critical models are available!")
|
| 417 |
return True
|
| 418 |
|
|
|
|
| 419 |
def init_models():
|
|
|
|
| 420 |
global yolo_face, anti_spoof_bin, detector, shape_predictor, face_recognition_model
|
| 421 |
|
| 422 |
try:
|
|
|
|
| 424 |
print("❌ Cannot initialize critical models - some files are missing")
|
| 425 |
return False
|
| 426 |
|
|
|
|
| 427 |
print("Loading dlib models...")
|
| 428 |
detector = dlib.get_frontal_face_detector()
|
| 429 |
shape_predictor = dlib.shape_predictor(SHAPE_PREDICTOR_PATH)
|
| 430 |
face_recognition_model = dlib.face_recognition_model_v1(FACE_RECOGNITION_MODEL_PATH)
|
| 431 |
print("✅ Dlib models loaded successfully!")
|
| 432 |
|
|
|
|
| 433 |
print("Loading ONNX models...")
|
| 434 |
try:
|
| 435 |
yolo_face = YoloV5FaceDetector(YOLO_FACE_MODEL_PATH, input_size=640, conf_threshold=0.3, iou_threshold=0.45)
|
| 436 |
anti_spoof_bin = AntiSpoofBinary(ANTI_SPOOF_BIN_MODEL_PATH, input_size=128, rgb=True, normalize=True, live_index=1)
|
| 437 |
except Exception as e:
|
| 438 |
print(f"⚠️ ONNX models initialization had issues: {e}")
|
|
|
|
| 439 |
|
| 440 |
print("✅ Model initialization complete!")
|
| 441 |
return True
|
|
|
|
| 444 |
print(f"❌ Error initializing models: {e}")
|
| 445 |
return False
|
| 446 |
|
|
|
|
| 447 |
models_loaded = init_models()
|
| 448 |
|
| 449 |
+
# Face processing functions
|
| 450 |
def decode_image(base64_image):
|
| 451 |
if ',' in base64_image:
|
| 452 |
base64_image = base64_image.split(',')[1]
|
|
|
|
| 456 |
return image
|
| 457 |
|
| 458 |
def align_face(image, shape):
|
|
|
|
| 459 |
left_eye = (shape.part(36).x, shape.part(36).y)
|
| 460 |
right_eye = (shape.part(45).x, shape.part(45).y)
|
| 461 |
dx = right_eye[0] - left_eye[0]
|
|
|
|
| 467 |
return aligned_image
|
| 468 |
|
| 469 |
def get_face_features(image):
|
|
|
|
| 470 |
if not models_loaded:
|
| 471 |
return None
|
| 472 |
|
|
|
|
| 486 |
return np.array(face_descriptor)
|
| 487 |
|
| 488 |
def recognize_face(image, user_id, user_type='student'):
|
|
|
|
| 489 |
global total_attempts, correct_recognitions, false_accepts, false_rejects, inference_times, unauthorized_attempts
|
| 490 |
|
| 491 |
if not models_loaded:
|
|
|
|
| 532 |
except Exception as e:
|
| 533 |
return False, f"Error in face recognition: {str(e)}"
|
| 534 |
|
| 535 |
+
# Metrics functions
|
| 536 |
def log_metrics_event(event: dict):
|
| 537 |
if not metrics_events:
|
| 538 |
return
|
|
|
|
| 541 |
except Exception as e:
|
| 542 |
print("Failed to log metrics event:", e)
|
| 543 |
|
| 544 |
+
def log_metrics_event_normalized(**kwargs):
|
| 545 |
+
event = kwargs.get('event')
|
| 546 |
+
if not kwargs.get('liveness_pass'):
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 547 |
decision = "spoof_blocked"
|
| 548 |
else:
|
| 549 |
+
decision = "recognized" if event and event.startswith("accept") else "not_recognized"
|
| 550 |
|
| 551 |
doc = {
|
| 552 |
"ts": datetime.now(timezone.utc),
|
| 553 |
"event": event,
|
| 554 |
+
"attempt_type": kwargs.get('attempt_type'),
|
| 555 |
+
"claimed_id": kwargs.get('claimed_id'),
|
| 556 |
+
"recognized_id": kwargs.get('recognized_id'),
|
| 557 |
+
"liveness_pass": bool(kwargs.get('liveness_pass')),
|
| 558 |
+
"distance": kwargs.get('distance'),
|
| 559 |
+
"live_prob": kwargs.get('live_prob'),
|
| 560 |
+
"latency_ms": kwargs.get('latency_ms'),
|
| 561 |
+
"client_ip": kwargs.get('client_ip'),
|
| 562 |
+
"reason": kwargs.get('reason'),
|
| 563 |
"decision": decision,
|
| 564 |
}
|
| 565 |
log_metrics_event(doc)
|
| 566 |
|
| 567 |
def classify_event(ev: Dict[str, Any]) -> Tuple[Optional[str], Optional[str]]:
|
|
|
|
| 568 |
if ev.get("event"):
|
| 569 |
e = ev.get("event")
|
| 570 |
at = ev.get("attempt_type")
|
|
|
|
| 595 |
return None, None
|
| 596 |
|
| 597 |
def compute_metrics(limit: int = 10000):
|
|
|
|
| 598 |
if not metrics_events:
|
| 599 |
return {
|
| 600 |
"counts": {"trueAccepts": 0, "falseAccepts": 0, "trueRejects": 0, "falseRejects": 0,
|
|
|
|
| 656 |
return None
|
| 657 |
return sum(vals) / len(vals)
|
| 658 |
|
| 659 |
+
# Flask Routes
|
| 660 |
@app.route('/')
|
| 661 |
def home():
|
| 662 |
return render_template('home.html')
|
|
|
|
| 673 |
def metrics_dashboard():
|
| 674 |
return render_template('metrics.html')
|
| 675 |
|
| 676 |
+
# Student Routes
|
| 677 |
@app.route('/register', methods=['POST'])
|
| 678 |
def register():
|
| 679 |
if not students_collection:
|
|
|
|
| 795 |
flash('Face not recognized. Please try again or contact admin.', 'danger')
|
| 796 |
return redirect(url_for('login_page'))
|
| 797 |
|
| 798 |
+
@app.route('/auto-face-login', methods=['POST'])
|
| 799 |
+
def auto_face_login():
|
| 800 |
+
try:
|
| 801 |
+
data = request.json
|
| 802 |
+
face_image = data.get('face_image')
|
| 803 |
+
face_role = data.get('face_role', 'student')
|
| 804 |
+
|
| 805 |
+
if not face_image:
|
| 806 |
+
return jsonify({'success': False, 'message': 'No image received'})
|
| 807 |
+
|
| 808 |
+
image = decode_image(face_image)
|
| 809 |
+
test_features = get_face_features(image)
|
| 810 |
+
if test_features is None:
|
| 811 |
+
return jsonify({'success': False, 'message': 'No face detected'})
|
| 812 |
+
|
| 813 |
+
if face_role == 'teacher':
|
| 814 |
+
collection = teachers_collection
|
| 815 |
+
id_field = 'teacher_id'
|
| 816 |
+
dashboard_route = '/teacher_dashboard'
|
| 817 |
+
else:
|
| 818 |
+
collection = students_collection
|
| 819 |
+
id_field = 'student_id'
|
| 820 |
+
dashboard_route = '/dashboard'
|
| 821 |
+
|
| 822 |
+
if not collection:
|
| 823 |
+
return jsonify({'success': False, 'message': 'Database not available'})
|
| 824 |
+
|
| 825 |
+
users = collection.find({'face_image': {'$exists': True, '$ne': None}})
|
| 826 |
+
for user in users:
|
| 827 |
+
try:
|
| 828 |
+
ref_image_array = np.frombuffer(user['face_image'], np.uint8)
|
| 829 |
+
ref_image = cv2.imdecode(ref_image_array, cv2.IMREAD_COLOR)
|
| 830 |
+
ref_features = get_face_features(ref_image)
|
| 831 |
+
if ref_features is None:
|
| 832 |
+
continue
|
| 833 |
+
dist = np.linalg.norm(test_features - ref_features)
|
| 834 |
+
if dist < 0.6:
|
| 835 |
+
session['logged_in'] = True
|
| 836 |
+
session['user_type'] = face_role
|
| 837 |
+
session[id_field] = user[id_field]
|
| 838 |
+
session['name'] = user.get('name')
|
| 839 |
+
return jsonify({
|
| 840 |
+
'success': True,
|
| 841 |
+
'message': f'Welcome {user["name"]}! Redirecting...',
|
| 842 |
+
'redirect_url': dashboard_route,
|
| 843 |
+
'face_role': face_role
|
| 844 |
+
})
|
| 845 |
+
except Exception as e:
|
| 846 |
+
print(f"Error processing user {user.get(id_field)}: {e}")
|
| 847 |
+
continue
|
| 848 |
+
|
| 849 |
+
return jsonify({'success': False, 'message': f'Face not recognized in {face_role} database'})
|
| 850 |
+
except Exception as e:
|
| 851 |
+
print(f"Auto face login error: {e}")
|
| 852 |
+
return jsonify({'success': False, 'message': 'Login failed due to server error'})
|
| 853 |
+
|
| 854 |
@app.route('/dashboard')
|
| 855 |
def dashboard():
|
| 856 |
if 'logged_in' not in session or session.get('user_type') != 'student':
|
|
|
|
| 873 |
|
| 874 |
return render_template('dashboard.html', student=student, attendance_records=attendance_records)
|
| 875 |
|
| 876 |
+
@app.route('/attendance.html')
|
| 877 |
+
def attendance_page():
|
| 878 |
+
if 'logged_in' not in session or session.get('user_type') != 'student':
|
| 879 |
+
return redirect(url_for('login_page'))
|
| 880 |
+
student_id = session.get('student_id')
|
| 881 |
+
student = students_collection.find_one({'student_id': student_id}) if students_collection else None
|
| 882 |
+
return render_template('attendance.html', student=student)
|
| 883 |
+
|
| 884 |
@app.route('/mark-attendance', methods=['POST'])
|
| 885 |
def mark_attendance():
|
| 886 |
if 'logged_in' not in session or session.get('user_type') != 'student':
|
|
|
|
| 980 |
else:
|
| 981 |
return jsonify({'success': False, 'message': message, 'overlay': overlay_data})
|
| 982 |
|
| 983 |
+
@app.route('/liveness-preview', methods=['POST'])
|
| 984 |
+
def liveness_preview():
|
| 985 |
+
if 'logged_in' not in session or session.get('user_type') != 'student':
|
| 986 |
+
return jsonify({'success': False, 'message': 'Not logged in'})
|
| 987 |
+
|
| 988 |
+
try:
|
| 989 |
+
data = request.json or {}
|
| 990 |
+
face_image = data.get('face_image')
|
| 991 |
+
if not face_image:
|
| 992 |
+
return jsonify({'success': False, 'message': 'No image received'})
|
| 993 |
+
|
| 994 |
+
image = decode_image(face_image)
|
| 995 |
+
if image is None or image.size == 0:
|
| 996 |
+
return jsonify({'success': False, 'message': 'Invalid image data'})
|
| 997 |
+
|
| 998 |
+
h, w = image.shape[:2]
|
| 999 |
+
vis = image.copy()
|
| 1000 |
+
detections = yolo_face.detect(image, max_det=10)
|
| 1001 |
+
|
| 1002 |
+
if not detections:
|
| 1003 |
+
overlay_data = image_to_data_uri(vis)
|
| 1004 |
+
return jsonify({
|
| 1005 |
+
'success': True,
|
| 1006 |
+
'live': False,
|
| 1007 |
+
'live_prob': 0.0,
|
| 1008 |
+
'message': 'No face detected',
|
| 1009 |
+
'overlay': overlay_data
|
| 1010 |
+
})
|
| 1011 |
+
|
| 1012 |
+
best = max(detections, key=lambda d: d["score"])
|
| 1013 |
+
x1, y1, x2, y2 = [int(v) for v in best["bbox"]]
|
| 1014 |
+
x1e, y1e, x2e, y2e = expand_and_clip_box((x1, y1, x2, y2), scale=1.2, w=w, h=h)
|
| 1015 |
+
face_crop = image[y1e:y2e, x1e:x2e]
|
| 1016 |
+
|
| 1017 |
+
if face_crop.size == 0:
|
| 1018 |
+
overlay_data = image_to_data_uri(vis)
|
| 1019 |
+
return jsonify({
|
| 1020 |
+
'success': True,
|
| 1021 |
+
'live': False,
|
| 1022 |
+
'live_prob': 0.0,
|
| 1023 |
+
'message': 'Failed to crop face',
|
| 1024 |
+
'overlay': overlay_data
|
| 1025 |
+
})
|
| 1026 |
+
|
| 1027 |
+
live_prob = anti_spoof_bin.predict_live_prob(face_crop)
|
| 1028 |
+
threshold = 0.7
|
| 1029 |
+
label = "LIVE" if live_prob >= threshold else "SPOOF"
|
| 1030 |
+
color = (0, 200, 0) if label == "LIVE" else (0, 0, 255)
|
| 1031 |
+
|
| 1032 |
+
draw_live_overlay(vis, (x1e, y1e, x2e, y2e), label, live_prob, color)
|
| 1033 |
+
overlay_data = image_to_data_uri(vis)
|
| 1034 |
+
|
| 1035 |
+
return jsonify({
|
| 1036 |
+
'success': True,
|
| 1037 |
+
'live': bool(live_prob >= threshold),
|
| 1038 |
+
'live_prob': float(live_prob),
|
| 1039 |
+
'overlay': overlay_data
|
| 1040 |
+
})
|
| 1041 |
+
except Exception as e:
|
| 1042 |
+
print("liveness_preview error:", e)
|
| 1043 |
+
return jsonify({'success': False, 'message': 'Server error during preview'})
|
| 1044 |
+
|
| 1045 |
+
# Teacher Routes
|
| 1046 |
+
@app.route('/teacher_register.html')
|
| 1047 |
+
def teacher_register_page():
|
| 1048 |
+
return render_template('teacher_register.html')
|
| 1049 |
+
|
| 1050 |
+
@app.route('/teacher_login.html')
|
| 1051 |
+
def teacher_login_page():
|
| 1052 |
+
return render_template('teacher_login.html')
|
| 1053 |
+
|
| 1054 |
+
@app.route('/teacher_register', methods=['POST'])
|
| 1055 |
+
def teacher_register():
|
| 1056 |
+
if not teachers_collection:
|
| 1057 |
+
flash('Database not available. Please try again later.', 'danger')
|
| 1058 |
+
return redirect(url_for('teacher_register_page'))
|
| 1059 |
+
|
| 1060 |
+
try:
|
| 1061 |
+
teacher_data = {
|
| 1062 |
+
'teacher_id': request.form.get('teacher_id'),
|
| 1063 |
+
'name': request.form.get('name'),
|
| 1064 |
+
'email': request.form.get('email'),
|
| 1065 |
+
'department': request.form.get('department'),
|
| 1066 |
+
'designation': request.form.get('designation'),
|
| 1067 |
+
'mobile': request.form.get('mobile'),
|
| 1068 |
+
'dob': request.form.get('dob'),
|
| 1069 |
+
'gender': request.form.get('gender'),
|
| 1070 |
+
'password': request.form.get('password'),
|
| 1071 |
+
'created_at': datetime.now()
|
| 1072 |
+
}
|
| 1073 |
+
face_image = request.form.get('face_image')
|
| 1074 |
+
if face_image and ',' in face_image:
|
| 1075 |
+
image_data = face_image.split(',')[1]
|
| 1076 |
+
teacher_data['face_image'] = Binary(base64.b64decode(image_data))
|
| 1077 |
+
teacher_data['face_image_type'] = face_image.split(',')[0].split(':')[1].split(';')[0]
|
| 1078 |
+
else:
|
| 1079 |
+
flash('Face image is required for registration.', 'danger')
|
| 1080 |
+
return redirect(url_for('teacher_register_page'))
|
| 1081 |
+
|
| 1082 |
+
result = teachers_collection.insert_one(teacher_data)
|
| 1083 |
+
if result.inserted_id:
|
| 1084 |
+
flash('Registration successful! You can now login.', 'success')
|
| 1085 |
+
return redirect(url_for('teacher_login_page'))
|
| 1086 |
+
else:
|
| 1087 |
+
flash('Registration failed. Please try again.', 'danger')
|
| 1088 |
+
return redirect(url_for('teacher_register_page'))
|
| 1089 |
+
except pymongo.errors.DuplicateKeyError:
|
| 1090 |
+
flash('Teacher ID already exists. Please use a different ID.', 'danger')
|
| 1091 |
+
return redirect(url_for('teacher_register_page'))
|
| 1092 |
+
except Exception as e:
|
| 1093 |
+
flash(f'Registration failed: {str(e)}', 'danger')
|
| 1094 |
+
return redirect(url_for('teacher_register_page'))
|
| 1095 |
+
|
| 1096 |
+
@app.route('/teacher_login', methods=['POST'])
|
| 1097 |
+
def teacher_login():
|
| 1098 |
+
if not teachers_collection:
|
| 1099 |
+
flash('Database not available. Please try again later.', 'danger')
|
| 1100 |
+
return redirect(url_for('teacher_login_page'))
|
| 1101 |
+
|
| 1102 |
+
teacher_id = request.form.get('teacher_id')
|
| 1103 |
+
password = request.form.get('password')
|
| 1104 |
+
teacher = teachers_collection.find_one({'teacher_id': teacher_id})
|
| 1105 |
+
|
| 1106 |
+
if teacher and teacher['password'] == password:
|
| 1107 |
+
session['logged_in'] = True
|
| 1108 |
+
session['user_type'] = 'teacher'
|
| 1109 |
+
session['teacher_id'] = teacher_id
|
| 1110 |
+
session['name'] = teacher.get('name')
|
| 1111 |
+
flash('Login successful!', 'success')
|
| 1112 |
+
return redirect(url_for('teacher_dashboard'))
|
| 1113 |
+
else:
|
| 1114 |
+
flash('Invalid credentials. Please try again.', 'danger')
|
| 1115 |
+
return redirect(url_for('teacher_login_page'))
|
| 1116 |
+
|
| 1117 |
+
@app.route('/teacher_dashboard')
|
| 1118 |
+
def teacher_dashboard():
|
| 1119 |
+
if 'logged_in' not in session or session.get('user_type') != 'teacher':
|
| 1120 |
+
return redirect(url_for('teacher_login_page'))
|
| 1121 |
+
|
| 1122 |
+
teacher_id = session.get('teacher_id')
|
| 1123 |
+
teacher = teachers_collection.find_one({'teacher_id': teacher_id}) if teachers_collection else None
|
| 1124 |
+
|
| 1125 |
+
if teacher and 'face_image' in teacher and teacher['face_image']:
|
| 1126 |
+
face_image_base64 = base64.b64encode(teacher['face_image']).decode('utf-8')
|
| 1127 |
+
mime_type = teacher.get('face_image_type', 'image/jpeg')
|
| 1128 |
+
teacher['face_image_url'] = f"data:{mime_type};base64,{face_image_base64}"
|
| 1129 |
+
|
| 1130 |
+
return render_template('teacher_dashboard.html', teacher=teacher)
|
| 1131 |
+
|
| 1132 |
+
@app.route('/teacher_logout')
|
| 1133 |
+
def teacher_logout():
|
| 1134 |
+
session.clear()
|
| 1135 |
+
flash('You have been logged out', 'info')
|
| 1136 |
+
return redirect(url_for('teacher_login_page'))
|
| 1137 |
+
|
| 1138 |
+
# Common logout
|
| 1139 |
@app.route('/logout')
|
| 1140 |
def logout():
|
| 1141 |
session.clear()
|
| 1142 |
flash('You have been logged out', 'info')
|
| 1143 |
return redirect(url_for('login_page'))
|
| 1144 |
|
| 1145 |
+
# Metrics endpoints
|
| 1146 |
@app.route('/metrics-json')
|
| 1147 |
def metrics_json():
|
| 1148 |
m = compute_metrics()
|
|
|
|
| 1164 |
'Unauthorized Attempts': counts["unauthorizedRejected"],
|
| 1165 |
})
|
| 1166 |
|
| 1167 |
+
@app.route('/metrics-data', methods=['GET'])
|
| 1168 |
+
def metrics_data():
|
| 1169 |
+
data = compute_metrics()
|
| 1170 |
+
recent = []
|
| 1171 |
+
if metrics_events:
|
| 1172 |
+
recent = list(metrics_events.find({}, {"_id": 0}).sort("ts", -1).limit(200))
|
| 1173 |
+
for r in recent:
|
| 1174 |
+
if isinstance(r.get("ts"), datetime):
|
| 1175 |
+
r["ts"] = r["ts"].isoformat()
|
| 1176 |
+
event, attempt_type = classify_event(r)
|
| 1177 |
+
if event and not r.get("event"):
|
| 1178 |
+
r["event"] = event
|
| 1179 |
+
if attempt_type and not r.get("attempt_type"):
|
| 1180 |
+
r["attempt_type"] = attempt_type
|
| 1181 |
+
if "liveness_pass" not in r:
|
| 1182 |
+
if r.get("decision") == "spoof_blocked":
|
| 1183 |
+
r["liveness_pass"] = False
|
| 1184 |
+
elif isinstance(r.get("live_prob"), (int, float)):
|
| 1185 |
+
r["liveness_pass"] = bool(r["live_prob"] >= 0.7)
|
| 1186 |
+
else:
|
| 1187 |
+
r["liveness_pass"] = None
|
| 1188 |
+
|
| 1189 |
+
data["recent"] = recent
|
| 1190 |
+
data["avg_latency_ms"] = compute_latency_avg()
|
| 1191 |
+
return jsonify(data)
|
| 1192 |
+
|
| 1193 |
+
@app.route('/metrics-events')
|
| 1194 |
+
def metrics_events_api():
|
| 1195 |
+
if not metrics_events:
|
| 1196 |
+
return jsonify([])
|
| 1197 |
+
|
| 1198 |
+
limit = int(request.args.get("limit", 200))
|
| 1199 |
+
cursor = metrics_events.find({}, {"_id": 0}).sort("ts", -1).limit(limit)
|
| 1200 |
+
events = list(cursor)
|
| 1201 |
+
|
| 1202 |
+
for ev in events:
|
| 1203 |
+
if isinstance(ev.get("ts"), datetime):
|
| 1204 |
+
ev["ts"] = ev["ts"].isoformat()
|
| 1205 |
+
|
| 1206 |
+
return jsonify(events)
|
| 1207 |
+
|
| 1208 |
# Health check endpoint
|
| 1209 |
@app.route('/health')
|
| 1210 |
def health_check():
|
|
|
|
| 1212 |
'status': 'healthy',
|
| 1213 |
'onnx_available': ONNX_AVAILABLE,
|
| 1214 |
'models_loaded': models_loaded,
|
| 1215 |
+
'database_connected': db is not None,
|
| 1216 |
'timestamp': datetime.now().isoformat()
|
| 1217 |
})
|
| 1218 |
|
| 1219 |
+
# Error handlers
|
| 1220 |
+
@app.errorhandler(404)
|
| 1221 |
+
def not_found_error(error):
|
| 1222 |
+
return render_template('404.html'), 404
|
| 1223 |
+
|
| 1224 |
+
@app.errorhandler(500)
|
| 1225 |
+
def internal_error(error):
|
| 1226 |
+
return render_template('500.html'), 500
|
| 1227 |
|
| 1228 |
if __name__ == '__main__':
|
| 1229 |
port = int(os.environ.get("PORT", 7860))
|