Spaces:
Sleeping
Sleeping
| from flask import Flask, render_template, request, redirect, url_for, flash, session, jsonify | |
| from werkzeug.middleware.proxy_fix import ProxyFix | |
| import os | |
| import gc | |
| import logging | |
| import time | |
| import uuid | |
| import secrets | |
| import pymongo | |
| from pymongo import MongoClient | |
| from bson.binary import Binary | |
| import base64 | |
| from datetime import datetime, timezone, timedelta | |
| from dotenv import load_dotenv | |
| import numpy as np | |
| import cv2 | |
| from typing import Optional, Dict, Tuple, Any | |
| import tempfile | |
| import atexit | |
| import shutil | |
| # Optimize memory usage and disable TensorFlow warnings | |
| os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3' | |
| os.environ['CUDA_VISIBLE_DEVICES'] = '-1' | |
| os.environ['OMP_NUM_THREADS'] = '1' | |
| # Configure logging for production | |
| logging.basicConfig(level=logging.WARNING) | |
| logging.getLogger('tensorflow').setLevel(logging.ERROR) | |
| # --- Evaluation Metrics Counters (legacy, kept for compatibility display) --- | |
| total_attempts = 0 | |
| correct_recognitions = 0 | |
| false_accepts = 0 | |
| false_rejects = 0 | |
| unauthorized_attempts = 0 | |
| inference_times = [] | |
| # --------------------------------------------------- | |
| # Load environment variables | |
| load_dotenv() | |
| # Initialize Flask app | |
| app = Flask(__name__, static_folder='app/static', template_folder='app/templates') | |
| # CRITICAL FIX: Enhanced secret key and session config for Hugging Face | |
| app.secret_key = os.environ.get('SECRET_KEY', 'huggingface-face-recognition-super-secret-key-2025') | |
| # CRITICAL FIX: Optimized session config for Hugging Face Spaces | |
| app.config.update({ | |
| 'PERMANENT_SESSION_LIFETIME': timedelta(hours=2), | |
| 'SESSION_COOKIE_NAME': 'face_app_session', | |
| 'SESSION_COOKIE_HTTPONLY': True, | |
| 'SESSION_COOKIE_SECURE': False, | |
| 'SESSION_COOKIE_SAMESITE': None, | |
| 'SESSION_REFRESH_EACH_REQUEST': False, | |
| 'SESSION_COOKIE_DOMAIN': None, | |
| 'SESSION_COOKIE_PATH': '/', | |
| 'SEND_FILE_MAX_AGE_DEFAULT': 0 | |
| }) | |
| # Add ProxyFix middleware for Hugging Face reverse proxy | |
| app.wsgi_app = ProxyFix( | |
| app.wsgi_app, | |
| x_for=1, | |
| x_proto=1, | |
| x_host=1, | |
| x_prefix=0 | |
| ) | |
| print("Flask app initialized with ProxyFix middleware and token-based sessions") | |
| # Create temporary directory for image processing | |
| TEMP_DIR = tempfile.mkdtemp() | |
| def cleanup_temp_dir(): | |
| """Clean up temporary directory on exit""" | |
| try: | |
| if os.path.exists(TEMP_DIR): | |
| shutil.rmtree(TEMP_DIR) | |
| gc.collect() | |
| except Exception as e: | |
| print(f"Error cleaning up temp directory: {e}") | |
| atexit.register(cleanup_temp_dir) | |
| # MongoDB Connection with connection pooling | |
| try: | |
| mongo_uri = os.getenv('MONGO_URI', 'mongodb://localhost:27017/') | |
| client = MongoClient( | |
| mongo_uri, | |
| maxPoolSize=10, | |
| connectTimeoutMS=5000, | |
| socketTimeoutMS=5000, | |
| serverSelectionTimeoutMS=5000 | |
| ) | |
| db = client['face_attendance_system'] | |
| students_collection = db['students'] | |
| teachers_collection = db['teachers'] | |
| attendance_collection = db['attendance'] | |
| metrics_events = db['metrics_events'] | |
| sessions_collection = db['user_sessions'] | |
| # Create indexes for better performance | |
| students_collection.create_index([("student_id", pymongo.ASCENDING)], unique=True) | |
| teachers_collection.create_index([("teacher_id", pymongo.ASCENDING)], unique=True) | |
| attendance_collection.create_index([ | |
| ("student_id", pymongo.ASCENDING), | |
| ("date", pymongo.ASCENDING), | |
| ("subject", pymongo.ASCENDING) | |
| ]) | |
| metrics_events.create_index([("ts", pymongo.DESCENDING)]) | |
| metrics_events.create_index([("event", pymongo.ASCENDING)]) | |
| metrics_events.create_index([("attempt_type", pymongo.ASCENDING)]) | |
| sessions_collection.create_index([("token", pymongo.ASCENDING)], unique=True) | |
| sessions_collection.create_index([("expires_at", pymongo.ASCENDING)], expireAfterSeconds=0) | |
| print("MongoDB connection successful") | |
| except Exception as e: | |
| print(f"MongoDB connection error: {e}") | |
| # Initialize face detection cascades | |
| face_detector = None | |
| try: | |
| face_detector = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml') | |
| print("Haar cascade face detector initialized successfully") | |
| except Exception as e: | |
| print(f"Error initializing face detector: {e}") | |
| eye_cascade = None | |
| try: | |
| eye_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_eye.xml') | |
| print("Eye cascade classifier initialized successfully") | |
| except Exception as e: | |
| print(f"Error initializing eye cascade: {e}") | |
| def get_unique_temp_path(prefix="temp", suffix=".jpg"): | |
| """Generate unique temporary file path""" | |
| unique_id = str(uuid.uuid4()) | |
| filename = f"{prefix}_{unique_id}_{int(time.time())}{suffix}" | |
| return os.path.join(TEMP_DIR, filename) | |
| def detect_faces_haar(image): | |
| """Detect faces using Haar cascade - memory efficient""" | |
| try: | |
| gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) | |
| faces = face_detector.detectMultiScale( | |
| gray, | |
| scaleFactor=1.1, | |
| minNeighbors=5, | |
| minSize=(30, 30) | |
| ) | |
| detections = [] | |
| for (x, y, w, h) in faces: | |
| detections.append({ | |
| "bbox": [x, y, x + w, y + h], | |
| "score": 0.9 | |
| }) | |
| del gray | |
| gc.collect() | |
| return detections | |
| except Exception as e: | |
| print(f"Error in Haar cascade detection: {e}") | |
| return [] | |
| def detect_faces_yunet(image): | |
| """Unified face detection function - memory optimized""" | |
| if face_detector is not None: | |
| return detect_faces_haar(image) | |
| try: | |
| gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) | |
| face_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml') | |
| faces = face_cascade.detectMultiScale(gray, 1.3, 5) | |
| detections = [] | |
| for (x, y, w, h) in faces: | |
| detections.append({ | |
| "bbox": [x, y, x + w, y + h], | |
| "score": 0.8 | |
| }) | |
| del gray | |
| gc.collect() | |
| return detections | |
| except Exception as e: | |
| print(f"Error in fallback detection: {e}") | |
| return [] | |
| def recognize_face_deepface(image, user_id, user_type='student'): | |
| """FIXED: Enhanced face recognition with robust error handling""" | |
| global total_attempts, correct_recognitions, unauthorized_attempts, inference_times | |
| temp_files = [] | |
| try: | |
| from deepface import DeepFace | |
| start_time = time.time() | |
| # CRITICAL FIX: Ensure image is valid numpy array | |
| if not isinstance(image, np.ndarray): | |
| print("Invalid image type provided to recognition") | |
| return False, "Invalid image format" | |
| if image.size == 0: | |
| print("Empty image provided to recognition") | |
| return False, "Empty image provided" | |
| # Save current image temporarily with error handling | |
| temp_img_path = get_unique_temp_path(f"current_{user_id}") | |
| temp_files.append(temp_img_path) | |
| # FIXED: Add image validation before saving | |
| try: | |
| success = cv2.imwrite(temp_img_path, image) | |
| if not success: | |
| return False, "Failed to save input image" | |
| except Exception as e: | |
| print(f"Error saving input image: {e}") | |
| return False, f"Image save error: {str(e)}" | |
| # Get user's reference image | |
| if user_type == 'student': | |
| user = students_collection.find_one({'student_id': user_id}) | |
| else: | |
| user = teachers_collection.find_one({'teacher_id': user_id}) | |
| if not user or 'face_image' not in user: | |
| unauthorized_attempts += 1 | |
| return False, f"No reference face found for {user_type} ID {user_id}" | |
| # Save reference image temporarily with validation | |
| ref_image_bytes = user['face_image'] | |
| ref_image_array = np.frombuffer(ref_image_bytes, np.uint8) | |
| ref_image = cv2.imdecode(ref_image_array, cv2.IMREAD_COLOR) | |
| if ref_image is None: | |
| return False, "Failed to decode reference image" | |
| temp_ref_path = get_unique_temp_path(f"ref_{user_id}") | |
| temp_files.append(temp_ref_path) | |
| try: | |
| success = cv2.imwrite(temp_ref_path, ref_image) | |
| if not success: | |
| return False, "Failed to save reference image" | |
| except Exception as e: | |
| print(f"Error saving reference image: {e}") | |
| return False, f"Reference image save error: {str(e)}" | |
| # Clean up arrays immediately | |
| del ref_image_array, ref_image | |
| try: | |
| # FIXED: More robust DeepFace call with better error handling | |
| result = DeepFace.verify( | |
| img1_path=temp_img_path, | |
| img2_path=temp_ref_path, | |
| model_name="Facenet", | |
| detector_backend="opencv", # Use opencv instead of retinaface for stability | |
| enforce_detection=False, | |
| align=True, | |
| distance_metric="cosine" | |
| ) | |
| is_verified = result["verified"] | |
| distance = result["distance"] | |
| inference_time = time.time() - start_time | |
| inference_times.append(inference_time) | |
| total_attempts += 1 | |
| # More lenient threshold for better usability | |
| custom_threshold = 0.55 # Increased threshold for better matching | |
| is_verified_custom = distance < custom_threshold | |
| if is_verified_custom: | |
| correct_recognitions += 1 | |
| return True, f"Face recognized (distance={distance:.3f}, threshold={custom_threshold}, time={inference_time:.2f}s)" | |
| else: | |
| unauthorized_attempts += 1 | |
| return False, f"Face not recognized (distance={distance:.3f}, required < {custom_threshold})" | |
| except Exception as e: | |
| print(f"DeepFace verification error: {e}") | |
| return False, f"DeepFace verification error: {str(e)}" | |
| except Exception as e: | |
| print(f"Error in face recognition: {e}") | |
| return False, f"Error in face recognition: {str(e)}" | |
| finally: | |
| # Clean up temporary files and memory | |
| for temp_file in temp_files: | |
| try: | |
| if os.path.exists(temp_file): | |
| os.remove(temp_file) | |
| except Exception as e: | |
| print(f"Error cleaning up temp file {temp_file}: {e}") | |
| gc.collect() | |
| def simple_liveness_check(image): | |
| """Improved liveness detection using multiple methods - memory optimized""" | |
| if eye_cascade is None: | |
| return 0.65 # Default score if cascade not available | |
| try: | |
| gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY) | |
| eyes = eye_cascade.detectMultiScale(gray, 1.3, 5) | |
| # Enhanced liveness scoring with multiple factors | |
| liveness_score = 0.0 | |
| # Factor 1: Eye detection (40% weight) | |
| if len(eyes) >= 2: | |
| liveness_score += 0.4 # Both eyes detected | |
| elif len(eyes) == 1: | |
| liveness_score += 0.25 # One eye detected | |
| else: | |
| liveness_score += 0.1 # No eyes detected but still some base score | |
| # Factor 2: Image quality assessment (30% weight) | |
| laplacian_var = cv2.Laplacian(gray, cv2.CV_64F).var() | |
| if laplacian_var > 100: | |
| liveness_score += 0.3 | |
| elif laplacian_var > 50: | |
| liveness_score += 0.2 | |
| else: | |
| liveness_score += 0.1 | |
| # Factor 3: Face size and position (30% weight) | |
| face_area = image.shape[0] * image.shape[1] | |
| if face_area > 10000: | |
| liveness_score += 0.3 | |
| elif face_area > 5000: | |
| liveness_score += 0.2 | |
| else: | |
| liveness_score += 0.1 | |
| # Ensure score is between 0 and 1 | |
| liveness_score = min(1.0, max(0.0, liveness_score)) | |
| del gray | |
| gc.collect() | |
| return liveness_score | |
| except Exception as e: | |
| print(f"Error in liveness check: {e}") | |
| return 0.6 # Return neutral score on error | |
| finally: | |
| gc.collect() | |
| def expand_and_clip_box(bbox_xyxy, scale: float, w: int, h: int): | |
| x1, y1, x2, y2 = bbox_xyxy | |
| bw = x2 - x1 | |
| bh = y2 - y1 | |
| cx = x1 + bw / 2.0 | |
| cy = y1 + bh / 2.0 | |
| bw2 = bw * scale | |
| bh2 = bh * scale | |
| x1n = int(max(0, cx - bw2 / 2.0)) | |
| y1n = int(max(0, cy - bh2 / 2.0)) | |
| x2n = int(min(w - 1, cx + bw2 / 2.0)) | |
| y2n = int(min(h - 1, cy + bh2 / 2.0)) | |
| return x1n, y1n, x2n, y2n | |
| def draw_live_overlay(img_bgr: np.ndarray, bbox, label: str, prob: float, color): | |
| x1, y1, x2, y2 = [int(v) for v in bbox] | |
| cv2.rectangle(img_bgr, (x1, y1), (x2, y2), color, 2) | |
| text = f"{label} {prob:.2f}" | |
| (tw, th), _ = cv2.getTextSize(text, cv2.FONT_HERSHEY_SIMPLEX, 0.7, 2) | |
| y_top = max(0, y1 - th - 8) | |
| cv2.rectangle(img_bgr, (x1, y_top), (x1 + tw + 6, y_top + th + 6), color, -1) | |
| cv2.putText(img_bgr, text, (x1 + 3, y_top + th), cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 0, 0), 2, cv2.LINE_AA) | |
| def image_to_data_uri(img_bgr: np.ndarray) -> Optional[str]: | |
| success, buf = cv2.imencode(".jpg", img_bgr, [int(cv2.IMWRITE_JPEG_QUALITY), 85]) | |
| if not success: | |
| return None | |
| b64 = base64.b64encode(buf.tobytes()).decode("utf-8") | |
| return f"data:image/jpeg;base64,{b64}" | |
| def decode_image(base64_image): | |
| if ',' in base64_image: | |
| base64_image = base64_image.split(',')[1] | |
| image_bytes = base64.b64decode(base64_image) | |
| np_array = np.frombuffer(image_bytes, np.uint8) | |
| image = cv2.imdecode(np_array, cv2.IMREAD_COLOR) | |
| del image_bytes, np_array | |
| gc.collect() | |
| return image | |
| # Token-based session helpers | |
| def validate_session_token(token): | |
| """Validate session token and return session data""" | |
| if not token: | |
| return None | |
| session_data = sessions_collection.find_one({'token': token}) | |
| if not session_data: | |
| return None | |
| # Check if session expired | |
| if datetime.now() > session_data.get('expires_at', datetime.now()): | |
| sessions_collection.delete_one({'token': token}) | |
| return None | |
| return session_data | |
| def create_session_token(user_id, user_type): | |
| """Create new session token""" | |
| token = secrets.token_urlsafe(32) | |
| session_data = { | |
| 'token': token, | |
| 'user_id': user_id, | |
| 'user_type': user_type, | |
| 'created_at': datetime.now(), | |
| 'expires_at': datetime.now() + timedelta(hours=2) | |
| } | |
| # Clear old sessions for this user | |
| if user_type == 'student': | |
| sessions_collection.delete_many({'student_id': user_id}) | |
| session_data['student_id'] = user_id | |
| else: | |
| sessions_collection.delete_many({'teacher_id': user_id}) | |
| session_data['teacher_id'] = user_id | |
| sessions_collection.insert_one(session_data) | |
| return token | |
| # Legacy function for backward compatibility | |
| def get_face_features(image): | |
| return None | |
| def recognize_face(image, user_id, user_type='student'): | |
| return recognize_face_deepface(image, user_id, user_type) | |
| # Metrics helpers (keeping existing implementation) | |
| def log_metrics_event(event: dict): | |
| try: | |
| metrics_events.insert_one(event) | |
| except Exception as e: | |
| print("Failed to log metrics event:", e) | |
| def log_metrics_event_normalized(*, event: str, attempt_type: str, claimed_id: Optional[str], | |
| recognized_id: Optional[str], liveness_pass: bool, distance: Optional[float], | |
| live_prob: Optional[float], latency_ms: Optional[float], client_ip: Optional[str], | |
| reason: Optional[str] = None): | |
| if not liveness_pass: | |
| decision = "spoof_blocked" | |
| else: | |
| decision = "recognized" if event.startswith("accept") else "not_recognized" | |
| doc = { | |
| "ts": datetime.now(timezone.utc), | |
| "event": event, | |
| "attempt_type": attempt_type, | |
| "claimed_id": claimed_id, | |
| "recognized_id": recognized_id, | |
| "liveness_pass": bool(liveness_pass), | |
| "distance": distance, | |
| "live_prob": live_prob, | |
| "latency_ms": latency_ms, | |
| "client_ip": client_ip, | |
| "reason": reason, | |
| "decision": decision, | |
| } | |
| log_metrics_event(doc) | |
| def classify_event(ev: Dict[str, Any]) -> Tuple[Optional[str], Optional[str]]: | |
| """Returns (event, attempt_type), robust to legacy documents.""" | |
| if ev.get("event"): | |
| e = ev.get("event") | |
| at = ev.get("attempt_type") | |
| if not at: | |
| if e in ("accept_true", "reject_false"): | |
| at = "genuine" | |
| elif e in ("accept_false", "reject_true"): | |
| at = "impostor" | |
| return e, at | |
| decision = ev.get("decision") | |
| success = ev.get("success") | |
| reason = (ev.get("reason") or "") if isinstance(ev.get("reason"), str) else ev.get("reason") | |
| if decision == "recognized" and (success is True or success is None): | |
| return "accept_true", "genuine" | |
| if decision == "spoof_blocked": | |
| return "reject_true", "impostor" | |
| if decision == "not_recognized": | |
| if reason in ("false_reject",): | |
| return "reject_false", "genuine" | |
| if reason in ("unauthorized_attempt", "liveness_fail", "mismatch_claim", "no_face_detected", "failed_crop", "recognition_error"): | |
| return "reject_true", "impostor" | |
| return "reject_true", "impostor" | |
| return None, None | |
| def compute_metrics(limit: int = 10000): | |
| """Robust metrics aggregation that tolerates legacy docs.""" | |
| try: | |
| cursor = metrics_events.find({}, {"_id": 0}).sort("ts", -1).limit(limit) | |
| counts = { | |
| "trueAccepts": 0, | |
| "falseAccepts": 0, | |
| "trueRejects": 0, | |
| "falseRejects": 0, | |
| "genuineAttempts": 0, | |
| "impostorAttempts": 0, | |
| "unauthorizedRejected": 0, | |
| "unauthorizedAccepted": 0, | |
| } | |
| total_attempts_calc = 0 | |
| for ev in cursor: | |
| e, at = classify_event(ev) | |
| if not e: | |
| continue | |
| total_attempts_calc += 1 | |
| if e == "accept_true": | |
| counts["trueAccepts"] += 1 | |
| elif e == "accept_false": | |
| counts["falseAccepts"] += 1 | |
| counts["unauthorizedAccepted"] += 1 | |
| elif e == "reject_true": | |
| counts["trueRejects"] += 1 | |
| counts["unauthorizedRejected"] += 1 | |
| elif e == "reject_false": | |
| counts["falseRejects"] += 1 | |
| if at == "genuine": | |
| counts["genuineAttempts"] += 1 | |
| elif at == "impostor": | |
| counts["impostorAttempts"] += 1 | |
| genuine_attempts = max(counts["genuineAttempts"], 1) | |
| impostor_attempts = max(counts["impostorAttempts"], 1) | |
| total_attempts_final = max(total_attempts_calc, 1) | |
| FAR = counts["falseAccepts"] / impostor_attempts | |
| FRR = counts["falseRejects"] / genuine_attempts | |
| accuracy = (counts["trueAccepts"] + counts["trueRejects"]) / total_attempts_final | |
| return { | |
| "counts": counts, | |
| "rates": { | |
| "FAR": FAR, | |
| "FRR": FRR, | |
| "accuracy": accuracy | |
| }, | |
| "totals": { | |
| "totalAttempts": total_attempts_calc | |
| } | |
| } | |
| except Exception as e: | |
| print(f"Error computing metrics: {e}") | |
| return { | |
| "counts": {"trueAccepts": 0, "falseAccepts": 0, "trueRejects": 0, "falseRejects": 0, | |
| "genuineAttempts": 0, "impostorAttempts": 0, "unauthorizedRejected": 0, "unauthorizedAccepted": 0}, | |
| "rates": {"FAR": 0, "FRR": 0, "accuracy": 0}, | |
| "totals": {"totalAttempts": 0} | |
| } | |
| def compute_latency_avg(limit: int = 300) -> Optional[float]: | |
| try: | |
| cursor = metrics_events.find({"latency_ms": {"$exists": True}}, {"latency_ms": 1, "_id": 0}).sort("ts", -1).limit(limit) | |
| vals = [float(d["latency_ms"]) for d in cursor if isinstance(d.get("latency_ms"), (int, float))] | |
| if not vals: | |
| return None | |
| return sum(vals) / len(vals) | |
| except Exception as e: | |
| print(f"Error computing latency average: {e}") | |
| return None | |
| # --------- STUDENT ROUTES --------- | |
| def home(): | |
| return render_template('home.html') | |
| def login_page(): | |
| return render_template('login.html') | |
| def register_page(): | |
| return render_template('register.html') | |
| def metrics_dashboard(): | |
| return render_template('metrics.html') | |
| def register(): | |
| try: | |
| student_data = { | |
| 'student_id': request.form.get('student_id'), | |
| 'name': request.form.get('name'), | |
| 'email': request.form.get('email'), | |
| 'department': request.form.get('department'), | |
| 'course': request.form.get('course'), | |
| 'year': request.form.get('year'), | |
| 'division': request.form.get('division'), | |
| 'mobile': request.form.get('mobile'), | |
| 'dob': request.form.get('dob'), | |
| 'gender': request.form.get('gender'), | |
| 'password': request.form.get('password'), | |
| 'created_at': datetime.now() | |
| } | |
| face_image = request.form.get('face_image') | |
| if face_image and ',' in face_image: | |
| image_data = face_image.split(',')[1] | |
| student_data['face_image'] = Binary(base64.b64decode(image_data)) | |
| student_data['face_image_type'] = face_image.split(',')[0].split(':')[1].split(';')[0] | |
| else: | |
| flash('Face image is required for registration.', 'danger') | |
| return redirect(url_for('register_page')) | |
| result = students_collection.insert_one(student_data) | |
| if result.inserted_id: | |
| flash('Registration successful! You can now login.', 'success') | |
| return redirect(url_for('login_page')) | |
| else: | |
| flash('Registration failed. Please try again.', 'danger') | |
| return redirect(url_for('register_page')) | |
| except pymongo.errors.DuplicateKeyError: | |
| flash('Student ID already exists. Please use a different ID.', 'danger') | |
| return redirect(url_for('register_page')) | |
| except Exception as e: | |
| flash(f'Registration failed: {str(e)}', 'danger') | |
| return redirect(url_for('register_page')) | |
| def login(): | |
| try: | |
| student_id = request.form.get('student_id') | |
| password = request.form.get('password') | |
| print(f"=== TOKEN-BASED LOGIN DEBUG ===") | |
| print(f"Student ID: {student_id}") | |
| if not student_id or not password: | |
| flash('Student ID and password are required.', 'danger') | |
| return redirect(url_for('login_page')) | |
| student = students_collection.find_one({'student_id': student_id}) | |
| print(f"Student found: {bool(student)}") | |
| if student and student.get('password') == password: | |
| # Create session token | |
| token = create_session_token(student_id, 'student') | |
| print(f"Session token created: {token[:10]}...") | |
| flash('Login successful!', 'success') | |
| # Redirect with token in URL | |
| return redirect(url_for('dashboard', token=token)) | |
| else: | |
| print("Invalid credentials") | |
| flash('Invalid credentials. Please try again.', 'danger') | |
| return redirect(url_for('login_page')) | |
| except Exception as e: | |
| print(f"Login error: {e}") | |
| flash(f'Login failed: {str(e)}', 'danger') | |
| return redirect(url_for('login_page')) | |
| def face_login(): | |
| try: | |
| face_image = request.form.get('face_image') | |
| face_role = request.form.get('face_role') | |
| print(f"Face login attempt for role: {face_role}") | |
| if not face_image or not face_role: | |
| flash('Face image and role are required for face login.', 'danger') | |
| return redirect(url_for('login_page')) | |
| image = decode_image(face_image) | |
| # FIXED: Add image validation | |
| if image is None: | |
| flash('Invalid image format received.', 'danger') | |
| return redirect(url_for('login_page')) | |
| if face_role == 'student': | |
| collection = students_collection | |
| id_field = 'student_id' | |
| dashboard_route = 'dashboard' | |
| elif face_role == 'teacher': | |
| collection = teachers_collection | |
| id_field = 'teacher_id' | |
| dashboard_route = 'teacher_dashboard' | |
| else: | |
| flash('Invalid role selected for face login.', 'danger') | |
| return redirect(url_for('login_page')) | |
| users = collection.find({'face_image': {'$exists': True, '$ne': None}}) | |
| best_match = None | |
| best_distance = float('inf') | |
| # Use robust face matching | |
| temp_login_path = get_unique_temp_path("login_image") | |
| try: | |
| success = cv2.imwrite(temp_login_path, image) | |
| if not success: | |
| flash('Failed to process face image. Please try again.', 'danger') | |
| return redirect(url_for('login_page')) | |
| except Exception as e: | |
| print(f"Error saving login image: {e}") | |
| flash('Error processing face image. Please try again.', 'danger') | |
| return redirect(url_for('login_page')) | |
| try: | |
| from deepface import DeepFace | |
| for user in users: | |
| try: | |
| ref_image_bytes = user['face_image'] | |
| ref_image_array = np.frombuffer(ref_image_bytes, np.uint8) | |
| ref_image = cv2.imdecode(ref_image_array, cv2.IMREAD_COLOR) | |
| if ref_image is None: | |
| continue | |
| temp_ref_path = get_unique_temp_path(f"ref_{user[id_field]}") | |
| success = cv2.imwrite(temp_ref_path, ref_image) | |
| if not success: | |
| continue | |
| # FIXED: Use stable opencv detector | |
| result = DeepFace.verify( | |
| img1_path=temp_login_path, | |
| img2_path=temp_ref_path, | |
| model_name="Facenet", | |
| detector_backend="opencv", # Use opencv instead of retinaface | |
| enforce_detection=False, | |
| align=True | |
| ) | |
| distance = result["distance"] | |
| # Keep track of best match | |
| if distance < best_distance: | |
| best_distance = distance | |
| best_match = user | |
| # More lenient threshold for face login | |
| if distance < 0.6: # Increased threshold for better matching | |
| # Create session token | |
| token = create_session_token(user[id_field], face_role) | |
| print(f"Face login successful for {user.get('name')}, distance: {distance:.3f}") | |
| flash('Face login successful!', 'success') | |
| # Cleanup | |
| if os.path.exists(temp_ref_path): | |
| os.remove(temp_ref_path) | |
| if os.path.exists(temp_login_path): | |
| os.remove(temp_login_path) | |
| gc.collect() | |
| return redirect(url_for(dashboard_route, token=token)) | |
| if os.path.exists(temp_ref_path): | |
| os.remove(temp_ref_path) | |
| except Exception as e: | |
| print(f"Face verification error: {e}") | |
| continue | |
| if os.path.exists(temp_login_path): | |
| os.remove(temp_login_path) | |
| except Exception as e: | |
| print(f"DeepFace import/processing error: {e}") | |
| if os.path.exists(temp_login_path): | |
| os.remove(temp_login_path) | |
| finally: | |
| gc.collect() | |
| # Provide better error message with best match info | |
| if best_match: | |
| print(f"Closest match was {best_match.get('name')} with distance {best_distance:.3f}") | |
| flash(f'Face recognition failed. Closest match distance: {best_distance:.3f}. Please try again with better lighting.', 'warning') | |
| else: | |
| flash('No face detected or face not found in database. Please try again.', 'danger') | |
| return redirect(url_for('login_page')) | |
| except Exception as e: | |
| print(f"Face login error: {e}") | |
| flash(f'Face login failed: {str(e)}', 'danger') | |
| return redirect(url_for('login_page')) | |
| def dashboard(): | |
| token = request.args.get('token') | |
| print(f"=== TOKEN-BASED DASHBOARD DEBUG ===") | |
| print(f"Token received: {token[:10] if token else 'None'}...") | |
| if not token: | |
| print("No token provided") | |
| flash('Please log in to access the dashboard.', 'info') | |
| return redirect(url_for('login_page')) | |
| try: | |
| # Validate session token | |
| session_data = validate_session_token(token) | |
| if not session_data: | |
| print("Invalid or expired token") | |
| flash('Session expired. Please log in again.', 'info') | |
| return redirect(url_for('login_page')) | |
| if session_data.get('user_type') != 'student': | |
| print("Invalid user type") | |
| flash('Please log in as a student.', 'info') | |
| return redirect(url_for('login_page')) | |
| student_id = session_data.get('student_id') | |
| print(f"Loading dashboard for student: {student_id}") | |
| student = students_collection.find_one({'student_id': student_id}) | |
| if not student: | |
| print("Student not found in database") | |
| sessions_collection.delete_one({'token': token}) | |
| flash('Student record not found. Please log in again.', 'danger') | |
| return redirect(url_for('login_page')) | |
| # Process face image if exists | |
| if student and 'face_image' in student and student['face_image']: | |
| face_image_base64 = base64.b64encode(student['face_image']).decode('utf-8') | |
| mime_type = student.get('face_image_type', 'image/jpeg') | |
| student['face_image_url'] = f"data:{mime_type};base64,{face_image_base64}" | |
| # Get attendance records | |
| attendance_records = list(attendance_collection.find({'student_id': student_id}).sort('date', -1)) | |
| print(f"Dashboard loaded successfully for {student.get('name')}") | |
| # Pass token to template for subsequent requests | |
| return render_template('dashboard.html', | |
| student=student, | |
| attendance_records=attendance_records, | |
| session_token=token) | |
| except Exception as e: | |
| print(f"Dashboard error: {e}") | |
| flash(f'Error loading dashboard: {str(e)}', 'danger') | |
| return redirect(url_for('login_page')) | |
| def mark_attendance(): | |
| data = request.json | |
| token = data.get('session_token') | |
| if not token: | |
| return jsonify({'success': False, 'message': 'Not authenticated'}) | |
| # Validate token | |
| session_data = validate_session_token(token) | |
| if not session_data: | |
| return jsonify({'success': False, 'message': 'Session expired'}) | |
| if session_data.get('user_type') != 'student': | |
| return jsonify({'success': False, 'message': 'Invalid user type'}) | |
| logged_in_student_id = session_data.get('student_id') | |
| program = data.get('program') | |
| semester = data.get('semester') | |
| course = data.get('course') | |
| face_image = data.get('face_image') | |
| if not all([logged_in_student_id, program, semester, course, face_image]): | |
| return jsonify({'success': False, 'message': 'Missing required data'}) | |
| client_ip = request.remote_addr | |
| t0 = time.time() | |
| # Decode image | |
| image = decode_image(face_image) | |
| if image is None or image.size == 0: | |
| return jsonify({'success': False, 'message': 'Invalid image data'}) | |
| h, w = image.shape[:2] | |
| vis = image.copy() | |
| # 1) Face detection using reliable methods | |
| detections = detect_faces_yunet(image) | |
| if not detections: | |
| overlay = image_to_data_uri(vis) | |
| log_metrics_event_normalized( | |
| event="reject_true", | |
| attempt_type="impostor", | |
| claimed_id=logged_in_student_id, | |
| recognized_id=None, | |
| liveness_pass=False, | |
| distance=None, | |
| live_prob=None, | |
| latency_ms=round((time.time() - t0) * 1000.0, 2), | |
| client_ip=client_ip, | |
| reason="no_face_detected" | |
| ) | |
| return jsonify({'success': False, 'message': 'No face detected', 'overlay': overlay}) | |
| # Pick highest-score detection | |
| best = max(detections, key=lambda d: d["score"]) | |
| x1, y1, x2, y2 = [int(v) for v in best["bbox"]] | |
| x1e, y1e, x2e, y2e = expand_and_clip_box((x1, y1, x2, y2), scale=1.2, w=w, h=h) | |
| face_crop = image[y1e:y2e, x1e:x2e] | |
| if face_crop.size == 0: | |
| overlay = image_to_data_uri(vis) | |
| log_metrics_event_normalized( | |
| event="reject_true", | |
| attempt_type="impostor", | |
| claimed_id=logged_in_student_id, | |
| recognized_id=None, | |
| liveness_pass=False, | |
| distance=None, | |
| live_prob=None, | |
| latency_ms=round((time.time() - t0) * 1000.0, 2), | |
| client_ip=client_ip, | |
| reason="failed_crop" | |
| ) | |
| return jsonify({'success': False, 'message': 'Failed to crop face', 'overlay': overlay}) | |
| # 2) Liveness check with lower threshold | |
| live_prob = simple_liveness_check(face_crop) | |
| liveness_threshold = 0.4 # More lenient threshold | |
| is_live = live_prob >= liveness_threshold | |
| label = "LIVE" if is_live else "SPOOF" | |
| color = (0, 200, 0) if is_live else (0, 0, 255) | |
| draw_live_overlay(vis, (x1e, y1e, x2e, y2e), label, live_prob, color) | |
| overlay_data = image_to_data_uri(vis) | |
| print(f"Liveness check - Score: {live_prob:.3f}, Threshold: {liveness_threshold}, Result: {label}") | |
| if not is_live: | |
| log_metrics_event_normalized( | |
| event="reject_true", | |
| attempt_type="impostor", | |
| claimed_id=logged_in_student_id, | |
| recognized_id=None, | |
| liveness_pass=False, | |
| distance=None, | |
| live_prob=float(live_prob), | |
| latency_ms=round((time.time() - t0) * 1000.0, 2), | |
| client_ip=client_ip, | |
| reason="liveness_fail" | |
| ) | |
| return jsonify({ | |
| 'success': False, | |
| 'message': f'Liveness check failed (score={live_prob:.2f}, need>={liveness_threshold}). Ensure good lighting and face visibility.', | |
| 'overlay': overlay_data | |
| }) | |
| # 3) CRITICAL SECURITY FIX: Verify face belongs to logged-in user | |
| success, message = recognize_face_deepface(image, logged_in_student_id, user_type='student') | |
| total_latency_ms = round((time.time() - t0) * 1000.0, 2) | |
| # Parse distance from message if available | |
| distance_val = None | |
| try: | |
| if "distance=" in message: | |
| part = message.split("distance=")[1] | |
| distance_val = float(part.split(",")[0].strip(") ")) | |
| except Exception: | |
| pass | |
| # CRITICAL SECURITY CHECK: Only allow attendance if recognized face matches logged-in user | |
| if not success: | |
| reason = "face_mismatch_with_logged_user" | |
| log_metrics_event_normalized( | |
| event="reject_true", | |
| attempt_type="impostor", | |
| claimed_id=logged_in_student_id, | |
| recognized_id=None, | |
| liveness_pass=True, | |
| distance=distance_val, | |
| live_prob=float(live_prob), | |
| latency_ms=total_latency_ms, | |
| client_ip=client_ip, | |
| reason=reason | |
| ) | |
| return jsonify({ | |
| 'success': False, | |
| 'message': f'SECURITY ALERT: Face does not match logged-in student {logged_in_student_id}. Please ensure you are the correct person marking attendance.', | |
| 'overlay': overlay_data | |
| }) | |
| # Log successful verification | |
| log_metrics_event_normalized( | |
| event="accept_true", | |
| attempt_type="genuine", | |
| claimed_id=logged_in_student_id, | |
| recognized_id=logged_in_student_id, | |
| liveness_pass=True, | |
| distance=distance_val, | |
| live_prob=float(live_prob), | |
| latency_ms=total_latency_ms, | |
| client_ip=client_ip, | |
| reason=None | |
| ) | |
| # Save attendance for the LOGGED-IN user (not whoever's face was recognized) | |
| attendance_data = { | |
| 'student_id': logged_in_student_id, # FIXED: Use logged-in user ID | |
| 'program': program, | |
| 'semester': semester, | |
| 'subject': course, | |
| 'date': datetime.now().date().isoformat(), | |
| 'time': datetime.now().time().strftime('%H:%M:%S'), | |
| 'status': 'present', | |
| 'created_at': datetime.now() | |
| } | |
| try: | |
| existing_attendance = attendance_collection.find_one({ | |
| 'student_id': logged_in_student_id, | |
| 'subject': course, | |
| 'date': datetime.now().date().isoformat() | |
| }) | |
| if existing_attendance: | |
| return jsonify({ | |
| 'success': False, | |
| 'message': 'Attendance already marked for this course today', | |
| 'overlay': overlay_data | |
| }) | |
| attendance_collection.insert_one(attendance_data) | |
| gc.collect() | |
| return jsonify({ | |
| 'success': True, | |
| 'message': f'Attendance marked successfully for {logged_in_student_id}', | |
| 'overlay': overlay_data | |
| }) | |
| except Exception as e: | |
| return jsonify({ | |
| 'success': False, | |
| 'message': f'Database error: {str(e)}', | |
| 'overlay': overlay_data | |
| }) | |
| def liveness_preview(): | |
| data = request.json or {} | |
| token = data.get('session_token') | |
| if not token or not validate_session_token(token): | |
| return jsonify({'success': False, 'message': 'Not authenticated'}) | |
| try: | |
| face_image = data.get('face_image') | |
| if not face_image: | |
| return jsonify({'success': False, 'message': 'No image received'}) | |
| image = decode_image(face_image) | |
| if image is None or image.size == 0: | |
| return jsonify({'success': False, 'message': 'Invalid image data'}) | |
| h, w = image.shape[:2] | |
| vis = image.copy() | |
| detections = detect_faces_yunet(image) | |
| if not detections: | |
| overlay_data = image_to_data_uri(vis) | |
| return jsonify({ | |
| 'success': True, | |
| 'live': False, | |
| 'live_prob': 0.0, | |
| 'message': 'No face detected', | |
| 'overlay': overlay_data | |
| }) | |
| best = max(detections, key=lambda d: d["score"]) | |
| x1, y1, x2, y2 = [int(v) for v in best["bbox"]] | |
| x1e, y1e, x2e, y2e = expand_and_clip_box((x1, y1, x2, y2), scale=1.2, w=w, h=h) | |
| face_crop = image[y1e:y2e, x1e:x2e] | |
| if face_crop.size == 0: | |
| overlay_data = image_to_data_uri(vis) | |
| return jsonify({ | |
| 'success': True, | |
| 'live': False, | |
| 'live_prob': 0.0, | |
| 'message': 'Failed to crop face', | |
| 'overlay': overlay_data | |
| }) | |
| live_prob = simple_liveness_check(face_crop) | |
| threshold = 0.4 # Match attendance marking threshold | |
| label = "LIVE" if live_prob >= threshold else "SPOOF" | |
| color = (0, 200, 0) if label == "LIVE" else (0, 0, 255) | |
| draw_live_overlay(vis, (x1e, y1e, x2e, y2e), label, live_prob, color) | |
| overlay_data = image_to_data_uri(vis) | |
| del image, vis, face_crop | |
| gc.collect() | |
| return jsonify({ | |
| 'success': True, | |
| 'live': bool(live_prob >= threshold), | |
| 'live_prob': float(live_prob), | |
| 'overlay': overlay_data | |
| }) | |
| except Exception as e: | |
| print("liveness_preview error:", e) | |
| return jsonify({'success': False, 'message': 'Server error during preview'}) | |
| # --------- LOGOUT ROUTES --------- | |
| def logout(): | |
| token = request.args.get('token') | |
| if token: | |
| sessions_collection.delete_one({'token': token}) | |
| print(f"Token {token[:10]}... invalidated") | |
| flash('You have been logged out', 'info') | |
| return redirect(url_for('login_page')) | |
| # --------- DEBUG ROUTES --------- | |
| def health_check(): | |
| return jsonify({ | |
| 'status': 'healthy', | |
| 'platform': 'hugging_face', | |
| 'session_type': 'token_based', | |
| 'proxy_fix': 'enabled', | |
| 'liveness_threshold': 0.4, | |
| 'face_detector': 'opencv_stable', | |
| 'timestamp': datetime.now().isoformat() | |
| }), 200 | |
| def debug_session(): | |
| token = request.args.get('token') | |
| session_data = validate_session_token(token) if token else None | |
| return jsonify({ | |
| 'token_provided': bool(token), | |
| 'session_valid': bool(session_data), | |
| 'session_data': session_data if session_data else None, | |
| 'headers': dict(request.headers), | |
| 'cookies': dict(request.cookies), | |
| 'session_type': 'token_based', | |
| 'proxy_fix': 'enabled' | |
| }) | |
| def manual_cleanup(): | |
| """Manual cleanup endpoint for memory management""" | |
| try: | |
| gc.collect() | |
| return jsonify({'status': 'cleanup completed'}), 200 | |
| except Exception as e: | |
| return jsonify({'status': 'cleanup failed', 'error': str(e)}), 500 | |
| # MAIN APPLICATION ENTRY POINT | |
| if __name__ == '__main__': | |
| port = int(os.environ.get('PORT', 7860)) | |
| print(f"Starting Flask app on port {port} with secure token-based authentication") | |
| app.run(host='0.0.0.0', port=port, debug=False) | |