|
|
|
|
|
""" |
|
|
FastAPI Attendance System - Production Ready |
|
|
Run with: uvicorn main:app --host 0.0.0.0 --port 5001 --workers 4 |
|
|
""" |
|
|
|
|
|
import base64 |
|
|
import io |
|
|
import os |
|
|
import sys |
|
|
from datetime import datetime |
|
|
from typing import Optional |
|
|
|
|
|
import cv2 |
|
|
import numpy as np |
|
|
import face_recognition |
|
|
import mysql.connector |
|
|
from fastapi import FastAPI, File, Form, UploadFile, HTTPException, status |
|
|
from fastapi.middleware.cors import CORSMiddleware |
|
|
from fastapi.responses import JSONResponse |
|
|
from PIL import Image |
|
|
from pydantic import BaseModel |
|
|
from mysql.connector import Error |
|
|
from werkzeug.utils import secure_filename |
|
|
|
|
|
|
|
|
from attendance_marking import markAttendance |
|
|
|
|
|
|
|
|
DB_CONFIG = { |
|
|
'user': 'root', |
|
|
'password': 'redclaws', |
|
|
'host': 'localhost', |
|
|
'database': 'NewAttn', |
|
|
} |
|
|
|
|
|
PATH_KNOWN_DATA = 'known_data' |
|
|
PATH_TRAINING_IMAGES = 'Training_images' |
|
|
FACE_DISTANCE_THRESHOLD = 0.5 |
|
|
|
|
|
|
|
|
os.makedirs(PATH_KNOWN_DATA, exist_ok=True) |
|
|
os.makedirs(PATH_TRAINING_IMAGES, exist_ok=True) |
|
|
|
|
|
|
|
|
class AttendanceRequest(BaseModel): |
|
|
image: str |
|
|
|
|
|
class AttendanceResponse(BaseModel): |
|
|
status: str |
|
|
message: str |
|
|
emp_id: Optional[str] = None |
|
|
distance: Optional[str] = None |
|
|
|
|
|
class RegistrationResponse(BaseModel): |
|
|
status: str |
|
|
message: str |
|
|
emp_id: Optional[str] = None |
|
|
|
|
|
|
|
|
def load_known_data(): |
|
|
"""Loads encodings and emp_ids from the known_data directory.""" |
|
|
try: |
|
|
encodings = np.load( |
|
|
os.path.join(PATH_KNOWN_DATA, 'known_encodings.npy'), |
|
|
allow_pickle=True |
|
|
) |
|
|
with open(os.path.join(PATH_KNOWN_DATA, 'known_names.txt'), 'r') as f: |
|
|
emp_ids = [line.strip() for line in f.readlines()] |
|
|
print(f"✅ Loaded {len(encodings)} faces.") |
|
|
return encodings, emp_ids |
|
|
except FileNotFoundError: |
|
|
print("⚠️ No existing known data found. Starting fresh.") |
|
|
return np.array([]), [] |
|
|
def findEncodings(images): |
|
|
encodeList = [] |
|
|
for img in images: |
|
|
img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) |
|
|
face_encodings = face_recognition.face_encodings(img) |
|
|
if face_encodings: |
|
|
encodeList.append(face_encodings[0]) |
|
|
else: |
|
|
print("Warning: No face found in one of the training images!") |
|
|
return encodeList |
|
|
|
|
|
|
|
|
def markAttendance(emp_id): |
|
|
now = datetime.now() |
|
|
dtString = now.strftime('%H:%M:%S') |
|
|
dateString = now.strftime('%Y-%m-%d') |
|
|
|
|
|
cursor = None |
|
|
try: |
|
|
record_count=0 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if record_count == 0: |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return True , "new" |
|
|
else: |
|
|
return False , "duplicate" |
|
|
|
|
|
except Error as e: |
|
|
|
|
|
|
|
|
return False , f"error: {e}" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
app = FastAPI( |
|
|
title="Facial Attendance System API", |
|
|
description="Production-ready facial recognition attendance system", |
|
|
version="1.0.0" |
|
|
) |
|
|
|
|
|
|
|
|
app.add_middleware( |
|
|
CORSMiddleware, |
|
|
allow_origins=["*"], |
|
|
allow_credentials=True, |
|
|
allow_methods=["*"], |
|
|
allow_headers=["*"], |
|
|
) |
|
|
|
|
|
|
|
|
encodeListKnown = np.array([]) |
|
|
classNames = [] |
|
|
|
|
|
|
|
|
@app.on_event("startup") |
|
|
async def startup_event(): |
|
|
"""Load known face encodings on startup.""" |
|
|
global encodeListKnown, classNames |
|
|
encodeListKnown, classNames = load_known_data() |
|
|
print("🚀 FastAPI Attendance System Started") |
|
|
|
|
|
|
|
|
@app.get("/health") |
|
|
async def health_check(): |
|
|
"""Health check endpoint.""" |
|
|
return { |
|
|
"status": "healthy", |
|
|
"timestamp": datetime.now().isoformat(), |
|
|
"registered_employees": len(classNames) |
|
|
} |
|
|
|
|
|
|
|
|
@app.post("/api/mark_attendance", response_model=AttendanceResponse) |
|
|
async def mark_attendance_api(request: AttendanceRequest): |
|
|
""" |
|
|
Mark attendance using facial recognition. |
|
|
|
|
|
Args: |
|
|
request: AttendanceRequest containing base64 encoded image |
|
|
|
|
|
Returns: |
|
|
AttendanceResponse with status and details |
|
|
""" |
|
|
if not request.image: |
|
|
raise HTTPException( |
|
|
status_code=status.HTTP_400_BAD_REQUEST, |
|
|
detail="No image data provided" |
|
|
) |
|
|
|
|
|
|
|
|
try: |
|
|
|
|
|
image_bytes = base64.b64decode(request.image) |
|
|
img = Image.open(io.BytesIO(image_bytes)).convert('RGB') |
|
|
img_np = np.array(img) |
|
|
img_bgr = cv2.cvtColor(img_np, cv2.COLOR_RGB2BGR) |
|
|
|
|
|
|
|
|
imgS = cv2.resize(img_bgr, (0, 0), None, 0.25, 0.25) |
|
|
imgS = cv2.cvtColor(imgS, cv2.COLOR_BGR2RGB) |
|
|
|
|
|
|
|
|
facesCurFrame = face_recognition.face_locations(imgS) |
|
|
encodesCurFrame = face_recognition.face_encodings(imgS, facesCurFrame) |
|
|
|
|
|
if not facesCurFrame: |
|
|
return AttendanceResponse( |
|
|
status="failure", |
|
|
message="No face detected in the image." |
|
|
) |
|
|
|
|
|
|
|
|
encodeFace = encodesCurFrame[0] |
|
|
matches = face_recognition.compare_faces(encodeListKnown, encodeFace) |
|
|
faceDis = face_recognition.face_distance(encodeListKnown, encodeFace) |
|
|
|
|
|
if len(faceDis) == 0: |
|
|
return AttendanceResponse( |
|
|
status="failure", |
|
|
message="No registered employees in the system." |
|
|
) |
|
|
|
|
|
matchIndex = np.argmin(faceDis) |
|
|
min_distance = faceDis[matchIndex] |
|
|
|
|
|
|
|
|
if matches[matchIndex] and min_distance < FACE_DISTANCE_THRESHOLD: |
|
|
employee_id = classNames[matchIndex].upper() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
was_marked, status_msg = markAttendance(employee_id) |
|
|
|
|
|
if was_marked: |
|
|
return AttendanceResponse( |
|
|
status="success", |
|
|
message=f"Attendance marked for {employee_id}.", |
|
|
emp_id=employee_id, |
|
|
distance=f"{min_distance:.2f}" |
|
|
) |
|
|
elif status_msg == "duplicate": |
|
|
return AttendanceResponse( |
|
|
status="info", |
|
|
message=f"{employee_id} already logged today.", |
|
|
emp_id=employee_id, |
|
|
distance=f"{min_distance:.2f}" |
|
|
) |
|
|
else: |
|
|
return AttendanceResponse( |
|
|
status="error", |
|
|
message=f"Failed to mark attendance: {status_msg}" |
|
|
) |
|
|
else: |
|
|
return AttendanceResponse( |
|
|
status="failure", |
|
|
message=f"Unknown person detected. Min Distance: {min_distance:.2f}" |
|
|
) |
|
|
|
|
|
except Error as err: |
|
|
print(f"❌ Database Error: {db_err}") |
|
|
raise HTTPException( |
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, |
|
|
detail=f"Database error: {str(db_err)}" |
|
|
) |
|
|
except Exception as e: |
|
|
print(f"❌ Exception: {e}") |
|
|
raise HTTPException( |
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, |
|
|
detail=f"Internal server error: {str(e)}" |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@app.post("/api/register_employee", response_model=RegistrationResponse) |
|
|
async def register_employee( |
|
|
emp_id: str = Form(...), |
|
|
image: UploadFile = File(...) |
|
|
): |
|
|
""" |
|
|
Register a new employee with facial recognition. |
|
|
|
|
|
Args: |
|
|
emp_id: Employee ID |
|
|
image: Employee face image file |
|
|
|
|
|
Returns: |
|
|
RegistrationResponse with status and details |
|
|
""" |
|
|
global encodeListKnown, classNames |
|
|
|
|
|
db_conn = None |
|
|
cursor = None |
|
|
image_path = None |
|
|
|
|
|
try: |
|
|
|
|
|
if not emp_id or not image: |
|
|
raise HTTPException( |
|
|
status_code=status.HTTP_400_BAD_REQUEST, |
|
|
detail="Employee ID and image are required." |
|
|
) |
|
|
|
|
|
|
|
|
emp_id = emp_id.strip().upper() |
|
|
|
|
|
|
|
|
if emp_id in classNames: |
|
|
raise HTTPException( |
|
|
status_code=status.HTTP_400_BAD_REQUEST, |
|
|
detail=f"Employee {emp_id} already exists in the system." |
|
|
) |
|
|
|
|
|
|
|
|
if not image.content_type.startswith('image/'): |
|
|
raise HTTPException( |
|
|
status_code=status.HTTP_400_BAD_REQUEST, |
|
|
detail="Invalid file type. Please upload an image." |
|
|
) |
|
|
|
|
|
|
|
|
filename = secure_filename(f"{emp_id}.jpg") |
|
|
image_path = os.path.join(PATH_TRAINING_IMAGES, filename) |
|
|
|
|
|
|
|
|
contents = await image.read() |
|
|
with open(image_path, 'wb') as f: |
|
|
f.write(contents) |
|
|
|
|
|
|
|
|
img = cv2.imread(image_path) |
|
|
if img is None: |
|
|
if os.path.exists(image_path): |
|
|
os.remove(image_path) |
|
|
raise HTTPException( |
|
|
status_code=status.HTTP_400_BAD_REQUEST, |
|
|
detail="Failed to read the uploaded image." |
|
|
) |
|
|
|
|
|
|
|
|
img_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) |
|
|
|
|
|
|
|
|
face_encodings = face_recognition.face_encodings(img_rgb) |
|
|
|
|
|
if not face_encodings: |
|
|
os.remove(image_path) |
|
|
raise HTTPException( |
|
|
status_code=status.HTTP_400_BAD_REQUEST, |
|
|
detail="No face detected in the image. Please upload a clear face photo." |
|
|
) |
|
|
|
|
|
if len(face_encodings) > 1: |
|
|
os.remove(image_path) |
|
|
raise HTTPException( |
|
|
status_code=status.HTTP_400_BAD_REQUEST, |
|
|
detail="Multiple faces detected. Please upload an image with only one face." |
|
|
) |
|
|
|
|
|
|
|
|
new_encoding = face_encodings[0] |
|
|
|
|
|
|
|
|
if len(encodeListKnown) == 0: |
|
|
encodeListKnown = np.array([new_encoding]) |
|
|
else: |
|
|
encodeListKnown = np.vstack([encodeListKnown, new_encoding]) |
|
|
|
|
|
classNames.append(emp_id) |
|
|
|
|
|
|
|
|
np.save( |
|
|
os.path.join(PATH_KNOWN_DATA, 'known_encodings.npy'), |
|
|
encodeListKnown |
|
|
) |
|
|
|
|
|
with open(os.path.join(PATH_KNOWN_DATA, 'known_names.txt'), 'w') as f: |
|
|
for name in classNames: |
|
|
f.write(f"{name}\n") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
print(f"✅ Successfully registered employee: {emp_id}") |
|
|
|
|
|
return RegistrationResponse( |
|
|
status="success", |
|
|
message=f"Employee {emp_id} registered successfully!", |
|
|
emp_id=emp_id |
|
|
) |
|
|
|
|
|
except HTTPException: |
|
|
|
|
|
raise |
|
|
except Exception as e: |
|
|
print(f"❌ Error during registration: {e}") |
|
|
if db_conn: |
|
|
db_conn.rollback() |
|
|
if image_path and os.path.exists(image_path): |
|
|
os.remove(image_path) |
|
|
raise HTTPException( |
|
|
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, |
|
|
detail=f"Registration failed: {str(e)}" |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@app.get("/api/employees") |
|
|
async def get_employees(): |
|
|
"""Get list of all registered employees.""" |
|
|
return { |
|
|
"status": "success", |
|
|
"count": len(classNames), |
|
|
"employees": classNames |
|
|
} |
|
|
|
|
|
|
|
|
@app.get("/") |
|
|
async def root(): |
|
|
"""Root endpoint.""" |
|
|
return { |
|
|
"message": "Facial Attendance System API", |
|
|
"version": "1.0.0", |
|
|
"docs": "/docs", |
|
|
"health": "/health" |
|
|
} |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
import uvicorn |
|
|
uvicorn.run( |
|
|
"app:app", |
|
|
host="0.0.0.0", |
|
|
port=5001, |
|
|
reload=False, |
|
|
workers=1 |
|
|
|
|
|
) |
|
|
|