from fastapi import APIRouter, File, UploadFile, Form, Request from starlette.concurrency import run_in_threadpool from starlette.responses import JSONResponse from typing import List, Optional import numpy as np import cv2 import base64 import requests from app.services.face_recognition import face_engine from app.core.database import db from app.models.schemas import AttendanceResponse, LightingAnalysis, ImageRequest from datetime import datetime from app.core.config import settings from app.utils.anti_spoofing import anti_spoofing import logging logger = logging.getLogger("face-service-api") router = APIRouter() # --- HELPER FUNCTIONS FOR OFF-THREAD EXECUTION --- def decode_image(contents: bytes) -> Optional[np.ndarray]: """Decodes image bytes to a numpy array (OpenCV format).""" nparr = np.frombuffer(contents, np.uint8) return cv2.imdecode(nparr, cv2.IMREAD_COLOR) def encode_image_base64(img: np.ndarray, quality: int = 60) -> str: """Encodes a numpy image to base64 string.""" _, buffer = cv2.imencode('.jpg', img, [cv2.IMWRITE_JPEG_QUALITY, quality]) return base64.b64encode(buffer).decode('utf-8') def calculate_average_embedding(embeddings: List[np.ndarray]) -> Optional[np.ndarray]: """Calculates normalized average embedding from a list.""" if not embeddings: return None avg = np.mean(embeddings, axis=0) norm = np.linalg.norm(avg) return avg / norm if norm > 0 else avg def compare_embeddings(target: np.ndarray, stored: np.ndarray) -> float: """Calculates cosine similarity between two embeddings.""" return float(np.dot(target, stored)) async def log_spoof_attempt(student_id: str, img: np.ndarray, score: float, lighting_data: dict = None): """Log spoofing to core server without blocking.""" try: img_base64 = await run_in_threadpool(encode_image_base64, img) payload = { "studentId": student_id, "timestamp": datetime.utcnow().isoformat(), "livenessScore": float(score), "snapshot": img_base64, "lighting_quality": lighting_data.get("lighting_quality") if lighting_data else "UNKNOWN", "brightness": lighting_data.get("brightness") if lighting_data else 0 } await run_in_threadpool( requests.post, f"{settings.CORE_SERVER_URL}/api/v1/audit/spoof-attempt", json=payload, timeout=5, headers={"X-API-Key": getattr(settings, 'FACE_API_KEY', '')} ) except Exception as e: logger.error(f"Failed to log spoof attempt: {e}") # --- API ENDPOINTS --- @router.post("/register", status_code=201) async def register( studentId: str = Form(...), ownerAdmin: str = Form("SYSTEM"), files: List[UploadFile] = File(...) ): logger.info(f"Registering student {studentId} with {len(files)} images") embeddings = [] for file in files: contents = await file.read() img = await run_in_threadpool(decode_image, contents) if img is None: continue emb = await run_in_threadpool(face_engine.extract_embedding, img) if emb is not None: embeddings.append(emb) if not embeddings: return JSONResponse( status_code=400, content={"success": False, "error": "No faces detected in the provided images."} ) # Process embeddings (CPU heavy) avg_embedding = await run_in_threadpool(calculate_average_embedding, embeddings) # Store in MongoDB (Async IO) student_data = { "faceEmbedding": avg_embedding.tolist(), "last_updated": datetime.utcnow() } collection = db.get_collection() existing = await collection.find_one({"studentId": studentId, "ownerAdmin": ownerAdmin}) if existing: await collection.update_one({"studentId": studentId, "ownerAdmin": ownerAdmin}, {"$set": student_data}) msg = "Student face recorded updated." else: student_data["studentId"] = studentId student_data["ownerAdmin"] = ownerAdmin await collection.insert_one(student_data) msg = "New student face record created." return { "success": True, "message": msg, "studentId": studentId, "faces_processed": len(embeddings) } @router.post("/check-lighting", response_model=LightingAnalysis) async def check_lighting(request: ImageRequest): try: # Image is base64 _, encoded = request.image.split(",", 1) if "," in request.image else (None, request.image) contents = base64.b64decode(encoded) img = await run_in_threadpool(decode_image, contents) if img is None: return JSONResponse( status_code=400, content={"success": False, "error": "Invalid image data."} ) lighting_result = await run_in_threadpool(anti_spoofing.analyze_lighting, img) lighting_result["success"] = True return lighting_result except Exception as e: return JSONResponse( status_code=400, content={"success": False, "error": f"Error processing image: {str(e)}"} ) @router.post("/mark-attendance") async def mark_attendance( file: UploadFile = File(...), studentId: str = Form(...), ownerAdmin: str = Form("SYSTEM") ): contents = await file.read() img = await run_in_threadpool(decode_image, contents) if img is None: return JSONResponse( status_code=400, content={"success": False, "error": "Invalid image file."} ) # All heavy operations in threadpool lighting_result = await run_in_threadpool(anti_spoofing.analyze_lighting, img) face_data_full = await run_in_threadpool(face_engine.process_complete, img) target_embedding = face_data_full["embedding"] face_data = face_data_full["eye_data"] if face_data_full["face_detected"] else None # Liveness Detection (LBP is heavy) liveness_score, liveness_passed = await run_in_threadpool(anti_spoofing.check_liveness_lbp, img) if not liveness_passed: await log_spoof_attempt(studentId, img, liveness_score, lighting_result) return { "success": False, "status": "SPOOF_DETECTED", "error": "Possible photo or screen detected. Please use your real face.", "liveness_score": float(liveness_score), "liveness_passed": False, "lighting_quality": lighting_result["lighting_quality"] } # Blink Detection (EAR) blink_detected = False if face_data: left_ear = anti_spoofing.calculate_ear(face_data["left_eye"]) right_ear = anti_spoofing.calculate_ear(face_data["right_eye"]) avg_ear = (left_ear + right_ear) / 2.0 blink_detected = avg_ear < settings.BLINK_EAR_THRESHOLD if target_embedding is None: return { "success": False, "status": "failed", "error": "No face detected or processing failed.", "liveness_score": float(liveness_score), "liveness_passed": True } # Fetch stored embedding collection = db.get_collection() doc = await collection.find_one({"studentId": studentId, "ownerAdmin": ownerAdmin}) if not doc or "faceEmbedding" not in doc: return { "success": False, "status": "failed", "error": "Student not registered for face recognition.", "liveness_score": float(liveness_score), "liveness_passed": True } stored_embedding = await run_in_threadpool(np.array, doc["faceEmbedding"]) sim = await run_in_threadpool(compare_embeddings, target_embedding, stored_embedding) threshold = settings.FACE_SIMILARITY_THRESHOLD if sim > threshold: return { "success": True, "studentId": studentId, "confidence": float(sim), "status": "success", "message": "Face recognized successfully.", "liveness_score": float(liveness_score), "blink_detected": blink_detected, "lighting_quality": lighting_result["lighting_quality"] } else: return { "success": False, "confidence": float(sim), "status": "failed", "error": "Face mismatch. Please try again.", "liveness_score": float(liveness_score) } @router.post("/detect") async def detect_face(file: UploadFile = File(...)): contents = await file.read() img = await run_in_threadpool(decode_image, contents) if img is None: return JSONResponse( status_code=400, content={"success": False, "error": "Invalid image file."} ) detected, guidance = await run_in_threadpool(face_engine.detect_only, img) return { "success": True, "detected": detected, "guidance": guidance, "status": "normal" }