import os, json import numpy as np import tensorflow as tf import gradio as gr import cv2 import mediapipe as mp SEQ_LEN = 60 TOPK = 5 MODEL_PATH = "best_model.keras" MEAN_PATH = "global_mean.npy" STD_PATH = "global_std.npy" LABEL_PATH = "label_map.json" mean = np.load(MEAN_PATH).astype(np.float32) std = np.load(STD_PATH).astype(np.float32) with open(LABEL_PATH, "r", encoding="utf-8") as f: lm = json.load(f) id2label = {int(k): v for k, v in lm["id2label"].items()} num_classes = len(id2label) feature_dim = int(mean.shape[0]) @tf.keras.utils.register_keras_serializable() class AttnPool(tf.keras.layers.Layer): def __init__(self, units=128, **kwargs): super().__init__(**kwargs) self.units = units self.supports_masking = True self.d1 = tf.keras.layers.Dense(units, activation="tanh") self.d2 = tf.keras.layers.Dense(1) def build(self, input_shape): self.d1.build(input_shape) self.d2.build((input_shape[0], input_shape[1], self.units)) super().build(input_shape) def call(self, x, mask=None): s = self.d2(self.d1(x)) s = tf.squeeze(s, axis=-1) if mask is not None: mask_f = tf.cast(mask, tf.float32) s = s + (1.0 - mask_f) * (-1e9) a = tf.nn.softmax(s, axis=1) a = tf.expand_dims(a, axis=-1) return tf.reduce_sum(x * a, axis=1) def compute_mask(self, inputs, mask=None): return None def get_config(self): c = super().get_config() c.update({"units": self.units}) return c model = tf.keras.models.load_model(MODEL_PATH, custom_objects={"AttnPool": AttnPool}) mp_holistic = mp.solutions.holistic def landmarks_to_vec(res): def flat_landmarks(lms, dims): if lms is None: return np.zeros((dims,), dtype=np.float32) arr = [] for p in lms.landmark: if dims == 4: arr.extend([p.x, p.y, p.z, getattr(p, "visibility", 0.0)]) else: arr.extend([p.x, p.y, p.z]) return np.array(arr, dtype=np.float32) pose = flat_landmarks(res.pose_landmarks, 4) face = flat_landmarks(res.face_landmarks, 3) lh = flat_landmarks(res.left_hand_landmarks, 3) rh = flat_landmarks(res.right_hand_landmarks, 3) v = np.concatenate([pose, face, lh, rh], axis=0).astype(np.float32) if v.shape[0] != feature_dim: if v.shape[0] > feature_dim: v = v[:feature_dim] else: v = np.pad(v, (0, feature_dim - v.shape[0])) return v def build_sequence_from_video(video_path): cap = cv2.VideoCapture(video_path) frames = [] with mp_holistic.Holistic( static_image_mode=False, model_complexity=1, enable_segmentation=False, refine_face_landmarks=False ) as holistic: total = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) if total <= 0: total = 300 idxs = np.linspace(max(0, total-1), 0, num=SEQ_LEN, dtype=int)[::-1] idxs = sorted(list(set(idxs))) want = set(idxs) i = 0 got = {} while True: ok, frame = cap.read() if not ok: break if i in want: rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) res = holistic.process(rgb) got[i] = landmarks_to_vec(res) i += 1 cap.release() vecs = [got[k] for k in sorted(got.keys())] if len(vecs) == 0: X = np.zeros((SEQ_LEN, feature_dim), dtype=np.float32) mask = np.zeros((SEQ_LEN,), dtype=np.bool_) return X, mask X = np.stack(vecs, axis=0).astype(np.float32) if X.shape[0] >= SEQ_LEN: X = X[-SEQ_LEN:] mask = np.ones((SEQ_LEN,), dtype=np.bool_) else: pad = np.zeros((SEQ_LEN - X.shape[0], feature_dim), dtype=np.float32) X = np.vstack([pad, X]) mask = np.zeros((SEQ_LEN,), dtype=np.bool_) mask[-X.shape[0]:] = True if mask.any(): X[mask] = (X[mask] - mean) / std return X, mask def predict_video(video_path): X, _ = build_sequence_from_video(video_path) prob = model.predict(X[None, ...], verbose=0)[0] idx = np.argsort(prob)[::-1][:TOPK] out = [(id2label[int(i)], float(prob[int(i)])) for i in idx] return out def ui_predict(video): preds = predict_video(video) return {k: v for k, v in preds} demo = gr.Interface( fn=ui_predict, inputs=gr.Video(sources=["webcam"], format="mp4"), outputs=gr.Label(num_top_classes=TOPK), title="MSL Real-time-ish Demo (Webcam Video -> Prediction)", description="Record a short clip (2-4s) and the model predicts the gloss." ) if __name__ == "__main__": demo.launch()