Spaces:
Sleeping
Sleeping
| import os | |
| import cv2 | |
| import json | |
| import base64 | |
| import sqlite3 | |
| from datetime import datetime, timedelta | |
| import pytz | |
| from flask import Flask, render_template, request, jsonify, Response | |
| from deepface import DeepFace | |
| import numpy as np | |
| import threading | |
| import logging | |
| from werkzeug.utils import secure_filename | |
| # Configure logging | |
| logging.basicConfig(level=logging.INFO) | |
| logger = logging.getLogger(__name__) | |
| app = Flask(__name__) | |
| app.config['SECRET_KEY'] = 'your-secret-key-here' | |
| app.config['UPLOAD_FOLDER'] = 'static/known_faces' | |
| app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024 # 16MB max file size | |
| # Create upload directory if it doesn't exist | |
| os.makedirs(app.config['UPLOAD_FOLDER'], exist_ok=True) | |
| # Indian Standard Time | |
| IST = pytz.timezone('Asia/Kolkata') | |
| class DatabaseManager: | |
| def __init__(self, db_path='attendance.db'): | |
| self.db_path = db_path | |
| self.init_database() | |
| def init_database(self): | |
| """Initialize the database with required tables""" | |
| conn = sqlite3.connect(self.db_path) | |
| cursor = conn.cursor() | |
| # Users table | |
| cursor.execute(''' | |
| CREATE TABLE IF NOT EXISTS users ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| name TEXT NOT NULL UNIQUE, | |
| face_encoding_path TEXT, | |
| created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP | |
| ) | |
| ''') | |
| # Attendance table | |
| cursor.execute(''' | |
| CREATE TABLE IF NOT EXISTS attendance ( | |
| id INTEGER PRIMARY KEY AUTOINCREMENT, | |
| user_id INTEGER, | |
| date DATE, | |
| check_in_time TIMESTAMP, | |
| check_out_time TIMESTAMP, | |
| status TEXT DEFAULT 'present', | |
| FOREIGN KEY (user_id) REFERENCES users (id), | |
| UNIQUE(user_id, date) | |
| ) | |
| ''') | |
| conn.commit() | |
| conn.close() | |
| def add_user(self, name, face_image_path): | |
| """Add a new user to the database""" | |
| try: | |
| conn = sqlite3.connect(self.db_path) | |
| cursor = conn.cursor() | |
| cursor.execute('INSERT INTO users (name, face_encoding_path) VALUES (?, ?)', | |
| (name, face_image_path)) | |
| user_id = cursor.lastrowid | |
| conn.commit() | |
| conn.close() | |
| return user_id | |
| except sqlite3.IntegrityError: | |
| return None | |
| def get_all_users(self): | |
| """Get all users from database""" | |
| conn = sqlite3.connect(self.db_path) | |
| cursor = conn.cursor() | |
| cursor.execute('SELECT id, name, face_encoding_path FROM users') | |
| users = cursor.fetchall() | |
| conn.close() | |
| return users | |
| def mark_attendance(self, user_id, attendance_type='check_in'): | |
| """Mark attendance for a user""" | |
| try: | |
| conn = sqlite3.connect(self.db_path) | |
| cursor = conn.cursor() | |
| # Get current date and time in IST | |
| now = datetime.now(IST) | |
| today = now.date() | |
| current_time = now | |
| if attendance_type == 'check_in': | |
| # Check if user already checked in today | |
| cursor.execute(''' | |
| SELECT id, check_in_time FROM attendance | |
| WHERE user_id = ? AND date = ? | |
| ''', (user_id, today)) | |
| existing = cursor.fetchone() | |
| if existing: | |
| conn.close() | |
| return {'success': False, 'message': 'Already checked in today'} | |
| # Insert new attendance record | |
| cursor.execute(''' | |
| INSERT INTO attendance (user_id, date, check_in_time, status) | |
| VALUES (?, ?, ?, 'present') | |
| ''', (user_id, today, current_time)) | |
| elif attendance_type == 'check_out': | |
| # Update existing record with check-out time | |
| cursor.execute(''' | |
| UPDATE attendance | |
| SET check_out_time = ? | |
| WHERE user_id = ? AND date = ? AND check_out_time IS NULL | |
| ''', (current_time, user_id, today)) | |
| if cursor.rowcount == 0: | |
| conn.close() | |
| return {'success': False, 'message': 'No check-in record found or already checked out'} | |
| conn.commit() | |
| conn.close() | |
| return {'success': True, 'message': f'Successfully {attendance_type.replace("_", "-")}'} | |
| except Exception as e: | |
| logger.error(f"Error marking attendance: {e}") | |
| return {'success': False, 'message': 'Database error'} | |
| def get_today_stats(self): | |
| """Get today's attendance statistics""" | |
| conn = sqlite3.connect(self.db_path) | |
| cursor = conn.cursor() | |
| today = datetime.now(IST).date() | |
| # Total users | |
| cursor.execute('SELECT COUNT(*) FROM users') | |
| total_users = cursor.fetchone()[0] | |
| # Present today (checked in) | |
| cursor.execute(''' | |
| SELECT COUNT(*) FROM attendance | |
| WHERE date = ? AND check_in_time IS NOT NULL | |
| ''', (today,)) | |
| present_today = cursor.fetchone()[0] | |
| # Absent today | |
| absent_today = total_users - present_today | |
| # Get today's attendance records with user names | |
| cursor.execute(''' | |
| SELECT u.name, a.check_in_time, a.check_out_time | |
| FROM attendance a | |
| JOIN users u ON a.user_id = u.id | |
| WHERE a.date = ? | |
| ORDER BY a.check_in_time DESC | |
| ''', (today,)) | |
| today_attendance = cursor.fetchall() | |
| conn.close() | |
| return { | |
| 'total_users': total_users, | |
| 'present_today': present_today, | |
| 'absent_today': absent_today, | |
| 'today_attendance': today_attendance | |
| } | |
| class FaceRecognizer: | |
| def __init__(self, known_faces_dir='static/known_faces'): | |
| self.known_faces_dir = known_faces_dir | |
| self.known_faces = {} | |
| self.load_known_faces() | |
| def load_known_faces(self): | |
| """Load known faces from the directory""" | |
| self.known_faces = {} | |
| if not os.path.exists(self.known_faces_dir): | |
| os.makedirs(self.known_faces_dir) | |
| return | |
| db_manager = DatabaseManager() | |
| users = db_manager.get_all_users() | |
| for user_id, name, face_path in users: | |
| full_path = os.path.join(self.known_faces_dir, face_path) if face_path else None | |
| if full_path and os.path.exists(full_path): | |
| self.known_faces[name] = { | |
| 'user_id': user_id, | |
| 'image_path': full_path | |
| } | |
| logger.info(f"Loaded {len(self.known_faces)} known faces") | |
| def recognize_face(self, frame): | |
| """Recognize face in the given frame""" | |
| try: | |
| if not self.known_faces: | |
| return None, 0 | |
| # Convert frame to RGB for DeepFace | |
| rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) | |
| # Save temporary frame for DeepFace | |
| temp_path = 'temp_frame.jpg' | |
| cv2.imwrite(temp_path, frame) | |
| best_match = None | |
| highest_confidence = 0 | |
| for name, face_data in self.known_faces.items(): | |
| try: | |
| # Use DeepFace to verify faces | |
| result = DeepFace.verify( | |
| img1_path=temp_path, | |
| img2_path=face_data['image_path'], | |
| model_name='VGG-Face', | |
| distance_metric='cosine', | |
| enforce_detection=False | |
| ) | |
| if result['verified']: | |
| confidence = (1 - result['distance']) * 100 | |
| if confidence > highest_confidence and confidence > 60: # 60% threshold | |
| highest_confidence = confidence | |
| best_match = { | |
| 'name': name, | |
| 'user_id': face_data['user_id'], | |
| 'confidence': confidence | |
| } | |
| except Exception as e: | |
| logger.debug(f"Recognition error for {name}: {e}") | |
| continue | |
| # Clean up temp file | |
| if os.path.exists(temp_path): | |
| os.remove(temp_path) | |
| if best_match: | |
| return best_match, highest_confidence | |
| return None, 0 | |
| except Exception as e: | |
| logger.error(f"Face recognition error: {e}") | |
| return None, 0 | |
| # Global instances | |
| db_manager = DatabaseManager() | |
| face_recognizer = FaceRecognizer() | |
| def index(): | |
| """Main page""" | |
| stats = db_manager.get_today_stats() | |
| return render_template('index.html', stats=stats) | |
| def get_stats(): | |
| """Get current statistics""" | |
| stats = db_manager.get_today_stats() | |
| # Format attendance records for display | |
| formatted_attendance = [] | |
| for name, check_in, check_out in stats['today_attendance']: | |
| record = { | |
| 'name': name, | |
| 'check_in': None, | |
| 'check_out': None | |
| } | |
| if check_in: | |
| check_in_dt = datetime.fromisoformat(check_in.replace('Z', '+00:00')) | |
| if check_in_dt.tzinfo is None: | |
| check_in_dt = pytz.utc.localize(check_in_dt) | |
| check_in_ist = check_in_dt.astimezone(IST) | |
| record['check_in'] = check_in_ist.strftime('%I:%M %p') | |
| if check_out: | |
| check_out_dt = datetime.fromisoformat(check_out.replace('Z', '+00:00')) | |
| if check_out_dt.tzinfo is None: | |
| check_out_dt = pytz.utc.localize(check_out_dt) | |
| check_out_ist = check_out_dt.astimezone(IST) | |
| record['check_out'] = check_out_ist.strftime('%I:%M %p') | |
| formatted_attendance.append(record) | |
| stats['today_attendance'] = formatted_attendance | |
| return jsonify(stats) | |
| def add_user(): | |
| """Add a new user with face image""" | |
| try: | |
| name = request.form.get('name') | |
| if not name: | |
| return jsonify({'success': False, 'message': 'Name is required'}) | |
| if 'face_image' not in request.files: | |
| return jsonify({'success': False, 'message': 'Face image is required'}) | |
| file = request.files['face_image'] | |
| if file.filename == '': | |
| return jsonify({'success': False, 'message': 'No file selected'}) | |
| # Save the uploaded file | |
| filename = secure_filename(f"{name}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.jpg") | |
| filepath = os.path.join(app.config['UPLOAD_FOLDER'], filename) | |
| file.save(filepath) | |
| # Add user to database | |
| user_id = db_manager.add_user(name, filename) | |
| if user_id: | |
| # Reload known faces | |
| face_recognizer.load_known_faces() | |
| return jsonify({'success': True, 'message': f'User {name} added successfully'}) | |
| else: | |
| # Remove the uploaded file if database insertion failed | |
| if os.path.exists(filepath): | |
| os.remove(filepath) | |
| return jsonify({'success': False, 'message': 'User already exists'}) | |
| except Exception as e: | |
| logger.error(f"Error adding user: {e}") | |
| return jsonify({'success': False, 'message': 'Server error'}) | |
| def recognize_and_mark(): | |
| """Recognize face and mark attendance""" | |
| try: | |
| data = request.get_json() | |
| image_data = data.get('image') | |
| attendance_type = data.get('type', 'check_in') # 'check_in' or 'check_out' | |
| if not image_data: | |
| return jsonify({'success': False, 'message': 'No image data provided'}) | |
| # Decode base64 image | |
| image_data = image_data.split(',')[1] # Remove data:image/jpeg;base64, | |
| image_bytes = base64.b64decode(image_data) | |
| # Convert to numpy array | |
| nparr = np.frombuffer(image_bytes, np.uint8) | |
| frame = cv2.imdecode(nparr, cv2.IMREAD_COLOR) | |
| # Recognize face | |
| result, confidence = face_recognizer.recognize_face(frame) | |
| if result and confidence > 70: # Higher threshold for attendance marking | |
| # Mark attendance | |
| attendance_result = db_manager.mark_attendance(result['user_id'], attendance_type) | |
| if attendance_result['success']: | |
| current_time = datetime.now(IST).strftime('%I:%M %p') | |
| return jsonify({ | |
| 'success': True, | |
| 'name': result['name'], | |
| 'confidence': round(confidence, 2), | |
| 'time': current_time, | |
| 'type': attendance_type, | |
| 'message': attendance_result['message'] | |
| }) | |
| else: | |
| return jsonify({ | |
| 'success': False, | |
| 'name': result['name'], | |
| 'confidence': round(confidence, 2), | |
| 'message': attendance_result['message'] | |
| }) | |
| else: | |
| return jsonify({'success': False, 'message': 'Face not recognized or confidence too low'}) | |
| except Exception as e: | |
| logger.error(f"Recognition error: {e}") | |
| return jsonify({'success': False, 'message': 'Recognition failed'}) | |
| def reload_faces(): | |
| """Reload known faces""" | |
| try: | |
| face_recognizer.load_known_faces() | |
| return jsonify({'success': True, 'message': 'Faces reloaded successfully'}) | |
| except Exception as e: | |
| logger.error(f"Error reloading faces: {e}") | |
| return jsonify({'success': False, 'message': 'Failed to reload faces'}) | |
| if __name__ == '__main__': | |
| app.run(debug=True, host='0.0.0.0', port=7860) |