Spaces:
Sleeping
Sleeping
| print("DEBUG: Script started", flush=True) | |
| from flask import Flask, request, jsonify, send_from_directory | |
| from flask_cors import CORS | |
| print("DEBUG: CORS imported", flush=True) | |
| import numpy as np | |
| print("DEBUG: numpy imported", flush=True) | |
| import base64 | |
| print("DEBUG: base64 imported", flush=True) | |
| import cv2 | |
| print("DEBUG: cv2 imported", flush=True) | |
| import io | |
| import os | |
| import json | |
| import requests | |
| print("DEBUG: requests imported", flush=True) | |
| from PIL import Image | |
| print("DEBUG: PIL imported", flush=True) | |
| from datetime import datetime, timezone | |
| import time | |
| import time | |
| # Start of executable logic | |
| print("DEBUG: Executable logic started", flush=True) | |
| import firebase_admin | |
| from firebase_admin import credentials | |
| print("DEBUG: firebase_admin and credentials imported", flush=True) | |
| # Helper for lazy loading storage | |
| def get_firebase_storage(): | |
| try: | |
| from firebase_admin import storage | |
| return storage | |
| except Exception as e: | |
| print(f"ERROR: Failed to import firebase_admin.storage: {e}", flush=True) | |
| return None | |
| # Helper for lazy loading firestore | |
| def get_firestore_client(): | |
| global db | |
| if db: | |
| return db | |
| try: | |
| from firebase_admin import firestore | |
| db = firestore.client() | |
| print("Firebase Admin SDK and Firestore initialized via lazy load", flush=True) | |
| return db | |
| except Exception as e: | |
| print(f"ERROR: Failed to initialize Firestore via lazy load: {e}", flush=True) | |
| return None | |
| # Initialize Firebase Admin SDK | |
| print("DEBUG: Initializing Firebase credentials...", flush=True) | |
| if not firebase_admin._apps: | |
| print("DEBUG: Initializing firebase_admin app...", flush=True) | |
| storage_options = {'storageBucket': 'gatepass-49d43.firebasestorage.app'} | |
| # 1) Cloud Run: use attached service account (ADC) | |
| if os.getenv("K_SERVICE"): | |
| firebase_app = firebase_admin.initialize_app(options=storage_options) | |
| print("DEBUG: firebase_admin initialized via ADC (Cloud Run)", flush=True) | |
| else: | |
| # 2) Hugging Face/other hosted env: service account JSON in env var | |
| sa_json = os.getenv("FIREBASE_SERVICE_ACCOUNT_JSON") | |
| if sa_json: | |
| cred = credentials.Certificate(json.loads(sa_json)) | |
| firebase_app = firebase_admin.initialize_app(cred, storage_options) | |
| print("DEBUG: firebase_admin initialized via FIREBASE_SERVICE_ACCOUNT_JSON", flush=True) | |
| else: | |
| # 3) Local development fallback (file-based key) | |
| cred_path = os.path.join(os.path.dirname(__file__), 'serviceAccountKey.json') | |
| cred = credentials.Certificate(cred_path) | |
| firebase_app = firebase_admin.initialize_app(cred, storage_options) | |
| print(f"DEBUG: firebase_admin initialized via local file: {cred_path}", flush=True) | |
| # Initialize Firestore client | |
| db = None | |
| print("Firestore initialization deferred to lazy loading", flush=True) | |
| def upload_to_firebase_storage(image_data, filename, content_type='image/jpeg'): | |
| """Upload image data to Firebase Storage and return the download URL.""" | |
| storage = get_firebase_storage() | |
| if not storage: | |
| raise Exception("Firebase Storage not available") | |
| bucket = storage.bucket() | |
| blob = bucket.blob(f"face_verification/{filename}") | |
| # Upload the image | |
| blob.upload_from_string(image_data, content_type=content_type) | |
| # Make the blob publicly accessible | |
| blob.make_public() | |
| return blob.public_url | |
| app = Flask(__name__) | |
| # Enable CORS for cross-origin requests from the app | |
| CORS(app) | |
| def root(): | |
| return jsonify({"message": "GatePass backend running"}), 200 | |
| def health_check(): | |
| return jsonify({"status": "ok"}), 200 | |
| def log_request_info(): | |
| print(f"Incoming Request: {request.method} {request.path}") | |
| json_data = request.get_json(silent=True) | |
| if json_data: | |
| print(f"Payload keys: {list(json_data.keys())}") | |
| # Configuration | |
| MODEL_PATH = 'blaze_face_short_range.tflite' | |
| MODEL_URL = 'https://storage.googleapis.com/mediapipe-models/face_detector/blaze_face_short_range/float16/1/blaze_face_short_range.tflite' | |
| # NEW: Local storage folder for verified faces | |
| UPLOAD_FOLDER = 'storage' | |
| if not os.path.exists(UPLOAD_FOLDER): | |
| os.makedirs(UPLOAD_FOLDER) | |
| # NEW: Pass storage folder for IoT gate access | |
| # Must match the path in gate_server.py | |
| BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) | |
| PASS_STORAGE_FOLDER = os.path.join(BASE_DIR, 'app', 'src', 'services', 'storage', 'passes') | |
| if not os.path.exists(PASS_STORAGE_FOLDER): | |
| os.makedirs(PASS_STORAGE_FOLDER, exist_ok=True) | |
| print(f"Pass storage folder: {PASS_STORAGE_FOLDER}", flush=True) | |
| app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER | |
| app.config['PASS_STORAGE_FOLDER'] = PASS_STORAGE_FOLDER | |
| def download_model(): | |
| if not os.path.exists(MODEL_PATH): | |
| print(f"Downloading MediaPipe model from {MODEL_URL}...") | |
| try: | |
| response = requests.get(MODEL_URL) | |
| with open(MODEL_PATH, 'wb') as f: | |
| f.write(response.content) | |
| print("Model downloaded successfully.") | |
| except Exception as e: | |
| print(f"Error downloading model: {e}") | |
| print("Warning: Model download failed, running without face detection") | |
| # Try to initialize OpenCV Haar Cascade face detector | |
| CASCADE_PATHS = [ | |
| os.path.join(os.path.dirname(__file__), 'haarcascade_frontalface_default.xml'), | |
| '/usr/share/opencv4/haarcascades/haarcascade_frontalface_default.xml', | |
| '/usr/local/share/opencv4/haarcascades/haarcascade_frontalface_default.xml' | |
| ] | |
| face_cascade = None | |
| for path in CASCADE_PATHS: | |
| if os.path.exists(path): | |
| face_cascade = cv2.CascadeClassifier(path) | |
| if not face_cascade.empty(): | |
| print(f"OpenCV Haar Cascade face detector loaded: {path}", flush=True) | |
| break | |
| if face_cascade is None or face_cascade.empty(): | |
| print("Could not load Haar Cascade face detector. Face detection will not work.", flush=True) | |
| def serve_storage(filename): | |
| print(f"Serving file: {filename}", flush=True) | |
| return send_from_directory(app.config['UPLOAD_FOLDER'], filename) | |
| def verify_face(): | |
| """ | |
| Face verification endpoint for mobile app biometric authentication. | |
| - Compresses images to under 500KB | |
| - Uploads to Firebase Storage | |
| - Detects faces using OpenCV Haar Cascade | |
| """ | |
| try: | |
| data = request.json | |
| if not data or 'image' not in data: | |
| return jsonify({"error": "No image data provided"}), 400 | |
| user_id = data.get('userId', 'unknown_user') | |
| # Decode base64 image | |
| image_data = base64.b64decode(data['image']) | |
| # Open image with PIL for compression | |
| image = Image.open(io.BytesIO(image_data)) | |
| rgb_image_np = np.array(image.convert('RGB')) | |
| # Compress image to under 500KB | |
| max_width = 1024 | |
| max_height = 1024 | |
| max_size_kb = 500 | |
| # Resize if needed | |
| img_width, img_height = image.size | |
| if img_width > max_width or img_height > max_height: | |
| image.thumbnail((max_width, max_height)) | |
| # Compress and get image bytes | |
| compressed_buffer = io.BytesIO() | |
| image.save(compressed_buffer, format='JPEG', quality=85, optimize=True) | |
| compressed_data = compressed_buffer.getvalue() | |
| # Check size and reduce quality if needed | |
| quality = 85 | |
| while len(compressed_data) > max_size_kb * 1024 and quality > 20: | |
| quality -= 10 | |
| compressed_buffer = io.BytesIO() | |
| image.save(compressed_buffer, format='JPEG', quality=quality, optimize=True) | |
| compressed_data = compressed_buffer.getvalue() | |
| print(f"📸 Original size: {len(image_data) / 1024:.1f}KB, Compressed: {len(compressed_data) / 1024:.1f}KB (quality: {quality})") | |
| # Generate unique filename | |
| timestamp = int(time.time()) | |
| filename = f"{user_id}_{timestamp}.jpg" | |
| # Upload to Firebase Storage | |
| try: | |
| download_url = upload_to_firebase_storage(compressed_data, filename, 'image/jpeg') | |
| print(f"Image uploaded to Firebase: {download_url}", flush=True) | |
| except Exception as fb_error: | |
| print(f"Firebase upload failed: {fb_error}, saving locally", flush=True) | |
| # Fallback to local storage | |
| filepath = os.path.join(app.config['UPLOAD_FOLDER'], filename) | |
| with open(filepath, 'wb') as f: | |
| f.write(compressed_data) | |
| download_url = f"file://{filepath}" | |
| # Perform Face Detection for verification (OpenCV Haar Cascade) | |
| try: | |
| gray = cv2.cvtColor(rgb_image_np, cv2.COLOR_RGB2GRAY) | |
| faces = [] | |
| if face_cascade is not None and not face_cascade.empty(): | |
| faces = face_cascade.detectMultiScale(gray, scaleFactor=1.1, minNeighbors=5, minSize=(30, 30)) | |
| if len(faces) > 0: | |
| return jsonify({ | |
| "verified": True, | |
| "face_count": len(faces), | |
| "message": "Face detected successfully (OpenCV Haar Cascade)", | |
| "imageUrl": download_url, | |
| "filename": filename, | |
| "compressedSizeKB": round(len(compressed_data) / 1024, 1) | |
| }) | |
| else: | |
| return jsonify({ | |
| "verified": False, | |
| "message": "No face detected in the image" | |
| }), 200 | |
| except Exception as e: | |
| print(f"Face detection error: {e}") | |
| return jsonify({ | |
| "verified": False, | |
| "message": f"Face detection error: {str(e)}" | |
| }), 500 | |
| except Exception as e: | |
| print(f"Error: {e}") | |
| return jsonify({"error": str(e)}), 500 | |
| def save_pass(): | |
| """Save pass files (student image + QR code) to shared storage for IoT gate access""" | |
| try: | |
| data = request.json | |
| if not data: | |
| return jsonify({"error": "No data provided"}), 400 | |
| pass_id = data.get('pass_id') | |
| person_id = data.get('person_id') or data.get('student_id') or data.get('employee_id') or data.get('userId') | |
| student_image_base64 = data.get('student_image') or data.get('person_image') | |
| qr_image_base64 = data.get('qr_image') | |
| if not pass_id: | |
| return jsonify({"error": "pass_id is required"}), 400 | |
| pass_folder = app.config['PASS_STORAGE_FOLDER'] | |
| # Save student image | |
| student_image_path = None | |
| if student_image_base64: | |
| try: | |
| student_image_data = base64.b64decode(student_image_base64) | |
| student_image_filename = f"{pass_id}_person.jpg" | |
| student_image_path = os.path.join(pass_folder, student_image_filename) | |
| with open(student_image_path, 'wb') as f: | |
| f.write(student_image_data) | |
| print(f"💾 Person image saved: {student_image_path}") | |
| except Exception as img_err: | |
| print(f"⚠ Error saving student image: {img_err}") | |
| # Save QR code image | |
| qr_image_path = None | |
| if qr_image_base64: | |
| try: | |
| qr_image_data = base64.b64decode(qr_image_base64) | |
| qr_image_filename = f"{pass_id}_qr.png" | |
| qr_image_path = os.path.join(pass_folder, qr_image_filename) | |
| with open(qr_image_path, 'wb') as f: | |
| f.write(qr_image_data) | |
| print(f"💾 QR image saved: {qr_image_path}") | |
| except Exception as qr_err: | |
| print(f"⚠ Error saving QR image: {qr_err}") | |
| # Create/update index.json for IoT gate quick access | |
| index_file = os.path.join(pass_folder, 'index.json') | |
| index_data = {"passes": {}} | |
| if os.path.exists(index_file): | |
| try: | |
| with open(index_file, 'r') as f: | |
| index_data = json.load(f) | |
| except: | |
| pass | |
| index_data['passes'][pass_id] = { | |
| "person_id": person_id, | |
| "person_image": student_image_path, | |
| "qr_image": qr_image_path, | |
| "stored_at": str(os.path.getmtime(pass_folder)) if os.path.exists(pass_folder) else None | |
| } | |
| with open(index_file, 'w') as f: | |
| json.dump(index_data, f, indent=2) | |
| return jsonify({ | |
| "success": True, | |
| "pass_id": pass_id, | |
| "student_image": student_image_path, | |
| "qr_image": qr_image_path | |
| }) | |
| except Exception as e: | |
| print(f"Error saving pass: {e}") | |
| return jsonify({"error": str(e)}), 500 | |
| def upload_profile_image(): | |
| """ | |
| Upload profile image to Firebase Storage. | |
| This endpoint receives base64 image from React Native app and uploads to Firebase Storage | |
| using firebase-admin SDK (which doesn't have blob/ArrayBuffer issues). | |
| Expected JSON body: | |
| { | |
| "image": "base64_encoded_image_data", | |
| "userId": "user_id_string", | |
| "contentType": "image/jpeg" (optional, defaults to image/jpeg) | |
| } | |
| Returns: | |
| { | |
| "success": true, | |
| "downloadURL": "https://storage.googleapis.com/...", | |
| "storagePath": "profiles/profile_userId_timestamp.jpg", | |
| "filename": "profile_userId_timestamp.jpg", | |
| "userId": "user_id", | |
| "uploadedAt": "2026-02-07T12:48:00Z" | |
| } | |
| """ | |
| try: | |
| print("\n--- 📥 Received request to /upload-profile-image ---") | |
| # Handle native binary upload (from FileSystem.uploadAsync) | |
| if request.content_type == 'image/jpeg': | |
| print("📦 Detected binary upload stream") | |
| image_data = request.data | |
| user_id = request.headers.get('x-user-id', 'unknown_user') | |
| content_type = 'image/jpeg' | |
| if not image_data: | |
| print("❌ Error: Empty binary data") | |
| return jsonify({'success': False, 'error': 'Empty binary data'}), 400 | |
| else: | |
| # Handle standard JSON base64 upload | |
| print("📄 Detected JSON base64 upload") | |
| data = request.json | |
| if not data or 'image' not in data: | |
| print("❌ Error: No image data provided") | |
| return jsonify({"error": "No image data provided"}), 400 | |
| image_base64 = data['image'] | |
| user_id = data.get('userId', 'unknown_user') | |
| content_type = data.get('contentType', 'image/jpeg') | |
| # Remove data URL prefix if present | |
| if 'base64,' in image_base64: | |
| image_base64 = image_base64.split('base64,')[1] | |
| image_data = base64.b64decode(image_base64) | |
| print(f"👤 User: {user_id} | Size: {len(image_data) / 1024:.1f}KB") | |
| # Generate clean filename with proper naming convention | |
| timestamp = int(time.time()) | |
| uploaded_at = datetime.now(timezone.utc).isoformat() + 'Z' | |
| extension = 'png' if content_type == 'image/png' else 'jpg' | |
| filename = f"profile_{user_id}_{timestamp}.{extension}" | |
| storage_path = f"profiles/{filename}" | |
| # Upload to Firebase Storage | |
| storage_client = get_firebase_storage() | |
| if not storage_client: | |
| raise Exception("Firebase Storage not available") | |
| bucket = storage_client.bucket() | |
| blob = bucket.blob(storage_path) | |
| # Upload the image | |
| blob.upload_from_string(image_data, content_type=content_type) | |
| # Make the blob publicly accessible | |
| blob.make_public() | |
| download_url = blob.public_url | |
| print(f"✅ Upload successful: {download_url}", flush=True) | |
| return jsonify({ | |
| "success": True, | |
| "downloadURL": download_url, | |
| "storagePath": storage_path, | |
| "filename": filename, | |
| "userId": user_id, | |
| "uploadedAt": uploaded_at, | |
| "sizeKB": round(len(image_data) / 1024, 1) | |
| }) | |
| except Exception as e: | |
| print(f"❌ Error uploading profile image: {e}") | |
| return jsonify({"error": str(e)}), 500 | |
| def upload_gatepass(): | |
| """ | |
| Upload full gate pass image to Firebase Storage. | |
| Organizes by date: Pass_images/YYYY-MM-DD/studentId_timestamp.png | |
| """ | |
| try: | |
| data = request.json | |
| if not data or 'image' not in data: | |
| return jsonify({"error": "No image data provided"}), 400 | |
| user_id = data.get('userId') or data.get('studentId') or data.get('employeeId') or 'unknown_user' | |
| content_type = data.get('contentType', 'image/png') | |
| print(f"Uploading gatepass for user: {user_id}", flush=True) | |
| # Decode base64 image | |
| image_base64 = data['image'] | |
| if 'base64,' in image_base64: | |
| image_base64 = image_base64.split('base64,')[1] | |
| image_data = base64.b64decode(image_base64) | |
| # Generate path: Pass_images/YYYY-MM-DD/STUDENTID_TIMESTAMP.png | |
| now = datetime.now(timezone.utc) | |
| date_folder = now.strftime('%Y-%m-%d') | |
| timestamp = int(now.timestamp()) | |
| extension = 'png' if content_type == 'image/png' else 'jpg' | |
| filename = f"{user_id}_{timestamp}.{extension}" | |
| storage_path = f"Pass_images/{date_folder}/{filename}" | |
| print(f"📁 Storage path: {storage_path}") | |
| # Upload to Firebase Storage | |
| storage = get_firebase_storage() | |
| if not storage: | |
| raise Exception("Firebase Storage not available") | |
| bucket = storage.bucket() | |
| blob = bucket.blob(storage_path) | |
| blob.upload_from_string(image_data, content_type=content_type) | |
| blob.make_public() | |
| print(f"Gatepass uploaded: {blob.public_url}", flush=True) | |
| return jsonify({ | |
| "success": True, | |
| "downloadURL": blob.public_url, | |
| "storagePath": storage_path, | |
| "filename": filename, | |
| "uploadedAt": now.isoformat() + 'Z' | |
| }) | |
| except Exception as e: | |
| print(f"Error uploading gatepass: {e}", flush=True) | |
| return jsonify({"error": str(e)}), 500 | |
| def upload_visitor_photo(): | |
| """ | |
| Upload visitor photo from IoT gate to Firebase Storage and create Firestore document. | |
| Organizes by date: visitor/YYYY-MM-DD/visitorName_timestamp.jpg | |
| """ | |
| try: | |
| data = request.json | |
| if not data or 'image' not in data: | |
| return jsonify({"error": "No image data provided"}), 400 | |
| visitor_name = data.get('visitor_name', 'unknown_visitor') | |
| destination = data.get('destination', '') | |
| storage_path = data.get('storage_path', '') | |
| meeting_person = data.get('meeting_person', '') | |
| # Always set pass_type to visitor for visitor photo uploads | |
| pass_type = data.get('pass_type', 'visitor') | |
| print(f"Uploading visitor photo for: {visitor_name}", flush=True) | |
| # Decode base64 image | |
| image_base64 = data['image'] | |
| if 'base64,' in image_base64: | |
| image_base64 = image_base64.split('base64,')[1] | |
| image_data = base64.b64decode(image_base64) | |
| # If storage_path not provided, generate it | |
| if not storage_path: | |
| now = datetime.now(timezone.utc) | |
| date_folder = now.strftime('%Y-%m-%d') | |
| timestamp = int(now.timestamp()) | |
| safe_name = visitor_name.replace(' ', '_').lower() | |
| storage_path = f"visitor/{date_folder}/{safe_name}_{timestamp}.jpg" | |
| print(f"📁 Storage path: {storage_path}") | |
| # Upload to Firebase Storage | |
| storage = get_firebase_storage() | |
| if not storage: | |
| raise Exception("Firebase Storage not available") | |
| bucket = storage.bucket() | |
| blob = bucket.blob(storage_path) | |
| blob.upload_from_string(image_data, content_type='image/jpeg') | |
| blob.make_public() | |
| download_url = blob.public_url | |
| print(f"Visitor photo uploaded: {download_url}", flush=True) | |
| # Create Firestore document for mobile app real-time updates | |
| try: | |
| firestore_client = get_firestore_client() | |
| if not firestore_client: | |
| raise Exception("Firestore client not available") | |
| # For SERVER_TIMESTAMP, we need firestore from firebase_admin | |
| from firebase_admin import firestore | |
| doc_ref = firestore_client.collection('visitor_photos').document() | |
| doc_ref.set({ | |
| 'visitor_name': visitor_name, | |
| 'destination': destination, | |
| 'meeting_person': meeting_person, | |
| 'photo_url': download_url, | |
| 'photo_storage_path': storage_path, | |
| 'created_at': firestore.SERVER_TIMESTAMP, | |
| 'verification_mode': 'entry', | |
| 'pass_type': pass_type # Include pass_type in Firestore document | |
| }) | |
| doc_id = doc_ref.id | |
| print(f"Created visitor_photos document: {doc_id}", flush=True) | |
| # Also create/update visitor registration log | |
| registration_ref = firestore_client.collection('visitor_registrations').document() | |
| registration_ref.set({ | |
| 'visitor_name': visitor_name, | |
| 'destination': destination, | |
| 'meeting_person': meeting_person, | |
| 'photo_url': download_url, | |
| 'photo_storage_path': storage_path, | |
| 'status': 'photo_captured', | |
| 'created_at': firestore.SERVER_TIMESTAMP, | |
| 'updated_at': firestore.SERVER_TIMESTAMP, | |
| 'source': 'iot_gate_verification', | |
| 'pass_type': pass_type # Include pass_type in registration document | |
| }) | |
| reg_doc_id = registration_ref.id | |
| print(f"Created visitor_registrations document: {reg_doc_id}", flush=True) | |
| return jsonify({ | |
| "success": True, | |
| "downloadURL": download_url, | |
| "storagePath": storage_path, | |
| "documentId": doc_id, | |
| "registrationDocId": reg_doc_id, | |
| "visitor_name": visitor_name, | |
| "uploadedAt": datetime.now(timezone.utc).isoformat() + 'Z' | |
| }) | |
| except Exception as firestore_error: | |
| print(f"Firestore document creation failed: {firestore_error}", flush=True) | |
| return jsonify({ | |
| "success": True, | |
| "downloadURL": download_url, | |
| "storagePath": storage_path, | |
| "visitor_name": visitor_name, | |
| "firestore_error": str(firestore_error), | |
| "uploadedAt": datetime.now(timezone.utc).isoformat() + 'Z' | |
| }) | |
| except Exception as e: | |
| print(f"Error uploading visitor photo: {e}", flush=True) | |
| return jsonify({"error": str(e)}), 500 | |
| if __name__ == '__main__': | |
| print("DEBUG: Entering main block", flush=True) | |
| debug_mode = True | |
| enable_push_dispatcher = os.getenv('ENABLE_PUSH_DISPATCHER', '1') == '1' | |
| is_reloader_child = os.environ.get('WERKZEUG_RUN_MAIN') == 'true' | |
| if enable_push_dispatcher and ((not debug_mode) or is_reloader_child): | |
| try: | |
| from push_dispatcher import start_push_dispatcher_in_background | |
| start_push_dispatcher_in_background() | |
| print("DEBUG: Push dispatcher started", flush=True) | |
| except Exception as push_err: | |
| print(f"DEBUG: Push dispatcher failed to start: {push_err}", flush=True) | |
| print("Face Verification Server starting on port 5000...", flush=True) | |
| app.run(host='0.0.0.0', port=5000, debug=debug_mode) | |