Spaces:
Sleeping
Sleeping
| import os | |
| import gc | |
| import asyncio | |
| import pandas as pd | |
| import numpy as np | |
| import pickle | |
| from contextlib import asynccontextmanager | |
| from fastapi import FastAPI | |
| from fastapi.responses import HTMLResponse | |
| from fastapi.staticfiles import StaticFiles | |
| from fastapi.templating import Jinja2Templates | |
| from starlette.requests import Request | |
| import uvicorn | |
| # --- 🛡️ SHIELDS --- | |
| os.environ["CUDA_VISIBLE_DEVICES"] = "-1" | |
| os.environ["TF_CPP_MIN_LOG_LEVEL"] = "3" | |
| # --- 🩹 Keras Monkey Patch --- | |
| import tensorflow as tf | |
| import keras | |
| import sys | |
| sys.modules['tensorflow.keras'] = keras | |
| sys.modules['tensorflow.keras.layers'] = keras.layers | |
| sys.modules['tensorflow.keras.models'] = keras.models | |
| from deepface import DeepFace | |
| # Custom Modules | |
| from core.config import LOG_FILE, MODELS, DETECTOR | |
| from core.state import attendance_memory, KNOWN_VECTORS | |
| from api.websocket import websocket_endpoint | |
| # --- Registration Service (Internal) --- | |
| def auto_register_faces(): | |
| """Generates embeddings.pickle on the fly if missing using direct 1-to-1 mapping.""" | |
| pickle_path = "core/embeddings.pickle" | |
| db_path = "faces_db" | |
| if os.path.exists(pickle_path): | |
| return | |
| if not os.path.exists(db_path) or not os.listdir(db_path): | |
| print("⚠️ faces_db is empty.") | |
| return | |
| print("🚀 Auto-Registering Faces on Cloud CPU...") | |
| identity_map = {} | |
| image_files = [f for f in os.listdir(db_path) if f.lower().endswith(('.jpg', '.jpeg', '.png'))] | |
| for filename in image_files: | |
| name = os.path.splitext(filename)[0] | |
| img_path = os.path.join(db_path, filename) | |
| print(f"📸 Cloud Encoding: {name}...") | |
| try: | |
| identity_map[name] = {} | |
| for model in MODELS: | |
| emb_objs = DeepFace.represent(img_path=img_path, model_name=model, | |
| enforce_detection=True, detector_backend=DETECTOR) | |
| if emb_objs: | |
| vector = np.array(emb_objs[0]["embedding"]) | |
| identity_map[name][model] = vector / np.linalg.norm(vector) | |
| print(f"✅ Encoded: {name}") | |
| except Exception as e: | |
| print(f"❌ Failed {name}: {e}") | |
| with open(pickle_path, "wb") as f: | |
| pickle.dump(identity_map, f) | |
| print(f"✨ Mathematical Brain Created Successfully!") | |
| async def lifespan(app: FastAPI): | |
| # --- STARTUP PHASE --- | |
| # 1. Run Auto-Registration if needed | |
| auto_register_faces() | |
| # 2. Load the Hard Pickle into RAM | |
| pickle_path = "core/embeddings.pickle" | |
| if os.path.exists(pickle_path): | |
| try: | |
| with open(pickle_path, "rb") as f: | |
| data = pickle.load(f) | |
| KNOWN_VECTORS.update(data) | |
| print(f"🧠 Mathematical Brain Loaded: Found {len(KNOWN_VECTORS)} identities.") | |
| except Exception as e: | |
| print(f"⚠️ Pickle Load Error: {e}") | |
| else: | |
| print("⚠️ WARNING: core/embeddings.pickle not found. Run register_faces.py first!") | |
| # 3. Load historical logs into RAM | |
| if os.path.exists(LOG_FILE): | |
| try: | |
| df = pd.read_csv(LOG_FILE) | |
| for _, row in df.iterrows(): | |
| attendance_memory.insert(0, {"name": row['Name'], "time": row['Time']}) | |
| print(f"✅ Loaded {len(attendance_memory)} records.") | |
| except: pass | |
| else: | |
| with open(LOG_FILE, "w") as f: f.write("Name,Time\n") | |
| yield | |
| # --- SHUTDOWN PHASE --- | |
| pass | |
| # --- AI Warmup Sequence --- | |
| print("🚀 Booting Dynamic Consensus AI Core...") | |
| try: | |
| for model in MODELS: | |
| DeepFace.build_model(model) | |
| # Note: RetinaFace is a detector, not a recognition model, so we don't build it via build_model() | |
| print(f"🧠 {', '.join(MODELS)} Networks Loaded.") | |
| gc.collect() | |
| except Exception as e: | |
| print(f"⚠️ AI Boot Error: {e}") | |
| # --- FastAPI Application --- | |
| app = FastAPI(title="Enterprise Attendance AI", version="5.0.0", lifespan=lifespan) | |
| app.mount("/static", StaticFiles(directory="static"), name="static") | |
| templates = Jinja2Templates(directory="templates") | |
| # Register the websocket route | |
| app.add_api_websocket_route("/ws", websocket_endpoint) | |
| async def read_root(request: Request): | |
| """Serves the main application dashboard.""" | |
| return templates.TemplateResponse("index.html", {"request": request}) | |
| if __name__ == "__main__": | |
| uvicorn.run(app, host="0.0.0.0", port=7860) | |