| import os, json |
| import numpy as np |
| import tensorflow as tf |
| import gradio as gr |
| import cv2 |
| import mediapipe as mp |
|
|
| SEQ_LEN = 60 |
| TOPK = 5 |
|
|
| MODEL_PATH = "best_model.keras" |
| MEAN_PATH = "global_mean.npy" |
| STD_PATH = "global_std.npy" |
| LABEL_PATH = "label_map.json" |
|
|
| mean = np.load(MEAN_PATH).astype(np.float32) |
| std = np.load(STD_PATH).astype(np.float32) |
|
|
| with open(LABEL_PATH, "r", encoding="utf-8") as f: |
| lm = json.load(f) |
| id2label = {int(k): v for k, v in lm["id2label"].items()} |
| num_classes = len(id2label) |
| feature_dim = int(mean.shape[0]) |
|
|
| @tf.keras.utils.register_keras_serializable() |
| class AttnPool(tf.keras.layers.Layer): |
| def __init__(self, units=128, **kwargs): |
| super().__init__(**kwargs) |
| self.units = units |
| self.supports_masking = True |
| self.d1 = tf.keras.layers.Dense(units, activation="tanh") |
| self.d2 = tf.keras.layers.Dense(1) |
| def build(self, input_shape): |
| self.d1.build(input_shape) |
| self.d2.build((input_shape[0], input_shape[1], self.units)) |
| super().build(input_shape) |
| def call(self, x, mask=None): |
| s = self.d2(self.d1(x)) |
| s = tf.squeeze(s, axis=-1) |
| if mask is not None: |
| mask_f = tf.cast(mask, tf.float32) |
| s = s + (1.0 - mask_f) * (-1e9) |
| a = tf.nn.softmax(s, axis=1) |
| a = tf.expand_dims(a, axis=-1) |
| return tf.reduce_sum(x * a, axis=1) |
| def compute_mask(self, inputs, mask=None): |
| return None |
| def get_config(self): |
| c = super().get_config() |
| c.update({"units": self.units}) |
| return c |
|
|
| model = tf.keras.models.load_model(MODEL_PATH, custom_objects={"AttnPool": AttnPool}) |
|
|
| mp_holistic = mp.solutions.holistic |
|
|
| def landmarks_to_vec(res): |
| def flat_landmarks(lms, dims): |
| if lms is None: |
| return np.zeros((dims,), dtype=np.float32) |
| arr = [] |
| for p in lms.landmark: |
| if dims == 4: |
| arr.extend([p.x, p.y, p.z, getattr(p, "visibility", 0.0)]) |
| else: |
| arr.extend([p.x, p.y, p.z]) |
| return np.array(arr, dtype=np.float32) |
|
|
| pose = flat_landmarks(res.pose_landmarks, 4) |
| face = flat_landmarks(res.face_landmarks, 3) |
| lh = flat_landmarks(res.left_hand_landmarks, 3) |
| rh = flat_landmarks(res.right_hand_landmarks, 3) |
|
|
| v = np.concatenate([pose, face, lh, rh], axis=0).astype(np.float32) |
|
|
| if v.shape[0] != feature_dim: |
| if v.shape[0] > feature_dim: |
| v = v[:feature_dim] |
| else: |
| v = np.pad(v, (0, feature_dim - v.shape[0])) |
| return v |
|
|
| def build_sequence_from_video(video_path): |
| cap = cv2.VideoCapture(video_path) |
| frames = [] |
| with mp_holistic.Holistic( |
| static_image_mode=False, |
| model_complexity=1, |
| enable_segmentation=False, |
| refine_face_landmarks=False |
| ) as holistic: |
| total = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) |
| if total <= 0: |
| total = 300 |
| idxs = np.linspace(max(0, total-1), 0, num=SEQ_LEN, dtype=int)[::-1] |
| idxs = sorted(list(set(idxs))) |
| want = set(idxs) |
| i = 0 |
| got = {} |
| while True: |
| ok, frame = cap.read() |
| if not ok: |
| break |
| if i in want: |
| rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) |
| res = holistic.process(rgb) |
| got[i] = landmarks_to_vec(res) |
| i += 1 |
| cap.release() |
|
|
| vecs = [got[k] for k in sorted(got.keys())] |
| if len(vecs) == 0: |
| X = np.zeros((SEQ_LEN, feature_dim), dtype=np.float32) |
| mask = np.zeros((SEQ_LEN,), dtype=np.bool_) |
| return X, mask |
|
|
| X = np.stack(vecs, axis=0).astype(np.float32) |
| if X.shape[0] >= SEQ_LEN: |
| X = X[-SEQ_LEN:] |
| mask = np.ones((SEQ_LEN,), dtype=np.bool_) |
| else: |
| pad = np.zeros((SEQ_LEN - X.shape[0], feature_dim), dtype=np.float32) |
| X = np.vstack([pad, X]) |
| mask = np.zeros((SEQ_LEN,), dtype=np.bool_) |
| mask[-X.shape[0]:] = True |
|
|
| if mask.any(): |
| X[mask] = (X[mask] - mean) / std |
| return X, mask |
|
|
| def predict_video(video_path): |
| X, _ = build_sequence_from_video(video_path) |
| prob = model.predict(X[None, ...], verbose=0)[0] |
| idx = np.argsort(prob)[::-1][:TOPK] |
| out = [(id2label[int(i)], float(prob[int(i)])) for i in idx] |
| return out |
|
|
| def ui_predict(video): |
| preds = predict_video(video) |
| return {k: v for k, v in preds} |
|
|
| demo = gr.Interface( |
| fn=ui_predict, |
| inputs=gr.Video(sources=["webcam"], format="mp4"), |
| outputs=gr.Label(num_top_classes=TOPK), |
| title="MSL Real-time-ish Demo (Webcam Video -> Prediction)", |
| description="Record a short clip (2-4s) and the model predicts the gloss." |
| ) |
|
|
| if __name__ == "__main__": |
| demo.launch() |
|
|