Spaces:
Sleeping
Sleeping
Update app.py
Browse files
app.py
CHANGED
|
@@ -57,6 +57,11 @@ from datetime import datetime
|
|
| 57 |
app = Flask(__name__)
|
| 58 |
CORS(app)
|
| 59 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 60 |
# Initialize MTCNN detector and FaceNet model
|
| 61 |
detector = MTCNN()
|
| 62 |
embedder = FaceNet()
|
|
@@ -68,6 +73,22 @@ app.config["MONGO_URI"] = "mongodb+srv://nanduvinay719:76qqKRX4zC97yQun@travis.7
|
|
| 68 |
mongo = PyMongo(app)#initialize
|
| 69 |
haar_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml')
|
| 70 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 71 |
def create_cnn_embedding_model():
|
| 72 |
model = Sequential([
|
| 73 |
Conv2D(32, (3, 3), activation='relu', input_shape=(160, 160, 1)),
|
|
@@ -106,11 +127,15 @@ def cnnlogin():
|
|
| 106 |
|
| 107 |
# Fetch user data from MongoDB
|
| 108 |
# Collection name 'data' is already correctly used here
|
| 109 |
-
user_data = mongo.db.data.find_one(
|
|
|
|
|
|
|
|
|
|
| 110 |
if user_data is None:
|
| 111 |
return jsonify({"error": "User not found"}), 404
|
| 112 |
stored_cnn_embedding = np.array(user_data["CNN_embeddings"])# Convert stored embeddings to NumPy array
|
| 113 |
username = user_data["username"]
|
|
|
|
| 114 |
print(username)
|
| 115 |
# Decode image
|
| 116 |
image_data = file.read()
|
|
@@ -145,7 +170,9 @@ def cnnlogin():
|
|
| 145 |
# Set a threshold for recognition
|
| 146 |
recognition_threshold = 0.94 # Adjust this threshold as needed
|
| 147 |
if similarity > recognition_threshold:
|
| 148 |
-
return jsonify(
|
|
|
|
|
|
|
| 149 |
else:
|
| 150 |
return jsonify({"error": "Face not recognized", "probability": float(similarity)}), 401
|
| 151 |
|
|
@@ -175,13 +202,27 @@ def recognizeLogin():
|
|
| 175 |
{'$set': {today: True}}, # Mark as present for today
|
| 176 |
upsert=True
|
| 177 |
)
|
| 178 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 179 |
else:
|
| 180 |
return jsonify({'name':"user not recognised"})
|
| 181 |
|
| 182 |
@app.route('/register', methods=['POST'])
|
| 183 |
def register():
|
| 184 |
-
rollnumber = request.form["RollNumber"]
|
| 185 |
username = request.form['Username']
|
| 186 |
fathername = request.form["FatherName"]
|
| 187 |
phoneno = request.form["phoneNumber"]
|
|
@@ -191,7 +232,10 @@ def register():
|
|
| 191 |
print(username)
|
| 192 |
# Check if user already exists
|
| 193 |
# Collection name 'data' is already correctly used here
|
| 194 |
-
existing_user = mongo.db.data.find_one(
|
|
|
|
|
|
|
|
|
|
| 195 |
if existing_user:
|
| 196 |
return jsonify({"error": f"User '{username}' already exists"}), 400
|
| 197 |
|
|
@@ -263,6 +307,7 @@ def register():
|
|
| 263 |
'username': username,
|
| 264 |
'FatherName': fathername,
|
| 265 |
'phoneNumber': phoneno,
|
|
|
|
| 266 |
'embeddings': mean_facenet_embedding,
|
| 267 |
'CNN_embeddings': mean_cnn_embedding,
|
| 268 |
'stored_image': stored_image,
|
|
@@ -282,34 +327,48 @@ def register():
|
|
| 282 |
@app.route('/get_users', methods=['GET'])
|
| 283 |
def get_users():
|
| 284 |
# Collection name 'data' is already correctly used here
|
| 285 |
-
users = list(
|
|
|
|
|
|
|
| 286 |
user_count = len(users)
|
| 287 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 288 |
|
| 289 |
def load_embeddings_from_db():
|
| 290 |
if mongo is None:
|
| 291 |
print("MongoDB not connected - returning empty embeddings")
|
| 292 |
-
return [], [], {}
|
| 293 |
|
| 294 |
try:
|
| 295 |
users = list(mongo.db.data.find())
|
| 296 |
face_data = []# facenet embeddings
|
| 297 |
labels = [] # id 1,2,3,..
|
| 298 |
names = {} #dict of id and roll number
|
|
|
|
| 299 |
# {"1":vinay,"2":shahank}
|
| 300 |
for user in users:
|
| 301 |
face_data.append(user["embeddings"])
|
| 302 |
labels.append(user['id']) # Keep the ObjectId
|
| 303 |
names[user['id']] = user['RollNumber'] # Use ObjectId as key
|
|
|
|
| 304 |
|
| 305 |
print(f"Loaded {len(face_data)} user embeddings from database")
|
| 306 |
-
return (face_data, labels, names) if face_data else ([], [], {})
|
| 307 |
except Exception as e:
|
| 308 |
print(f"Error loading embeddings from database: {e}")
|
| 309 |
-
return [], [], {}
|
| 310 |
|
| 311 |
# Load face embeddings from MongoDB initially
|
| 312 |
-
face_data, labels, names = load_embeddings_from_db()
|
| 313 |
|
| 314 |
# Load CNN model weights with error handling
|
| 315 |
try:
|
|
@@ -327,7 +386,7 @@ except Exception as e:
|
|
| 327 |
|
| 328 |
# Reload embeddings to update after a new registration
|
| 329 |
def reload_embeddings():
|
| 330 |
-
global face_data, labels, names
|
| 331 |
try:
|
| 332 |
weights_path = os.path.join(app_data_path, 'cnn_model.weights.h5')
|
| 333 |
if os.path.exists(weights_path):
|
|
@@ -337,7 +396,7 @@ def reload_embeddings():
|
|
| 337 |
except Exception as e:
|
| 338 |
print(f"Warning: Could not reload CNN model weights: {e}")
|
| 339 |
|
| 340 |
-
face_data, labels, names = load_embeddings_from_db()
|
| 341 |
|
| 342 |
# Recognize faces using MongoDB-stored embeddings
|
| 343 |
model = YOLO('yolov5s.pt') # Replace with your YOLO model path
|
|
@@ -374,7 +433,7 @@ def upload_image():
|
|
| 374 |
|
| 375 |
def recognize_faces_in_image(image):
|
| 376 |
if len(face_data) == 0:
|
| 377 |
-
return [{"name": "No registered faces", "probability": 0.0}]
|
| 378 |
|
| 379 |
faces = detector.detect_faces(image)
|
| 380 |
results = []
|
|
@@ -394,11 +453,52 @@ def recognize_faces_in_image(image):
|
|
| 394 |
if best_match > 0.7:
|
| 395 |
recognized_id = labels[idx] # Get the ObjectId
|
| 396 |
recognized_name = names[recognized_id] # Use ObjectId to get the username
|
| 397 |
-
|
|
|
|
|
|
|
|
|
|
| 398 |
else:
|
| 399 |
-
results.append(
|
|
|
|
|
|
|
| 400 |
return results
|
| 401 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 402 |
@app.route('/users/<username>/images', methods=['GET'])
|
| 403 |
def get_user_images(username):
|
| 404 |
username = str(username).upper()
|
|
@@ -477,4 +577,4 @@ def get_user_attendance(username):
|
|
| 477 |
def get_attendance():
|
| 478 |
# Collection name 'attendance1' is already correctly used here
|
| 479 |
records = list(mongo.db.attendance1.find({}, {"_id": 0}))
|
| 480 |
-
return jsonify({"attendance": records})
|
|
|
|
| 57 |
app = Flask(__name__)
|
| 58 |
CORS(app)
|
| 59 |
|
| 60 |
+
# Role management settings
|
| 61 |
+
DEFAULT_ROLE = "user"
|
| 62 |
+
ALLOWED_ROLES = {"user", "admin"}
|
| 63 |
+
DEFAULT_ADMIN_ROLLNUMBER = os.getenv("DEFAULT_ADMIN_ROLLNUMBER", "23BD1A056D").upper()
|
| 64 |
+
|
| 65 |
# Initialize MTCNN detector and FaceNet model
|
| 66 |
detector = MTCNN()
|
| 67 |
embedder = FaceNet()
|
|
|
|
| 73 |
mongo = PyMongo(app)#initialize
|
| 74 |
haar_cascade = cv2.CascadeClassifier(cv2.data.haarcascades + 'haarcascade_frontalface_default.xml')
|
| 75 |
|
| 76 |
+
def normalize_role(role_value, rollnumber=None):
|
| 77 |
+
role = (role_value or "").strip().lower()
|
| 78 |
+
if role in ALLOWED_ROLES:
|
| 79 |
+
return role
|
| 80 |
+
if rollnumber and str(rollnumber).upper() == DEFAULT_ADMIN_ROLLNUMBER:
|
| 81 |
+
return "admin"
|
| 82 |
+
return DEFAULT_ROLE
|
| 83 |
+
|
| 84 |
+
def count_admins():
|
| 85 |
+
users = mongo.db.data.find({}, {"role": 1, "RollNumber": 1})
|
| 86 |
+
return sum(
|
| 87 |
+
1
|
| 88 |
+
for user in users
|
| 89 |
+
if normalize_role(user.get("role"), user.get("RollNumber")) == "admin"
|
| 90 |
+
)
|
| 91 |
+
|
| 92 |
def create_cnn_embedding_model():
|
| 93 |
model = Sequential([
|
| 94 |
Conv2D(32, (3, 3), activation='relu', input_shape=(160, 160, 1)),
|
|
|
|
| 127 |
|
| 128 |
# Fetch user data from MongoDB
|
| 129 |
# Collection name 'data' is already correctly used here
|
| 130 |
+
user_data = mongo.db.data.find_one(
|
| 131 |
+
{"RollNumber": rollnumber},
|
| 132 |
+
{"CNN_embeddings": 1, "username": 1, "role": 1, "RollNumber": 1},
|
| 133 |
+
)
|
| 134 |
if user_data is None:
|
| 135 |
return jsonify({"error": "User not found"}), 404
|
| 136 |
stored_cnn_embedding = np.array(user_data["CNN_embeddings"])# Convert stored embeddings to NumPy array
|
| 137 |
username = user_data["username"]
|
| 138 |
+
role = normalize_role(user_data.get("role"), user_data.get("RollNumber", rollnumber))
|
| 139 |
print(username)
|
| 140 |
# Decode image
|
| 141 |
image_data = file.read()
|
|
|
|
| 170 |
# Set a threshold for recognition
|
| 171 |
recognition_threshold = 0.94 # Adjust this threshold as needed
|
| 172 |
if similarity > recognition_threshold:
|
| 173 |
+
return jsonify(
|
| 174 |
+
{"name": username, "probability": float(similarity), "role": role}
|
| 175 |
+
), 200
|
| 176 |
else:
|
| 177 |
return jsonify({"error": "Face not recognized", "probability": float(similarity)}), 401
|
| 178 |
|
|
|
|
| 202 |
{'$set': {today: True}}, # Mark as present for today
|
| 203 |
upsert=True
|
| 204 |
)
|
| 205 |
+
user_data = mongo.db.data.find_one(
|
| 206 |
+
{"RollNumber": name},
|
| 207 |
+
{"role": 1, "RollNumber": 1},
|
| 208 |
+
)
|
| 209 |
+
role = normalize_role(
|
| 210 |
+
user_data.get("role") if user_data else None,
|
| 211 |
+
user_data.get("RollNumber") if user_data else name,
|
| 212 |
+
)
|
| 213 |
+
return jsonify(
|
| 214 |
+
{
|
| 215 |
+
"name": name,
|
| 216 |
+
"probability": results[0]["probability"],
|
| 217 |
+
"role": role,
|
| 218 |
+
}
|
| 219 |
+
), 200
|
| 220 |
else:
|
| 221 |
return jsonify({'name':"user not recognised"})
|
| 222 |
|
| 223 |
@app.route('/register', methods=['POST'])
|
| 224 |
def register():
|
| 225 |
+
rollnumber = request.form["RollNumber"].upper()
|
| 226 |
username = request.form['Username']
|
| 227 |
fathername = request.form["FatherName"]
|
| 228 |
phoneno = request.form["phoneNumber"]
|
|
|
|
| 232 |
print(username)
|
| 233 |
# Check if user already exists
|
| 234 |
# Collection name 'data' is already correctly used here
|
| 235 |
+
existing_user = mongo.db.data.find_one(
|
| 236 |
+
{"$or": [{"username": username}, {"RollNumber": rollnumber}]},
|
| 237 |
+
{"_id": 0},
|
| 238 |
+
)
|
| 239 |
if existing_user:
|
| 240 |
return jsonify({"error": f"User '{username}' already exists"}), 400
|
| 241 |
|
|
|
|
| 307 |
'username': username,
|
| 308 |
'FatherName': fathername,
|
| 309 |
'phoneNumber': phoneno,
|
| 310 |
+
'role': DEFAULT_ROLE,
|
| 311 |
'embeddings': mean_facenet_embedding,
|
| 312 |
'CNN_embeddings': mean_cnn_embedding,
|
| 313 |
'stored_image': stored_image,
|
|
|
|
| 327 |
@app.route('/get_users', methods=['GET'])
|
| 328 |
def get_users():
|
| 329 |
# Collection name 'data' is already correctly used here
|
| 330 |
+
users = list(
|
| 331 |
+
mongo.db.data.find({}, {"id": 1, "username": 1, "RollNumber": 1, "role": 1})
|
| 332 |
+
)
|
| 333 |
user_count = len(users)
|
| 334 |
+
formatted_users = []
|
| 335 |
+
for user in users:
|
| 336 |
+
roll = user.get("RollNumber", "")
|
| 337 |
+
formatted_users.append(
|
| 338 |
+
{
|
| 339 |
+
"rollNumber": roll,
|
| 340 |
+
"username": user.get("username", ""),
|
| 341 |
+
"role": normalize_role(user.get("role"), roll),
|
| 342 |
+
}
|
| 343 |
+
)
|
| 344 |
+
return jsonify({"users": formatted_users, "count": user_count})
|
| 345 |
|
| 346 |
def load_embeddings_from_db():
|
| 347 |
if mongo is None:
|
| 348 |
print("MongoDB not connected - returning empty embeddings")
|
| 349 |
+
return [], [], {}, {}
|
| 350 |
|
| 351 |
try:
|
| 352 |
users = list(mongo.db.data.find())
|
| 353 |
face_data = []# facenet embeddings
|
| 354 |
labels = [] # id 1,2,3,..
|
| 355 |
names = {} #dict of id and roll number
|
| 356 |
+
roles = {}
|
| 357 |
# {"1":vinay,"2":shahank}
|
| 358 |
for user in users:
|
| 359 |
face_data.append(user["embeddings"])
|
| 360 |
labels.append(user['id']) # Keep the ObjectId
|
| 361 |
names[user['id']] = user['RollNumber'] # Use ObjectId as key
|
| 362 |
+
roles[user['id']] = normalize_role(user.get("role"), user.get("RollNumber"))
|
| 363 |
|
| 364 |
print(f"Loaded {len(face_data)} user embeddings from database")
|
| 365 |
+
return (face_data, labels, names, roles) if face_data else ([], [], {}, {})
|
| 366 |
except Exception as e:
|
| 367 |
print(f"Error loading embeddings from database: {e}")
|
| 368 |
+
return [], [], {}, {}
|
| 369 |
|
| 370 |
# Load face embeddings from MongoDB initially
|
| 371 |
+
face_data, labels, names, roles = load_embeddings_from_db()
|
| 372 |
|
| 373 |
# Load CNN model weights with error handling
|
| 374 |
try:
|
|
|
|
| 386 |
|
| 387 |
# Reload embeddings to update after a new registration
|
| 388 |
def reload_embeddings():
|
| 389 |
+
global face_data, labels, names, roles
|
| 390 |
try:
|
| 391 |
weights_path = os.path.join(app_data_path, 'cnn_model.weights.h5')
|
| 392 |
if os.path.exists(weights_path):
|
|
|
|
| 396 |
except Exception as e:
|
| 397 |
print(f"Warning: Could not reload CNN model weights: {e}")
|
| 398 |
|
| 399 |
+
face_data, labels, names, roles = load_embeddings_from_db()
|
| 400 |
|
| 401 |
# Recognize faces using MongoDB-stored embeddings
|
| 402 |
model = YOLO('yolov5s.pt') # Replace with your YOLO model path
|
|
|
|
| 433 |
|
| 434 |
def recognize_faces_in_image(image):
|
| 435 |
if len(face_data) == 0:
|
| 436 |
+
return [{"name": "No registered faces", "probability": 0.0, "role": "unknown"}]
|
| 437 |
|
| 438 |
faces = detector.detect_faces(image)
|
| 439 |
results = []
|
|
|
|
| 453 |
if best_match > 0.7:
|
| 454 |
recognized_id = labels[idx] # Get the ObjectId
|
| 455 |
recognized_name = names[recognized_id] # Use ObjectId to get the username
|
| 456 |
+
role = roles.get(recognized_id, DEFAULT_ROLE)
|
| 457 |
+
results.append(
|
| 458 |
+
{"name": recognized_name, "probability": float(best_match), "role": role}
|
| 459 |
+
)
|
| 460 |
else:
|
| 461 |
+
results.append(
|
| 462 |
+
{"name": "Unknown", "probability": float(best_match), "role": "unknown"}
|
| 463 |
+
)
|
| 464 |
return results
|
| 465 |
|
| 466 |
+
@app.route('/users/<rollnumber>/role', methods=['PATCH'])
|
| 467 |
+
def update_user_role(rollnumber):
|
| 468 |
+
payload = request.get_json(silent=True) or {}
|
| 469 |
+
role = str(payload.get("role", "")).strip().lower()
|
| 470 |
+
if role not in ALLOWED_ROLES:
|
| 471 |
+
return jsonify({"error": "Invalid role"}), 400
|
| 472 |
+
|
| 473 |
+
rollnumber = str(rollnumber).upper()
|
| 474 |
+
user = mongo.db.data.find_one({"RollNumber": rollnumber})
|
| 475 |
+
if user is None:
|
| 476 |
+
return jsonify({"error": "User not found"}), 404
|
| 477 |
+
|
| 478 |
+
current_role = normalize_role(user.get("role"), user.get("RollNumber"))
|
| 479 |
+
if current_role == "admin" and role != "admin" and count_admins() <= 1:
|
| 480 |
+
return jsonify({"error": "Cannot remove the last admin"}), 400
|
| 481 |
+
|
| 482 |
+
mongo.db.data.update_one({"_id": user["_id"]}, {"$set": {"role": role}})
|
| 483 |
+
reload_embeddings()
|
| 484 |
+
return jsonify({"message": "Role updated", "role": role})
|
| 485 |
+
|
| 486 |
+
@app.route('/users/<rollnumber>', methods=['DELETE'])
|
| 487 |
+
def delete_user(rollnumber):
|
| 488 |
+
rollnumber = str(rollnumber).upper()
|
| 489 |
+
user = mongo.db.data.find_one({"RollNumber": rollnumber})
|
| 490 |
+
if user is None:
|
| 491 |
+
return jsonify({"error": "User not found"}), 404
|
| 492 |
+
|
| 493 |
+
current_role = normalize_role(user.get("role"), user.get("RollNumber"))
|
| 494 |
+
if current_role == "admin" and count_admins() <= 1:
|
| 495 |
+
return jsonify({"error": "Cannot delete the last admin"}), 400
|
| 496 |
+
|
| 497 |
+
mongo.db.data.delete_one({"_id": user["_id"]})
|
| 498 |
+
mongo.db.attendance1.delete_one({"username": rollnumber})
|
| 499 |
+
reload_embeddings()
|
| 500 |
+
return jsonify({"message": "User deleted"})
|
| 501 |
+
|
| 502 |
@app.route('/users/<username>/images', methods=['GET'])
|
| 503 |
def get_user_images(username):
|
| 504 |
username = str(username).upper()
|
|
|
|
| 577 |
def get_attendance():
|
| 578 |
# Collection name 'attendance1' is already correctly used here
|
| 579 |
records = list(mongo.db.attendance1.find({}, {"_id": 0}))
|
| 580 |
+
return jsonify({"attendance": records})
|