Spaces:
Build error
Build error
| import os | |
| import cv2 | |
| import time | |
| import pickle | |
| import numpy as np | |
| import face_recognition | |
| import smtplib | |
| from email.message import EmailMessage | |
| from datetime import datetime, date | |
| import gradio as gr | |
| from sklearn import neighbors | |
| from sklearn.metrics import accuracy_score | |
| from sqlalchemy import create_engine, Column, String, Integer, Date, Time, UniqueConstraint, func | |
| from sqlalchemy.ext.declarative import declarative_base | |
| from sqlalchemy.orm import sessionmaker | |
| from supabase import create_client, Client | |
| # ------------------------------ | |
| # Configuration and Environment Variables | |
| # ------------------------------ | |
| FRAME_RESIZE_FACTOR = 0.5 | |
| MODEL = "hog" | |
| KNN_NEIGHBORS = 2 | |
| UNKNOWN_THRESHOLD = 0.5 | |
| REQUIRED_IDENTIFICATIONS = 3 | |
| LIVENESS_HISTORY_LENGTH = 3 | |
| LIVENESS_VARIATION_THRESHOLD = 0.03 | |
| # Email & Admin Configuration | |
| SMTP_SERVER = os.environ.get("SMTP_SERVER", "smtp.gmail.com") | |
| SMTP_PORT = int(os.environ.get("SMTP_PORT", 587)) | |
| SMTP_USERNAME = os.environ.get("SMTP_USERNAME") | |
| SMTP_PASSWORD = os.environ.get("SMTP_PASSWORD") | |
| SENDER_EMAIL = os.environ.get("SENDER_EMAIL") | |
| ADMIN_EMAIL = os.environ.get("ADMIN_EMAIL") | |
| ADMIN_PASSWORD = os.environ.get("ADMIN_PASSWORD") | |
| # Supabase Configuration | |
| SUPABASE_URL = os.environ.get("SUPABASE_URL") | |
| SUPABASE_KEY = os.environ.get("SUPABASE_KEY") | |
| STORAGE_BUCKET = os.environ.get("STORAGE_BUCKET", "face-encodings") | |
| STORAGE_PATH = os.environ.get("STORAGE_PATH", "encodings.pkl") | |
| KNOWN_FACES_BUCKET = os.environ.get("KNOWN_FACES_BUCKET", "known-faces") | |
| # Database Configuration | |
| DATABASE_URL = os.environ.get("DATABASE_URL") | |
| # Global State | |
| recognition_counts = {} | |
| liveness_history = {} | |
| supabase: Client = create_client(SUPABASE_URL, SUPABASE_KEY) | |
| # ------------------------------ | |
| # Database Setup | |
| # ------------------------------ | |
| engine = create_engine(DATABASE_URL, pool_size=5, max_overflow=10) | |
| SessionLocal = sessionmaker(bind=engine) | |
| Base = declarative_base() | |
| class User(Base): | |
| __tablename__ = "users" | |
| roll_number = Column(String, primary_key=True) | |
| name = Column(String) | |
| email = Column(String) | |
| parent_email = Column(String) | |
| class Attendance(Base): | |
| __tablename__ = "attendance" | |
| id = Column(Integer, primary_key=True, autoincrement=True) | |
| roll_number = Column(String) | |
| name = Column(String) | |
| date = Column(Date) | |
| time = Column(Time) | |
| __table_args__ = (UniqueConstraint('roll_number', 'date', name='_roll_date_uc'),) | |
| Base.metadata.create_all(engine) | |
| # ------------------------------ | |
| # Storage Functions | |
| # ------------------------------ | |
| def upload_to_supabase(local_file, bucket, remote_path): | |
| """Generic upload function for Supabase storage.""" | |
| try: | |
| with open(local_file, "rb") as f: | |
| file_data = f.read() | |
| res = supabase.storage.from_(bucket).upload(remote_path, file_data) | |
| if res.get("error"): | |
| raise Exception(res["error"]) | |
| return True | |
| except Exception as e: | |
| print(f"Upload error: {e}") | |
| return False | |
| def download_from_supabase(local_file, bucket, remote_path): | |
| """Generic download function for Supabase storage.""" | |
| try: | |
| res = supabase.storage.from_(bucket).download(remote_path) | |
| if res.get("error"): | |
| raise Exception(res["error"]) | |
| with open(local_file, "wb") as f: | |
| f.write(res["data"]) | |
| return True | |
| except Exception as e: | |
| print(f"Download error: {e}") | |
| return False | |
| # ------------------------------ | |
| # Face Recognition Functions | |
| # ------------------------------ | |
| def compute_ear(eye): | |
| """Compute Eye Aspect Ratio for liveness detection.""" | |
| try: | |
| A = np.linalg.norm(np.array(eye[1]) - np.array(eye[5])) | |
| B = np.linalg.norm(np.array(eye[2]) - np.array(eye[4])) | |
| C = np.linalg.norm(np.array(eye[0]) - np.array(eye[3])) | |
| return (A + B) / (2.0 * C) | |
| except: | |
| return 0.0 | |
| def process_liveness(face_id, ear): | |
| """Process liveness detection using eye aspect ratio history.""" | |
| if face_id not in liveness_history: | |
| liveness_history[face_id] = [] | |
| liveness_history[face_id].append(ear) | |
| if len(liveness_history[face_id]) > LIVENESS_HISTORY_LENGTH: | |
| liveness_history[face_id] = liveness_history[face_id][-LIVENESS_HISTORY_LENGTH:] | |
| if len(liveness_history[face_id]) < LIVENESS_HISTORY_LENGTH: | |
| return True | |
| variation = max(liveness_history[face_id]) - min(liveness_history[face_id]) | |
| return variation > LIVENESS_VARIATION_THRESHOLD | |
| def load_face_classifier(): | |
| """Load and prepare the face recognition classifier.""" | |
| try: | |
| download_from_supabase("temp_encodings.pkl", STORAGE_BUCKET, STORAGE_PATH) | |
| with open("temp_encodings.pkl", 'rb') as f: | |
| encodings, names = pickle.load(f) | |
| os.remove("temp_encodings.pkl") | |
| if not encodings: | |
| raise Exception("No encodings found") | |
| clf = neighbors.KNeighborsClassifier( | |
| n_neighbors=KNN_NEIGHBORS, | |
| algorithm='ball_tree', | |
| weights='distance' | |
| ) | |
| clf.fit(encodings, names) | |
| return clf | |
| except Exception as e: | |
| print(f"Classifier loading error: {e}") | |
| return None | |
| # ------------------------------ | |
| # Attendance Processing | |
| # ------------------------------ | |
| def process_attendance_frame(frame): | |
| """Process a video frame for attendance marking.""" | |
| if frame is None: | |
| return None | |
| global recognition_counts | |
| # Resize frame | |
| height, width = frame.shape[:2] | |
| frame = cv2.resize(frame, (int(width * FRAME_RESIZE_FACTOR), | |
| int(height * FRAME_RESIZE_FACTOR))) | |
| # Convert to RGB | |
| rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) | |
| # Detect faces | |
| face_locations = face_recognition.face_locations(rgb_frame, model=MODEL) | |
| if not face_locations: | |
| return frame | |
| # Get face encodings | |
| face_encodings = face_recognition.face_encodings(rgb_frame, face_locations) | |
| # Process each face | |
| for (top, right, bottom, left), face_encoding in zip(face_locations, face_encodings): | |
| try: | |
| # Predict identity | |
| distances, _ = knn_clf.kneighbors([face_encoding], n_neighbors=KNN_NEIGHBORS) | |
| name = "Unknown" | |
| if distances[0][0] < UNKNOWN_THRESHOLD: | |
| name = knn_clf.predict([face_encoding])[0] | |
| # Check liveness | |
| face_landmarks = face_recognition.face_landmarks( | |
| rgb_frame[max(0, top-20):min(bottom+20, frame.shape[0]), | |
| max(0, left-20):min(right+20, frame.shape[1])] | |
| ) | |
| live = True | |
| if face_landmarks: | |
| landmarks = face_landmarks[0] | |
| if "left_eye" in landmarks and "right_eye" in landmarks: | |
| left_ear = compute_ear(landmarks["left_eye"]) | |
| right_ear = compute_ear(landmarks["right_eye"]) | |
| ear = (left_ear + right_ear) / 2.0 | |
| face_id = f"{name}_{top}_{left}" | |
| live = process_liveness(face_id, ear) | |
| # Mark attendance if conditions met | |
| if live and name != "Unknown": | |
| recognition_counts[name] = recognition_counts.get(name, 0) + 1 | |
| if recognition_counts[name] >= REQUIRED_IDENTIFICATIONS: | |
| mark_attendance(name) | |
| recognition_counts[name] = 0 | |
| # Draw results | |
| color = (0, 255, 0) if live else (0, 0, 255) | |
| cv2.rectangle(frame, (left, top), (right, bottom), color, 2) | |
| cv2.putText(frame, name, (left, top - 10), | |
| cv2.FONT_HERSHEY_SIMPLEX, 0.5, color, 1) | |
| except Exception as e: | |
| print(f"Face processing error: {e}") | |
| continue | |
| return frame | |
| def mark_attendance(roll_number): | |
| """Mark attendance in database and send notifications.""" | |
| try: | |
| with SessionLocal() as session: | |
| today = date.today() | |
| now = datetime.now().time() | |
| # Check if attendance already marked | |
| exists = session.query(Attendance)\ | |
| .filter(Attendance.roll_number == roll_number, | |
| Attendance.date == today)\ | |
| .first() | |
| if exists is None: | |
| # Get user details | |
| user = session.query(User)\ | |
| .filter(User.roll_number == roll_number)\ | |
| .first() | |
| if user: | |
| # Mark attendance | |
| new_attendance = Attendance( | |
| roll_number=roll_number, | |
| name=user.name, | |
| date=today, | |
| time=now | |
| ) | |
| session.add(new_attendance) | |
| session.commit() | |
| # Send notifications | |
| if user.email: | |
| send_email( | |
| user.email, | |
| "Attendance Marked", | |
| f"Your attendance for {today} has been marked at {now}" | |
| ) | |
| if user.parent_email: | |
| send_email( | |
| user.parent_email, | |
| "Attendance Notification", | |
| f"Attendance for {user.name} was marked on {today} at {now}" | |
| ) | |
| return True | |
| except Exception as e: | |
| print(f"Attendance marking error: {e}") | |
| return False | |
| # ------------------------------ | |
| # Email Functions | |
| # ------------------------------ | |
| def send_email(to_email, subject, body): | |
| """Send email notification.""" | |
| try: | |
| msg = EmailMessage() | |
| msg.set_content(body) | |
| msg["Subject"] = subject | |
| msg["From"] = SENDER_EMAIL | |
| msg["To"] = to_email | |
| with smtplib.SMTP(SMTP_SERVER, SMTP_PORT) as server: | |
| server.starttls() | |
| server.login(SMTP_USERNAME, SMTP_PASSWORD) | |
| server.send_message(msg) | |
| return True | |
| except Exception as e: | |
| print(f"Email error: {e}") | |
| return False | |
| # ------------------------------ | |
| # Admin Functions | |
| # ------------------------------ | |
| def admin_login(password): | |
| """Verify admin login.""" | |
| return password == ADMIN_PASSWORD | |
| def register_face(roll_number, name, email, parent_email, images): | |
| """Register a new user with face images.""" | |
| try: | |
| # Create directory for user | |
| user_dir = os.path.join("known_faces", roll_number) | |
| os.makedirs(user_dir, exist_ok=True) | |
| # Process images | |
| new_encodings = [] | |
| new_names = [] | |
| saved_count = 0 | |
| for i, img in enumerate(images): | |
| # Convert and detect face | |
| rgb_img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) | |
| face_locations = face_recognition.face_locations(rgb_img, model=MODEL) | |
| if not face_locations: | |
| continue | |
| # Save image and compute encoding | |
| filename = f"{roll_number}_{i}.jpg" | |
| filepath = os.path.join(user_dir, filename) | |
| cv2.imwrite(filepath, img) | |
| # Upload to Supabase | |
| upload_to_supabase( | |
| filepath, | |
| KNOWN_FACES_BUCKET, | |
| f"{roll_number}/{filename}" | |
| ) | |
| # Get face encoding | |
| face_encoding = face_recognition.face_encodings(rgb_img)[0] | |
| new_encodings.append(face_encoding) | |
| new_names.append(roll_number) | |
| saved_count += 1 | |
| if saved_count < 3: | |
| return "Need at least 3 good quality face images" | |
| # Add user to database | |
| with SessionLocal() as session: | |
| new_user = User( | |
| roll_number=roll_number, | |
| name=name, | |
| email=email, | |
| parent_email=parent_email | |
| ) | |
| session.add(new_user) | |
| session.commit() | |
| # Update face encodings | |
| download_from_supabase("temp_encodings.pkl", STORAGE_BUCKET, STORAGE_PATH) | |
| with open("temp_encodings.pkl", 'rb') as f: | |
| encodings, names = pickle.load(f) | |
| encodings.extend(new_encodings) | |
| names.extend(new_names) | |
| with open("temp_encodings.pkl", 'wb') as f: | |
| pickle.dump((encodings, names), f) | |
| upload_to_supabase("temp_encodings.pkl", STORAGE_BUCKET, STORAGE_PATH) | |
| os.remove("temp_encodings.pkl") | |
| # Reload classifier | |
| global knn_clf | |
| knn_clf = load_face_classifier() | |
| return "Registration successful" | |
| except Exception as e: | |
| print(f"Registration error: {e}") | |
| return f"Registration failed: {str(e)}" | |
| def get_attendance_stats(): | |
| """Get attendance statistics.""" | |
| try: | |
| with SessionLocal() as session: | |
| stats = session.query( | |
| Attendance.date, | |
| func.count(Attendance.id) | |
| ).group_by(Attendance.date).all() | |
| if not stats: | |
| return "No attendance records found" | |
| result = "Attendance Statistics:\n" | |
| for date, count in stats: | |
| result += f"{date}: {count} students\n" | |
| return result | |
| except Exception as e: | |
| return f"Error fetching statistics: {str(e)}" | |
| # ------------------------------ | |
| # Gradio Interface | |
| # ------------------------------ | |
| def create_interface(): | |
| """Create the Gradio interface.""" | |
| with gr.Blocks() as demo: | |
| gr.Markdown("# Face Recognition Attendance System") | |
| with gr.Tabs(): | |
| # Attendance Tab | |
| with gr.Tab("Attendance"): | |
| attendance_in = gr.Image( | |
| source="webcam", | |
| streaming=True, | |
| label="Webcam Feed", | |
| height=400, | |
| width=600 | |
| ) | |
| attendance_out = gr.Image(label="Processed Feed") | |
| attendance_in.change( | |
| fn=process_attendance_frame, | |
| inputs=attendance_in, | |
| outputs=attendance_out, | |
| batch=False | |
| ) | |
| # Admin Tab | |
| with gr.Tab("Admin"): | |
| admin_pass = gr.Textbox( | |
| type="password", | |
| label="Admin Password" | |
| ) | |
| with gr.Column(visible=False) as admin_panel: | |
| gr.Markdown("## Register New User") | |
| with gr.Row(): | |
| roll_number = gr.Textbox(label="Roll Number") | |
| name = gr.Textbox(label="Name") | |
| with gr.Row(): | |
| email = gr.Textbox(label="Student Email") | |
| parent_email = gr.Textbox(label="Parent Email") | |
| images = gr.File( | |
| file_count="multiple", | |
| label="Upload Face Images", | |
| file_types=["image"] | |
| ) | |
| register_btn = gr.Button("Register User") | |
| register_output = gr.Textbox(label="Registration Status") | |
| gr.Markdown("## Attendance Statistics") | |
| stats_btn = gr.Button("View Statistics") | |
| stats_output = gr.Textbox(label="Statistics") | |
| def check_password(password): | |
| return gr.Column.update(visible=admin_login(password)) | |
| admin_pass.change( | |
| fn=check_password, | |
| inputs=admin_pass, | |
| outputs=admin_panel | |
| ) | |
| register_btn.click( | |
| fn=register_face, | |
| inputs=[roll_number, name, email, parent_email, images], | |
| outputs=register_output | |
| ) | |
| stats_btn.click( | |
| fn=get_attendance_stats, | |
| inputs=None, | |
| outputs=stats_output | |
| ) | |
| return demo | |
| # Initialize the classifier | |
| knn_clf = load_face_classifier() | |
| # Launch the interface | |
| if __name__ == "__main__": | |
| demo = create_interface() | |
| demo.launch(share=True) |