from fastapi import FastAPI, File, UploadFile, Form, HTTPException, Depends, status from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from fastapi.responses import HTMLResponse, JSONResponse import numpy as np import cv2 import os import json import secrets from datetime import datetime, timedelta from typing import List, Dict, Optional, Tuple import requests app = FastAPI(title="AEFRS Face Recognition System", docs_url="/docs") security = HTTPBearer() # ========== إعداد المسارات ========== os.makedirs("artifacts/vector_index", exist_ok=True) os.makedirs("artifacts/metadata", exist_ok=True) os.makedirs("artifacts/models", exist_ok=True) # ========== تحميل النماذج ========== detection_session = None embedding_session = None def download_model(url: str, save_path: str) -> bool: """تحميل نموذج من رابط مباشر""" try: print(f"📥 Downloading model from {url}...") response = requests.get(url, stream=True, timeout=30) if response.status_code == 200: with open(save_path, 'wb') as f: for chunk in response.iter_content(chunk_size=8192): f.write(chunk) print(f"✅ Model saved to {save_path}") return True else: print(f"❌ Download failed: HTTP {response.status_code}") return False except Exception as e: print(f"❌ Download error: {e}") return False def load_onnx_models(): """تحميل نماذج ONNX""" global detection_session, embedding_session # محاولة تحميل RetinaFace (نموذج خفيف) retinaface_path = "artifacts/models/retinaface.onnx" if os.path.exists(retinaface_path): try: import onnxruntime as ort detection_session = ort.InferenceSession(retinaface_path) print("✅ RetinaFace model loaded from file") except Exception as e: print(f"⚠️ Failed to load RetinaFace: {e}") else: print(f"⚠️ RetinaFace not found at {retinaface_path}") # محاولة تحميل ArcFace arcface_path = "artifacts/models/arcface_iresnet100.onnx" # إذا لم يكن النموذج موجوداً، حاول تحميله if not os.path.exists(arcface_path): print("📥 ArcFace model not found, attempting to download...") # رابط نموذج MobileFaceNet (خفيف - 5MB) url = "https://github.com/leondgarse/Keras_insightface/releases/download/v1.0.0/mobilefacenet_128.onnx" if download_model(url, arcface_path): print("✅ ArcFace model downloaded successfully") # تحميل النموذج إذا كان موجوداً if os.path.exists(arcface_path): try: import onnxruntime as ort embedding_session = ort.InferenceSession(arcface_path) print("✅ ArcFace model loaded successfully") except Exception as e: print(f"⚠️ Failed to load ArcFace: {e}") else: print(f"⚠️ ArcFace model not available, using fallback mode") # تحميل النماذج load_onnx_models() # ========== تخزين البيانات ========== active_tokens = {} identities_db = {} vector_index = [] DATA_FILE = "artifacts/data.json" def load_data(): global identities_db, vector_index if os.path.exists(DATA_FILE): try: with open(DATA_FILE, 'r') as f: data = json.load(f) identities_db = data.get('identities', {}) vector_index = data.get('vector_index', []) print(f"✅ Loaded {len(identities_db)} identities") except Exception as e: print(f"Error loading data: {e}") def save_data(): try: with open(DATA_FILE, 'w') as f: json.dump({ 'identities': identities_db, 'vector_index': vector_index }, f, indent=2) print(f"✅ Saved {len(identities_db)} identities") except Exception as e: print(f"Error saving data: {e}") load_data() # ========== دوال التوكن ========== def generate_token(username: str) -> str: token = secrets.token_urlsafe(32) active_tokens[token] = { "username": username, "expires": datetime.now() + timedelta(hours=24) } return token def verify_token(credentials: HTTPAuthorizationCredentials = Depends(security)): token = credentials.credentials if token not in active_tokens: raise HTTPException(status_code=401, detail="Invalid token") token_data = active_tokens[token] if datetime.now() > token_data["expires"]: del active_tokens[token] raise HTTPException(status_code=401, detail="Token expired") return token_data["username"] # ========== استخراج الميزات ========== def extract_features_fallback(image_bytes) -> np.ndarray: """استخراج ميزات بديلة (تحسين للتمييز بين الأشخاص)""" img_array = np.frombuffer(image_bytes, np.uint8) img = cv2.imdecode(img_array, cv2.IMREAD_COLOR) if img is None: return None gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY) resized = cv2.resize(gray, (128, 128)) # استخدام HOG مع إعدادات أفضل للتمييز try: from skimage.feature import hog features = hog(resized, orientations=12, pixels_per_cell=(8, 8), cells_per_block=(2, 2), visualize=False) except: # بديل بسيط إذا لم تكن skimage متاحة features = resized.flatten() / 255.0 # تقليل الأبعاد step = len(features) // 512 if step > 0: features = features[::step][:512] else: features = np.pad(features, (0, 512 - len(features))) # تطبيع features = (features - features.mean()) / (features.std() + 1e-6) return features.astype(np.float32) def extract_face_embedding(image_bytes) -> np.ndarray: """استخراج embedding من الوجه""" global embedding_session # إذا كان ArcFace متاحاً، استخدمه if embedding_session is not None: try: img_array = np.frombuffer(image_bytes, np.uint8) img = cv2.imdecode(img_array, cv2.IMREAD_COLOR) if img is None: return extract_features_fallback(image_bytes) # تجهيز الصورة لـ ArcFace face = cv2.resize(img, (112, 112)) face = face.astype(np.float32) face = (face - 127.5) / 128.0 face = np.transpose(face, (2, 0, 1)) face = np.expand_dims(face, axis=0) # استخراج embedding input_name = embedding_session.get_inputs()[0].name embedding = embedding_session.run(None, {input_name: face})[0][0] return embedding.astype(np.float32) except Exception as e: print(f"ArcFace error: {e}") # استخدام الـ fallback return extract_features_fallback(image_bytes) def cosine_similarity(a, b): """حساب التشابه""" a = np.array(a) b = np.array(b) norm_a = np.linalg.norm(a) norm_b = np.linalg.norm(b) if norm_a == 0 or norm_b == 0: return 0.0 similarity = np.dot(a, b) / (norm_a * norm_b) similarity = (similarity + 1) / 2 return max(0.0, min(1.0, similarity)) # ========== API Endpoints ========== @app.get("/healthz") async def health(): return { "status": "ok", "system": "AEFRS", "timestamp": datetime.now().isoformat(), "identities_count": len(identities_db), "models": { "arcface": embedding_session is not None }, "mode": "production" if embedding_session else "fallback" } @app.post("/v1/token") async def login(username: str = Form(...), password: str = Form(...)): if username and password: token = generate_token(username) return { "access_token": token, "token_type": "bearer", "expires_in": 86400, "username": username } raise HTTPException(status_code=401, detail="Invalid credentials") @app.post("/v1/enroll") async def enroll( credentials: HTTPAuthorizationCredentials = Depends(security), identity_id: str = Form(...), name: str = Form(...), image: UploadFile = File(...) ): username = verify_token(credentials) try: contents = await image.read() embedding = extract_face_embedding(contents) if embedding is None: return JSONResponse( status_code=400, content={"status": "error", "message": "Could not extract face features"} ) identities_db[identity_id] = { "name": name, "embedding": embedding.tolist(), "created_at": datetime.now().isoformat(), "created_by": username } vector_index.append({ "identity_id": identity_id, "name": name, "embedding": embedding.tolist() }) save_data() return { "status": "success", "message": f"تم تسجيل {name} بنجاح", "identity_id": identity_id, "name": name, "embedding_dim": len(embedding), "mode": "production" if embedding_session else "fallback" } except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.post("/v1/search") async def search( credentials: HTTPAuthorizationCredentials = Depends(security), image: UploadFile = File(...), top_k: int = Form(5) ): username = verify_token(credentials) try: contents = await image.read() query_embedding = extract_face_embedding(contents) if query_embedding is None: return JSONResponse( status_code=400, content={"status": "error", "message": "Could not extract face features"} ) results = [] for item in vector_index: similarity = cosine_similarity(query_embedding, item["embedding"]) results.append({ "identity_id": item["identity_id"], "name": item["name"], "similarity": similarity, "similarity_percent": round(similarity * 100, 2) }) results.sort(key=lambda x: x["similarity"], reverse=True) results = results[:top_k] return { "status": "success", "message": "Search completed", "matches": results, "total_matches": len(results), "search_by": username, "mode": "production" if embedding_session else "fallback" } except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @app.get("/v1/identities") async def list_identities( credentials: HTTPAuthorizationCredentials = Depends(security), limit: int = 100, offset: int = 0 ): verify_token(credentials) identities_list = [ { "identity_id": k, "name": v["name"], "created_at": v["created_at"] } for k, v in identities_db.items() ] return { "total": len(identities_list), "identities": identities_list[offset:offset+limit] } @app.delete("/v1/identity/{identity_id}") async def delete_identity( identity_id: str, credentials: HTTPAuthorizationCredentials = Depends(security) ): verify_token(credentials) if identity_id not in identities_db: raise HTTPException(status_code=404, detail="Identity not found") del identities_db[identity_id] global vector_index vector_index = [item for item in vector_index if item["identity_id"] != identity_id] save_data() return {"message": f"Deleted {identity_id}"} @app.post("/v1/logout") async def logout(credentials: HTTPAuthorizationCredentials = Depends(security)): token = credentials.credentials if token in active_tokens: del active_tokens[token] return {"message": "Logged out"} raise HTTPException(status_code=400, detail="Invalid token") # ========== واجهة المستخدم ========== # (نفس الـ HTML السابق مع إضافة عرض وضع التشغيل) HTML_PAGE = """ AEFRS - نظام التعرف على الوجوه

🧠 AEFRS

نظام التعرف على الوجوه المتقدم

🔄 جاري الاتصال...

🔐 تسجيل الدخول

""" @app.get("/", response_class=HTMLResponse) async def ui(): return HTMLResponse(content=HTML_PAGE) if __name__ == "__main__": import uvicorn port = int(os.environ.get("PORT", 7860)) print(f"🚀 Server on http://0.0.0.0:{port}") print(f"📊 ArcFace mode: {'PRODUCTION' if embedding_session else 'FALLBACK (HOG)'}") uvicorn.run(app, host="0.0.0.0", port=port)