import os import gc import asyncio import pandas as pd import numpy as np import pickle from contextlib import asynccontextmanager from fastapi import FastAPI from fastapi.responses import HTMLResponse from fastapi.staticfiles import StaticFiles from fastapi.templating import Jinja2Templates from starlette.requests import Request import uvicorn # --- 🛡️ SHIELDS --- os.environ["CUDA_VISIBLE_DEVICES"] = "-1" os.environ["TF_CPP_MIN_LOG_LEVEL"] = "3" # --- 🩹 Keras Monkey Patch --- import tensorflow as tf import keras import sys sys.modules['tensorflow.keras'] = keras sys.modules['tensorflow.keras.layers'] = keras.layers sys.modules['tensorflow.keras.models'] = keras.models from deepface import DeepFace # Custom Modules from core.config import LOG_FILE, MODELS, DETECTOR from core.state import attendance_memory, KNOWN_VECTORS from api.websocket import websocket_endpoint # --- Registration Service (Internal) --- def auto_register_faces(): """Generates embeddings.pickle on the fly if missing using direct 1-to-1 mapping.""" pickle_path = "core/embeddings.pickle" db_path = "faces_db" if os.path.exists(pickle_path): return if not os.path.exists(db_path) or not os.listdir(db_path): print("⚠️ faces_db is empty.") return print("🚀 Auto-Registering Faces on Cloud CPU...") identity_map = {} image_files = [f for f in os.listdir(db_path) if f.lower().endswith(('.jpg', '.jpeg', '.png'))] for filename in image_files: name = os.path.splitext(filename)[0] img_path = os.path.join(db_path, filename) print(f"📸 Cloud Encoding: {name}...") try: identity_map[name] = {} for model in MODELS: emb_objs = DeepFace.represent(img_path=img_path, model_name=model, enforce_detection=True, detector_backend=DETECTOR) if emb_objs: vector = np.array(emb_objs[0]["embedding"]) identity_map[name][model] = vector / np.linalg.norm(vector) print(f"✅ Encoded: {name}") except Exception as e: print(f"❌ Failed {name}: {e}") with open(pickle_path, "wb") as f: pickle.dump(identity_map, f) print(f"✨ Mathematical Brain Created Successfully!") @asynccontextmanager async def lifespan(app: FastAPI): # --- STARTUP PHASE --- # 1. Run Auto-Registration if needed auto_register_faces() # 2. Load the Hard Pickle into RAM pickle_path = "core/embeddings.pickle" if os.path.exists(pickle_path): try: with open(pickle_path, "rb") as f: data = pickle.load(f) KNOWN_VECTORS.update(data) print(f"🧠 Mathematical Brain Loaded: Found {len(KNOWN_VECTORS)} identities.") except Exception as e: print(f"⚠️ Pickle Load Error: {e}") else: print("⚠️ WARNING: core/embeddings.pickle not found. Run register_faces.py first!") # 3. Load historical logs into RAM if os.path.exists(LOG_FILE): try: df = pd.read_csv(LOG_FILE) for _, row in df.iterrows(): attendance_memory.insert(0, {"name": row['Name'], "time": row['Time']}) print(f"✅ Loaded {len(attendance_memory)} records.") except: pass else: with open(LOG_FILE, "w") as f: f.write("Name,Time\n") yield # --- SHUTDOWN PHASE --- pass # --- AI Warmup Sequence --- print("🚀 Booting Dynamic Consensus AI Core...") try: for model in MODELS: DeepFace.build_model(model) # Note: RetinaFace is a detector, not a recognition model, so we don't build it via build_model() print(f"🧠 {', '.join(MODELS)} Networks Loaded.") gc.collect() except Exception as e: print(f"⚠️ AI Boot Error: {e}") # --- FastAPI Application --- app = FastAPI(title="Enterprise Attendance AI", version="5.0.0", lifespan=lifespan) app.mount("/static", StaticFiles(directory="static"), name="static") templates = Jinja2Templates(directory="templates") # Register the websocket route app.add_api_websocket_route("/ws", websocket_endpoint) @app.get("/", response_class=HTMLResponse) async def read_root(request: Request): """Serves the main application dashboard.""" return templates.TemplateResponse("index.html", {"request": request}) if __name__ == "__main__": uvicorn.run(app, host="0.0.0.0", port=7860)