| import os |
| import time |
| import logging |
| import requests |
| import cv2 |
| import numpy as np |
| from datetime import datetime |
| from typing import List, Optional |
| from concurrent.futures import ThreadPoolExecutor, wait |
|
|
| from dotenv import load_dotenv |
| |
| load_dotenv() |
|
|
| from fastapi import FastAPI, UploadFile, File, Form, HTTPException, Depends, status, Request |
| from fastapi.responses import Response |
| from fastapi.security import APIKeyHeader |
| from fastapi.middleware.cors import CORSMiddleware |
| from fastapi.staticfiles import StaticFiles |
| from huggingface_hub import HfApi |
| from PIL import Image |
| import io |
| import time |
| from collections import defaultdict |
| from firebase_admin import firestore |
|
|
| |
| from firebase_helper import ( |
| get_firestore_db, |
| check_firebase_connection, |
| get_all_student_embeddings, |
| load_all_embeddings, |
| save_attendance, |
| update_enrollment_status, |
| save_face_embedding, |
| update_camera_status, |
| get_period_info, |
| delete_student_embedding, |
| IST |
| ) |
| from detector import FaceDetector, PhoneDetector |
| from recognizer import FaceRecognizer |
| from keep_alive import start_keep_alive |
|
|
| |
| logging.basicConfig( |
| level=logging.INFO, |
| format="%(asctime)s [%(levelname)s] %(name)s: %(message)s" |
| ) |
| logger = logging.getLogger(__name__) |
|
|
| |
| START_TIME = time.time() |
| _scans_today = 0 |
| _last_scan_date = None |
| _last_scan_time = "—" |
| _last_scan_timestamp = 0.0 |
|
|
| |
| |
| |
| _is_processing = False |
|
|
| |
| app = FastAPI( |
| title="Machub AI Face Recognition Attendance System Backend", |
| version="1.1.0", |
| description="ONNX-powered face detection and recognition FastAPI server for Hugging Face Spaces" |
| ) |
|
|
| |
| allowed_origins = [ |
| "http://localhost:5173", |
| "http://127.0.0.1:5173", |
| "https://machub-6af39.web.app", |
| "https://machub-6af39.firebaseapp.com" |
| ] |
| |
| custom_origins = os.getenv("ALLOWED_ORIGINS") |
| if custom_origins: |
| allowed_origins.extend([o.strip() for o in custom_origins.split(",")]) |
|
|
| app.add_middleware( |
| CORSMiddleware, |
| allow_origins=allowed_origins, |
| allow_credentials=True, |
| allow_methods=["*"], |
| allow_headers=["*"], |
| ) |
|
|
| |
| API_KEY_HEADER = APIKeyHeader(name="X-API-Key", auto_error=False) |
|
|
| def verify_api_key(x_api_key: Optional[str] = Depends(API_KEY_HEADER)): |
| """Enforces authentication on protected routes using the X-API-Key header.""" |
| secret_key = os.getenv("API_SECRET_KEY") |
| if not secret_key: |
| logger.warning("API_SECRET_KEY is not configured in the server environment!") |
| return None |
| if not x_api_key or x_api_key != secret_key: |
| raise HTTPException( |
| status_code=status.HTTP_401_UNAUTHORIZED, |
| detail="Unauthorized: Invalid or missing X-API-Key header." |
| ) |
| return x_api_key |
|
|
| |
| detector = None |
| recognizer = None |
| phone_detector = None |
|
|
| |
| hf_token = os.getenv("HF_TOKEN") |
| hf_api = HfApi(token=hf_token) |
| hf_repo = os.getenv("HF_DATASET_REPO", "MrRayZer/machub-biometric-storage") |
|
|
| if hf_token: |
| try: |
| logger.info(f"Ensuring private dataset repository exists: {hf_repo}") |
| hf_api.create_repo(repo_id=hf_repo, repo_type="dataset", private=True, exist_ok=True) |
| logger.info("Dataset repository verified/created successfully.") |
| except Exception as e: |
| logger.error(f"Failed to verify/create HF dataset repository {hf_repo}: {e}") |
| else: |
| logger.warning("HF_TOKEN environment variable is missing! Cannot manage private dataset repository.") |
|
|
| |
| _enroll_rate_limits = defaultdict(list) |
|
|
| def check_rate_limit(ip_address: str) -> bool: |
| now = time.time() |
| _enroll_rate_limits[ip_address] = [t for t in _enroll_rate_limits[ip_address] if now - t < 60.0] |
| if len(_enroll_rate_limits[ip_address]) >= 10: |
| return False |
| _enroll_rate_limits[ip_address].append(now) |
| return True |
|
|
| |
| def call_hf_with_retry(func, *args, **kwargs): |
| retries = 3 |
| backoff = 2.0 |
| for attempt in range(retries): |
| try: |
| return func(*args, **kwargs) |
| except Exception as e: |
| err_str = str(e).lower() |
| is_transient = "429" in err_str or "503" in err_str or "rate limit" in err_str or "connection" in err_str |
| if is_transient and attempt < retries - 1: |
| logger.warning(f"[HF_API] Call failed (attempt {attempt+1}/{retries}). Retrying in {backoff}s... Error: {e}") |
| time.sleep(backoff) |
| backoff *= 2.0 |
| else: |
| raise e |
| raise Exception("Hugging Face API call failed after max retries") |
|
|
| |
| def verify_admin_user(id_token: str) -> str: |
| if not id_token: |
| raise HTTPException(status_code=403, detail="Unauthorized: Missing credentials token.") |
| try: |
| from firebase_admin import auth as firebase_auth |
| decoded_token = firebase_auth.verify_id_token(id_token) |
| email = decoded_token.get("email", "").lower() |
| uid = decoded_token.get("uid") |
| |
| db_client = get_firestore_db() |
| user_doc = db_client.collection("allowedUsers").document(email).get() |
| |
| is_admin = False |
| if user_doc.exists: |
| data = user_doc.to_dict() |
| if data.get("role") == "admin": |
| is_admin = True |
| |
| if not is_admin and email in ["admin@machub.in", "mrabensojan@gmail.com"]: |
| is_admin = True |
| |
| if not is_admin: |
| logger.warning(f"Access denied: User {email} is not authorized as admin.") |
| raise HTTPException(status_code=403, detail="Unauthorized: User does not have admin privileges.") |
| |
| return uid |
| except HTTPException: |
| raise |
| except Exception as e: |
| logger.error(f"Firebase token validation error: {e}") |
| raise HTTPException(status_code=403, detail=f"Unauthorized: Token verification failed ({str(e)}).") |
|
|
| |
| YOLO_URL = "https://huggingface.co/crj/dl-ws/resolve/main/yoloface_8n.onnx" |
| ARCFACE_URL = "https://huggingface.co/FoivosPar/Arc2Face/resolve/main/arcface.onnx" |
|
|
| def download_model(url: str, dest_path: str): |
| """Downloads a model file with chunk progress reporting.""" |
| os.makedirs(os.path.dirname(dest_path), exist_ok=True) |
| logger.info(f"Downloading {url} to {dest_path}...") |
| try: |
| response = requests.get(url, stream=True) |
| response.raise_for_status() |
| total_size = int(response.headers.get("content-length", 0)) |
| block_size = 1024 * 1024 |
| |
| downloaded = 0 |
| with open(dest_path, "wb") as f: |
| for chunk in response.iter_content(chunk_size=block_size): |
| if chunk: |
| f.write(chunk) |
| downloaded += len(chunk) |
| if total_size > 0: |
| percent = (downloaded / total_size) * 100 |
| logger.info(f"Downloading {os.path.basename(dest_path)}: {percent:.1f}% ({downloaded}/{total_size} bytes)") |
| else: |
| logger.info(f"Downloading {os.path.basename(dest_path)}: {downloaded} bytes") |
| logger.info(f"Finished downloading {dest_path}") |
| except Exception as e: |
| logger.error(f"Error downloading model {dest_path}: {e}") |
| if os.path.exists(dest_path): |
| os.remove(dest_path) |
| raise e |
|
|
| def pre_warm_models(): |
| """Warms up model runtimes to eliminate initial inference delays (cold-start mitigation).""" |
| global detector, recognizer, phone_detector |
| try: |
| logger.info("Pre-warming YOLOv8 detector, ArcFace recognizer, and Phone detector...") |
| dummy_img = np.zeros((480, 640, 3), dtype=np.uint8) |
| if detector and detector.loaded: |
| _ = detector.detect_faces(dummy_img) |
| |
| if recognizer and recognizer.loaded: |
| dummy_face = np.zeros((112, 112, 3), dtype=np.uint8) |
| _ = recognizer.get_embedding(dummy_face) |
| |
| if phone_detector and phone_detector.loaded: |
| _ = phone_detector.detect_phones(dummy_img) |
| |
| logger.info("Models successfully pre-warmed.") |
| except Exception as e: |
| logger.error(f"Failed to pre-warm models: {e}") |
|
|
| @app.on_event("startup") |
| def startup_event(): |
| """Server initialization hook.""" |
| global detector, recognizer, phone_detector |
| |
| logger.info("--- Starting Machub Backend Initialization ---") |
| |
| |
| try: |
| get_firestore_db() |
| logger.info("Firebase Firestore initialized.") |
| load_all_embeddings(force_refresh=True) |
| logger.info("In-memory student embeddings pre-loaded.") |
| except Exception as e: |
| logger.error(f"Firebase / Caching initialization failed: {e}") |
| |
| |
| models_dir = "models" |
| yolo_path = os.path.join(models_dir, "yolov8n-face.onnx") |
| arcface_path = os.path.join(models_dir, "arcface.onnx") |
| yolo_coco_path = os.path.join(models_dir, "yolov8n.onnx") |
| |
| if not os.path.exists(yolo_path): |
| download_model(YOLO_URL, yolo_path) |
| if not os.path.exists(arcface_path): |
| download_model(ARCFACE_URL, arcface_path) |
| |
| try: |
| if not os.path.exists(yolo_coco_path): |
| download_model("https://huggingface.co/webnn/yolov8n/resolve/main/onnx/yolov8n_fp16.onnx", yolo_coco_path) |
| except Exception as ex: |
| logger.warning(f"Gracefully skipped YOLOv8 COCO model download for phone detection: {ex}") |
| |
| |
| detector = FaceDetector(yolo_path) |
| recognizer = FaceRecognizer(arcface_path) |
| phone_detector = PhoneDetector(yolo_coco_path) |
| |
| |
| pre_warm_models() |
| |
| |
| start_keep_alive() |
| |
| logger.info("--- Machub Backend Ready ---") |
|
|
| |
| |
| _marked_cache = {} |
|
|
| |
| _track_embeddings = {} |
| _track_identities = {} |
| _track_last_seen = {} |
|
|
| def prune_stale_tracks(): |
| """Prunes tracking states that have not been updated for 10 seconds.""" |
| now = time.time() |
| stale_ids = [tid for tid, last_seen in _track_last_seen.items() if now - last_seen > 10.0] |
| for tid in stale_ids: |
| logger.info(f"[TRACKER] Pruning stale track due to inactivity: {tid}") |
| _track_embeddings.pop(tid, None) |
| _track_identities.pop(tid, None) |
| _track_last_seen.pop(tid, None) |
|
|
| def get_marked_set(division: str, date: str, period: str) -> set: |
| """Returns the set of students already marked in-memory for a period. Prunes older cached keys.""" |
| key = (division, date, period) |
| |
| keys_to_remove = [k for k in _marked_cache.keys() if k != key] |
| for k in keys_to_remove: |
| del _marked_cache[k] |
| |
| if key not in _marked_cache: |
| _marked_cache[key] = set() |
| return _marked_cache[key] |
|
|
| |
| def _process_single_face_crop(idx: int, box: List[int], img: np.ndarray, known_profiles: dict): |
| """Crops a face, equalizes contrast, computes embedding, and matches against known database.""" |
| x1, y1, x2, y2 = box |
| crop = img[y1:y2, x1:x2] |
| if crop.size == 0: |
| return None |
| |
| embedding = recognizer.get_embedding(crop) |
| if embedding is None: |
| return None |
| |
| match_id, score = recognizer.match_student(embedding, known_profiles) |
| return { |
| "index": idx, |
| "match_id": match_id, |
| "score": score |
| } |
|
|
| |
|
|
| @app.get("/health") |
| def health_check(): |
| """Public health endpoint used for uptime monitoring, warmups, and diagnostics.""" |
| firebase_ok = check_firebase_connection() |
| return { |
| "status": "online", |
| "models": { |
| "detector": detector.loaded if detector else False, |
| "recognizer": recognizer.loaded if recognizer else False |
| }, |
| "firebase": firebase_ok, |
| "version": "1.1.0" |
| } |
|
|
| |
| |
| |
| @app.get("/status") |
| def server_status(): |
| """Public endpoint: returns whether the server is currently busy processing a frame.""" |
| global _is_processing, _last_scan_timestamp, START_TIME |
| secs_ago = int(time.time() - _last_scan_timestamp) if _last_scan_timestamp > 0 else -1 |
| return { |
| "busy": _is_processing, |
| "last_scan_seconds_ago": secs_ago, |
| "uptime_seconds": int(time.time() - START_TIME), |
| } |
|
|
| |
| async def handle_enrollment_core( |
| request: Request, |
| id_token: str, |
| roll_no: str, |
| name: str, |
| division: str, |
| photos: List[UploadFile], |
| is_re_enroll: bool |
| ): |
| client_ip = request.client.host |
| if not check_rate_limit(client_ip): |
| raise HTTPException( |
| status_code=429, |
| detail="Rate limit exceeded: Maximum 10 enrollments per minute per IP." |
| ) |
|
|
| admin_uid = verify_admin_user(id_token) |
|
|
| roll_no = roll_no.strip().upper() |
| name = name.strip() |
| division = division.strip() |
|
|
| db_client = get_firestore_db() |
| student_ref = db_client.collection("students").document(roll_no) |
| student_snap = student_ref.get() |
|
|
| if not is_re_enroll and student_snap.exists: |
| raise HTTPException( |
| status_code=400, |
| detail=f"Student {roll_no} is already enrolled. Please use re-enrollment route." |
| ) |
| elif is_re_enroll and not student_snap.exists: |
| raise HTTPException( |
| status_code=400, |
| detail=f"Student {roll_no} does not exist. Use the new enrollment route." |
| ) |
|
|
| if not photos: |
| raise HTTPException(status_code=400, detail="No photo files uploaded. Include photos list.") |
|
|
| |
| update_enrollment_status(roll_no, "processing") |
|
|
| valid_embeddings = [] |
| photos_skipped = 0 |
| start_time = time.time() |
|
|
| for photo in photos: |
| try: |
| contents = await photo.read() |
| file_size = len(contents) |
| |
| if file_size > 5 * 1024 * 1024: |
| photos_skipped += 1 |
| logger.warning(f"File {photo.filename} skipped: exceeds 5MB size limit.") |
| continue |
| |
| nparr = np.frombuffer(contents, np.uint8) |
| img = cv2.imdecode(nparr, cv2.IMREAD_COLOR) |
| if img is None: |
| photos_skipped += 1 |
| logger.warning(f"File {photo.filename} skipped: could not decode image.") |
| continue |
| |
| faces = detector.detect_faces(img) |
| if not faces: |
| photos_skipped += 1 |
| logger.warning(f"File {photo.filename} skipped: no face detected.") |
| continue |
| |
| faces.sort(key=lambda box: (box[2]-box[0])*(box[3]-box[1]), reverse=True) |
| x1, y1, x2, y2 = faces[0] |
| crop = img[y1:y2, x1:x2] |
| if crop.size == 0: |
| photos_skipped += 1 |
| logger.warning(f"File {photo.filename} skipped: empty face crop.") |
| continue |
| |
| emb = recognizer.get_embedding(crop) |
| if emb is not None: |
| valid_embeddings.append(emb) |
| else: |
| photos_skipped += 1 |
| logger.warning(f"File {photo.filename} skipped: failed to extract embedding.") |
| except Exception as e: |
| photos_skipped += 1 |
| logger.error(f"Failed to process face extraction for {photo.filename}: {e}") |
|
|
| if not valid_embeddings: |
| raise HTTPException( |
| status_code=400, |
| detail="Enrollment failed: No clear faces could be extracted from any of the uploaded photos." |
| ) |
|
|
| avg_embedding = np.mean(valid_embeddings, axis=0) |
| norm = np.linalg.norm(avg_embedding) |
| master_embedding_arr = avg_embedding / norm if norm > 0 else avg_embedding |
| master_embedding = master_embedding_arr.tolist() |
|
|
| try: |
| known_profiles = load_all_embeddings(force_refresh=True) |
| if known_profiles: |
| for existing_roll, profile in known_profiles.items(): |
| if existing_roll != roll_no: |
| similarity = float(np.dot(master_embedding_arr, profile["embedding"])) |
| if similarity >= 0.70: |
| existing_name = profile["name"] |
| raise HTTPException( |
| status_code=400, |
| detail=f"Student Name: {existing_name} and already enrolled." |
| ) |
| except HTTPException: |
| raise |
| except Exception as e: |
| logger.error(f"Error checking double enrollment: {e}") |
|
|
| try: |
| student_data = { |
| "name": name, |
| "rollNo": roll_no, |
| "division": division, |
| "photoCount": len(photos), |
| "photoUrls": [], |
| "photoURL": None, |
| "faceEmbedding": master_embedding, |
| "embeddingStatus": "complete", |
| "faceEnrolled": True, |
| "updatedAt": firestore.SERVER_TIMESTAMP |
| } |
| |
| if not is_re_enroll: |
| student_data["totalClasses"] = 0 |
| student_data["attendedClasses"] = 0 |
| student_data["enrolledAt"] = firestore.SERVER_TIMESTAMP |
| student_data["enrolledBy"] = admin_uid |
| else: |
| student_data["enrolledAt"] = firestore.SERVER_TIMESTAMP |
| student_data["enrolledBy"] = admin_uid |
| |
| student_ref.set(student_data, merge=True) |
| |
| queue_ref = db_client.collection("enrollmentQueue").document(roll_no) |
| queue_ref.set({ |
| "rollNo": roll_no, |
| "division": division, |
| "status": "complete", |
| "photoCount": len(photos), |
| "uploadedAt": firestore.SERVER_TIMESTAMP |
| }, merge=True) |
| |
| except Exception as e: |
| logger.error(f"Firestore transaction write failed for student {roll_no}: {e}") |
| raise HTTPException( |
| status_code=500, |
| detail=f"Database ledger write failed: {str(e)}" |
| ) |
|
|
| try: |
| load_all_embeddings(force_refresh=True) |
| except Exception as cache_err: |
| logger.warning(f"Failed to refresh embeddings cache: {cache_err}") |
|
|
| processing_time_ms = int((time.time() - start_time) * 1000) |
|
|
| return { |
| "success": True, |
| "roll_no": roll_no, |
| "name": name, |
| "photos_received": len(photos), |
| "photos_processed": len(valid_embeddings), |
| "photos_skipped": photos_skipped, |
| "embedding_dimensions": len(master_embedding), |
| "message": "Enrolled successfully" |
| } |
|
|
| @app.post("/enroll") |
| async def enroll_student( |
| request: Request, |
| id_token: str = Form(...), |
| roll_no: str = Form(...), |
| name: str = Form(...), |
| division: str = Form(...), |
| photos: List[UploadFile] = File(...) |
| ): |
| return await handle_enrollment_core( |
| request=request, |
| id_token=id_token, |
| roll_no=roll_no, |
| name=name, |
| division=division, |
| photos=photos, |
| is_re_enroll=False |
| ) |
|
|
| @app.post("/scan") |
| async def scan_frame( |
| frame: UploadFile = File(...), |
| period: int = Form(...), |
| division: Optional[str] = Form(None), |
| date: Optional[str] = Form(None), |
| auth: None = Depends(verify_api_key) |
| ): |
| """Processes a classroom frame, matches student embeddings, and logs attendance.""" |
| global _scans_today, _last_scan_date, _last_scan_time, _is_processing, _last_scan_timestamp |
|
|
| |
| _is_processing = True |
| |
| now_ist = datetime.now(IST) |
| local_date = now_ist.strftime("%Y-%m-%d") |
| local_time = now_ist.strftime("%H:%M") |
| |
| |
| if _last_scan_date != local_date: |
| _scans_today = 0 |
| _last_scan_date = local_date |
| |
| _scans_today += 1 |
| _last_scan_time = local_time |
| |
| logger.info(f"Processing classroom scan (Scan #{_scans_today} today)") |
| |
| |
| try: |
| contents = await frame.read() |
| nparr = np.frombuffer(contents, np.uint8) |
| img = cv2.imdecode(nparr, cv2.IMREAD_COLOR) |
| if img is None: |
| raise HTTPException(status_code=400, detail="Invalid frame file. Could not decode image.") |
| except Exception as e: |
| logger.error(f"Error decoding frame: {e}") |
| raise HTTPException(status_code=400, detail=f"Error reading image: {str(e)}") |
| |
| |
| if not division: |
| info = get_period_info() |
| if info: |
| division = info.get("division") |
| if not division: |
| division = "BCA-A" |
| |
| if not date: |
| date = local_date |
| |
| period_str = f"period-{period}" |
| |
| |
| known_profiles = get_all_student_embeddings(division) |
| if not known_profiles: |
| logger.warning(f"No enrolled student profiles found in database for division {division}.") |
| |
| update_camera_status(is_online=True, current_period=period, division=division) |
| return { |
| "success": True, |
| "faces_detected": 0, |
| "matches_found": 0, |
| "students_marked": [], |
| "already_marked": [], |
| "skipped_ambiguous": 0, |
| "period": period |
| } |
| |
| |
| h_orig, w_orig = img.shape[:2] |
| if h_orig > 480 or w_orig > 640: |
| img_resized = cv2.resize(img, (640, 480)) |
| else: |
| img_resized = img |
| |
| |
| detected_faces = detector.detect_faces(img_resized) |
| logger.info(f"Faces detected: {len(detected_faces)}") |
| |
| if not detected_faces: |
| update_camera_status(is_online=True, current_period=period, division=division) |
| return { |
| "success": True, |
| "faces_detected": 0, |
| "matches_found": 0, |
| "students_marked": [], |
| "already_marked": [], |
| "skipped_ambiguous": 0, |
| "period": period |
| } |
| |
| |
| if len(detected_faces) > 10: |
| logger.info(f"Truncating {len(detected_faces)} detected faces to 10 for CPU speed.") |
| detected_faces = detected_faces[:10] |
| |
| |
| results = [] |
| futures = [] |
| |
| with ThreadPoolExecutor(max_workers=2) as executor: |
| for idx, box in enumerate(detected_faces): |
| f = executor.submit(_process_single_face_crop, idx, box, img_resized, known_profiles) |
| futures.append(f) |
| |
| |
| done, not_done = wait(futures, timeout=8.0) |
| |
| for f in done: |
| try: |
| res = f.result() |
| if res is not None: |
| results.append(res) |
| except Exception as ex: |
| logger.error(f"Worker thread error during face recognition: {ex}") |
| |
| |
| for f in not_done: |
| f.cancel() |
| logger.warning("A face recognition thread timed out and was cancelled.") |
| |
| |
| students_marked = [] |
| already_marked = [] |
| skipped_ambiguous = 0 |
| |
| marked_set = get_marked_set(division, date, period_str) |
| |
| for res in results: |
| match_id = res["match_id"] |
| if match_id == "AMBIGUOUS": |
| skipped_ambiguous += 1 |
| continue |
| elif match_id is not None: |
| |
| if match_id in marked_set: |
| already_marked.append(match_id) |
| else: |
| student_name = known_profiles[match_id]["name"] |
| success = save_attendance( |
| student_id=match_id, |
| name=student_name, |
| period=period, |
| subject=None, |
| division=division, |
| date=date |
| ) |
| if success: |
| marked_set.add(match_id) |
| students_marked.append(match_id) |
| |
| |
| update_camera_status(is_online=True, current_period=period, division=division) |
|
|
| |
| _is_processing = False |
| _last_scan_timestamp = time.time() |
|
|
| logger.info(f"Scan processed: Detected={len(detected_faces)}, New Marks={len(students_marked)}, Already Marked={len(already_marked)}, Ambiguous={skipped_ambiguous}") |
| return { |
| "success": True, |
| "faces_detected": len(detected_faces), |
| "matches_found": len(students_marked) + len(already_marked), |
| "students_marked": students_marked, |
| "already_marked": already_marked, |
| "skipped_ambiguous": skipped_ambiguous, |
| "period": period |
| } |
|
|
| @app.post("/re-enroll") |
| async def re_enroll_student_route( |
| request: Request, |
| id_token: str = Form(...), |
| roll_no: str = Form(...), |
| name: str = Form(...), |
| division: str = Form(...), |
| photos: List[UploadFile] = File(...) |
| ): |
| return await handle_enrollment_core( |
| request=request, |
| id_token=id_token, |
| roll_no=roll_no, |
| name=name, |
| division=division, |
| photos=photos, |
| is_re_enroll=True |
| ) |
|
|
| @app.get("/photo/{roll_no}/{filename}") |
| async def get_student_photo(roll_no: str, filename: str): |
| repo = os.getenv("HF_DATASET_REPO", "MrRayZer/machub-biometric-storage") |
| token = os.getenv("HF_TOKEN") |
| |
| url = f"https://huggingface.co/datasets/{repo}/resolve/main/{roll_no}/{filename}" |
| |
| headers = {} |
| if token: |
| headers["Authorization"] = f"Bearer {token}" |
| |
| try: |
| resp = requests.get(url, headers=headers, timeout=15) |
| if resp.status_code == 200: |
| return Response(content=resp.content, media_type=resp.headers.get("content-type", "image/jpeg")) |
| else: |
| raise HTTPException(status_code=resp.status_code, detail="Photo not found") |
| except Exception as e: |
| logger.error(f"Proxy photo download failed for {roll_no}/{filename}: {e}") |
| raise HTTPException(status_code=500, detail=f"Proxy error downloading photo: {str(e)}") |
|
|
| @app.get("/status/{roll_no}") |
| def get_enrollment_status( |
| roll_no: str, |
| auth: None = Depends(verify_api_key) |
| ): |
| """Queries the database to report the enrollment status of a student.""" |
| db_client = get_firestore_db() |
| |
| student_doc = db_client.collection("students").document(roll_no).get() |
| queue_doc = db_client.collection("enrollmentQueue").document(roll_no).get() |
| |
| if student_doc.exists: |
| data = student_doc.to_dict() |
| return { |
| "roll_no": roll_no, |
| "enrolled": True, |
| "embedding_ready": "faceEmbedding" in data, |
| "photo_count": data.get("photoCount", 0), |
| "status": "complete" |
| } |
| elif queue_doc.exists: |
| data = queue_doc.to_dict() |
| return { |
| "roll_no": roll_no, |
| "enrolled": False, |
| "embedding_ready": False, |
| "photo_count": 0, |
| "status": data.get("status", "processing") |
| } |
| else: |
| raise HTTPException( |
| status_code=404, |
| detail=f"Student {roll_no} has no profile or queue item." |
| ) |
|
|
| @app.delete("/student/{roll_no}") |
| def delete_student( |
| roll_no: str, |
| auth: None = Depends(verify_api_key) |
| ): |
| """Deletes face embedding profile records for a student.""" |
| success = delete_student_embedding(roll_no) |
| if success: |
| return {"success": True, "message": f"Successfully deleted face embeddings for {roll_no}"} |
| else: |
| raise HTTPException( |
| status_code=500, |
| detail="Failed to delete student embeddings." |
| ) |
|
|
| @app.get("/stats") |
| def get_stats( |
| auth: None = Depends(verify_api_key) |
| ): |
| """Returns analytics about the face recognition container instance.""" |
| db_client = get_firestore_db() |
| |
| |
| total_enrolled = 0 |
| try: |
| snap = db_client.collection("students").where("embeddingStatus", "==", "complete").count().get() |
| total_enrolled = snap[0][0].value |
| except Exception: |
| |
| try: |
| docs = db_client.collection("students").where("embeddingStatus", "==", "complete").stream() |
| total_enrolled = sum(1 for _ in docs) |
| except Exception: |
| pass |
| |
| uptime = int(time.time() - START_TIME) |
| space_url = os.getenv("SPACE_URL", "http://localhost:7860") |
| |
| return { |
| "total_enrolled_students": total_enrolled, |
| "scans_today": _scans_today, |
| "last_scan": _last_scan_time, |
| "uptime_seconds": uptime, |
| "space_url": space_url |
| } |
|
|
| @app.post("/refresh") |
| def refresh_embeddings_cache( |
| auth: None = Depends(verify_api_key) |
| ): |
| """Purges the in-memory student embeddings cache and re-queries Firestore.""" |
| try: |
| logger.info("Retriggering embedding cache reload from admin request.") |
| profiles = load_all_embeddings(force_refresh=True) |
| return { |
| "success": True, |
| "message": f"Cached embeddings successfully rebuilt. Total profiles: {len(profiles)}" |
| } |
| except Exception as e: |
| logger.error(f"Failed to manually refresh embeddings: {e}") |
| raise HTTPException( |
| status_code=500, |
| detail=f"Rebuilding embedding cache failed: {str(e)}" |
| ) |
|
|
| @app.post("/identify") |
| async def identify_faces( |
| face_crops: List[UploadFile] = File(...), |
| tracking_ids: Optional[List[str]] = Form(None), |
| auth: None = Depends(verify_api_key) |
| ): |
| """Identifies students from a list of cropped face images using the global embeddings cache and stateful hysteresis tracking.""" |
| |
| prune_stale_tracks() |
|
|
| known_profiles = load_all_embeddings() |
| if not known_profiles: |
| logger.warning("No student profiles in cache.") |
| return {"success": True, "faces": []} |
|
|
| results = [] |
| |
| for idx, crop_file in enumerate(face_crops): |
| tracking_id = None |
| if tracking_ids and idx < len(tracking_ids): |
| tracking_id = tracking_ids[idx] |
|
|
| try: |
| contents = await crop_file.read() |
| nparr = np.frombuffer(contents, np.uint8) |
| img = cv2.imdecode(nparr, cv2.IMREAD_COLOR) |
| |
| if img is None or img.size == 0: |
| results.append({ |
| "index": idx, |
| "match": False, |
| "confidence": 0.0, |
| "name": "Invalid image" |
| }) |
| continue |
| |
| |
| if detector and detector.loaded: |
| try: |
| faces = detector.detect_faces(img) |
| if faces: |
| |
| faces.sort(key=lambda b: (b[2]-b[0])*(b[3]-b[1]), reverse=True) |
| fx1, fy1, fx2, fy2 = faces[0] |
| crop = img[fy1:fy2, fx1:fx2] |
| if crop.size > 0: |
| img = crop |
| except Exception as detect_err: |
| logger.warning(f"Failed to recrop face crop in /identify: {detect_err}") |
| |
| |
| embedding = recognizer.get_embedding(img) |
| if embedding is None: |
| results.append({ |
| "index": idx, |
| "match": False, |
| "confidence": 0.0, |
| "name": "No face embedding" |
| }) |
| continue |
| |
| if tracking_id: |
| |
| _track_last_seen[tracking_id] = time.time() |
| |
| |
| if tracking_id not in _track_embeddings: |
| _track_embeddings[tracking_id] = [] |
| _track_embeddings[tracking_id].append(embedding) |
| _track_embeddings[tracking_id] = _track_embeddings[tracking_id][-5:] |
| |
| |
| mean_emb = np.mean(_track_embeddings[tracking_id], axis=0) |
| norm = np.linalg.norm(mean_emb) |
| smoothed_emb = mean_emb / norm if norm > 0 else mean_emb |
| |
| |
| scores = [] |
| for roll_no, profile in known_profiles.items(): |
| similarity = float(np.dot(smoothed_emb, profile["embedding"])) |
| scores.append((roll_no, similarity)) |
| |
| if not scores: |
| results.append({ |
| "index": idx, |
| "match": False, |
| "confidence": 0.0, |
| "name": "Unknown" |
| }) |
| continue |
| |
| |
| scores.sort(key=lambda x: x[1], reverse=True) |
| top_roll, top_score = scores[0] |
| |
| |
| is_ambiguous = False |
| if len(scores) > 1: |
| second_roll, second_score = scores[1] |
| if (top_score - second_score) < 0.05 and top_score >= 0.60: |
| is_ambiguous = True |
| |
| locked = _track_identities.get(tracking_id) |
| |
| if top_score < 0.60: |
| |
| if locked: |
| locked["consecutive_low_scores"] += 1 |
| if locked["consecutive_low_scores"] >= 3: |
| logger.info(f"[TRACKER] Lock broken for {tracking_id} (identity {locked['rollNo']}) due to 3 consecutive low scores.") |
| _track_identities.pop(tracking_id, None) |
| results.append({ |
| "index": idx, |
| "match": False, |
| "confidence": top_score, |
| "name": "Unknown" |
| }) |
| else: |
| logger.info(f"[TRACKER] Hysteresis retain: {tracking_id} stays locked to {locked['rollNo']} (score {top_score:.3f}, consecutive: {locked['consecutive_low_scores']})") |
| results.append({ |
| "index": idx, |
| "match": True, |
| "rollNo": locked["rollNo"], |
| "name": locked["name"], |
| "confidence": locked["score"] |
| }) |
| else: |
| results.append({ |
| "index": idx, |
| "match": False, |
| "confidence": top_score, |
| "name": "Unknown" |
| }) |
| |
| elif is_ambiguous: |
| |
| if locked: |
| locked["consecutive_low_scores"] += 1 |
| if locked["consecutive_low_scores"] >= 3: |
| logger.info(f"[TRACKER] Lock broken for {tracking_id} due to 3 consecutive ambiguous/low scores.") |
| _track_identities.pop(tracking_id, None) |
| results.append({ |
| "index": idx, |
| "match": False, |
| "confidence": top_score, |
| "name": "Ambiguous" |
| }) |
| else: |
| logger.info(f"[TRACKER] Hysteresis retain: {tracking_id} stays locked to {locked['rollNo']} despite ambiguity.") |
| results.append({ |
| "index": idx, |
| "match": True, |
| "rollNo": locked["rollNo"], |
| "name": locked["name"], |
| "confidence": locked["score"] |
| }) |
| else: |
| results.append({ |
| "index": idx, |
| "match": False, |
| "confidence": top_score, |
| "name": "Ambiguous" |
| }) |
| |
| else: |
| if locked: |
| if top_roll == locked["rollNo"]: |
| |
| locked["consecutive_low_scores"] = 0 |
| locked["score"] = top_score |
| results.append({ |
| "index": idx, |
| "match": True, |
| "rollNo": top_roll, |
| "name": locked["name"], |
| "confidence": top_score |
| }) |
| else: |
| |
| if top_score >= 0.72: |
| logger.info(f"[TRACKER] Identity switched for {tracking_id} from {locked['rollNo']} to {top_roll} (score {top_score:.3f} >= 0.72)") |
| _track_identities[tracking_id] = { |
| "rollNo": top_roll, |
| "name": known_profiles[top_roll]["name"], |
| "score": top_score, |
| "consecutive_low_scores": 0 |
| } |
| results.append({ |
| "index": idx, |
| "match": True, |
| "rollNo": top_roll, |
| "name": known_profiles[top_roll]["name"], |
| "confidence": top_score |
| }) |
| else: |
| |
| locked["consecutive_low_scores"] += 1 |
| if locked["consecutive_low_scores"] >= 3: |
| logger.info(f"[TRACKER] Lock broken for {tracking_id} due to 3 consecutive identity switches/low scores.") |
| _track_identities.pop(tracking_id, None) |
| results.append({ |
| "index": idx, |
| "match": False, |
| "confidence": top_score, |
| "name": "Unknown" |
| }) |
| else: |
| logger.info(f"[TRACKER] Hysteresis retain: {tracking_id} stays locked to {locked['rollNo']} (switch to {top_roll} rejected, score {top_score:.3f} < 0.72)") |
| results.append({ |
| "index": idx, |
| "match": True, |
| "rollNo": locked["rollNo"], |
| "name": locked["name"], |
| "confidence": locked["score"] |
| }) |
| else: |
| |
| if top_score >= 0.72: |
| logger.info(f"[TRACKER] Locked new identity for {tracking_id}: {top_roll} (score {top_score:.3f} >= 0.72)") |
| _track_identities[tracking_id] = { |
| "rollNo": top_roll, |
| "name": known_profiles[top_roll]["name"], |
| "score": top_score, |
| "consecutive_low_scores": 0 |
| } |
| results.append({ |
| "index": idx, |
| "match": True, |
| "rollNo": top_roll, |
| "name": known_profiles[top_roll]["name"], |
| "confidence": top_score |
| }) |
| else: |
| results.append({ |
| "index": idx, |
| "match": False, |
| "confidence": top_score, |
| "name": "Unknown" |
| }) |
| else: |
| |
| match_id, score = recognizer.match_student(embedding, known_profiles) |
| |
| if match_id == "AMBIGUOUS": |
| results.append({ |
| "index": idx, |
| "match": False, |
| "confidence": score, |
| "name": "Ambiguous" |
| }) |
| elif match_id is not None: |
| results.append({ |
| "index": idx, |
| "match": True, |
| "rollNo": match_id, |
| "name": known_profiles[match_id]["name"], |
| "confidence": score |
| }) |
| else: |
| results.append({ |
| "index": idx, |
| "match": False, |
| "confidence": score, |
| "name": "Unknown" |
| }) |
| |
| except Exception as e: |
| logger.error(f"Error processing face crop {crop_file.filename}: {e}") |
| results.append({ |
| "index": idx, |
| "match": False, |
| "confidence": 0.0, |
| "name": f"Error: {str(e)}" |
| }) |
| |
| return {"success": True, "faces": results} |
|
|
|
|
| @app.post("/detect-phone") |
| async def detect_phone_in_frame( |
| frame: UploadFile = File(None), |
| photo: UploadFile = File(None), |
| auth: None = Depends(verify_api_key) |
| ): |
| """ |
| Detects phones in a classroom frame using YOLOv8 and correlates each phone |
| with the nearest detected face centroid to identify the student holding it. |
| |
| v1.1.0 — Added 2026-06-23 |
| """ |
| start_time = time.time() |
| |
| file_to_process = frame if frame is not None else photo |
| if file_to_process is None: |
| raise HTTPException(status_code=400, detail="Missing frame or photo file parameter.") |
| |
| try: |
| contents = await file_to_process.read() |
| nparr = np.frombuffer(contents, np.uint8) |
| img = cv2.imdecode(nparr, cv2.IMREAD_COLOR) |
| if img is None: |
| raise HTTPException(status_code=400, detail="Invalid frame: could not decode image.") |
| except HTTPException: |
| raise |
| except Exception as e: |
| raise HTTPException(status_code=400, detail=f"Error reading frame: {str(e)}") |
| |
| h_orig, w_orig = img.shape[:2] |
| |
| |
| if h_orig > 480 or w_orig > 640: |
| img_resized = cv2.resize(img, (640, 480)) |
| scale_x = w_orig / 640 |
| scale_y = h_orig / 480 |
| else: |
| img_resized = img |
| scale_x = 1.0 |
| scale_y = 1.0 |
| |
| |
| face_boxes = detector.detect_faces(img_resized) if detector else [] |
| face_centroids = [] |
| for box in face_boxes: |
| x1, y1, x2, y2 = box[:4] |
| cx = (x1 + x2) / 2 |
| cy = (y1 + y2) / 2 |
| face_centroids.append({"cx": cx, "cy": cy, "bbox": [x1, y1, x2, y2]}) |
| |
| |
| phone_detections = [] |
| if phone_detector and phone_detector.loaded: |
| try: |
| phone_detections = phone_detector.detect_phones(img_resized) |
| logger.info(f"[PHONE] Scanning frame ({w_orig}x{h_orig}) for phone objects. Faces: {len(face_centroids)} | Phones: {len(phone_detections)}") |
| except Exception as e: |
| logger.warning(f"[PHONE] Detection error: {e}") |
| else: |
| logger.warning("[PHONE] Phone detector not loaded or missing. Skipping inference.") |
| |
| elapsed_ms = int((time.time() - start_time) * 1000) |
| |
| |
| result_phones = [] |
| detections_compat = [] |
| |
| for phone in phone_detections: |
| px1, py1, px2, py2 = phone["bbox"] |
| phone_cx = (px1 + px2) / 2 |
| phone_cy = (py1 + py2) / 2 |
| |
| |
| nearest_student = None |
| min_dist = float("inf") |
| for i, fc in enumerate(face_centroids): |
| dist = ((phone_cx - fc["cx"]) ** 2 + (phone_cy - fc["cy"]) ** 2) ** 0.5 |
| if dist < min_dist: |
| min_dist = dist |
| nearest_student = i |
| |
| |
| result_phones.append({ |
| "bbox": [int(px1 * scale_x), int(py1 * scale_y), int(px2 * scale_x), int(py2 * scale_y)], |
| "type": "mobile_phone", |
| "confidence": phone.get("confidence", 0.0), |
| "nearest_face_index": nearest_student, |
| "distance_to_face": round(min_dist * max(scale_x, scale_y), 1) |
| }) |
| |
| |
| detections_compat.append({ |
| "box": [ |
| max(0.0, min(1.0, px1 / 640.0)), |
| max(0.0, min(1.0, py1 / 480.0)), |
| max(0.0, min(1.0, px2 / 640.0)), |
| max(0.0, min(1.0, py2 / 480.0)) |
| ], |
| "confidence": phone.get("confidence", 0.0) |
| }) |
| |
| logger.info(f"[PHONE] Detection complete: {len(result_phones)} phones in {elapsed_ms}ms") |
| |
| return { |
| "success": True, |
| "detected": len(detections_compat) > 0, |
| "detections": detections_compat, |
| "phones": result_phones, |
| "face_count": len(face_centroids), |
| "processing_time_ms": elapsed_ms |
| } |
|
|