| from fastapi import APIRouter, File, UploadFile, Form, Request |
| from starlette.concurrency import run_in_threadpool |
| from starlette.responses import JSONResponse |
| from typing import List, Optional |
| import numpy as np |
| import cv2 |
| import base64 |
| import requests |
| from app.services.face_recognition import face_engine |
| from app.core.database import db |
| from app.models.schemas import AttendanceResponse, LightingAnalysis, ImageRequest |
| from datetime import datetime |
| from app.core.config import settings |
| from app.utils.anti_spoofing import anti_spoofing |
| import logging |
|
|
| logger = logging.getLogger("face-service-api") |
| router = APIRouter() |
|
|
| |
|
|
| def decode_image(contents: bytes) -> Optional[np.ndarray]: |
| """Decodes image bytes to a numpy array (OpenCV format).""" |
| nparr = np.frombuffer(contents, np.uint8) |
| return cv2.imdecode(nparr, cv2.IMREAD_COLOR) |
|
|
| def encode_image_base64(img: np.ndarray, quality: int = 60) -> str: |
| """Encodes a numpy image to base64 string.""" |
| _, buffer = cv2.imencode('.jpg', img, [cv2.IMWRITE_JPEG_QUALITY, quality]) |
| return base64.b64encode(buffer).decode('utf-8') |
|
|
| def calculate_average_embedding(embeddings: List[np.ndarray]) -> Optional[np.ndarray]: |
| """Calculates normalized average embedding from a list.""" |
| if not embeddings: |
| return None |
| avg = np.mean(embeddings, axis=0) |
| norm = np.linalg.norm(avg) |
| return avg / norm if norm > 0 else avg |
|
|
| def compare_embeddings(target: np.ndarray, stored: np.ndarray) -> float: |
| """Calculates cosine similarity between two embeddings.""" |
| return float(np.dot(target, stored)) |
|
|
| async def log_spoof_attempt(student_id: str, img: np.ndarray, score: float, lighting_data: dict = None): |
| """Log spoofing to core server without blocking.""" |
| try: |
| img_base64 = await run_in_threadpool(encode_image_base64, img) |
| |
| payload = { |
| "studentId": student_id, |
| "timestamp": datetime.utcnow().isoformat(), |
| "livenessScore": float(score), |
| "snapshot": img_base64, |
| "lighting_quality": lighting_data.get("lighting_quality") if lighting_data else "UNKNOWN", |
| "brightness": lighting_data.get("brightness") if lighting_data else 0 |
| } |
| |
| await run_in_threadpool( |
| requests.post, |
| f"{settings.CORE_SERVER_URL}/api/v1/audit/spoof-attempt", |
| json=payload, |
| timeout=5, |
| headers={"X-API-Key": getattr(settings, 'FACE_API_KEY', '')} |
| ) |
| except Exception as e: |
| logger.error(f"Failed to log spoof attempt: {e}") |
|
|
| |
|
|
| @router.post("/register", status_code=201) |
| async def register( |
| studentId: str = Form(...), |
| ownerAdmin: str = Form("SYSTEM"), |
| files: List[UploadFile] = File(...) |
| ): |
| logger.info(f"Registering student {studentId} with {len(files)} images") |
| |
| embeddings = [] |
| for file in files: |
| contents = await file.read() |
| img = await run_in_threadpool(decode_image, contents) |
| |
| if img is None: |
| continue |
| |
| emb = await run_in_threadpool(face_engine.extract_embedding, img) |
| if emb is not None: |
| embeddings.append(emb) |
|
|
| if not embeddings: |
| return JSONResponse( |
| status_code=400, |
| content={"success": False, "error": "No faces detected in the provided images."} |
| ) |
|
|
| |
| avg_embedding = await run_in_threadpool(calculate_average_embedding, embeddings) |
| |
| |
| student_data = { |
| "faceEmbedding": avg_embedding.tolist(), |
| "last_updated": datetime.utcnow() |
| } |
| |
| collection = db.get_collection() |
| existing = await collection.find_one({"studentId": studentId, "ownerAdmin": ownerAdmin}) |
| |
| if existing: |
| await collection.update_one({"studentId": studentId, "ownerAdmin": ownerAdmin}, {"$set": student_data}) |
| msg = "Student face recorded updated." |
| else: |
| student_data["studentId"] = studentId |
| student_data["ownerAdmin"] = ownerAdmin |
| await collection.insert_one(student_data) |
| msg = "New student face record created." |
|
|
| return { |
| "success": True, |
| "message": msg, |
| "studentId": studentId, |
| "faces_processed": len(embeddings) |
| } |
|
|
| @router.post("/check-lighting", response_model=LightingAnalysis) |
| async def check_lighting(request: ImageRequest): |
| try: |
| |
| _, encoded = request.image.split(",", 1) if "," in request.image else (None, request.image) |
| contents = base64.b64decode(encoded) |
| img = await run_in_threadpool(decode_image, contents) |
| |
| if img is None: |
| return JSONResponse( |
| status_code=400, |
| content={"success": False, "error": "Invalid image data."} |
| ) |
|
|
| lighting_result = await run_in_threadpool(anti_spoofing.analyze_lighting, img) |
| lighting_result["success"] = True |
| return lighting_result |
| except Exception as e: |
| return JSONResponse( |
| status_code=400, |
| content={"success": False, "error": f"Error processing image: {str(e)}"} |
| ) |
|
|
| @router.post("/mark-attendance") |
| async def mark_attendance( |
| file: UploadFile = File(...), |
| studentId: str = Form(...), |
| ownerAdmin: str = Form("SYSTEM") |
| ): |
| contents = await file.read() |
| img = await run_in_threadpool(decode_image, contents) |
| |
| if img is None: |
| return JSONResponse( |
| status_code=400, |
| content={"success": False, "error": "Invalid image file."} |
| ) |
|
|
| |
| lighting_result = await run_in_threadpool(anti_spoofing.analyze_lighting, img) |
| face_data_full = await run_in_threadpool(face_engine.process_complete, img) |
| |
| target_embedding = face_data_full["embedding"] |
| face_data = face_data_full["eye_data"] if face_data_full["face_detected"] else None |
|
|
| |
| liveness_score, liveness_passed = await run_in_threadpool(anti_spoofing.check_liveness_lbp, img) |
| |
| if not liveness_passed: |
| await log_spoof_attempt(studentId, img, liveness_score, lighting_result) |
| return { |
| "success": False, |
| "status": "SPOOF_DETECTED", |
| "error": "Possible photo or screen detected. Please use your real face.", |
| "liveness_score": float(liveness_score), |
| "liveness_passed": False, |
| "lighting_quality": lighting_result["lighting_quality"] |
| } |
|
|
| |
| blink_detected = False |
| if face_data: |
| left_ear = anti_spoofing.calculate_ear(face_data["left_eye"]) |
| right_ear = anti_spoofing.calculate_ear(face_data["right_eye"]) |
| avg_ear = (left_ear + right_ear) / 2.0 |
| blink_detected = avg_ear < settings.BLINK_EAR_THRESHOLD |
|
|
| if target_embedding is None: |
| return { |
| "success": False, |
| "status": "failed", |
| "error": "No face detected or processing failed.", |
| "liveness_score": float(liveness_score), |
| "liveness_passed": True |
| } |
|
|
| |
| collection = db.get_collection() |
| doc = await collection.find_one({"studentId": studentId, "ownerAdmin": ownerAdmin}) |
| |
| if not doc or "faceEmbedding" not in doc: |
| return { |
| "success": False, |
| "status": "failed", |
| "error": "Student not registered for face recognition.", |
| "liveness_score": float(liveness_score), |
| "liveness_passed": True |
| } |
|
|
| stored_embedding = await run_in_threadpool(np.array, doc["faceEmbedding"]) |
| sim = await run_in_threadpool(compare_embeddings, target_embedding, stored_embedding) |
| |
| threshold = settings.FACE_SIMILARITY_THRESHOLD |
| |
| if sim > threshold: |
| return { |
| "success": True, |
| "studentId": studentId, |
| "confidence": float(sim), |
| "status": "success", |
| "message": "Face recognized successfully.", |
| "liveness_score": float(liveness_score), |
| "blink_detected": blink_detected, |
| "lighting_quality": lighting_result["lighting_quality"] |
| } |
| else: |
| return { |
| "success": False, |
| "confidence": float(sim), |
| "status": "failed", |
| "error": "Face mismatch. Please try again.", |
| "liveness_score": float(liveness_score) |
| } |
|
|
| @router.post("/detect") |
| async def detect_face(file: UploadFile = File(...)): |
| contents = await file.read() |
| img = await run_in_threadpool(decode_image, contents) |
| |
| if img is None: |
| return JSONResponse( |
| status_code=400, |
| content={"success": False, "error": "Invalid image file."} |
| ) |
|
|
| detected, guidance = await run_in_threadpool(face_engine.detect_only, img) |
|
|
| return { |
| "success": True, |
| "detected": detected, |
| "guidance": guidance, |
| "status": "normal" |
| } |
|
|