| """ |
| Arabic Sign Language Interpreter - FastAPI Server |
| """ |
| import io |
| import base64 |
| import inspect |
| import sys |
| import os |
| import types |
| import shutil |
| from unittest.mock import MagicMock |
|
|
| import numpy as np |
| import cv2 |
| import torch |
| import joblib |
| import pandas as pd |
| from pathlib import Path |
| from scipy.spatial import distance |
| from torchvision import transforms |
| from PIL import Image |
| from contextlib import asynccontextmanager |
|
|
| from fastapi import FastAPI, File, UploadFile, HTTPException |
| from fastapi.middleware.cors import CORSMiddleware |
| from fastapi.responses import JSONResponse |
| import uvicorn |
| from huggingface_hub import hf_hub_download |
|
|
| |
| if not hasattr(inspect, "getargspec"): |
| inspect.getargspec = inspect.getfullargspec |
|
|
| for attr, typ in [("int", int), ("float", float), ("complex", complex), |
| ("bool", bool), ("object", object), ("str", str), ("unicode", str)]: |
| if not hasattr(np, attr): |
| setattr(np, attr, typ) |
|
|
| |
| pyrender_mock = types.ModuleType("pyrender") |
| for _attr in ["Scene", "Mesh", "Node", "PerspectiveCamera", "DirectionalLight", |
| "PointLight", "SpotLight", "OffscreenRenderer", "RenderFlags", |
| "Viewer", "MetallicRoughnessMaterial"]: |
| setattr(pyrender_mock, _attr, MagicMock) |
| sys.modules["pyrender"] = pyrender_mock |
|
|
| for _mod in ["OpenGL", "OpenGL.GL", "OpenGL.GL.framebufferobjects", |
| "OpenGL.platform", "OpenGL.error"]: |
| if _mod not in sys.modules: |
| sys.modules[_mod] = types.ModuleType(_mod) |
|
|
| os.environ["PYOPENGL_PLATFORM"] = "osmesa" |
|
|
| |
| CLASSES = {0: "letter", 1: "number"} |
| IMG_SIZE = 64 |
|
|
| |
| REPO_ID = "SondosM/api_GP" |
|
|
| def get_hf_file(filename, is_mano=False): |
| print(f"Downloading {filename} from {REPO_ID}...") |
| temp_path = hf_hub_download(repo_id=REPO_ID, filename=filename) |
|
|
| if is_mano: |
| os.makedirs("./mano_data", exist_ok=True) |
| target_path = os.path.join("./mano_data", os.path.basename(filename)) |
| if not os.path.exists(target_path): |
| shutil.copy(temp_path, target_path) |
| print(f"Copied {filename} to {target_path}") |
| return target_path |
|
|
| return temp_path |
|
|
| |
| print("Initializing model file paths...") |
|
|
| get_hf_file("mano_data/mano_data/mano_mean_params.npz", is_mano=True) |
| get_hf_file("mano_data/mano_data/MANO_LEFT.pkl", is_mano=True) |
| get_hf_file("mano_data/mano_data/MANO_RIGHT.pkl", is_mano=True) |
|
|
| WILOR_REPO_PATH = "./WiLoR" |
| WILOR_CKPT = get_hf_file("pretrained_models/pretrained_models/wilor_final.ckpt") |
| WILOR_CFG = get_hf_file("pretrained_models/pretrained_models/model_config.yaml") |
| DETECTOR_PATH = get_hf_file("pretrained_models/pretrained_models/detector.pt") |
| ROUTER_MODEL_PATH = get_hf_file("router_model.keras") |
| MLP_LETTERS_PATH = get_hf_file("MLP_letters.pkl") |
| MLP_NUMBERS_PATH = get_hf_file("MLP_numbers.pkl") |
|
|
| DEVICE = "cuda" if torch.cuda.is_available() else "cpu" |
|
|
| WILOR_TRANSFORM = transforms.Compose([ |
| transforms.ToTensor(), |
| transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]) |
| ]) |
|
|
| wilor_model = None |
| yolo_detector = None |
| router_model_keras = None |
| mlp_letters = None |
| mlp_numbers = None |
|
|
|
|
| def load_models(): |
| global wilor_model, yolo_detector, router_model_keras, mlp_letters, mlp_numbers |
|
|
| sys.path.insert(0, WILOR_REPO_PATH) |
| from wilor.models import load_wilor |
| from ultralytics import YOLO |
| from tensorflow.keras.models import load_model |
|
|
| print(f"Loading WiLoR on {DEVICE}...") |
| wilor_model, _ = load_wilor(checkpoint_path=WILOR_CKPT, cfg_path=WILOR_CFG) |
| wilor_model.to(DEVICE) |
| wilor_model.eval() |
|
|
| print("Loading YOLO detector...") |
| yolo_detector = YOLO(DETECTOR_PATH) |
|
|
| print("Loading router model (Keras)...") |
| router_model_keras = load_model(ROUTER_MODEL_PATH) |
|
|
| print("Loading MLP classifiers...") |
| mlp_letters = joblib.load(MLP_LETTERS_PATH) |
| mlp_numbers = joblib.load(MLP_NUMBERS_PATH) |
|
|
| print("โ
All models loaded successfully!") |
|
|
|
|
| @asynccontextmanager |
| async def lifespan(app: FastAPI): |
| load_models() |
| yield |
|
|
|
|
| app = FastAPI(title="Arabic Sign Language Interpreter", lifespan=lifespan) |
|
|
| app.add_middleware( |
| CORSMiddleware, |
| allow_origins=["*"], |
| allow_methods=["*"], |
| allow_headers=["*"], |
| ) |
|
|
|
|
| |
| |
| |
|
|
| def extract_features(crop_rgb: np.ndarray) -> np.ndarray | None: |
| img_input = cv2.resize(crop_rgb, (256, 256)) |
| img_tensor = WILOR_TRANSFORM(img_input).unsqueeze(0).to(DEVICE) |
|
|
| with torch.no_grad(): |
| output = wilor_model({"img": img_tensor}) |
|
|
| if "pred_mano_params" not in output or "pred_keypoints_3d" not in output: |
| return None |
|
|
| mano = output["pred_mano_params"] |
| hand_pose = mano["hand_pose"][0].cpu().numpy().flatten() |
| global_orient = mano["global_orient"][0].cpu().numpy().flatten() |
| theta = np.concatenate([global_orient, hand_pose]) |
|
|
| joints = output["pred_keypoints_3d"][0].cpu().numpy() |
| tips = [4, 8, 12, 16, 20] |
| hand_scale = distance.euclidean(joints[0], joints[9]) + 1e-8 |
|
|
| dist_feats = [] |
| for i in range(1, 5): |
| dist_feats.append(distance.euclidean(joints[tips[0]], joints[tips[i]]) / hand_scale) |
| for i in range(1, 4): |
| dist_feats.append(distance.euclidean(joints[tips[i]], joints[tips[i + 1]]) / hand_scale) |
|
|
| return np.concatenate([theta, dist_feats]) |
|
|
|
|
| def get_3d_joints(crop_rgb: np.ndarray) -> np.ndarray: |
| img_input = cv2.resize(crop_rgb, (256, 256)) |
| img_tensor = WILOR_TRANSFORM(img_input).unsqueeze(0).to(DEVICE) |
| with torch.no_grad(): |
| output = wilor_model({"img": img_tensor}) |
| return output["pred_keypoints_3d"][0].cpu().numpy() |
|
|
|
|
| def read_image_from_upload(file_bytes: bytes) -> np.ndarray: |
| arr = np.frombuffer(file_bytes, np.uint8) |
| img = cv2.imdecode(arr, cv2.IMREAD_COLOR) |
| if img is None: |
| raise HTTPException(status_code=400, detail="Invalid image format.") |
| return img |
|
|
|
|
| def _align_features(model, features: np.ndarray) -> np.ndarray: |
| if hasattr(model, "feature_names_in_"): |
| expected_cols = model.feature_names_in_ |
| vec = np.zeros(len(expected_cols)) |
| limit = min(len(features), len(vec)) |
| vec[:limit] = features[:limit] |
| return pd.DataFrame([vec], columns=expected_cols) |
| else: |
| n = model.n_features_in_ |
| vec = np.zeros(n) |
| limit = min(len(features), n) |
| vec[:limit] = features[:limit] |
| return vec.reshape(1, -1) |
|
|
| def run_two_stage(features: np.ndarray, crop_rgb: np.ndarray) -> dict: |
| img_gray = cv2.cvtColor(crop_rgb, cv2.COLOR_RGB2GRAY) |
| img_resized = cv2.resize(img_gray, (IMG_SIZE, IMG_SIZE)) |
| img_array = np.expand_dims(img_resized, axis=(0, -1)).astype("float32") / 255.0 |
| |
|
|
| prob = float(router_model_keras.predict(img_array, verbose=0)[0][0]) |
| cls_idx = 1 if prob >= 0.5 else 0 |
| category = CLASSES[cls_idx] |
| cat_conf = prob if cls_idx == 1 else 1.0 - prob |
|
|
| |
| model = mlp_letters if category == "letter" else mlp_numbers |
| feat_df = _align_features(model, features) |
| label = str(model.predict(feat_df)[0]) |
| conf = float(model.predict_proba(feat_df)[0].max()) |
|
|
| return { |
| "sign": label, |
| "sign_confidence": round(conf, 4), |
| "category": category, |
| "category_confidence": round(cat_conf, 4), |
| } |
|
|
| |
| |
| |
|
|
| @app.get("/") |
| def root(): |
| return {"status": "running", "device": DEVICE} |
|
|
|
|
| @app.post("/predict") |
| async def predict(file: UploadFile = File(...)): |
| raw = await file.read() |
| img_bgr = read_image_from_upload(raw) |
| img_rgb = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2RGB) |
|
|
| results = yolo_detector.predict(img_rgb, conf=0.5, verbose=False, device=DEVICE) |
| if not results[0].boxes: |
| raise HTTPException(status_code=422, detail="No hand detected.") |
|
|
| box = results[0].boxes.xyxy[0].cpu().numpy().astype(int) |
| label_id = int(results[0].boxes.cls[0].cpu().item()) |
| hand_side = "left" if label_id == 0 else "right" |
|
|
| h, w = img_rgb.shape[:2] |
| x1, y1, x2, y2 = max(0, box[0]), max(0, box[1]), min(w, box[2]), min(h, box[3]) |
| crop = img_rgb[y1:y2, x1:x2] |
|
|
| if crop.size == 0: |
| raise HTTPException(status_code=422, detail="Empty hand crop.") |
|
|
| features = extract_features(crop) |
| if features is None: |
| raise HTTPException(status_code=500, detail="Feature extraction failed.") |
|
|
| result = run_two_stage(features, crop) |
| return JSONResponse({**result, "hand_side": hand_side, "bbox": [int(x1), int(y1), int(x2), int(y2)]}) |
|
|
|
|
| @app.post("/predict_with_skeleton") |
| async def predict_with_skeleton(file: UploadFile = File(...)): |
| raw = await file.read() |
| img_bgr = read_image_from_upload(raw) |
| img_rgb = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2RGB) |
|
|
| results = yolo_detector.predict(img_rgb, conf=0.5, verbose=False, device=DEVICE) |
| if not results[0].boxes: |
| raise HTTPException(status_code=422, detail="No hand detected.") |
|
|
| box = results[0].boxes.xyxy[0].cpu().numpy().astype(int) |
| label_id = int(results[0].boxes.cls[0].cpu().item()) |
| hand_side = "left" if label_id == 0 else "right" |
|
|
| h, w = img_rgb.shape[:2] |
| x1, y1, x2, y2 = max(0, box[0]), max(0, box[1]), min(w, box[2]), min(h, box[3]) |
| crop = img_rgb[y1:y2, x1:x2] |
|
|
| features = extract_features(crop) |
| joints = get_3d_joints(crop) |
|
|
| result = run_two_stage(features, crop) |
| _, buf = cv2.imencode(".png", cv2.cvtColor(crop, cv2.COLOR_RGB2BGR)) |
| crop_b64 = base64.b64encode(buf).decode("utf-8") |
|
|
| return JSONResponse({ |
| **result, |
| "hand_side": hand_side, |
| "bbox": [int(x1), int(y1), int(x2), int(y2)], |
| "joints_3d": joints.tolist(), |
| "crop_b64": crop_b64, |
| }) |
|
|
|
|
| if __name__ == "__main__": |
| uvicorn.run("app:app", host="0.0.0.0", port=7860, reload=False) |