MacHub / app.py
MrRayZer's picture
Upload app.py with huggingface_hub
d7f8f25 verified
Raw
History Blame Contribute Delete
48.9 kB
import os
import time
import logging
import requests
import cv2
import numpy as np
from datetime import datetime
from typing import List, Optional
from concurrent.futures import ThreadPoolExecutor, wait
from dotenv import load_dotenv
# Load environment variables from .env if present
load_dotenv()
from fastapi import FastAPI, UploadFile, File, Form, HTTPException, Depends, status, Request
from fastapi.responses import Response
from fastapi.security import APIKeyHeader
from fastapi.middleware.cors import CORSMiddleware
from fastapi.staticfiles import StaticFiles
from huggingface_hub import HfApi
from PIL import Image
import io
import time
from collections import defaultdict
from firebase_admin import firestore
# Import local helpers
from firebase_helper import (
get_firestore_db,
check_firebase_connection,
get_all_student_embeddings,
load_all_embeddings,
save_attendance,
update_enrollment_status,
save_face_embedding,
update_camera_status,
get_period_info,
delete_student_embedding,
IST
)
from detector import FaceDetector, PhoneDetector
from recognizer import FaceRecognizer
from keep_alive import start_keep_alive
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s"
)
logger = logging.getLogger(__name__)
# Track uptime and stats
START_TIME = time.time()
_scans_today = 0
_last_scan_date = None
_last_scan_time = "—"
_last_scan_timestamp = 0.0 # unix time of last /scan completion
# FIX 1 — Processing flag: set True while /scan is running
# Used by GET /status so the capture client can skip uploading
# when the server is already busy with a frame.
_is_processing = False
# FastAPI Setup
app = FastAPI(
title="Machub AI Face Recognition Attendance System Backend",
version="1.1.0",
description="ONNX-powered face detection and recognition FastAPI server for Hugging Face Spaces"
)
# Configure CORS to allow only the dashboard domains and local development
allowed_origins = [
"http://localhost:5173",
"http://127.0.0.1:5173",
"https://machub-6af39.web.app",
"https://machub-6af39.firebaseapp.com"
]
# Allow custom origins from environment if provided
custom_origins = os.getenv("ALLOWED_ORIGINS")
if custom_origins:
allowed_origins.extend([o.strip() for o in custom_origins.split(",")])
app.add_middleware(
CORSMiddleware,
allow_origins=allowed_origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# API Key Security Setup
API_KEY_HEADER = APIKeyHeader(name="X-API-Key", auto_error=False)
def verify_api_key(x_api_key: Optional[str] = Depends(API_KEY_HEADER)):
"""Enforces authentication on protected routes using the X-API-Key header."""
secret_key = os.getenv("API_SECRET_KEY")
if not secret_key:
logger.warning("API_SECRET_KEY is not configured in the server environment!")
return None
if not x_api_key or x_api_key != secret_key:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Unauthorized: Invalid or missing X-API-Key header."
)
return x_api_key
# Global Singletons
detector = None
recognizer = None
phone_detector = None
# Hugging Face Dataset Storage Configuration
hf_token = os.getenv("HF_TOKEN")
hf_api = HfApi(token=hf_token)
hf_repo = os.getenv("HF_DATASET_REPO", "MrRayZer/machub-biometric-storage")
if hf_token:
try:
logger.info(f"Ensuring private dataset repository exists: {hf_repo}")
hf_api.create_repo(repo_id=hf_repo, repo_type="dataset", private=True, exist_ok=True)
logger.info("Dataset repository verified/created successfully.")
except Exception as e:
logger.error(f"Failed to verify/create HF dataset repository {hf_repo}: {e}")
else:
logger.warning("HF_TOKEN environment variable is missing! Cannot manage private dataset repository.")
# Rate Limiting configuration (10 enrollments per minute per IP)
_enroll_rate_limits = defaultdict(list)
def check_rate_limit(ip_address: str) -> bool:
now = time.time()
_enroll_rate_limits[ip_address] = [t for t in _enroll_rate_limits[ip_address] if now - t < 60.0]
if len(_enroll_rate_limits[ip_address]) >= 10:
return False
_enroll_rate_limits[ip_address].append(now)
return True
# Hugging Face API call retry helper with exponential backoff
def call_hf_with_retry(func, *args, **kwargs):
retries = 3
backoff = 2.0
for attempt in range(retries):
try:
return func(*args, **kwargs)
except Exception as e:
err_str = str(e).lower()
is_transient = "429" in err_str or "503" in err_str or "rate limit" in err_str or "connection" in err_str
if is_transient and attempt < retries - 1:
logger.warning(f"[HF_API] Call failed (attempt {attempt+1}/{retries}). Retrying in {backoff}s... Error: {e}")
time.sleep(backoff)
backoff *= 2.0
else:
raise e
raise Exception("Hugging Face API call failed after max retries")
# Admin role and Firebase ID token verifier
def verify_admin_user(id_token: str) -> str:
if not id_token:
raise HTTPException(status_code=403, detail="Unauthorized: Missing credentials token.")
try:
from firebase_admin import auth as firebase_auth
decoded_token = firebase_auth.verify_id_token(id_token)
email = decoded_token.get("email", "").lower()
uid = decoded_token.get("uid")
db_client = get_firestore_db()
user_doc = db_client.collection("allowedUsers").document(email).get()
is_admin = False
if user_doc.exists:
data = user_doc.to_dict()
if data.get("role") == "admin":
is_admin = True
if not is_admin and email in ["admin@machub.in", "mrabensojan@gmail.com"]:
is_admin = True
if not is_admin:
logger.warning(f"Access denied: User {email} is not authorized as admin.")
raise HTTPException(status_code=403, detail="Unauthorized: User does not have admin privileges.")
return uid
except HTTPException:
raise
except Exception as e:
logger.error(f"Firebase token validation error: {e}")
raise HTTPException(status_code=403, detail=f"Unauthorized: Token verification failed ({str(e)}).")
# Model URLs for automatic download on startup
YOLO_URL = "https://huggingface.co/crj/dl-ws/resolve/main/yoloface_8n.onnx"
ARCFACE_URL = "https://huggingface.co/FoivosPar/Arc2Face/resolve/main/arcface.onnx"
def download_model(url: str, dest_path: str):
"""Downloads a model file with chunk progress reporting."""
os.makedirs(os.path.dirname(dest_path), exist_ok=True)
logger.info(f"Downloading {url} to {dest_path}...")
try:
response = requests.get(url, stream=True)
response.raise_for_status()
total_size = int(response.headers.get("content-length", 0))
block_size = 1024 * 1024 # 1MB
downloaded = 0
with open(dest_path, "wb") as f:
for chunk in response.iter_content(chunk_size=block_size):
if chunk:
f.write(chunk)
downloaded += len(chunk)
if total_size > 0:
percent = (downloaded / total_size) * 100
logger.info(f"Downloading {os.path.basename(dest_path)}: {percent:.1f}% ({downloaded}/{total_size} bytes)")
else:
logger.info(f"Downloading {os.path.basename(dest_path)}: {downloaded} bytes")
logger.info(f"Finished downloading {dest_path}")
except Exception as e:
logger.error(f"Error downloading model {dest_path}: {e}")
if os.path.exists(dest_path):
os.remove(dest_path)
raise e
def pre_warm_models():
"""Warms up model runtimes to eliminate initial inference delays (cold-start mitigation)."""
global detector, recognizer, phone_detector
try:
logger.info("Pre-warming YOLOv8 detector, ArcFace recognizer, and Phone detector...")
dummy_img = np.zeros((480, 640, 3), dtype=np.uint8)
if detector and detector.loaded:
_ = detector.detect_faces(dummy_img)
if recognizer and recognizer.loaded:
dummy_face = np.zeros((112, 112, 3), dtype=np.uint8)
_ = recognizer.get_embedding(dummy_face)
if phone_detector and phone_detector.loaded:
_ = phone_detector.detect_phones(dummy_img)
logger.info("Models successfully pre-warmed.")
except Exception as e:
logger.error(f"Failed to pre-warm models: {e}")
@app.on_event("startup")
def startup_event():
"""Server initialization hook."""
global detector, recognizer, phone_detector
logger.info("--- Starting Machub Backend Initialization ---")
# 1. Initialize Firebase and Cache Embeddings
try:
get_firestore_db()
logger.info("Firebase Firestore initialized.")
load_all_embeddings(force_refresh=True)
logger.info("In-memory student embeddings pre-loaded.")
except Exception as e:
logger.error(f"Firebase / Caching initialization failed: {e}")
# 2. Ensure ONNX models exist (download if missing)
models_dir = "models"
yolo_path = os.path.join(models_dir, "yolov8n-face.onnx")
arcface_path = os.path.join(models_dir, "arcface.onnx")
yolo_coco_path = os.path.join(models_dir, "yolov8n.onnx")
if not os.path.exists(yolo_path):
download_model(YOLO_URL, yolo_path)
if not os.path.exists(arcface_path):
download_model(ARCFACE_URL, arcface_path)
try:
if not os.path.exists(yolo_coco_path):
download_model("https://huggingface.co/webnn/yolov8n/resolve/main/onnx/yolov8n_fp16.onnx", yolo_coco_path)
except Exception as ex:
logger.warning(f"Gracefully skipped YOLOv8 COCO model download for phone detection: {ex}")
# 3. Instantiate model singletons
detector = FaceDetector(yolo_path)
recognizer = FaceRecognizer(arcface_path)
phone_detector = PhoneDetector(yolo_coco_path)
# 4. Pre-warm models
pre_warm_models()
# 5. Start keep-alive loop
start_keep_alive()
logger.info("--- Machub Backend Ready ---")
# In-Memory Cache to minimize duplicate attendance writes in the same class session
# Key: (division, date, period) -> Value: set(roll_no)
_marked_cache = {}
# Stateful tracking registries for /identify
_track_embeddings = {} # tid -> list of embeddings (numpy arrays)
_track_identities = {} # tid -> { "rollNo": str, "name": str, "score": float, "consecutive_low_scores": int }
_track_last_seen = {} # tid -> timestamp (float)
def prune_stale_tracks():
"""Prunes tracking states that have not been updated for 10 seconds."""
now = time.time()
stale_ids = [tid for tid, last_seen in _track_last_seen.items() if now - last_seen > 10.0]
for tid in stale_ids:
logger.info(f"[TRACKER] Pruning stale track due to inactivity: {tid}")
_track_embeddings.pop(tid, None)
_track_identities.pop(tid, None)
_track_last_seen.pop(tid, None)
def get_marked_set(division: str, date: str, period: str) -> set:
"""Returns the set of students already marked in-memory for a period. Prunes older cached keys."""
key = (division, date, period)
# Prune keys from other sessions to prevent memory leaks
keys_to_remove = [k for k in _marked_cache.keys() if k != key]
for k in keys_to_remove:
del _marked_cache[k]
if key not in _marked_cache:
_marked_cache[key] = set()
return _marked_cache[key]
# Parallel processing helper
def _process_single_face_crop(idx: int, box: List[int], img: np.ndarray, known_profiles: dict):
"""Crops a face, equalizes contrast, computes embedding, and matches against known database."""
x1, y1, x2, y2 = box
crop = img[y1:y2, x1:x2]
if crop.size == 0:
return None
embedding = recognizer.get_embedding(crop)
if embedding is None:
return None
match_id, score = recognizer.match_student(embedding, known_profiles)
return {
"index": idx,
"match_id": match_id,
"score": score
}
# ----------------- ENDPOINTS -----------------
@app.get("/health")
def health_check():
"""Public health endpoint used for uptime monitoring, warmups, and diagnostics."""
firebase_ok = check_firebase_connection()
return {
"status": "online",
"models": {
"detector": detector.loaded if detector else False,
"recognizer": recognizer.loaded if recognizer else False
},
"firebase": firebase_ok,
"version": "1.1.0"
}
# FIX 2 — Public /status endpoint (no auth required)
# Called by the capture client with a 5 s timeout before every upload.
# If busy=True the client skips the upload immediately.
@app.get("/status")
def server_status():
"""Public endpoint: returns whether the server is currently busy processing a frame."""
global _is_processing, _last_scan_timestamp, START_TIME
secs_ago = int(time.time() - _last_scan_timestamp) if _last_scan_timestamp > 0 else -1
return {
"busy": _is_processing,
"last_scan_seconds_ago": secs_ago,
"uptime_seconds": int(time.time() - START_TIME),
}
# Core Biometric Enrollment and Upload Engine (RAM-only Processing)
async def handle_enrollment_core(
request: Request,
id_token: str,
roll_no: str,
name: str,
division: str,
photos: List[UploadFile],
is_re_enroll: bool
):
client_ip = request.client.host
if not check_rate_limit(client_ip):
raise HTTPException(
status_code=429,
detail="Rate limit exceeded: Maximum 10 enrollments per minute per IP."
)
admin_uid = verify_admin_user(id_token)
roll_no = roll_no.strip().upper()
name = name.strip()
division = division.strip()
db_client = get_firestore_db()
student_ref = db_client.collection("students").document(roll_no)
student_snap = student_ref.get()
if not is_re_enroll and student_snap.exists:
raise HTTPException(
status_code=400,
detail=f"Student {roll_no} is already enrolled. Please use re-enrollment route."
)
elif is_re_enroll and not student_snap.exists:
raise HTTPException(
status_code=400,
detail=f"Student {roll_no} does not exist. Use the new enrollment route."
)
if not photos:
raise HTTPException(status_code=400, detail="No photo files uploaded. Include photos list.")
# Update queue to processing
update_enrollment_status(roll_no, "processing")
valid_embeddings = []
photos_skipped = 0
start_time = time.time()
for photo in photos:
try:
contents = await photo.read()
file_size = len(contents)
if file_size > 5 * 1024 * 1024:
photos_skipped += 1
logger.warning(f"File {photo.filename} skipped: exceeds 5MB size limit.")
continue
nparr = np.frombuffer(contents, np.uint8)
img = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
if img is None:
photos_skipped += 1
logger.warning(f"File {photo.filename} skipped: could not decode image.")
continue
faces = detector.detect_faces(img)
if not faces:
photos_skipped += 1
logger.warning(f"File {photo.filename} skipped: no face detected.")
continue
faces.sort(key=lambda box: (box[2]-box[0])*(box[3]-box[1]), reverse=True)
x1, y1, x2, y2 = faces[0]
crop = img[y1:y2, x1:x2]
if crop.size == 0:
photos_skipped += 1
logger.warning(f"File {photo.filename} skipped: empty face crop.")
continue
emb = recognizer.get_embedding(crop)
if emb is not None:
valid_embeddings.append(emb)
else:
photos_skipped += 1
logger.warning(f"File {photo.filename} skipped: failed to extract embedding.")
except Exception as e:
photos_skipped += 1
logger.error(f"Failed to process face extraction for {photo.filename}: {e}")
if not valid_embeddings:
raise HTTPException(
status_code=400,
detail="Enrollment failed: No clear faces could be extracted from any of the uploaded photos."
)
avg_embedding = np.mean(valid_embeddings, axis=0)
norm = np.linalg.norm(avg_embedding)
master_embedding_arr = avg_embedding / norm if norm > 0 else avg_embedding
master_embedding = master_embedding_arr.tolist()
try:
known_profiles = load_all_embeddings(force_refresh=True)
if known_profiles:
for existing_roll, profile in known_profiles.items():
if existing_roll != roll_no:
similarity = float(np.dot(master_embedding_arr, profile["embedding"]))
if similarity >= 0.70:
existing_name = profile["name"]
raise HTTPException(
status_code=400,
detail=f"Student Name: {existing_name} and already enrolled."
)
except HTTPException:
raise
except Exception as e:
logger.error(f"Error checking double enrollment: {e}")
try:
student_data = {
"name": name,
"rollNo": roll_no,
"division": division,
"photoCount": len(photos),
"photoUrls": [],
"photoURL": None,
"faceEmbedding": master_embedding,
"embeddingStatus": "complete",
"faceEnrolled": True,
"updatedAt": firestore.SERVER_TIMESTAMP
}
if not is_re_enroll:
student_data["totalClasses"] = 0
student_data["attendedClasses"] = 0
student_data["enrolledAt"] = firestore.SERVER_TIMESTAMP
student_data["enrolledBy"] = admin_uid
else:
student_data["enrolledAt"] = firestore.SERVER_TIMESTAMP
student_data["enrolledBy"] = admin_uid
student_ref.set(student_data, merge=True)
queue_ref = db_client.collection("enrollmentQueue").document(roll_no)
queue_ref.set({
"rollNo": roll_no,
"division": division,
"status": "complete",
"photoCount": len(photos),
"uploadedAt": firestore.SERVER_TIMESTAMP
}, merge=True)
except Exception as e:
logger.error(f"Firestore transaction write failed for student {roll_no}: {e}")
raise HTTPException(
status_code=500,
detail=f"Database ledger write failed: {str(e)}"
)
try:
load_all_embeddings(force_refresh=True)
except Exception as cache_err:
logger.warning(f"Failed to refresh embeddings cache: {cache_err}")
processing_time_ms = int((time.time() - start_time) * 1000)
return {
"success": True,
"roll_no": roll_no,
"name": name,
"photos_received": len(photos),
"photos_processed": len(valid_embeddings),
"photos_skipped": photos_skipped,
"embedding_dimensions": len(master_embedding),
"message": "Enrolled successfully"
}
@app.post("/enroll")
async def enroll_student(
request: Request,
id_token: str = Form(...),
roll_no: str = Form(...),
name: str = Form(...),
division: str = Form(...),
photos: List[UploadFile] = File(...)
):
return await handle_enrollment_core(
request=request,
id_token=id_token,
roll_no=roll_no,
name=name,
division=division,
photos=photos,
is_re_enroll=False
)
@app.post("/scan")
async def scan_frame(
frame: UploadFile = File(...),
period: int = Form(...),
division: Optional[str] = Form(None),
date: Optional[str] = Form(None),
auth: None = Depends(verify_api_key)
):
"""Processes a classroom frame, matches student embeddings, and logs attendance."""
global _scans_today, _last_scan_date, _last_scan_time, _is_processing, _last_scan_timestamp
# FIX 1 — set busy flag immediately so /status reflects reality
_is_processing = True
now_ist = datetime.now(IST)
local_date = now_ist.strftime("%Y-%m-%d")
local_time = now_ist.strftime("%H:%M")
# Setup stats tracking
if _last_scan_date != local_date:
_scans_today = 0
_last_scan_date = local_date
_scans_today += 1
_last_scan_time = local_time
logger.info(f"Processing classroom scan (Scan #{_scans_today} today)")
# 1. Decode Frame Image
try:
contents = await frame.read()
nparr = np.frombuffer(contents, np.uint8)
img = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
if img is None:
raise HTTPException(status_code=400, detail="Invalid frame file. Could not decode image.")
except Exception as e:
logger.error(f"Error decoding frame: {e}")
raise HTTPException(status_code=400, detail=f"Error reading image: {str(e)}")
# 2. Get Period Info & Division Fallbacks
if not division:
info = get_period_info()
if info:
division = info.get("division")
if not division:
division = "BCA-A" # Final fallback
if not date:
date = local_date
period_str = f"period-{period}"
# 3. Fetch Known Division Profiles from database
known_profiles = get_all_student_embeddings(division)
if not known_profiles:
logger.warning(f"No enrolled student profiles found in database for division {division}.")
# Empty classroom fallback
update_camera_status(is_online=True, current_period=period, division=division)
return {
"success": True,
"faces_detected": 0,
"matches_found": 0,
"students_marked": [],
"already_marked": [],
"skipped_ambiguous": 0,
"period": period
}
# 4. Resize Frame to 640x480 for faster CPU execution
h_orig, w_orig = img.shape[:2]
if h_orig > 480 or w_orig > 640:
img_resized = cv2.resize(img, (640, 480))
else:
img_resized = img
# 5. Face Detection
detected_faces = detector.detect_faces(img_resized)
logger.info(f"Faces detected: {len(detected_faces)}")
if not detected_faces:
update_camera_status(is_online=True, current_period=period, division=division)
return {
"success": True,
"faces_detected": 0,
"matches_found": 0,
"students_marked": [],
"already_marked": [],
"skipped_ambiguous": 0,
"period": period
}
# Drawback Fix - Capping at 10 faces max to maintain CPU performance
if len(detected_faces) > 10:
logger.info(f"Truncating {len(detected_faces)} detected faces to 10 for CPU speed.")
detected_faces = detected_faces[:10]
# 6. Parallel Recognition using ThreadPoolExecutor (8-second max timeout check)
results = []
futures = []
with ThreadPoolExecutor(max_workers=2) as executor:
for idx, box in enumerate(detected_faces):
f = executor.submit(_process_single_face_crop, idx, box, img_resized, known_profiles)
futures.append(f)
# Limit processing time to 8 seconds to prevent HTTP gateway timeouts
done, not_done = wait(futures, timeout=8.0)
for f in done:
try:
res = f.result()
if res is not None:
results.append(res)
except Exception as ex:
logger.error(f"Worker thread error during face recognition: {ex}")
# Cancel any slow hanging threads
for f in not_done:
f.cancel()
logger.warning("A face recognition thread timed out and was cancelled.")
# 7. Attendance Mapping & In-Memory Duplicate Check
students_marked = []
already_marked = []
skipped_ambiguous = 0
marked_set = get_marked_set(division, date, period_str)
for res in results:
match_id = res["match_id"]
if match_id == "AMBIGUOUS":
skipped_ambiguous += 1
continue
elif match_id is not None:
# Check in-memory list first to prevent hammering Firestore
if match_id in marked_set:
already_marked.append(match_id)
else:
student_name = known_profiles[match_id]["name"]
success = save_attendance(
student_id=match_id,
name=student_name,
period=period,
subject=None,
division=division,
date=date
)
if success:
marked_set.add(match_id)
students_marked.append(match_id)
# 8. Update Camera Last Scan Status
update_camera_status(is_online=True, current_period=period, division=division)
# FIX 1 — clear busy flag and record completion timestamp
_is_processing = False
_last_scan_timestamp = time.time()
logger.info(f"Scan processed: Detected={len(detected_faces)}, New Marks={len(students_marked)}, Already Marked={len(already_marked)}, Ambiguous={skipped_ambiguous}")
return {
"success": True,
"faces_detected": len(detected_faces),
"matches_found": len(students_marked) + len(already_marked),
"students_marked": students_marked,
"already_marked": already_marked,
"skipped_ambiguous": skipped_ambiguous,
"period": period
}
@app.post("/re-enroll")
async def re_enroll_student_route(
request: Request,
id_token: str = Form(...),
roll_no: str = Form(...),
name: str = Form(...),
division: str = Form(...),
photos: List[UploadFile] = File(...)
):
return await handle_enrollment_core(
request=request,
id_token=id_token,
roll_no=roll_no,
name=name,
division=division,
photos=photos,
is_re_enroll=True
)
@app.get("/photo/{roll_no}/{filename}")
async def get_student_photo(roll_no: str, filename: str):
repo = os.getenv("HF_DATASET_REPO", "MrRayZer/machub-biometric-storage")
token = os.getenv("HF_TOKEN")
url = f"https://huggingface.co/datasets/{repo}/resolve/main/{roll_no}/{filename}"
headers = {}
if token:
headers["Authorization"] = f"Bearer {token}"
try:
resp = requests.get(url, headers=headers, timeout=15)
if resp.status_code == 200:
return Response(content=resp.content, media_type=resp.headers.get("content-type", "image/jpeg"))
else:
raise HTTPException(status_code=resp.status_code, detail="Photo not found")
except Exception as e:
logger.error(f"Proxy photo download failed for {roll_no}/{filename}: {e}")
raise HTTPException(status_code=500, detail=f"Proxy error downloading photo: {str(e)}")
@app.get("/status/{roll_no}")
def get_enrollment_status(
roll_no: str,
auth: None = Depends(verify_api_key)
):
"""Queries the database to report the enrollment status of a student."""
db_client = get_firestore_db()
student_doc = db_client.collection("students").document(roll_no).get()
queue_doc = db_client.collection("enrollmentQueue").document(roll_no).get()
if student_doc.exists:
data = student_doc.to_dict()
return {
"roll_no": roll_no,
"enrolled": True,
"embedding_ready": "faceEmbedding" in data,
"photo_count": data.get("photoCount", 0),
"status": "complete"
}
elif queue_doc.exists:
data = queue_doc.to_dict()
return {
"roll_no": roll_no,
"enrolled": False,
"embedding_ready": False,
"photo_count": 0,
"status": data.get("status", "processing")
}
else:
raise HTTPException(
status_code=404,
detail=f"Student {roll_no} has no profile or queue item."
)
@app.delete("/student/{roll_no}")
def delete_student(
roll_no: str,
auth: None = Depends(verify_api_key)
):
"""Deletes face embedding profile records for a student."""
success = delete_student_embedding(roll_no)
if success:
return {"success": True, "message": f"Successfully deleted face embeddings for {roll_no}"}
else:
raise HTTPException(
status_code=500,
detail="Failed to delete student embeddings."
)
@app.get("/stats")
def get_stats(
auth: None = Depends(verify_api_key)
):
"""Returns analytics about the face recognition container instance."""
db_client = get_firestore_db()
# Fetch total enrolled students
total_enrolled = 0
try:
snap = db_client.collection("students").where("embeddingStatus", "==", "complete").count().get()
total_enrolled = snap[0][0].value
except Exception:
# Fallback if count query fails
try:
docs = db_client.collection("students").where("embeddingStatus", "==", "complete").stream()
total_enrolled = sum(1 for _ in docs)
except Exception:
pass
uptime = int(time.time() - START_TIME)
space_url = os.getenv("SPACE_URL", "http://localhost:7860")
return {
"total_enrolled_students": total_enrolled,
"scans_today": _scans_today,
"last_scan": _last_scan_time,
"uptime_seconds": uptime,
"space_url": space_url
}
@app.post("/refresh")
def refresh_embeddings_cache(
auth: None = Depends(verify_api_key)
):
"""Purges the in-memory student embeddings cache and re-queries Firestore."""
try:
logger.info("Retriggering embedding cache reload from admin request.")
profiles = load_all_embeddings(force_refresh=True)
return {
"success": True,
"message": f"Cached embeddings successfully rebuilt. Total profiles: {len(profiles)}"
}
except Exception as e:
logger.error(f"Failed to manually refresh embeddings: {e}")
raise HTTPException(
status_code=500,
detail=f"Rebuilding embedding cache failed: {str(e)}"
)
@app.post("/identify")
async def identify_faces(
face_crops: List[UploadFile] = File(...),
tracking_ids: Optional[List[str]] = Form(None),
auth: None = Depends(verify_api_key)
):
"""Identifies students from a list of cropped face images using the global embeddings cache and stateful hysteresis tracking."""
# Prune stale tracking sessions first to prevent memory leak
prune_stale_tracks()
known_profiles = load_all_embeddings()
if not known_profiles:
logger.warning("No student profiles in cache.")
return {"success": True, "faces": []}
results = []
for idx, crop_file in enumerate(face_crops):
tracking_id = None
if tracking_ids and idx < len(tracking_ids):
tracking_id = tracking_ids[idx]
try:
contents = await crop_file.read()
nparr = np.frombuffer(contents, np.uint8)
img = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
if img is None or img.size == 0:
results.append({
"index": idx,
"match": False,
"confidence": 0.0,
"name": "Invalid image"
})
continue
# Crop the face using the YOLOface detector (just like in enrollment) for perfect alignment
if detector and detector.loaded:
try:
faces = detector.detect_faces(img)
if faces:
# Take the largest face found
faces.sort(key=lambda b: (b[2]-b[0])*(b[3]-b[1]), reverse=True)
fx1, fy1, fx2, fy2 = faces[0]
crop = img[fy1:fy2, fx1:fx2]
if crop.size > 0:
img = crop
except Exception as detect_err:
logger.warning(f"Failed to recrop face crop in /identify: {detect_err}")
# Compute embedding
embedding = recognizer.get_embedding(img)
if embedding is None:
results.append({
"index": idx,
"match": False,
"confidence": 0.0,
"name": "No face embedding"
})
continue
if tracking_id:
# Update last seen timestamp for this track
_track_last_seen[tracking_id] = time.time()
# Append current frame embedding to the track's history
if tracking_id not in _track_embeddings:
_track_embeddings[tracking_id] = []
_track_embeddings[tracking_id].append(embedding)
_track_embeddings[tracking_id] = _track_embeddings[tracking_id][-5:]
# Compute smoothed master embedding
mean_emb = np.mean(_track_embeddings[tracking_id], axis=0)
norm = np.linalg.norm(mean_emb)
smoothed_emb = mean_emb / norm if norm > 0 else mean_emb
# Match master embedding against all known student profiles
scores = []
for roll_no, profile in known_profiles.items():
similarity = float(np.dot(smoothed_emb, profile["embedding"]))
scores.append((roll_no, similarity))
if not scores:
results.append({
"index": idx,
"match": False,
"confidence": 0.0,
"name": "Unknown"
})
continue
# Sort profiles by similarity score descending
scores.sort(key=lambda x: x[1], reverse=True)
top_roll, top_score = scores[0]
# Check for wrong match ambiguity (margin < 0.05 at >= 0.60 similarity)
is_ambiguous = False
if len(scores) > 1:
second_roll, second_score = scores[1]
if (top_score - second_score) < 0.05 and top_score >= 0.60:
is_ambiguous = True
locked = _track_identities.get(tracking_id)
if top_score < 0.60:
# Current frame fails retention threshold
if locked:
locked["consecutive_low_scores"] += 1
if locked["consecutive_low_scores"] >= 3:
logger.info(f"[TRACKER] Lock broken for {tracking_id} (identity {locked['rollNo']}) due to 3 consecutive low scores.")
_track_identities.pop(tracking_id, None)
results.append({
"index": idx,
"match": False,
"confidence": top_score,
"name": "Unknown"
})
else:
logger.info(f"[TRACKER] Hysteresis retain: {tracking_id} stays locked to {locked['rollNo']} (score {top_score:.3f}, consecutive: {locked['consecutive_low_scores']})")
results.append({
"index": idx,
"match": True,
"rollNo": locked["rollNo"],
"name": locked["name"],
"confidence": locked["score"]
})
else:
results.append({
"index": idx,
"match": False,
"confidence": top_score,
"name": "Unknown"
})
elif is_ambiguous:
# Current frame is ambiguous
if locked:
locked["consecutive_low_scores"] += 1
if locked["consecutive_low_scores"] >= 3:
logger.info(f"[TRACKER] Lock broken for {tracking_id} due to 3 consecutive ambiguous/low scores.")
_track_identities.pop(tracking_id, None)
results.append({
"index": idx,
"match": False,
"confidence": top_score,
"name": "Ambiguous"
})
else:
logger.info(f"[TRACKER] Hysteresis retain: {tracking_id} stays locked to {locked['rollNo']} despite ambiguity.")
results.append({
"index": idx,
"match": True,
"rollNo": locked["rollNo"],
"name": locked["name"],
"confidence": locked["score"]
})
else:
results.append({
"index": idx,
"match": False,
"confidence": top_score,
"name": "Ambiguous"
})
else: # top_score >= 0.60 and not ambiguous
if locked:
if top_roll == locked["rollNo"]:
# Same identity: reset low score count and refresh locked score
locked["consecutive_low_scores"] = 0
locked["score"] = top_score
results.append({
"index": idx,
"match": True,
"rollNo": top_roll,
"name": locked["name"],
"confidence": top_score
})
else:
# Identity switched! Check if it exceeds Lock-In (0.72)
if top_score >= 0.72:
logger.info(f"[TRACKER] Identity switched for {tracking_id} from {locked['rollNo']} to {top_roll} (score {top_score:.3f} >= 0.72)")
_track_identities[tracking_id] = {
"rollNo": top_roll,
"name": known_profiles[top_roll]["name"],
"score": top_score,
"consecutive_low_scores": 0
}
results.append({
"index": idx,
"match": True,
"rollNo": top_roll,
"name": known_profiles[top_roll]["name"],
"confidence": top_score
})
else:
# Overwrite failed lock-in: count as low score for currently locked student
locked["consecutive_low_scores"] += 1
if locked["consecutive_low_scores"] >= 3:
logger.info(f"[TRACKER] Lock broken for {tracking_id} due to 3 consecutive identity switches/low scores.")
_track_identities.pop(tracking_id, None)
results.append({
"index": idx,
"match": False,
"confidence": top_score,
"name": "Unknown"
})
else:
logger.info(f"[TRACKER] Hysteresis retain: {tracking_id} stays locked to {locked['rollNo']} (switch to {top_roll} rejected, score {top_score:.3f} < 0.72)")
results.append({
"index": idx,
"match": True,
"rollNo": locked["rollNo"],
"name": locked["name"],
"confidence": locked["score"]
})
else: # not locked yet
# Must exceed Lock-In Threshold (0.72) to create new lock
if top_score >= 0.72:
logger.info(f"[TRACKER] Locked new identity for {tracking_id}: {top_roll} (score {top_score:.3f} >= 0.72)")
_track_identities[tracking_id] = {
"rollNo": top_roll,
"name": known_profiles[top_roll]["name"],
"score": top_score,
"consecutive_low_scores": 0
}
results.append({
"index": idx,
"match": True,
"rollNo": top_roll,
"name": known_profiles[top_roll]["name"],
"confidence": top_score
})
else:
results.append({
"index": idx,
"match": False,
"confidence": top_score,
"name": "Unknown"
})
else:
# No tracking ID provided: fall back to frame-by-frame isolation matching
match_id, score = recognizer.match_student(embedding, known_profiles)
if match_id == "AMBIGUOUS":
results.append({
"index": idx,
"match": False,
"confidence": score,
"name": "Ambiguous"
})
elif match_id is not None:
results.append({
"index": idx,
"match": True,
"rollNo": match_id,
"name": known_profiles[match_id]["name"],
"confidence": score
})
else:
results.append({
"index": idx,
"match": False,
"confidence": score,
"name": "Unknown"
})
except Exception as e:
logger.error(f"Error processing face crop {crop_file.filename}: {e}")
results.append({
"index": idx,
"match": False,
"confidence": 0.0,
"name": f"Error: {str(e)}"
})
return {"success": True, "faces": results}
@app.post("/detect-phone")
async def detect_phone_in_frame(
frame: UploadFile = File(None),
photo: UploadFile = File(None),
auth: None = Depends(verify_api_key)
):
"""
Detects phones in a classroom frame using YOLOv8 and correlates each phone
with the nearest detected face centroid to identify the student holding it.
v1.1.0 — Added 2026-06-23
"""
start_time = time.time()
file_to_process = frame if frame is not None else photo
if file_to_process is None:
raise HTTPException(status_code=400, detail="Missing frame or photo file parameter.")
try:
contents = await file_to_process.read()
nparr = np.frombuffer(contents, np.uint8)
img = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
if img is None:
raise HTTPException(status_code=400, detail="Invalid frame: could not decode image.")
except HTTPException:
raise
except Exception as e:
raise HTTPException(status_code=400, detail=f"Error reading frame: {str(e)}")
h_orig, w_orig = img.shape[:2]
# Resize for faster processing
if h_orig > 480 or w_orig > 640:
img_resized = cv2.resize(img, (640, 480))
scale_x = w_orig / 640
scale_y = h_orig / 480
else:
img_resized = img
scale_x = 1.0
scale_y = 1.0
# 1. Detect faces to get face centroids
face_boxes = detector.detect_faces(img_resized) if detector else []
face_centroids = []
for box in face_boxes:
x1, y1, x2, y2 = box[:4]
cx = (x1 + x2) / 2
cy = (y1 + y2) / 2
face_centroids.append({"cx": cx, "cy": cy, "bbox": [x1, y1, x2, y2]})
# 2. Detect phones using YOLO
phone_detections = []
if phone_detector and phone_detector.loaded:
try:
phone_detections = phone_detector.detect_phones(img_resized)
logger.info(f"[PHONE] Scanning frame ({w_orig}x{h_orig}) for phone objects. Faces: {len(face_centroids)} | Phones: {len(phone_detections)}")
except Exception as e:
logger.warning(f"[PHONE] Detection error: {e}")
else:
logger.warning("[PHONE] Phone detector not loaded or missing. Skipping inference.")
elapsed_ms = int((time.time() - start_time) * 1000)
# 3. Build response and map nearest faces
result_phones = []
detections_compat = []
for phone in phone_detections:
px1, py1, px2, py2 = phone["bbox"]
phone_cx = (px1 + px2) / 2
phone_cy = (py1 + py2) / 2
# Find nearest face centroid
nearest_student = None
min_dist = float("inf")
for i, fc in enumerate(face_centroids):
dist = ((phone_cx - fc["cx"]) ** 2 + (phone_cy - fc["cy"]) ** 2) ** 0.5
if dist < min_dist:
min_dist = dist
nearest_student = i
# Absolute pixel boxes scaled back to original dimensions
result_phones.append({
"bbox": [int(px1 * scale_x), int(py1 * scale_y), int(px2 * scale_x), int(py2 * scale_y)],
"type": "mobile_phone",
"confidence": phone.get("confidence", 0.0),
"nearest_face_index": nearest_student,
"distance_to_face": round(min_dist * max(scale_x, scale_y), 1)
})
# Normalized boxes (0.0 to 1.0) on the 640x480 resized dimensions
detections_compat.append({
"box": [
max(0.0, min(1.0, px1 / 640.0)),
max(0.0, min(1.0, py1 / 480.0)),
max(0.0, min(1.0, px2 / 640.0)),
max(0.0, min(1.0, py2 / 480.0))
],
"confidence": phone.get("confidence", 0.0)
})
logger.info(f"[PHONE] Detection complete: {len(result_phones)} phones in {elapsed_ms}ms")
return {
"success": True,
"detected": len(detections_compat) > 0,
"detections": detections_compat,
"phones": result_phones,
"face_count": len(face_centroids),
"processing_time_ms": elapsed_ms
}