AttendifyAI / app.py
Manish-2458's picture
Update app.py
e94b006 verified
import os
import cv2
import time
import pickle
import numpy as np
import face_recognition
import smtplib
from email.message import EmailMessage
from datetime import datetime, date
import gradio as gr
from sklearn import neighbors
from sklearn.metrics import accuracy_score
from sqlalchemy import create_engine, Column, String, Integer, Date, Time, UniqueConstraint, func
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from supabase import create_client, Client
# ------------------------------
# Configuration and Environment Variables
# ------------------------------
FRAME_RESIZE_FACTOR = 0.5
MODEL = "hog"
KNN_NEIGHBORS = 2
UNKNOWN_THRESHOLD = 0.5
REQUIRED_IDENTIFICATIONS = 3
LIVENESS_HISTORY_LENGTH = 3
LIVENESS_VARIATION_THRESHOLD = 0.03
# Email & Admin Configuration
SMTP_SERVER = os.environ.get("SMTP_SERVER", "smtp.gmail.com")
SMTP_PORT = int(os.environ.get("SMTP_PORT", 587))
SMTP_USERNAME = os.environ.get("SMTP_USERNAME")
SMTP_PASSWORD = os.environ.get("SMTP_PASSWORD")
SENDER_EMAIL = os.environ.get("SENDER_EMAIL")
ADMIN_EMAIL = os.environ.get("ADMIN_EMAIL")
ADMIN_PASSWORD = os.environ.get("ADMIN_PASSWORD")
# Supabase Configuration
SUPABASE_URL = os.environ.get("SUPABASE_URL")
SUPABASE_KEY = os.environ.get("SUPABASE_KEY")
STORAGE_BUCKET = os.environ.get("STORAGE_BUCKET", "face-encodings")
STORAGE_PATH = os.environ.get("STORAGE_PATH", "encodings.pkl")
KNOWN_FACES_BUCKET = os.environ.get("KNOWN_FACES_BUCKET", "known-faces")
# Database Configuration
DATABASE_URL = os.environ.get("DATABASE_URL")
# Global State
recognition_counts = {}
liveness_history = {}
supabase: Client = create_client(SUPABASE_URL, SUPABASE_KEY)
# ------------------------------
# Database Setup
# ------------------------------
engine = create_engine(DATABASE_URL, pool_size=5, max_overflow=10)
SessionLocal = sessionmaker(bind=engine)
Base = declarative_base()
class User(Base):
__tablename__ = "users"
roll_number = Column(String, primary_key=True)
name = Column(String)
email = Column(String)
parent_email = Column(String)
class Attendance(Base):
__tablename__ = "attendance"
id = Column(Integer, primary_key=True, autoincrement=True)
roll_number = Column(String)
name = Column(String)
date = Column(Date)
time = Column(Time)
__table_args__ = (UniqueConstraint('roll_number', 'date', name='_roll_date_uc'),)
Base.metadata.create_all(engine)
# ------------------------------
# Storage Functions
# ------------------------------
def upload_to_supabase(local_file, bucket, remote_path):
"""Generic upload function for Supabase storage."""
try:
with open(local_file, "rb") as f:
file_data = f.read()
res = supabase.storage.from_(bucket).upload(remote_path, file_data)
if res.get("error"):
raise Exception(res["error"])
return True
except Exception as e:
print(f"Upload error: {e}")
return False
def download_from_supabase(local_file, bucket, remote_path):
"""Generic download function for Supabase storage."""
try:
res = supabase.storage.from_(bucket).download(remote_path)
if res.get("error"):
raise Exception(res["error"])
with open(local_file, "wb") as f:
f.write(res["data"])
return True
except Exception as e:
print(f"Download error: {e}")
return False
# ------------------------------
# Face Recognition Functions
# ------------------------------
def compute_ear(eye):
"""Compute Eye Aspect Ratio for liveness detection."""
try:
A = np.linalg.norm(np.array(eye[1]) - np.array(eye[5]))
B = np.linalg.norm(np.array(eye[2]) - np.array(eye[4]))
C = np.linalg.norm(np.array(eye[0]) - np.array(eye[3]))
return (A + B) / (2.0 * C)
except:
return 0.0
def process_liveness(face_id, ear):
"""Process liveness detection using eye aspect ratio history."""
if face_id not in liveness_history:
liveness_history[face_id] = []
liveness_history[face_id].append(ear)
if len(liveness_history[face_id]) > LIVENESS_HISTORY_LENGTH:
liveness_history[face_id] = liveness_history[face_id][-LIVENESS_HISTORY_LENGTH:]
if len(liveness_history[face_id]) < LIVENESS_HISTORY_LENGTH:
return True
variation = max(liveness_history[face_id]) - min(liveness_history[face_id])
return variation > LIVENESS_VARIATION_THRESHOLD
def load_face_classifier():
"""Load and prepare the face recognition classifier."""
try:
download_from_supabase("temp_encodings.pkl", STORAGE_BUCKET, STORAGE_PATH)
with open("temp_encodings.pkl", 'rb') as f:
encodings, names = pickle.load(f)
os.remove("temp_encodings.pkl")
if not encodings:
raise Exception("No encodings found")
clf = neighbors.KNeighborsClassifier(
n_neighbors=KNN_NEIGHBORS,
algorithm='ball_tree',
weights='distance'
)
clf.fit(encodings, names)
return clf
except Exception as e:
print(f"Classifier loading error: {e}")
return None
# ------------------------------
# Attendance Processing
# ------------------------------
def process_attendance_frame(frame):
"""Process a video frame for attendance marking."""
if frame is None:
return None
global recognition_counts
# Resize frame
height, width = frame.shape[:2]
frame = cv2.resize(frame, (int(width * FRAME_RESIZE_FACTOR),
int(height * FRAME_RESIZE_FACTOR)))
# Convert to RGB
rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
# Detect faces
face_locations = face_recognition.face_locations(rgb_frame, model=MODEL)
if not face_locations:
return frame
# Get face encodings
face_encodings = face_recognition.face_encodings(rgb_frame, face_locations)
# Process each face
for (top, right, bottom, left), face_encoding in zip(face_locations, face_encodings):
try:
# Predict identity
distances, _ = knn_clf.kneighbors([face_encoding], n_neighbors=KNN_NEIGHBORS)
name = "Unknown"
if distances[0][0] < UNKNOWN_THRESHOLD:
name = knn_clf.predict([face_encoding])[0]
# Check liveness
face_landmarks = face_recognition.face_landmarks(
rgb_frame[max(0, top-20):min(bottom+20, frame.shape[0]),
max(0, left-20):min(right+20, frame.shape[1])]
)
live = True
if face_landmarks:
landmarks = face_landmarks[0]
if "left_eye" in landmarks and "right_eye" in landmarks:
left_ear = compute_ear(landmarks["left_eye"])
right_ear = compute_ear(landmarks["right_eye"])
ear = (left_ear + right_ear) / 2.0
face_id = f"{name}_{top}_{left}"
live = process_liveness(face_id, ear)
# Mark attendance if conditions met
if live and name != "Unknown":
recognition_counts[name] = recognition_counts.get(name, 0) + 1
if recognition_counts[name] >= REQUIRED_IDENTIFICATIONS:
mark_attendance(name)
recognition_counts[name] = 0
# Draw results
color = (0, 255, 0) if live else (0, 0, 255)
cv2.rectangle(frame, (left, top), (right, bottom), color, 2)
cv2.putText(frame, name, (left, top - 10),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 1)
except Exception as e:
print(f"Face processing error: {e}")
continue
return frame
def mark_attendance(roll_number):
"""Mark attendance in database and send notifications."""
try:
with SessionLocal() as session:
today = date.today()
now = datetime.now().time()
# Check if attendance already marked
exists = session.query(Attendance)\
.filter(Attendance.roll_number == roll_number,
Attendance.date == today)\
.first()
if exists is None:
# Get user details
user = session.query(User)\
.filter(User.roll_number == roll_number)\
.first()
if user:
# Mark attendance
new_attendance = Attendance(
roll_number=roll_number,
name=user.name,
date=today,
time=now
)
session.add(new_attendance)
session.commit()
# Send notifications
if user.email:
send_email(
user.email,
"Attendance Marked",
f"Your attendance for {today} has been marked at {now}"
)
if user.parent_email:
send_email(
user.parent_email,
"Attendance Notification",
f"Attendance for {user.name} was marked on {today} at {now}"
)
return True
except Exception as e:
print(f"Attendance marking error: {e}")
return False
# ------------------------------
# Email Functions
# ------------------------------
def send_email(to_email, subject, body):
"""Send email notification."""
try:
msg = EmailMessage()
msg.set_content(body)
msg["Subject"] = subject
msg["From"] = SENDER_EMAIL
msg["To"] = to_email
with smtplib.SMTP(SMTP_SERVER, SMTP_PORT) as server:
server.starttls()
server.login(SMTP_USERNAME, SMTP_PASSWORD)
server.send_message(msg)
return True
except Exception as e:
print(f"Email error: {e}")
return False
# ------------------------------
# Admin Functions
# ------------------------------
def admin_login(password):
"""Verify admin login."""
return password == ADMIN_PASSWORD
def register_face(roll_number, name, email, parent_email, images):
"""Register a new user with face images."""
try:
# Create directory for user
user_dir = os.path.join("known_faces", roll_number)
os.makedirs(user_dir, exist_ok=True)
# Process images
new_encodings = []
new_names = []
saved_count = 0
for i, img in enumerate(images):
# Convert and detect face
rgb_img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
face_locations = face_recognition.face_locations(rgb_img, model=MODEL)
if not face_locations:
continue
# Save image and compute encoding
filename = f"{roll_number}_{i}.jpg"
filepath = os.path.join(user_dir, filename)
cv2.imwrite(filepath, img)
# Upload to Supabase
upload_to_supabase(
filepath,
KNOWN_FACES_BUCKET,
f"{roll_number}/{filename}"
)
# Get face encoding
face_encoding = face_recognition.face_encodings(rgb_img)[0]
new_encodings.append(face_encoding)
new_names.append(roll_number)
saved_count += 1
if saved_count < 3:
return "Need at least 3 good quality face images"
# Add user to database
with SessionLocal() as session:
new_user = User(
roll_number=roll_number,
name=name,
email=email,
parent_email=parent_email
)
session.add(new_user)
session.commit()
# Update face encodings
download_from_supabase("temp_encodings.pkl", STORAGE_BUCKET, STORAGE_PATH)
with open("temp_encodings.pkl", 'rb') as f:
encodings, names = pickle.load(f)
encodings.extend(new_encodings)
names.extend(new_names)
with open("temp_encodings.pkl", 'wb') as f:
pickle.dump((encodings, names), f)
upload_to_supabase("temp_encodings.pkl", STORAGE_BUCKET, STORAGE_PATH)
os.remove("temp_encodings.pkl")
# Reload classifier
global knn_clf
knn_clf = load_face_classifier()
return "Registration successful"
except Exception as e:
print(f"Registration error: {e}")
return f"Registration failed: {str(e)}"
def get_attendance_stats():
"""Get attendance statistics."""
try:
with SessionLocal() as session:
stats = session.query(
Attendance.date,
func.count(Attendance.id)
).group_by(Attendance.date).all()
if not stats:
return "No attendance records found"
result = "Attendance Statistics:\n"
for date, count in stats:
result += f"{date}: {count} students\n"
return result
except Exception as e:
return f"Error fetching statistics: {str(e)}"
# ------------------------------
# Gradio Interface
# ------------------------------
def create_interface():
"""Create the Gradio interface."""
with gr.Blocks() as demo:
gr.Markdown("# Face Recognition Attendance System")
with gr.Tabs():
# Attendance Tab
with gr.Tab("Attendance"):
attendance_in = gr.Image(
source="webcam",
streaming=True,
label="Webcam Feed",
height=400,
width=600
)
attendance_out = gr.Image(label="Processed Feed")
attendance_in.change(
fn=process_attendance_frame,
inputs=attendance_in,
outputs=attendance_out,
batch=False
)
# Admin Tab
with gr.Tab("Admin"):
admin_pass = gr.Textbox(
type="password",
label="Admin Password"
)
with gr.Column(visible=False) as admin_panel:
gr.Markdown("## Register New User")
with gr.Row():
roll_number = gr.Textbox(label="Roll Number")
name = gr.Textbox(label="Name")
with gr.Row():
email = gr.Textbox(label="Student Email")
parent_email = gr.Textbox(label="Parent Email")
images = gr.File(
file_count="multiple",
label="Upload Face Images",
file_types=["image"]
)
register_btn = gr.Button("Register User")
register_output = gr.Textbox(label="Registration Status")
gr.Markdown("## Attendance Statistics")
stats_btn = gr.Button("View Statistics")
stats_output = gr.Textbox(label="Statistics")
def check_password(password):
return gr.Column.update(visible=admin_login(password))
admin_pass.change(
fn=check_password,
inputs=admin_pass,
outputs=admin_panel
)
register_btn.click(
fn=register_face,
inputs=[roll_number, name, email, parent_email, images],
outputs=register_output
)
stats_btn.click(
fn=get_attendance_stats,
inputs=None,
outputs=stats_output
)
return demo
# Initialize the classifier
knn_clf = load_face_classifier()
# Launch the interface
if __name__ == "__main__":
demo = create_interface()
demo.launch(share=True)