""" ESP32-CAM Face Detection API ──────────────────────────── POST /detect Form fields: image - JPEG image file mobile_no - registered mobile number password - user password Flow: 1. Validate mobile_no + password via checkuser.php 2. Extract fcm_token from response 3. Run YuNet face detection on the image 4. If face detected → send FCM push notification to user's device 5. Return detection result JSON """ from flask import Flask, request, jsonify import cv2 import numpy as np import requests import json import os from huggingface_hub import hf_hub_download # ── Firebase Admin SDK ──────────────────────────────────────────────────────── import firebase_admin from firebase_admin import credentials, messaging app = Flask(__name__) # ── Firebase initialisation ─────────────────────────────────────────────────── # Place your service account JSON file next to this script and set the env var, # OR just hardcode the filename here. SERVICE_ACCOUNT_FILE = os.environ.get( "GOOGLE_APPLICATION_CREDENTIALS", "service_account.json" # ← put your Firebase service account JSON here ) if not os.path.exists(SERVICE_ACCOUNT_FILE): raise FileNotFoundError( f"Firebase service account file not found: '{SERVICE_ACCOUNT_FILE}'. " "Download it from Firebase Console → Project Settings → Service Accounts." ) cred = credentials.Certificate(SERVICE_ACCOUNT_FILE) firebase_admin.initialize_app(cred) # ── Checkuser API ───────────────────────────────────────────────────────────── CHECKUSER_URL = "https://enliteapp.co.in/mobibell/checkuser.php" # ── YuNet face detector ─────────────────────────────────────────────────────── model_path = hf_hub_download( repo_id="opencv/face_detection_yunet", filename="face_detection_yunet_2023mar.onnx" ) face_detector = cv2.FaceDetectorYN_create( model_path, "", (320, 320), 0.87, # score threshold 0.3, # NMS threshold 5000 # top K ) # ═════════════════════════════════════════════════════════════════════════════ # Helpers # ═════════════════════════════════════════════════════════════════════════════ def verify_user(mobile_no: str, password: str) -> dict: """ Call checkuser.php with mobile_no + password. Returns the parsed JSON dict on success, raises ValueError on failure. """ try: resp = requests.post( CHECKUSER_URL, json={"mobile_no": mobile_no, "password": password}, timeout=10 ) except requests.RequestException as e: raise ValueError(f"Could not reach auth server: {e}") try: data = resp.json() except ValueError: raise ValueError("Auth server returned non-JSON response") if resp.status_code != 200 or not data.get("success"): raise ValueError(data.get("message", "Authentication failed")) return data def send_fcm_notification(fcm_token: str, face_count: int): """ Send a push notification via Firebase Cloud Messaging (FCM v1 API). Only called when at least one face is detected. """ if not fcm_token: print("[FCM] No FCM token available — skipping notification") return message = messaging.Message( notification=messaging.Notification( title="Motion Alert 🚨", body=( f"{face_count} face{'s' if face_count != 1 else ''} detected by your ESP32-CAM!" ), ), android=messaging.AndroidConfig( priority="high", notification=messaging.AndroidNotification( sound="default", channel_id="face_detection_alerts" ), ), apns=messaging.APNSConfig( payload=messaging.APNSPayload( aps=messaging.Aps(sound="default") ) ), token=fcm_token, ) try: response = messaging.send(message) print(f"[FCM] Notification sent successfully. Message ID: {response}") except Exception as e: # Log but don't crash the API — detection result still returns to client print(f"[FCM] Failed to send notification: {e}") # ═════════════════════════════════════════════════════════════════════════════ # Routes # ═════════════════════════════════════════════════════════════════════════════ @app.route("/", methods=["GET"]) def home(): return jsonify({ "status": "API is running", "usage": ( "POST to /detect with multipart/form-data: " "fields 'mobile_no', 'password', and file 'image' (JPEG)" ), "response": { "has_face": "true/false", "count": "number of faces", "message": "human-readable result" } }) @app.route("/detect", methods=["POST"]) def detect(): # ── 1. Validate required fields ────────────────────────────────────────── mobile_no = request.form.get("mobile_no", "").strip() password = request.form.get("password", "").strip() if not mobile_no or not password: return jsonify({ "success": False, "error": "Missing required fields: 'mobile_no' and 'password'" }), 400 if "image" not in request.files: return jsonify({ "success": False, "error": "No 'image' file in request" }), 400 image_file = request.files["image"] if image_file.filename == "": return jsonify({ "success": False, "error": "No file selected" }), 400 # ── 2. Authenticate user ───────────────────────────────────────────────── try: user_data = verify_user(mobile_no, password) except ValueError as e: return jsonify({ "success": False, "error": str(e) }), 401 fcm_token = user_data.get("fcm_token") or "" user_name = user_data.get("name", mobile_no) print(f"[AUTH] Verified user: {user_name} | FCM token present: {bool(fcm_token)}") # ── 3. Decode image ────────────────────────────────────────────────────── file_bytes = image_file.read() np_arr = np.frombuffer(file_bytes, np.uint8) img = cv2.imdecode(np_arr, cv2.IMREAD_COLOR) if img is None: return jsonify({ "success": False, "error": "Invalid or corrupted image" }), 400 # ── 4. Run face detection ──────────────────────────────────────────────── h, w = img.shape[:2] face_detector.setInputSize((w, h)) _, faces = face_detector.detect(img) has_face = faces is not None and len(faces) > 0 face_count = len(faces) if faces is not None else 0 print(f"[DETECT] has_face={has_face} count={face_count}") # ── 5. Send FCM notification if face detected ──────────────────────────── if has_face: send_fcm_notification(fcm_token, face_count) # ── 6. Return result ───────────────────────────────────────────────────── return jsonify({ "success": True, "has_face": has_face, "count": face_count, "message": f"Face detected ({face_count} face{'s' if face_count != 1 else ''})" if has_face else "No face detected" }) if __name__ == "__main__": app.run(host="0.0.0.0", port=7860, debug=True)