import os import cv2 import time import pickle import numpy as np import face_recognition import smtplib from email.message import EmailMessage from datetime import datetime, date import gradio as gr from sklearn import neighbors from sklearn.metrics import accuracy_score from sqlalchemy import create_engine, Column, String, Integer, Date, Time, UniqueConstraint, func from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.orm import sessionmaker from supabase import create_client, Client # ------------------------------ # Configuration and Environment Variables # ------------------------------ FRAME_RESIZE_FACTOR = 0.5 MODEL = "hog" KNN_NEIGHBORS = 2 UNKNOWN_THRESHOLD = 0.5 REQUIRED_IDENTIFICATIONS = 3 LIVENESS_HISTORY_LENGTH = 3 LIVENESS_VARIATION_THRESHOLD = 0.03 # Email & Admin Configuration SMTP_SERVER = os.environ.get("SMTP_SERVER", "smtp.gmail.com") SMTP_PORT = int(os.environ.get("SMTP_PORT", 587)) SMTP_USERNAME = os.environ.get("SMTP_USERNAME") SMTP_PASSWORD = os.environ.get("SMTP_PASSWORD") SENDER_EMAIL = os.environ.get("SENDER_EMAIL") ADMIN_EMAIL = os.environ.get("ADMIN_EMAIL") ADMIN_PASSWORD = os.environ.get("ADMIN_PASSWORD") # Supabase Configuration SUPABASE_URL = os.environ.get("SUPABASE_URL") SUPABASE_KEY = os.environ.get("SUPABASE_KEY") STORAGE_BUCKET = os.environ.get("STORAGE_BUCKET", "face-encodings") STORAGE_PATH = os.environ.get("STORAGE_PATH", "encodings.pkl") KNOWN_FACES_BUCKET = os.environ.get("KNOWN_FACES_BUCKET", "known-faces") # Database Configuration DATABASE_URL = os.environ.get("DATABASE_URL") # Global State recognition_counts = {} liveness_history = {} supabase: Client = create_client(SUPABASE_URL, SUPABASE_KEY) # ------------------------------ # Database Setup # ------------------------------ engine = create_engine(DATABASE_URL, pool_size=5, max_overflow=10) SessionLocal = sessionmaker(bind=engine) Base = declarative_base() class User(Base): __tablename__ = "users" roll_number = Column(String, primary_key=True) name = Column(String) email = Column(String) parent_email = Column(String) class Attendance(Base): __tablename__ = "attendance" id = Column(Integer, primary_key=True, autoincrement=True) roll_number = Column(String) name = Column(String) date = Column(Date) time = Column(Time) __table_args__ = (UniqueConstraint('roll_number', 'date', name='_roll_date_uc'),) Base.metadata.create_all(engine) # ------------------------------ # Storage Functions # ------------------------------ def upload_to_supabase(local_file, bucket, remote_path): """Generic upload function for Supabase storage.""" try: with open(local_file, "rb") as f: file_data = f.read() res = supabase.storage.from_(bucket).upload(remote_path, file_data) if res.get("error"): raise Exception(res["error"]) return True except Exception as e: print(f"Upload error: {e}") return False def download_from_supabase(local_file, bucket, remote_path): """Generic download function for Supabase storage.""" try: res = supabase.storage.from_(bucket).download(remote_path) if res.get("error"): raise Exception(res["error"]) with open(local_file, "wb") as f: f.write(res["data"]) return True except Exception as e: print(f"Download error: {e}") return False # ------------------------------ # Face Recognition Functions # ------------------------------ def compute_ear(eye): """Compute Eye Aspect Ratio for liveness detection.""" try: A = np.linalg.norm(np.array(eye[1]) - np.array(eye[5])) B = np.linalg.norm(np.array(eye[2]) - np.array(eye[4])) C = np.linalg.norm(np.array(eye[0]) - np.array(eye[3])) return (A + B) / (2.0 * C) except: return 0.0 def process_liveness(face_id, ear): """Process liveness detection using eye aspect ratio history.""" if face_id not in liveness_history: liveness_history[face_id] = [] liveness_history[face_id].append(ear) if len(liveness_history[face_id]) > LIVENESS_HISTORY_LENGTH: liveness_history[face_id] = liveness_history[face_id][-LIVENESS_HISTORY_LENGTH:] if len(liveness_history[face_id]) < LIVENESS_HISTORY_LENGTH: return True variation = max(liveness_history[face_id]) - min(liveness_history[face_id]) return variation > LIVENESS_VARIATION_THRESHOLD def load_face_classifier(): """Load and prepare the face recognition classifier.""" try: download_from_supabase("temp_encodings.pkl", STORAGE_BUCKET, STORAGE_PATH) with open("temp_encodings.pkl", 'rb') as f: encodings, names = pickle.load(f) os.remove("temp_encodings.pkl") if not encodings: raise Exception("No encodings found") clf = neighbors.KNeighborsClassifier( n_neighbors=KNN_NEIGHBORS, algorithm='ball_tree', weights='distance' ) clf.fit(encodings, names) return clf except Exception as e: print(f"Classifier loading error: {e}") return None # ------------------------------ # Attendance Processing # ------------------------------ def process_attendance_frame(frame): """Process a video frame for attendance marking.""" if frame is None: return None global recognition_counts # Resize frame height, width = frame.shape[:2] frame = cv2.resize(frame, (int(width * FRAME_RESIZE_FACTOR), int(height * FRAME_RESIZE_FACTOR))) # Convert to RGB rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) # Detect faces face_locations = face_recognition.face_locations(rgb_frame, model=MODEL) if not face_locations: return frame # Get face encodings face_encodings = face_recognition.face_encodings(rgb_frame, face_locations) # Process each face for (top, right, bottom, left), face_encoding in zip(face_locations, face_encodings): try: # Predict identity distances, _ = knn_clf.kneighbors([face_encoding], n_neighbors=KNN_NEIGHBORS) name = "Unknown" if distances[0][0] < UNKNOWN_THRESHOLD: name = knn_clf.predict([face_encoding])[0] # Check liveness face_landmarks = face_recognition.face_landmarks( rgb_frame[max(0, top-20):min(bottom+20, frame.shape[0]), max(0, left-20):min(right+20, frame.shape[1])] ) live = True if face_landmarks: landmarks = face_landmarks[0] if "left_eye" in landmarks and "right_eye" in landmarks: left_ear = compute_ear(landmarks["left_eye"]) right_ear = compute_ear(landmarks["right_eye"]) ear = (left_ear + right_ear) / 2.0 face_id = f"{name}_{top}_{left}" live = process_liveness(face_id, ear) # Mark attendance if conditions met if live and name != "Unknown": recognition_counts[name] = recognition_counts.get(name, 0) + 1 if recognition_counts[name] >= REQUIRED_IDENTIFICATIONS: mark_attendance(name) recognition_counts[name] = 0 # Draw results color = (0, 255, 0) if live else (0, 0, 255) cv2.rectangle(frame, (left, top), (right, bottom), color, 2) cv2.putText(frame, name, (left, top - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 1) except Exception as e: print(f"Face processing error: {e}") continue return frame def mark_attendance(roll_number): """Mark attendance in database and send notifications.""" try: with SessionLocal() as session: today = date.today() now = datetime.now().time() # Check if attendance already marked exists = session.query(Attendance)\ .filter(Attendance.roll_number == roll_number, Attendance.date == today)\ .first() if exists is None: # Get user details user = session.query(User)\ .filter(User.roll_number == roll_number)\ .first() if user: # Mark attendance new_attendance = Attendance( roll_number=roll_number, name=user.name, date=today, time=now ) session.add(new_attendance) session.commit() # Send notifications if user.email: send_email( user.email, "Attendance Marked", f"Your attendance for {today} has been marked at {now}" ) if user.parent_email: send_email( user.parent_email, "Attendance Notification", f"Attendance for {user.name} was marked on {today} at {now}" ) return True except Exception as e: print(f"Attendance marking error: {e}") return False # ------------------------------ # Email Functions # ------------------------------ def send_email(to_email, subject, body): """Send email notification.""" try: msg = EmailMessage() msg.set_content(body) msg["Subject"] = subject msg["From"] = SENDER_EMAIL msg["To"] = to_email with smtplib.SMTP(SMTP_SERVER, SMTP_PORT) as server: server.starttls() server.login(SMTP_USERNAME, SMTP_PASSWORD) server.send_message(msg) return True except Exception as e: print(f"Email error: {e}") return False # ------------------------------ # Admin Functions # ------------------------------ def admin_login(password): """Verify admin login.""" return password == ADMIN_PASSWORD def register_face(roll_number, name, email, parent_email, images): """Register a new user with face images.""" try: # Create directory for user user_dir = os.path.join("known_faces", roll_number) os.makedirs(user_dir, exist_ok=True) # Process images new_encodings = [] new_names = [] saved_count = 0 for i, img in enumerate(images): # Convert and detect face rgb_img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) face_locations = face_recognition.face_locations(rgb_img, model=MODEL) if not face_locations: continue # Save image and compute encoding filename = f"{roll_number}_{i}.jpg" filepath = os.path.join(user_dir, filename) cv2.imwrite(filepath, img) # Upload to Supabase upload_to_supabase( filepath, KNOWN_FACES_BUCKET, f"{roll_number}/{filename}" ) # Get face encoding face_encoding = face_recognition.face_encodings(rgb_img)[0] new_encodings.append(face_encoding) new_names.append(roll_number) saved_count += 1 if saved_count < 3: return "Need at least 3 good quality face images" # Add user to database with SessionLocal() as session: new_user = User( roll_number=roll_number, name=name, email=email, parent_email=parent_email ) session.add(new_user) session.commit() # Update face encodings download_from_supabase("temp_encodings.pkl", STORAGE_BUCKET, STORAGE_PATH) with open("temp_encodings.pkl", 'rb') as f: encodings, names = pickle.load(f) encodings.extend(new_encodings) names.extend(new_names) with open("temp_encodings.pkl", 'wb') as f: pickle.dump((encodings, names), f) upload_to_supabase("temp_encodings.pkl", STORAGE_BUCKET, STORAGE_PATH) os.remove("temp_encodings.pkl") # Reload classifier global knn_clf knn_clf = load_face_classifier() return "Registration successful" except Exception as e: print(f"Registration error: {e}") return f"Registration failed: {str(e)}" def get_attendance_stats(): """Get attendance statistics.""" try: with SessionLocal() as session: stats = session.query( Attendance.date, func.count(Attendance.id) ).group_by(Attendance.date).all() if not stats: return "No attendance records found" result = "Attendance Statistics:\n" for date, count in stats: result += f"{date}: {count} students\n" return result except Exception as e: return f"Error fetching statistics: {str(e)}" # ------------------------------ # Gradio Interface # ------------------------------ def create_interface(): """Create the Gradio interface.""" with gr.Blocks() as demo: gr.Markdown("# Face Recognition Attendance System") with gr.Tabs(): # Attendance Tab with gr.Tab("Attendance"): attendance_in = gr.Image( source="webcam", streaming=True, label="Webcam Feed", height=400, width=600 ) attendance_out = gr.Image(label="Processed Feed") attendance_in.change( fn=process_attendance_frame, inputs=attendance_in, outputs=attendance_out, batch=False ) # Admin Tab with gr.Tab("Admin"): admin_pass = gr.Textbox( type="password", label="Admin Password" ) with gr.Column(visible=False) as admin_panel: gr.Markdown("## Register New User") with gr.Row(): roll_number = gr.Textbox(label="Roll Number") name = gr.Textbox(label="Name") with gr.Row(): email = gr.Textbox(label="Student Email") parent_email = gr.Textbox(label="Parent Email") images = gr.File( file_count="multiple", label="Upload Face Images", file_types=["image"] ) register_btn = gr.Button("Register User") register_output = gr.Textbox(label="Registration Status") gr.Markdown("## Attendance Statistics") stats_btn = gr.Button("View Statistics") stats_output = gr.Textbox(label="Statistics") def check_password(password): return gr.Column.update(visible=admin_login(password)) admin_pass.change( fn=check_password, inputs=admin_pass, outputs=admin_panel ) register_btn.click( fn=register_face, inputs=[roll_number, name, email, parent_email, images], outputs=register_output ) stats_btn.click( fn=get_attendance_stats, inputs=None, outputs=stats_output ) return demo # Initialize the classifier knn_clf = load_face_classifier() # Launch the interface if __name__ == "__main__": demo = create_interface() demo.launch(share=True)