import os import time import logging import requests import cv2 import numpy as np from datetime import datetime from typing import List, Optional from concurrent.futures import ThreadPoolExecutor, wait from dotenv import load_dotenv # Load environment variables from .env if present load_dotenv() from fastapi import FastAPI, UploadFile, File, Form, HTTPException, Depends, status, Request from fastapi.responses import Response from fastapi.security import APIKeyHeader from fastapi.middleware.cors import CORSMiddleware from fastapi.staticfiles import StaticFiles from huggingface_hub import HfApi from PIL import Image import io import time from collections import defaultdict from firebase_admin import firestore # Import local helpers from firebase_helper import ( get_firestore_db, check_firebase_connection, get_all_student_embeddings, load_all_embeddings, save_attendance, update_enrollment_status, save_face_embedding, update_camera_status, get_period_info, delete_student_embedding, IST ) from detector import FaceDetector, PhoneDetector from recognizer import FaceRecognizer from keep_alive import start_keep_alive # Configure logging logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s" ) logger = logging.getLogger(__name__) # Track uptime and stats START_TIME = time.time() _scans_today = 0 _last_scan_date = None _last_scan_time = "—" _last_scan_timestamp = 0.0 # unix time of last /scan completion # FIX 1 — Processing flag: set True while /scan is running # Used by GET /status so the capture client can skip uploading # when the server is already busy with a frame. _is_processing = False # FastAPI Setup app = FastAPI( title="Machub AI Face Recognition Attendance System Backend", version="1.1.0", description="ONNX-powered face detection and recognition FastAPI server for Hugging Face Spaces" ) # Configure CORS to allow only the dashboard domains and local development allowed_origins = [ "http://localhost:5173", "http://127.0.0.1:5173", "https://machub-6af39.web.app", "https://machub-6af39.firebaseapp.com" ] # Allow custom origins from environment if provided custom_origins = os.getenv("ALLOWED_ORIGINS") if custom_origins: allowed_origins.extend([o.strip() for o in custom_origins.split(",")]) app.add_middleware( CORSMiddleware, allow_origins=allowed_origins, allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # API Key Security Setup API_KEY_HEADER = APIKeyHeader(name="X-API-Key", auto_error=False) def verify_api_key(x_api_key: Optional[str] = Depends(API_KEY_HEADER)): """Enforces authentication on protected routes using the X-API-Key header.""" secret_key = os.getenv("API_SECRET_KEY") if not secret_key: logger.warning("API_SECRET_KEY is not configured in the server environment!") return None if not x_api_key or x_api_key != secret_key: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Unauthorized: Invalid or missing X-API-Key header." ) return x_api_key # Global Singletons detector = None recognizer = None phone_detector = None # Hugging Face Dataset Storage Configuration hf_token = os.getenv("HF_TOKEN") hf_api = HfApi(token=hf_token) hf_repo = os.getenv("HF_DATASET_REPO", "MrRayZer/machub-biometric-storage") if hf_token: try: logger.info(f"Ensuring private dataset repository exists: {hf_repo}") hf_api.create_repo(repo_id=hf_repo, repo_type="dataset", private=True, exist_ok=True) logger.info("Dataset repository verified/created successfully.") except Exception as e: logger.error(f"Failed to verify/create HF dataset repository {hf_repo}: {e}") else: logger.warning("HF_TOKEN environment variable is missing! Cannot manage private dataset repository.") # Rate Limiting configuration (10 enrollments per minute per IP) _enroll_rate_limits = defaultdict(list) def check_rate_limit(ip_address: str) -> bool: now = time.time() _enroll_rate_limits[ip_address] = [t for t in _enroll_rate_limits[ip_address] if now - t < 60.0] if len(_enroll_rate_limits[ip_address]) >= 10: return False _enroll_rate_limits[ip_address].append(now) return True # Hugging Face API call retry helper with exponential backoff def call_hf_with_retry(func, *args, **kwargs): retries = 3 backoff = 2.0 for attempt in range(retries): try: return func(*args, **kwargs) except Exception as e: err_str = str(e).lower() is_transient = "429" in err_str or "503" in err_str or "rate limit" in err_str or "connection" in err_str if is_transient and attempt < retries - 1: logger.warning(f"[HF_API] Call failed (attempt {attempt+1}/{retries}). Retrying in {backoff}s... Error: {e}") time.sleep(backoff) backoff *= 2.0 else: raise e raise Exception("Hugging Face API call failed after max retries") # Admin role and Firebase ID token verifier def verify_admin_user(id_token: str) -> str: if not id_token: raise HTTPException(status_code=403, detail="Unauthorized: Missing credentials token.") try: from firebase_admin import auth as firebase_auth decoded_token = firebase_auth.verify_id_token(id_token) email = decoded_token.get("email", "").lower() uid = decoded_token.get("uid") db_client = get_firestore_db() user_doc = db_client.collection("allowedUsers").document(email).get() is_admin = False if user_doc.exists: data = user_doc.to_dict() if data.get("role") == "admin": is_admin = True if not is_admin and email in ["admin@machub.in", "mrabensojan@gmail.com"]: is_admin = True if not is_admin: logger.warning(f"Access denied: User {email} is not authorized as admin.") raise HTTPException(status_code=403, detail="Unauthorized: User does not have admin privileges.") return uid except HTTPException: raise except Exception as e: logger.error(f"Firebase token validation error: {e}") raise HTTPException(status_code=403, detail=f"Unauthorized: Token verification failed ({str(e)}).") # Model URLs for automatic download on startup YOLO_URL = "https://huggingface.co/crj/dl-ws/resolve/main/yoloface_8n.onnx" ARCFACE_URL = "https://huggingface.co/FoivosPar/Arc2Face/resolve/main/arcface.onnx" def download_model(url: str, dest_path: str): """Downloads a model file with chunk progress reporting.""" os.makedirs(os.path.dirname(dest_path), exist_ok=True) logger.info(f"Downloading {url} to {dest_path}...") try: response = requests.get(url, stream=True) response.raise_for_status() total_size = int(response.headers.get("content-length", 0)) block_size = 1024 * 1024 # 1MB downloaded = 0 with open(dest_path, "wb") as f: for chunk in response.iter_content(chunk_size=block_size): if chunk: f.write(chunk) downloaded += len(chunk) if total_size > 0: percent = (downloaded / total_size) * 100 logger.info(f"Downloading {os.path.basename(dest_path)}: {percent:.1f}% ({downloaded}/{total_size} bytes)") else: logger.info(f"Downloading {os.path.basename(dest_path)}: {downloaded} bytes") logger.info(f"Finished downloading {dest_path}") except Exception as e: logger.error(f"Error downloading model {dest_path}: {e}") if os.path.exists(dest_path): os.remove(dest_path) raise e def pre_warm_models(): """Warms up model runtimes to eliminate initial inference delays (cold-start mitigation).""" global detector, recognizer, phone_detector try: logger.info("Pre-warming YOLOv8 detector, ArcFace recognizer, and Phone detector...") dummy_img = np.zeros((480, 640, 3), dtype=np.uint8) if detector and detector.loaded: _ = detector.detect_faces(dummy_img) if recognizer and recognizer.loaded: dummy_face = np.zeros((112, 112, 3), dtype=np.uint8) _ = recognizer.get_embedding(dummy_face) if phone_detector and phone_detector.loaded: _ = phone_detector.detect_phones(dummy_img) logger.info("Models successfully pre-warmed.") except Exception as e: logger.error(f"Failed to pre-warm models: {e}") @app.on_event("startup") def startup_event(): """Server initialization hook.""" global detector, recognizer, phone_detector logger.info("--- Starting Machub Backend Initialization ---") # 1. Initialize Firebase and Cache Embeddings try: get_firestore_db() logger.info("Firebase Firestore initialized.") load_all_embeddings(force_refresh=True) logger.info("In-memory student embeddings pre-loaded.") except Exception as e: logger.error(f"Firebase / Caching initialization failed: {e}") # 2. Ensure ONNX models exist (download if missing) models_dir = "models" yolo_path = os.path.join(models_dir, "yolov8n-face.onnx") arcface_path = os.path.join(models_dir, "arcface.onnx") yolo_coco_path = os.path.join(models_dir, "yolov8n.onnx") if not os.path.exists(yolo_path): download_model(YOLO_URL, yolo_path) if not os.path.exists(arcface_path): download_model(ARCFACE_URL, arcface_path) try: if not os.path.exists(yolo_coco_path): download_model("https://huggingface.co/webnn/yolov8n/resolve/main/onnx/yolov8n_fp16.onnx", yolo_coco_path) except Exception as ex: logger.warning(f"Gracefully skipped YOLOv8 COCO model download for phone detection: {ex}") # 3. Instantiate model singletons detector = FaceDetector(yolo_path) recognizer = FaceRecognizer(arcface_path) phone_detector = PhoneDetector(yolo_coco_path) # 4. Pre-warm models pre_warm_models() # 5. Start keep-alive loop start_keep_alive() logger.info("--- Machub Backend Ready ---") # In-Memory Cache to minimize duplicate attendance writes in the same class session # Key: (division, date, period) -> Value: set(roll_no) _marked_cache = {} # Stateful tracking registries for /identify _track_embeddings = {} # tid -> list of embeddings (numpy arrays) _track_identities = {} # tid -> { "rollNo": str, "name": str, "score": float, "consecutive_low_scores": int } _track_last_seen = {} # tid -> timestamp (float) def prune_stale_tracks(): """Prunes tracking states that have not been updated for 10 seconds.""" now = time.time() stale_ids = [tid for tid, last_seen in _track_last_seen.items() if now - last_seen > 10.0] for tid in stale_ids: logger.info(f"[TRACKER] Pruning stale track due to inactivity: {tid}") _track_embeddings.pop(tid, None) _track_identities.pop(tid, None) _track_last_seen.pop(tid, None) def get_marked_set(division: str, date: str, period: str) -> set: """Returns the set of students already marked in-memory for a period. Prunes older cached keys.""" key = (division, date, period) # Prune keys from other sessions to prevent memory leaks keys_to_remove = [k for k in _marked_cache.keys() if k != key] for k in keys_to_remove: del _marked_cache[k] if key not in _marked_cache: _marked_cache[key] = set() return _marked_cache[key] # Parallel processing helper def _process_single_face_crop(idx: int, box: List[int], img: np.ndarray, known_profiles: dict): """Crops a face, equalizes contrast, computes embedding, and matches against known database.""" x1, y1, x2, y2 = box crop = img[y1:y2, x1:x2] if crop.size == 0: return None embedding = recognizer.get_embedding(crop) if embedding is None: return None match_id, score = recognizer.match_student(embedding, known_profiles) return { "index": idx, "match_id": match_id, "score": score } # ----------------- ENDPOINTS ----------------- @app.get("/health") def health_check(): """Public health endpoint used for uptime monitoring, warmups, and diagnostics.""" firebase_ok = check_firebase_connection() return { "status": "online", "models": { "detector": detector.loaded if detector else False, "recognizer": recognizer.loaded if recognizer else False }, "firebase": firebase_ok, "version": "1.1.0" } # FIX 2 — Public /status endpoint (no auth required) # Called by the capture client with a 5 s timeout before every upload. # If busy=True the client skips the upload immediately. @app.get("/status") def server_status(): """Public endpoint: returns whether the server is currently busy processing a frame.""" global _is_processing, _last_scan_timestamp, START_TIME secs_ago = int(time.time() - _last_scan_timestamp) if _last_scan_timestamp > 0 else -1 return { "busy": _is_processing, "last_scan_seconds_ago": secs_ago, "uptime_seconds": int(time.time() - START_TIME), } # Core Biometric Enrollment and Upload Engine (RAM-only Processing) async def handle_enrollment_core( request: Request, id_token: str, roll_no: str, name: str, division: str, photos: List[UploadFile], is_re_enroll: bool ): client_ip = request.client.host if not check_rate_limit(client_ip): raise HTTPException( status_code=429, detail="Rate limit exceeded: Maximum 10 enrollments per minute per IP." ) admin_uid = verify_admin_user(id_token) roll_no = roll_no.strip().upper() name = name.strip() division = division.strip() db_client = get_firestore_db() student_ref = db_client.collection("students").document(roll_no) student_snap = student_ref.get() if not is_re_enroll and student_snap.exists: raise HTTPException( status_code=400, detail=f"Student {roll_no} is already enrolled. Please use re-enrollment route." ) elif is_re_enroll and not student_snap.exists: raise HTTPException( status_code=400, detail=f"Student {roll_no} does not exist. Use the new enrollment route." ) if not photos: raise HTTPException(status_code=400, detail="No photo files uploaded. Include photos list.") # Update queue to processing update_enrollment_status(roll_no, "processing") valid_embeddings = [] photos_skipped = 0 start_time = time.time() for photo in photos: try: contents = await photo.read() file_size = len(contents) if file_size > 5 * 1024 * 1024: photos_skipped += 1 logger.warning(f"File {photo.filename} skipped: exceeds 5MB size limit.") continue nparr = np.frombuffer(contents, np.uint8) img = cv2.imdecode(nparr, cv2.IMREAD_COLOR) if img is None: photos_skipped += 1 logger.warning(f"File {photo.filename} skipped: could not decode image.") continue faces = detector.detect_faces(img) if not faces: photos_skipped += 1 logger.warning(f"File {photo.filename} skipped: no face detected.") continue faces.sort(key=lambda box: (box[2]-box[0])*(box[3]-box[1]), reverse=True) x1, y1, x2, y2 = faces[0] crop = img[y1:y2, x1:x2] if crop.size == 0: photos_skipped += 1 logger.warning(f"File {photo.filename} skipped: empty face crop.") continue emb = recognizer.get_embedding(crop) if emb is not None: valid_embeddings.append(emb) else: photos_skipped += 1 logger.warning(f"File {photo.filename} skipped: failed to extract embedding.") except Exception as e: photos_skipped += 1 logger.error(f"Failed to process face extraction for {photo.filename}: {e}") if not valid_embeddings: raise HTTPException( status_code=400, detail="Enrollment failed: No clear faces could be extracted from any of the uploaded photos." ) avg_embedding = np.mean(valid_embeddings, axis=0) norm = np.linalg.norm(avg_embedding) master_embedding_arr = avg_embedding / norm if norm > 0 else avg_embedding master_embedding = master_embedding_arr.tolist() try: known_profiles = load_all_embeddings(force_refresh=True) if known_profiles: for existing_roll, profile in known_profiles.items(): if existing_roll != roll_no: similarity = float(np.dot(master_embedding_arr, profile["embedding"])) if similarity >= 0.70: existing_name = profile["name"] raise HTTPException( status_code=400, detail=f"Student Name: {existing_name} and already enrolled." ) except HTTPException: raise except Exception as e: logger.error(f"Error checking double enrollment: {e}") try: student_data = { "name": name, "rollNo": roll_no, "division": division, "photoCount": len(photos), "photoUrls": [], "photoURL": None, "faceEmbedding": master_embedding, "embeddingStatus": "complete", "faceEnrolled": True, "updatedAt": firestore.SERVER_TIMESTAMP } if not is_re_enroll: student_data["totalClasses"] = 0 student_data["attendedClasses"] = 0 student_data["enrolledAt"] = firestore.SERVER_TIMESTAMP student_data["enrolledBy"] = admin_uid else: student_data["enrolledAt"] = firestore.SERVER_TIMESTAMP student_data["enrolledBy"] = admin_uid student_ref.set(student_data, merge=True) queue_ref = db_client.collection("enrollmentQueue").document(roll_no) queue_ref.set({ "rollNo": roll_no, "division": division, "status": "complete", "photoCount": len(photos), "uploadedAt": firestore.SERVER_TIMESTAMP }, merge=True) except Exception as e: logger.error(f"Firestore transaction write failed for student {roll_no}: {e}") raise HTTPException( status_code=500, detail=f"Database ledger write failed: {str(e)}" ) try: load_all_embeddings(force_refresh=True) except Exception as cache_err: logger.warning(f"Failed to refresh embeddings cache: {cache_err}") processing_time_ms = int((time.time() - start_time) * 1000) return { "success": True, "roll_no": roll_no, "name": name, "photos_received": len(photos), "photos_processed": len(valid_embeddings), "photos_skipped": photos_skipped, "embedding_dimensions": len(master_embedding), "message": "Enrolled successfully" } @app.post("/enroll") async def enroll_student( request: Request, id_token: str = Form(...), roll_no: str = Form(...), name: str = Form(...), division: str = Form(...), photos: List[UploadFile] = File(...) ): return await handle_enrollment_core( request=request, id_token=id_token, roll_no=roll_no, name=name, division=division, photos=photos, is_re_enroll=False ) @app.post("/scan") async def scan_frame( frame: UploadFile = File(...), period: int = Form(...), division: Optional[str] = Form(None), date: Optional[str] = Form(None), auth: None = Depends(verify_api_key) ): """Processes a classroom frame, matches student embeddings, and logs attendance.""" global _scans_today, _last_scan_date, _last_scan_time, _is_processing, _last_scan_timestamp # FIX 1 — set busy flag immediately so /status reflects reality _is_processing = True now_ist = datetime.now(IST) local_date = now_ist.strftime("%Y-%m-%d") local_time = now_ist.strftime("%H:%M") # Setup stats tracking if _last_scan_date != local_date: _scans_today = 0 _last_scan_date = local_date _scans_today += 1 _last_scan_time = local_time logger.info(f"Processing classroom scan (Scan #{_scans_today} today)") # 1. Decode Frame Image try: contents = await frame.read() nparr = np.frombuffer(contents, np.uint8) img = cv2.imdecode(nparr, cv2.IMREAD_COLOR) if img is None: raise HTTPException(status_code=400, detail="Invalid frame file. Could not decode image.") except Exception as e: logger.error(f"Error decoding frame: {e}") raise HTTPException(status_code=400, detail=f"Error reading image: {str(e)}") # 2. Get Period Info & Division Fallbacks if not division: info = get_period_info() if info: division = info.get("division") if not division: division = "BCA-A" # Final fallback if not date: date = local_date period_str = f"period-{period}" # 3. Fetch Known Division Profiles from database known_profiles = get_all_student_embeddings(division) if not known_profiles: logger.warning(f"No enrolled student profiles found in database for division {division}.") # Empty classroom fallback update_camera_status(is_online=True, current_period=period, division=division) return { "success": True, "faces_detected": 0, "matches_found": 0, "students_marked": [], "already_marked": [], "skipped_ambiguous": 0, "period": period } # 4. Resize Frame to 640x480 for faster CPU execution h_orig, w_orig = img.shape[:2] if h_orig > 480 or w_orig > 640: img_resized = cv2.resize(img, (640, 480)) else: img_resized = img # 5. Face Detection detected_faces = detector.detect_faces(img_resized) logger.info(f"Faces detected: {len(detected_faces)}") if not detected_faces: update_camera_status(is_online=True, current_period=period, division=division) return { "success": True, "faces_detected": 0, "matches_found": 0, "students_marked": [], "already_marked": [], "skipped_ambiguous": 0, "period": period } # Drawback Fix - Capping at 10 faces max to maintain CPU performance if len(detected_faces) > 10: logger.info(f"Truncating {len(detected_faces)} detected faces to 10 for CPU speed.") detected_faces = detected_faces[:10] # 6. Parallel Recognition using ThreadPoolExecutor (8-second max timeout check) results = [] futures = [] with ThreadPoolExecutor(max_workers=2) as executor: for idx, box in enumerate(detected_faces): f = executor.submit(_process_single_face_crop, idx, box, img_resized, known_profiles) futures.append(f) # Limit processing time to 8 seconds to prevent HTTP gateway timeouts done, not_done = wait(futures, timeout=8.0) for f in done: try: res = f.result() if res is not None: results.append(res) except Exception as ex: logger.error(f"Worker thread error during face recognition: {ex}") # Cancel any slow hanging threads for f in not_done: f.cancel() logger.warning("A face recognition thread timed out and was cancelled.") # 7. Attendance Mapping & In-Memory Duplicate Check students_marked = [] already_marked = [] skipped_ambiguous = 0 marked_set = get_marked_set(division, date, period_str) for res in results: match_id = res["match_id"] if match_id == "AMBIGUOUS": skipped_ambiguous += 1 continue elif match_id is not None: # Check in-memory list first to prevent hammering Firestore if match_id in marked_set: already_marked.append(match_id) else: student_name = known_profiles[match_id]["name"] success = save_attendance( student_id=match_id, name=student_name, period=period, subject=None, division=division, date=date ) if success: marked_set.add(match_id) students_marked.append(match_id) # 8. Update Camera Last Scan Status update_camera_status(is_online=True, current_period=period, division=division) # FIX 1 — clear busy flag and record completion timestamp _is_processing = False _last_scan_timestamp = time.time() logger.info(f"Scan processed: Detected={len(detected_faces)}, New Marks={len(students_marked)}, Already Marked={len(already_marked)}, Ambiguous={skipped_ambiguous}") return { "success": True, "faces_detected": len(detected_faces), "matches_found": len(students_marked) + len(already_marked), "students_marked": students_marked, "already_marked": already_marked, "skipped_ambiguous": skipped_ambiguous, "period": period } @app.post("/re-enroll") async def re_enroll_student_route( request: Request, id_token: str = Form(...), roll_no: str = Form(...), name: str = Form(...), division: str = Form(...), photos: List[UploadFile] = File(...) ): return await handle_enrollment_core( request=request, id_token=id_token, roll_no=roll_no, name=name, division=division, photos=photos, is_re_enroll=True ) @app.get("/photo/{roll_no}/{filename}") async def get_student_photo(roll_no: str, filename: str): repo = os.getenv("HF_DATASET_REPO", "MrRayZer/machub-biometric-storage") token = os.getenv("HF_TOKEN") url = f"https://huggingface.co/datasets/{repo}/resolve/main/{roll_no}/{filename}" headers = {} if token: headers["Authorization"] = f"Bearer {token}" try: resp = requests.get(url, headers=headers, timeout=15) if resp.status_code == 200: return Response(content=resp.content, media_type=resp.headers.get("content-type", "image/jpeg")) else: raise HTTPException(status_code=resp.status_code, detail="Photo not found") except Exception as e: logger.error(f"Proxy photo download failed for {roll_no}/{filename}: {e}") raise HTTPException(status_code=500, detail=f"Proxy error downloading photo: {str(e)}") @app.get("/status/{roll_no}") def get_enrollment_status( roll_no: str, auth: None = Depends(verify_api_key) ): """Queries the database to report the enrollment status of a student.""" db_client = get_firestore_db() student_doc = db_client.collection("students").document(roll_no).get() queue_doc = db_client.collection("enrollmentQueue").document(roll_no).get() if student_doc.exists: data = student_doc.to_dict() return { "roll_no": roll_no, "enrolled": True, "embedding_ready": "faceEmbedding" in data, "photo_count": data.get("photoCount", 0), "status": "complete" } elif queue_doc.exists: data = queue_doc.to_dict() return { "roll_no": roll_no, "enrolled": False, "embedding_ready": False, "photo_count": 0, "status": data.get("status", "processing") } else: raise HTTPException( status_code=404, detail=f"Student {roll_no} has no profile or queue item." ) @app.delete("/student/{roll_no}") def delete_student( roll_no: str, auth: None = Depends(verify_api_key) ): """Deletes face embedding profile records for a student.""" success = delete_student_embedding(roll_no) if success: return {"success": True, "message": f"Successfully deleted face embeddings for {roll_no}"} else: raise HTTPException( status_code=500, detail="Failed to delete student embeddings." ) @app.get("/stats") def get_stats( auth: None = Depends(verify_api_key) ): """Returns analytics about the face recognition container instance.""" db_client = get_firestore_db() # Fetch total enrolled students total_enrolled = 0 try: snap = db_client.collection("students").where("embeddingStatus", "==", "complete").count().get() total_enrolled = snap[0][0].value except Exception: # Fallback if count query fails try: docs = db_client.collection("students").where("embeddingStatus", "==", "complete").stream() total_enrolled = sum(1 for _ in docs) except Exception: pass uptime = int(time.time() - START_TIME) space_url = os.getenv("SPACE_URL", "http://localhost:7860") return { "total_enrolled_students": total_enrolled, "scans_today": _scans_today, "last_scan": _last_scan_time, "uptime_seconds": uptime, "space_url": space_url } @app.post("/refresh") def refresh_embeddings_cache( auth: None = Depends(verify_api_key) ): """Purges the in-memory student embeddings cache and re-queries Firestore.""" try: logger.info("Retriggering embedding cache reload from admin request.") profiles = load_all_embeddings(force_refresh=True) return { "success": True, "message": f"Cached embeddings successfully rebuilt. Total profiles: {len(profiles)}" } except Exception as e: logger.error(f"Failed to manually refresh embeddings: {e}") raise HTTPException( status_code=500, detail=f"Rebuilding embedding cache failed: {str(e)}" ) @app.post("/identify") async def identify_faces( face_crops: List[UploadFile] = File(...), tracking_ids: Optional[List[str]] = Form(None), auth: None = Depends(verify_api_key) ): """Identifies students from a list of cropped face images using the global embeddings cache and stateful hysteresis tracking.""" # Prune stale tracking sessions first to prevent memory leak prune_stale_tracks() known_profiles = load_all_embeddings() if not known_profiles: logger.warning("No student profiles in cache.") return {"success": True, "faces": []} results = [] for idx, crop_file in enumerate(face_crops): tracking_id = None if tracking_ids and idx < len(tracking_ids): tracking_id = tracking_ids[idx] try: contents = await crop_file.read() nparr = np.frombuffer(contents, np.uint8) img = cv2.imdecode(nparr, cv2.IMREAD_COLOR) if img is None or img.size == 0: results.append({ "index": idx, "match": False, "confidence": 0.0, "name": "Invalid image" }) continue # Crop the face using the YOLOface detector (just like in enrollment) for perfect alignment if detector and detector.loaded: try: faces = detector.detect_faces(img) if faces: # Take the largest face found faces.sort(key=lambda b: (b[2]-b[0])*(b[3]-b[1]), reverse=True) fx1, fy1, fx2, fy2 = faces[0] crop = img[fy1:fy2, fx1:fx2] if crop.size > 0: img = crop except Exception as detect_err: logger.warning(f"Failed to recrop face crop in /identify: {detect_err}") # Compute embedding embedding = recognizer.get_embedding(img) if embedding is None: results.append({ "index": idx, "match": False, "confidence": 0.0, "name": "No face embedding" }) continue if tracking_id: # Update last seen timestamp for this track _track_last_seen[tracking_id] = time.time() # Append current frame embedding to the track's history if tracking_id not in _track_embeddings: _track_embeddings[tracking_id] = [] _track_embeddings[tracking_id].append(embedding) _track_embeddings[tracking_id] = _track_embeddings[tracking_id][-5:] # Compute smoothed master embedding mean_emb = np.mean(_track_embeddings[tracking_id], axis=0) norm = np.linalg.norm(mean_emb) smoothed_emb = mean_emb / norm if norm > 0 else mean_emb # Match master embedding against all known student profiles scores = [] for roll_no, profile in known_profiles.items(): similarity = float(np.dot(smoothed_emb, profile["embedding"])) scores.append((roll_no, similarity)) if not scores: results.append({ "index": idx, "match": False, "confidence": 0.0, "name": "Unknown" }) continue # Sort profiles by similarity score descending scores.sort(key=lambda x: x[1], reverse=True) top_roll, top_score = scores[0] # Check for wrong match ambiguity (margin < 0.05 at >= 0.60 similarity) is_ambiguous = False if len(scores) > 1: second_roll, second_score = scores[1] if (top_score - second_score) < 0.05 and top_score >= 0.60: is_ambiguous = True locked = _track_identities.get(tracking_id) if top_score < 0.60: # Current frame fails retention threshold if locked: locked["consecutive_low_scores"] += 1 if locked["consecutive_low_scores"] >= 3: logger.info(f"[TRACKER] Lock broken for {tracking_id} (identity {locked['rollNo']}) due to 3 consecutive low scores.") _track_identities.pop(tracking_id, None) results.append({ "index": idx, "match": False, "confidence": top_score, "name": "Unknown" }) else: logger.info(f"[TRACKER] Hysteresis retain: {tracking_id} stays locked to {locked['rollNo']} (score {top_score:.3f}, consecutive: {locked['consecutive_low_scores']})") results.append({ "index": idx, "match": True, "rollNo": locked["rollNo"], "name": locked["name"], "confidence": locked["score"] }) else: results.append({ "index": idx, "match": False, "confidence": top_score, "name": "Unknown" }) elif is_ambiguous: # Current frame is ambiguous if locked: locked["consecutive_low_scores"] += 1 if locked["consecutive_low_scores"] >= 3: logger.info(f"[TRACKER] Lock broken for {tracking_id} due to 3 consecutive ambiguous/low scores.") _track_identities.pop(tracking_id, None) results.append({ "index": idx, "match": False, "confidence": top_score, "name": "Ambiguous" }) else: logger.info(f"[TRACKER] Hysteresis retain: {tracking_id} stays locked to {locked['rollNo']} despite ambiguity.") results.append({ "index": idx, "match": True, "rollNo": locked["rollNo"], "name": locked["name"], "confidence": locked["score"] }) else: results.append({ "index": idx, "match": False, "confidence": top_score, "name": "Ambiguous" }) else: # top_score >= 0.60 and not ambiguous if locked: if top_roll == locked["rollNo"]: # Same identity: reset low score count and refresh locked score locked["consecutive_low_scores"] = 0 locked["score"] = top_score results.append({ "index": idx, "match": True, "rollNo": top_roll, "name": locked["name"], "confidence": top_score }) else: # Identity switched! Check if it exceeds Lock-In (0.72) if top_score >= 0.72: logger.info(f"[TRACKER] Identity switched for {tracking_id} from {locked['rollNo']} to {top_roll} (score {top_score:.3f} >= 0.72)") _track_identities[tracking_id] = { "rollNo": top_roll, "name": known_profiles[top_roll]["name"], "score": top_score, "consecutive_low_scores": 0 } results.append({ "index": idx, "match": True, "rollNo": top_roll, "name": known_profiles[top_roll]["name"], "confidence": top_score }) else: # Overwrite failed lock-in: count as low score for currently locked student locked["consecutive_low_scores"] += 1 if locked["consecutive_low_scores"] >= 3: logger.info(f"[TRACKER] Lock broken for {tracking_id} due to 3 consecutive identity switches/low scores.") _track_identities.pop(tracking_id, None) results.append({ "index": idx, "match": False, "confidence": top_score, "name": "Unknown" }) else: logger.info(f"[TRACKER] Hysteresis retain: {tracking_id} stays locked to {locked['rollNo']} (switch to {top_roll} rejected, score {top_score:.3f} < 0.72)") results.append({ "index": idx, "match": True, "rollNo": locked["rollNo"], "name": locked["name"], "confidence": locked["score"] }) else: # not locked yet # Must exceed Lock-In Threshold (0.72) to create new lock if top_score >= 0.72: logger.info(f"[TRACKER] Locked new identity for {tracking_id}: {top_roll} (score {top_score:.3f} >= 0.72)") _track_identities[tracking_id] = { "rollNo": top_roll, "name": known_profiles[top_roll]["name"], "score": top_score, "consecutive_low_scores": 0 } results.append({ "index": idx, "match": True, "rollNo": top_roll, "name": known_profiles[top_roll]["name"], "confidence": top_score }) else: results.append({ "index": idx, "match": False, "confidence": top_score, "name": "Unknown" }) else: # No tracking ID provided: fall back to frame-by-frame isolation matching match_id, score = recognizer.match_student(embedding, known_profiles) if match_id == "AMBIGUOUS": results.append({ "index": idx, "match": False, "confidence": score, "name": "Ambiguous" }) elif match_id is not None: results.append({ "index": idx, "match": True, "rollNo": match_id, "name": known_profiles[match_id]["name"], "confidence": score }) else: results.append({ "index": idx, "match": False, "confidence": score, "name": "Unknown" }) except Exception as e: logger.error(f"Error processing face crop {crop_file.filename}: {e}") results.append({ "index": idx, "match": False, "confidence": 0.0, "name": f"Error: {str(e)}" }) return {"success": True, "faces": results} @app.post("/detect-phone") async def detect_phone_in_frame( frame: UploadFile = File(None), photo: UploadFile = File(None), auth: None = Depends(verify_api_key) ): """ Detects phones in a classroom frame using YOLOv8 and correlates each phone with the nearest detected face centroid to identify the student holding it. v1.1.0 — Added 2026-06-23 """ start_time = time.time() file_to_process = frame if frame is not None else photo if file_to_process is None: raise HTTPException(status_code=400, detail="Missing frame or photo file parameter.") try: contents = await file_to_process.read() nparr = np.frombuffer(contents, np.uint8) img = cv2.imdecode(nparr, cv2.IMREAD_COLOR) if img is None: raise HTTPException(status_code=400, detail="Invalid frame: could not decode image.") except HTTPException: raise except Exception as e: raise HTTPException(status_code=400, detail=f"Error reading frame: {str(e)}") h_orig, w_orig = img.shape[:2] # Resize for faster processing if h_orig > 480 or w_orig > 640: img_resized = cv2.resize(img, (640, 480)) scale_x = w_orig / 640 scale_y = h_orig / 480 else: img_resized = img scale_x = 1.0 scale_y = 1.0 # 1. Detect faces to get face centroids face_boxes = detector.detect_faces(img_resized) if detector else [] face_centroids = [] for box in face_boxes: x1, y1, x2, y2 = box[:4] cx = (x1 + x2) / 2 cy = (y1 + y2) / 2 face_centroids.append({"cx": cx, "cy": cy, "bbox": [x1, y1, x2, y2]}) # 2. Detect phones using YOLO phone_detections = [] if phone_detector and phone_detector.loaded: try: phone_detections = phone_detector.detect_phones(img_resized) logger.info(f"[PHONE] Scanning frame ({w_orig}x{h_orig}) for phone objects. Faces: {len(face_centroids)} | Phones: {len(phone_detections)}") except Exception as e: logger.warning(f"[PHONE] Detection error: {e}") else: logger.warning("[PHONE] Phone detector not loaded or missing. Skipping inference.") elapsed_ms = int((time.time() - start_time) * 1000) # 3. Build response and map nearest faces result_phones = [] detections_compat = [] for phone in phone_detections: px1, py1, px2, py2 = phone["bbox"] phone_cx = (px1 + px2) / 2 phone_cy = (py1 + py2) / 2 # Find nearest face centroid nearest_student = None min_dist = float("inf") for i, fc in enumerate(face_centroids): dist = ((phone_cx - fc["cx"]) ** 2 + (phone_cy - fc["cy"]) ** 2) ** 0.5 if dist < min_dist: min_dist = dist nearest_student = i # Absolute pixel boxes scaled back to original dimensions result_phones.append({ "bbox": [int(px1 * scale_x), int(py1 * scale_y), int(px2 * scale_x), int(py2 * scale_y)], "type": "mobile_phone", "confidence": phone.get("confidence", 0.0), "nearest_face_index": nearest_student, "distance_to_face": round(min_dist * max(scale_x, scale_y), 1) }) # Normalized boxes (0.0 to 1.0) on the 640x480 resized dimensions detections_compat.append({ "box": [ max(0.0, min(1.0, px1 / 640.0)), max(0.0, min(1.0, py1 / 480.0)), max(0.0, min(1.0, px2 / 640.0)), max(0.0, min(1.0, py2 / 480.0)) ], "confidence": phone.get("confidence", 0.0) }) logger.info(f"[PHONE] Detection complete: {len(result_phones)} phones in {elapsed_ms}ms") return { "success": True, "detected": len(detections_compat) > 0, "detections": detections_compat, "phones": result_phones, "face_count": len(face_centroids), "processing_time_ms": elapsed_ms }