edusync / app /api /endpoints.py
sakshusat's picture
feat: implement initial face recognition service with API endpoints for registration, attendance, and anti-spoofing capabilities.
e51f7f4
from fastapi import APIRouter, File, UploadFile, Form, Request
from starlette.concurrency import run_in_threadpool
from starlette.responses import JSONResponse
from typing import List, Optional
import numpy as np
import cv2
import base64
import requests
from app.services.face_recognition import face_engine
from app.core.database import db
from app.models.schemas import AttendanceResponse, LightingAnalysis, ImageRequest
from datetime import datetime
from app.core.config import settings
from app.utils.anti_spoofing import anti_spoofing
import logging
logger = logging.getLogger("face-service-api")
router = APIRouter()
# --- HELPER FUNCTIONS FOR OFF-THREAD EXECUTION ---
def decode_image(contents: bytes) -> Optional[np.ndarray]:
"""Decodes image bytes to a numpy array (OpenCV format)."""
nparr = np.frombuffer(contents, np.uint8)
return cv2.imdecode(nparr, cv2.IMREAD_COLOR)
def encode_image_base64(img: np.ndarray, quality: int = 60) -> str:
"""Encodes a numpy image to base64 string."""
_, buffer = cv2.imencode('.jpg', img, [cv2.IMWRITE_JPEG_QUALITY, quality])
return base64.b64encode(buffer).decode('utf-8')
def calculate_average_embedding(embeddings: List[np.ndarray]) -> Optional[np.ndarray]:
"""Calculates normalized average embedding from a list."""
if not embeddings:
return None
avg = np.mean(embeddings, axis=0)
norm = np.linalg.norm(avg)
return avg / norm if norm > 0 else avg
def compare_embeddings(target: np.ndarray, stored: np.ndarray) -> float:
"""Calculates cosine similarity between two embeddings."""
return float(np.dot(target, stored))
async def log_spoof_attempt(student_id: str, img: np.ndarray, score: float, lighting_data: dict = None):
"""Log spoofing to core server without blocking."""
try:
img_base64 = await run_in_threadpool(encode_image_base64, img)
payload = {
"studentId": student_id,
"timestamp": datetime.utcnow().isoformat(),
"livenessScore": float(score),
"snapshot": img_base64,
"lighting_quality": lighting_data.get("lighting_quality") if lighting_data else "UNKNOWN",
"brightness": lighting_data.get("brightness") if lighting_data else 0
}
await run_in_threadpool(
requests.post,
f"{settings.CORE_SERVER_URL}/api/v1/audit/spoof-attempt",
json=payload,
timeout=5,
headers={"X-API-Key": getattr(settings, 'FACE_API_KEY', '')}
)
except Exception as e:
logger.error(f"Failed to log spoof attempt: {e}")
# --- API ENDPOINTS ---
@router.post("/register", status_code=201)
async def register(
studentId: str = Form(...),
ownerAdmin: str = Form("SYSTEM"),
files: List[UploadFile] = File(...)
):
logger.info(f"Registering student {studentId} with {len(files)} images")
embeddings = []
for file in files:
contents = await file.read()
img = await run_in_threadpool(decode_image, contents)
if img is None:
continue
emb = await run_in_threadpool(face_engine.extract_embedding, img)
if emb is not None:
embeddings.append(emb)
if not embeddings:
return JSONResponse(
status_code=400,
content={"success": False, "error": "No faces detected in the provided images."}
)
# Process embeddings (CPU heavy)
avg_embedding = await run_in_threadpool(calculate_average_embedding, embeddings)
# Store in MongoDB (Async IO)
student_data = {
"faceEmbedding": avg_embedding.tolist(),
"last_updated": datetime.utcnow()
}
collection = db.get_collection()
existing = await collection.find_one({"studentId": studentId, "ownerAdmin": ownerAdmin})
if existing:
await collection.update_one({"studentId": studentId, "ownerAdmin": ownerAdmin}, {"$set": student_data})
msg = "Student face recorded updated."
else:
student_data["studentId"] = studentId
student_data["ownerAdmin"] = ownerAdmin
await collection.insert_one(student_data)
msg = "New student face record created."
return {
"success": True,
"message": msg,
"studentId": studentId,
"faces_processed": len(embeddings)
}
@router.post("/check-lighting", response_model=LightingAnalysis)
async def check_lighting(request: ImageRequest):
try:
# Image is base64
_, encoded = request.image.split(",", 1) if "," in request.image else (None, request.image)
contents = base64.b64decode(encoded)
img = await run_in_threadpool(decode_image, contents)
if img is None:
return JSONResponse(
status_code=400,
content={"success": False, "error": "Invalid image data."}
)
lighting_result = await run_in_threadpool(anti_spoofing.analyze_lighting, img)
lighting_result["success"] = True
return lighting_result
except Exception as e:
return JSONResponse(
status_code=400,
content={"success": False, "error": f"Error processing image: {str(e)}"}
)
@router.post("/mark-attendance")
async def mark_attendance(
file: UploadFile = File(...),
studentId: str = Form(...),
ownerAdmin: str = Form("SYSTEM")
):
contents = await file.read()
img = await run_in_threadpool(decode_image, contents)
if img is None:
return JSONResponse(
status_code=400,
content={"success": False, "error": "Invalid image file."}
)
# All heavy operations in threadpool
lighting_result = await run_in_threadpool(anti_spoofing.analyze_lighting, img)
face_data_full = await run_in_threadpool(face_engine.process_complete, img)
target_embedding = face_data_full["embedding"]
face_data = face_data_full["eye_data"] if face_data_full["face_detected"] else None
# Liveness Detection (LBP is heavy)
liveness_score, liveness_passed = await run_in_threadpool(anti_spoofing.check_liveness_lbp, img)
if not liveness_passed:
await log_spoof_attempt(studentId, img, liveness_score, lighting_result)
return {
"success": False,
"status": "SPOOF_DETECTED",
"error": "Possible photo or screen detected. Please use your real face.",
"liveness_score": float(liveness_score),
"liveness_passed": False,
"lighting_quality": lighting_result["lighting_quality"]
}
# Blink Detection (EAR)
blink_detected = False
if face_data:
left_ear = anti_spoofing.calculate_ear(face_data["left_eye"])
right_ear = anti_spoofing.calculate_ear(face_data["right_eye"])
avg_ear = (left_ear + right_ear) / 2.0
blink_detected = avg_ear < settings.BLINK_EAR_THRESHOLD
if target_embedding is None:
return {
"success": False,
"status": "failed",
"error": "No face detected or processing failed.",
"liveness_score": float(liveness_score),
"liveness_passed": True
}
# Fetch stored embedding
collection = db.get_collection()
doc = await collection.find_one({"studentId": studentId, "ownerAdmin": ownerAdmin})
if not doc or "faceEmbedding" not in doc:
return {
"success": False,
"status": "failed",
"error": "Student not registered for face recognition.",
"liveness_score": float(liveness_score),
"liveness_passed": True
}
stored_embedding = await run_in_threadpool(np.array, doc["faceEmbedding"])
sim = await run_in_threadpool(compare_embeddings, target_embedding, stored_embedding)
threshold = settings.FACE_SIMILARITY_THRESHOLD
if sim > threshold:
return {
"success": True,
"studentId": studentId,
"confidence": float(sim),
"status": "success",
"message": "Face recognized successfully.",
"liveness_score": float(liveness_score),
"blink_detected": blink_detected,
"lighting_quality": lighting_result["lighting_quality"]
}
else:
return {
"success": False,
"confidence": float(sim),
"status": "failed",
"error": "Face mismatch. Please try again.",
"liveness_score": float(liveness_score)
}
@router.post("/detect")
async def detect_face(file: UploadFile = File(...)):
contents = await file.read()
img = await run_in_threadpool(decode_image, contents)
if img is None:
return JSONResponse(
status_code=400,
content={"success": False, "error": "Invalid image file."}
)
detected, guidance = await run_in_threadpool(face_engine.detect_only, img)
return {
"success": True,
"detected": detected,
"guidance": guidance,
"status": "normal"
}