Spaces:
Running
Running
| """ | |
| SignMotionGPT - HuggingFace Spaces Demo | |
| Text-to-Sign Language Motion Generation | |
| Uses PyRender for high-quality avatar visualization | |
| """ | |
| # IMPORTANT: Set OpenGL platform BEFORE any OpenGL imports (for headless rendering) | |
| import os | |
| os.environ["PYOPENGL_PLATFORM"] = "egl" | |
| import sys | |
| import re | |
| import json | |
| import random | |
| import warnings | |
| import tempfile | |
| import uuid | |
| from pathlib import Path | |
| import torch | |
| import numpy as np | |
| warnings.filterwarnings("ignore") | |
| # ===================================================================== | |
| # Configuration for HuggingFace Spaces | |
| # ===================================================================== | |
| WORK_DIR = os.getcwd() | |
| DATA_DIR = os.path.join(WORK_DIR, "data") | |
| OUTPUT_DIR = os.path.join(WORK_DIR, "outputs") | |
| os.makedirs(DATA_DIR, exist_ok=True) | |
| os.makedirs(OUTPUT_DIR, exist_ok=True) | |
| # Path definitions | |
| DATASET_PATH = os.path.join(DATA_DIR, "motion_llm_dataset.json") | |
| VQVAE_CHECKPOINT = os.path.join(DATA_DIR, "vqvae_model.pt") | |
| STATS_PATH = os.path.join(DATA_DIR, "vqvae_stats.pt") | |
| SMPLX_MODEL_DIR = os.path.join(DATA_DIR, "smplx_models") | |
| # HuggingFace model config | |
| HF_REPO_ID = os.environ.get("HF_REPO_ID", "rdz-falcon/SignMotionGPTfit-archive") | |
| HF_SUBFOLDER = os.environ.get("HF_SUBFOLDER", "stage2_v2/epoch-030") | |
| DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu") | |
| # Generation parameters | |
| M_START = "<M_START>" | |
| M_END = "<M_END>" | |
| PAD_TOKEN = "<PAD>" | |
| INFERENCE_TEMPERATURE = 0.7 | |
| INFERENCE_TOP_K = 50 | |
| INFERENCE_REPETITION_PENALTY = 1.2 | |
| # VQ-VAE parameters | |
| SMPL_DIM = 182 | |
| CODEBOOK_SIZE = 512 | |
| CODE_DIM = 512 | |
| VQ_ARGS = dict( | |
| width=512, depth=3, down_t=2, stride_t=2, | |
| dilation_growth_rate=3, activation='relu', norm=None, quantizer="ema_reset" | |
| ) | |
| PARAM_DIMS = [10, 63, 45, 45, 3, 10, 3, 3] | |
| PARAM_NAMES = ["betas", "body_pose", "left_hand_pose", "right_hand_pose", | |
| "trans", "expression", "jaw_pose", "eye_pose"] | |
| # Visualization defaults | |
| AVATAR_COLOR = (0.36, 0.78, 0.36, 1.0) # Green color as RGBA | |
| VIDEO_FPS = 15 | |
| VIDEO_SLOWDOWN = 2 | |
| FRAME_WIDTH = 544 # Must be divisible by 16 for video codec compatibility | |
| FRAME_HEIGHT = 720 | |
| # ===================================================================== | |
| # Install/Import Dependencies | |
| # ===================================================================== | |
| try: | |
| import gradio as gr | |
| except ImportError: | |
| os.system("pip install -q gradio>=4.0.0") | |
| import gradio as gr | |
| try: | |
| import smplx | |
| except ImportError: | |
| os.system("pip install -q smplx==0.1.28") | |
| import smplx | |
| # PyRender for high-quality rendering | |
| PYRENDER_AVAILABLE = False | |
| try: | |
| import trimesh | |
| import pyrender | |
| from PIL import Image, ImageDraw, ImageFont | |
| PYRENDER_AVAILABLE = True | |
| except ImportError: | |
| pass | |
| try: | |
| import imageio | |
| except ImportError: | |
| os.system("pip install -q imageio[ffmpeg]") | |
| import imageio | |
| from transformers import AutoModelForCausalLM, AutoTokenizer | |
| import torch.nn.functional as F | |
| # ===================================================================== | |
| # Import VQ-VAE architecture | |
| # ===================================================================== | |
| current_dir = os.path.dirname(os.path.abspath(__file__)) | |
| parent_dir = os.path.dirname(current_dir) | |
| if parent_dir not in sys.path: | |
| sys.path.insert(0, parent_dir) | |
| if current_dir not in sys.path: | |
| sys.path.insert(0, current_dir) | |
| try: | |
| from mGPT.archs.mgpt_vq import VQVae | |
| except ImportError as e: | |
| print(f"Warning: Could not import VQVae: {e}") | |
| VQVae = None | |
| # ===================================================================== | |
| # Global Cache | |
| # ===================================================================== | |
| _model_cache = { | |
| "llm_model": None, | |
| "llm_tokenizer": None, | |
| "vqvae_model": None, | |
| "smplx_model": None, | |
| "stats": (None, None), | |
| "initialized": False | |
| } | |
| _word_pid_map = {} | |
| _example_cache = {} | |
| # ===================================================================== | |
| # PyRender Setup | |
| # ===================================================================== | |
| def ensure_pyrender(): | |
| """Install pyrender dependencies if not available""" | |
| global PYRENDER_AVAILABLE, trimesh, pyrender, Image, ImageDraw, ImageFont | |
| if PYRENDER_AVAILABLE: | |
| return True | |
| print("Installing pyrender dependencies...") | |
| if os.path.exists("/etc/debian_version"): | |
| os.system("apt-get update -qq && apt-get install -qq -y libegl1-mesa-dev libgles2-mesa-dev > /dev/null 2>&1") | |
| os.system("pip install -q trimesh pyrender PyOpenGL PyOpenGL_accelerate Pillow") | |
| try: | |
| import trimesh | |
| import pyrender | |
| from PIL import Image, ImageDraw, ImageFont | |
| PYRENDER_AVAILABLE = True | |
| return True | |
| except ImportError as e: | |
| print(f"Could not install pyrender: {e}") | |
| return False | |
| # ===================================================================== | |
| # Dataset Loading - Word to PID mapping | |
| # ===================================================================== | |
| def load_word_pid_mapping(): | |
| """Load the dataset and build word -> PIDs mapping.""" | |
| global _word_pid_map | |
| if not os.path.exists(DATASET_PATH): | |
| print(f"Dataset not found: {DATASET_PATH}") | |
| return | |
| print(f"Loading dataset from: {DATASET_PATH}") | |
| try: | |
| with open(DATASET_PATH, 'r', encoding='utf-8') as f: | |
| data = json.load(f) | |
| for entry in data: | |
| word = entry.get('word', '').lower() | |
| pid = entry.get('participant_id', '') | |
| if word and pid: | |
| if word not in _word_pid_map: | |
| _word_pid_map[word] = set() | |
| _word_pid_map[word].add(pid) | |
| for word in _word_pid_map: | |
| _word_pid_map[word] = sorted(list(_word_pid_map[word])) | |
| print(f"Loaded {len(_word_pid_map)} unique words from dataset") | |
| except Exception as e: | |
| print(f"Error loading dataset: {e}") | |
| def get_pids_for_word(word: str) -> list: | |
| """Get valid PIDs for a word from the dataset.""" | |
| word = word.lower().strip() | |
| return _word_pid_map.get(word, []) | |
| def get_random_pids_for_word(word: str, count: int = 2) -> list: | |
| """Get random PIDs for a word. Returns up to 'count' PIDs.""" | |
| pids = get_pids_for_word(word) | |
| if not pids: | |
| return [] | |
| if len(pids) <= count: | |
| return pids | |
| return random.sample(pids, count) | |
| def get_example_words_with_pids(count: int = 3) -> list: | |
| """Get example words with valid PIDs from dataset.""" | |
| examples = [] | |
| preferred = ['push', 'passport', 'library', 'send', 'college', 'help', 'thank', 'hello'] | |
| for word in preferred: | |
| pids = get_pids_for_word(word) | |
| if pids: | |
| examples.append((word, pids[0])) | |
| if len(examples) >= count: | |
| break | |
| if len(examples) < count: | |
| available = [w for w in _word_pid_map.keys() if w not in [e[0] for e in examples]] | |
| random.shuffle(available) | |
| for word in available[:count - len(examples)]: | |
| pids = _word_pid_map[word] | |
| examples.append((word, pids[0])) | |
| return examples | |
| # ===================================================================== | |
| # VQ-VAE Wrapper | |
| # ===================================================================== | |
| class MotionGPT_VQVAE_Wrapper(torch.nn.Module): | |
| def __init__(self, smpl_dim=SMPL_DIM, codebook_size=CODEBOOK_SIZE, code_dim=CODE_DIM, **kwargs): | |
| super().__init__() | |
| if VQVae is None: | |
| raise RuntimeError("VQVae architecture not available") | |
| self.vqvae = VQVae( | |
| nfeats=smpl_dim, code_num=codebook_size, code_dim=code_dim, | |
| output_emb_width=code_dim, **kwargs | |
| ) | |
| # ===================================================================== | |
| # Model Loading Functions | |
| # ===================================================================== | |
| def load_llm_model(): | |
| print(f"Loading LLM from: {HF_REPO_ID}/{HF_SUBFOLDER}") | |
| token = os.environ.get("HF_TOKEN") or os.environ.get("HUGGINGFACE_HUB_TOKEN") | |
| tokenizer = AutoTokenizer.from_pretrained( | |
| HF_REPO_ID, subfolder=HF_SUBFOLDER, trust_remote_code=True, token=token | |
| ) | |
| model = AutoModelForCausalLM.from_pretrained( | |
| HF_REPO_ID, subfolder=HF_SUBFOLDER, trust_remote_code=True, token=token, | |
| torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32 | |
| ) | |
| if tokenizer.pad_token is None: | |
| tokenizer.add_special_tokens({"pad_token": PAD_TOKEN}) | |
| model.resize_token_embeddings(len(tokenizer)) | |
| model.config.pad_token_id = tokenizer.pad_token_id | |
| model.to(DEVICE) | |
| model.eval() | |
| print(f"LLM loaded (vocab size: {len(tokenizer)})") | |
| return model, tokenizer | |
| def load_vqvae_model(): | |
| if not os.path.exists(VQVAE_CHECKPOINT): | |
| print(f"VQ-VAE checkpoint not found: {VQVAE_CHECKPOINT}") | |
| return None | |
| print(f"Loading VQ-VAE from: {VQVAE_CHECKPOINT}") | |
| model = MotionGPT_VQVAE_Wrapper(smpl_dim=SMPL_DIM, codebook_size=CODEBOOK_SIZE, code_dim=CODE_DIM, **VQ_ARGS).to(DEVICE) | |
| ckpt = torch.load(VQVAE_CHECKPOINT, map_location=DEVICE, weights_only=False) | |
| state_dict = ckpt.get('model_state_dict', ckpt) | |
| model.load_state_dict(state_dict, strict=False) | |
| model.eval() | |
| print(f"VQ-VAE loaded") | |
| return model | |
| def load_stats(): | |
| if not os.path.exists(STATS_PATH): | |
| return None, None | |
| st = torch.load(STATS_PATH, map_location='cpu', weights_only=False) | |
| mean, std = st.get('mean', 0), st.get('std', 1) | |
| if torch.is_tensor(mean): mean = mean.cpu().numpy() | |
| if torch.is_tensor(std): std = std.cpu().numpy() | |
| return mean, std | |
| def load_smplx_model(): | |
| if not os.path.exists(SMPLX_MODEL_DIR): | |
| print(f"SMPL-X directory not found: {SMPLX_MODEL_DIR}") | |
| return None | |
| print(f"Loading SMPL-X from: {SMPLX_MODEL_DIR}") | |
| model = smplx.SMPLX( | |
| model_path=SMPLX_MODEL_DIR, model_type='smplx', gender='neutral', use_pca=False, | |
| create_global_orient=True, create_body_pose=True, create_betas=True, | |
| create_expression=True, create_jaw_pose=True, create_left_hand_pose=True, | |
| create_right_hand_pose=True, create_transl=True | |
| ).to(DEVICE) | |
| print(f"SMPL-X loaded") | |
| return model | |
| def initialize_models(): | |
| global _model_cache | |
| if _model_cache["initialized"]: | |
| return | |
| print("\n" + "="*60) | |
| print(" Initializing SignMotionGPT Models") | |
| print("="*60) | |
| load_word_pid_mapping() | |
| _model_cache["llm_model"], _model_cache["llm_tokenizer"] = load_llm_model() | |
| try: | |
| _model_cache["vqvae_model"] = load_vqvae_model() | |
| _model_cache["stats"] = load_stats() | |
| _model_cache["smplx_model"] = load_smplx_model() | |
| except Exception as e: | |
| print(f"Could not load visualization models: {e}") | |
| # Ensure PyRender is available | |
| ensure_pyrender() | |
| _model_cache["initialized"] = True | |
| print("All models initialized") | |
| print("="*60) | |
| def precompute_examples(): | |
| """Pre-compute animations for example words at startup.""" | |
| global _example_cache | |
| if not _model_cache["initialized"]: | |
| return | |
| examples = get_example_words_with_pids(3) | |
| print(f"\nPre-computing {len(examples)} example animations...") | |
| for word, pid in examples: | |
| key = f"{word}_{pid}" | |
| print(f" Computing: {word} ({pid})...") | |
| try: | |
| video_path, tokens = generate_video_for_word(word, pid) | |
| _example_cache[key] = {"video_path": video_path, "tokens": tokens, "word": word, "pid": pid} | |
| print(f" Done: {word}") | |
| except Exception as e: | |
| print(f" Failed: {word} - {e}") | |
| _example_cache[key] = {"video_path": None, "tokens": "", "word": word, "pid": pid} | |
| print("Example pre-computation complete\n") | |
| # ===================================================================== | |
| # Motion Generation Functions | |
| # ===================================================================== | |
| def generate_motion_tokens(word: str, variant: str) -> str: | |
| model = _model_cache["llm_model"] | |
| tokenizer = _model_cache["llm_tokenizer"] | |
| if model is None or tokenizer is None: | |
| raise RuntimeError("LLM model not loaded") | |
| prompt = f"Instruction: Generate motion for word '{word}' with variant '{variant}'.\nMotion: " | |
| inputs = tokenizer(prompt, return_tensors="pt").to(DEVICE) | |
| with torch.no_grad(): | |
| output = model.generate( | |
| **inputs, max_new_tokens=100, do_sample=True, | |
| temperature=INFERENCE_TEMPERATURE, top_k=INFERENCE_TOP_K, | |
| repetition_penalty=INFERENCE_REPETITION_PENALTY, | |
| pad_token_id=tokenizer.pad_token_id, | |
| eos_token_id=tokenizer.convert_tokens_to_ids(M_END), | |
| early_stopping=True | |
| ) | |
| decoded = tokenizer.decode(output[0], skip_special_tokens=False) | |
| motion_part = decoded.split("Motion: ")[-1] if "Motion: " in decoded else decoded | |
| return motion_part.strip() | |
| def parse_motion_tokens(token_str: str) -> list: | |
| if isinstance(token_str, (list, tuple, np.ndarray)): | |
| return [int(x) for x in token_str] | |
| if not isinstance(token_str, str): | |
| return [] | |
| matches = re.findall(r'<M(\d+)>', token_str) | |
| if matches: | |
| return [int(x) for x in matches] | |
| matches = re.findall(r'<motion_(\d+)>', token_str) | |
| if matches: | |
| return [int(x) for x in matches] | |
| return [] | |
| def decode_tokens_to_params(tokens: list) -> np.ndarray: | |
| vqvae_model = _model_cache["vqvae_model"] | |
| mean, std = _model_cache["stats"] | |
| if vqvae_model is None or not tokens: | |
| return np.zeros((0, SMPL_DIM), dtype=np.float32) | |
| idx = torch.tensor(tokens, dtype=torch.long, device=DEVICE).unsqueeze(0) | |
| T_q = idx.shape[1] | |
| quantizer = vqvae_model.vqvae.quantizer | |
| if hasattr(quantizer, "codebook"): | |
| codebook = quantizer.codebook.to(DEVICE) | |
| code_dim = codebook.shape[1] | |
| else: | |
| code_dim = CODE_DIM | |
| x_quantized = None | |
| if hasattr(quantizer, "dequantize"): | |
| try: | |
| with torch.no_grad(): | |
| dq = quantizer.dequantize(idx) | |
| if dq is not None: | |
| dq = dq.contiguous() | |
| if dq.ndim == 3 and dq.shape[1] == code_dim: | |
| x_quantized = dq | |
| elif dq.ndim == 3 and dq.shape[1] == T_q: | |
| x_quantized = dq.permute(0, 2, 1).contiguous() | |
| except Exception: | |
| pass | |
| if x_quantized is None: | |
| if not hasattr(quantizer, "codebook"): | |
| return np.zeros((0, SMPL_DIM), dtype=np.float32) | |
| with torch.no_grad(): | |
| emb = codebook[idx] | |
| x_quantized = emb.permute(0, 2, 1).contiguous() | |
| with torch.no_grad(): | |
| x_dec = vqvae_model.vqvae.decoder(x_quantized) | |
| smpl_out = vqvae_model.vqvae.postprocess(x_dec) | |
| params_np = smpl_out.squeeze(0).cpu().numpy() | |
| if (mean is not None) and (std is not None): | |
| params_np = (params_np * np.array(std).reshape(1, -1)) + np.array(mean).reshape(1, -1) | |
| return params_np | |
| def params_to_vertices(params_seq: np.ndarray) -> tuple: | |
| smplx_model = _model_cache["smplx_model"] | |
| if smplx_model is None or params_seq.shape[0] == 0: | |
| return None, None | |
| starts = np.cumsum([0] + PARAM_DIMS[:-1]) | |
| ends = starts + np.array(PARAM_DIMS) | |
| T = params_seq.shape[0] | |
| all_verts = [] | |
| batch_size = 32 | |
| num_body_joints = getattr(smplx_model, "NUM_BODY_JOINTS", 21) | |
| with torch.no_grad(): | |
| for s in range(0, T, batch_size): | |
| batch = params_seq[s:s+batch_size] | |
| B = batch.shape[0] | |
| np_parts = {name: batch[:, st:ed].astype(np.float32) for name, st, ed in zip(PARAM_NAMES, starts, ends)} | |
| tensor_parts = {name: torch.from_numpy(arr).to(DEVICE) for name, arr in np_parts.items()} | |
| # ================================================================= | |
| # FIX: Neutralize Jaw Pose | |
| # ================================================================= | |
| # The generated jaw rotation can be unstable, causing the mouth | |
| # to rotate backwards into the neck. We force it to 0 (closed) | |
| # to keep the face render clean. | |
| tensor_parts['jaw_pose'] = torch.zeros_like(tensor_parts['jaw_pose']) | |
| # ================================================================= | |
| body_t = tensor_parts['body_pose'] | |
| L_body = body_t.shape[1] | |
| expected_no_go = num_body_joints * 3 | |
| expected_with_go = (num_body_joints + 1) * 3 | |
| if L_body == expected_with_go: | |
| global_orient = body_t[:, :3].contiguous() | |
| body_pose_only = body_t[:, 3:].contiguous() | |
| elif L_body == expected_no_go: | |
| global_orient = torch.zeros((B, 3), dtype=torch.float32, device=DEVICE) | |
| body_pose_only = body_t | |
| else: | |
| if L_body > expected_no_go: | |
| global_orient = body_t[:, :3].contiguous() | |
| body_pose_only = body_t[:, 3:].contiguous() | |
| else: | |
| body_pose_only = F.pad(body_t, (0, max(0, expected_no_go - L_body))) | |
| global_orient = torch.zeros((B, 3), dtype=torch.float32, device=DEVICE) | |
| out = smplx_model( | |
| betas=tensor_parts['betas'], global_orient=global_orient, body_pose=body_pose_only, | |
| left_hand_pose=tensor_parts['left_hand_pose'], right_hand_pose=tensor_parts['right_hand_pose'], | |
| expression=tensor_parts['expression'], jaw_pose=tensor_parts['jaw_pose'], | |
| leye_pose=tensor_parts['eye_pose'], reye_pose=tensor_parts['eye_pose'], | |
| transl=tensor_parts['trans'], return_verts=True | |
| ) | |
| all_verts.append(out.vertices.detach().cpu().numpy()) | |
| return np.concatenate(all_verts, axis=0), smplx_model.faces.astype(np.int32) | |
| # ===================================================================== | |
| # PyRender Visualization Functions | |
| # ===================================================================== | |
| def render_single_frame( | |
| verts: np.ndarray, | |
| faces: np.ndarray, | |
| label: str = "", | |
| color: tuple = AVATAR_COLOR, | |
| fixed_center: np.ndarray = None, | |
| camera_distance: float = 3.5, | |
| focal_length: float = 2000, | |
| frame_width: int = FRAME_WIDTH, | |
| frame_height: int = FRAME_HEIGHT, | |
| bg_color: tuple = (0.95, 0.95, 0.97, 1.0) | |
| ) -> np.ndarray: | |
| """Render a single mesh frame using PyRender.""" | |
| if not PYRENDER_AVAILABLE: | |
| raise RuntimeError("PyRender not available") | |
| # Check for invalid vertices | |
| if not np.isfinite(verts).all(): | |
| blank = np.ones((frame_height, frame_width, 3), dtype=np.uint8) * 200 | |
| return blank | |
| # Create scene | |
| scene = pyrender.Scene(bg_color=bg_color, ambient_light=[0.4, 0.4, 0.4]) | |
| # Material | |
| material = pyrender.MetallicRoughnessMaterial( | |
| metallicFactor=0.0, | |
| roughnessFactor=0.4, | |
| alphaMode='OPAQUE', | |
| baseColorFactor=color | |
| ) | |
| # Create mesh | |
| mesh = trimesh.Trimesh(vertices=verts, faces=faces) | |
| mesh_render = pyrender.Mesh.from_trimesh(mesh, material=material, smooth=True) | |
| scene.add(mesh_render) | |
| # Compute center for camera positioning | |
| mesh_center = verts.mean(axis=0) | |
| camera_target = fixed_center if fixed_center is not None else mesh_center | |
| # Camera setup | |
| camera = pyrender.IntrinsicsCamera( | |
| fx=focal_length, fy=focal_length, | |
| cx=frame_width / 2, cy=frame_height / 2, | |
| znear=0.1, zfar=20.0 | |
| ) | |
| # Camera pose: After 180-degree rotation around X-axis, coordinate system changes | |
| # Camera should be positioned in front (negative Z) with flipped orientation | |
| # This matches visualize.py and ensures proper face visibility | |
| camera_pose = np.eye(4) | |
| camera_pose[0, 3] = camera_target[0] # Center X | |
| camera_pose[1, 3] = camera_target[1] # Center Y (body center) | |
| camera_pose[2, 3] = camera_target[2] - camera_distance # In front (negative Z) | |
| # Camera orientation: flip to look at subject (SOKE-style) | |
| # This rotation makes camera look toward +Z (at the subject) | |
| camera_pose[:3, :3] = np.array([ | |
| [1, 0, 0], | |
| [0, -1, 0], | |
| [0, 0, -1] | |
| ]) | |
| scene.add(camera, pose=camera_pose) | |
| # Lighting | |
| key_light = pyrender.DirectionalLight(color=[1.0, 1.0, 1.0], intensity=3.0) | |
| key_pose = np.eye(4) | |
| key_pose[:3, :3] = trimesh.transformations.euler_matrix(np.radians(-30), np.radians(-20), 0)[:3, :3] | |
| scene.add(key_light, pose=key_pose) | |
| fill_light = pyrender.DirectionalLight(color=[0.9, 0.9, 1.0], intensity=1.5) | |
| fill_pose = np.eye(4) | |
| fill_pose[:3, :3] = trimesh.transformations.euler_matrix(np.radians(-20), np.radians(30), 0)[:3, :3] | |
| scene.add(fill_light, pose=fill_pose) | |
| rim_light = pyrender.DirectionalLight(color=[1.0, 1.0, 1.0], intensity=2.0) | |
| rim_pose = np.eye(4) | |
| rim_pose[:3, :3] = trimesh.transformations.euler_matrix(np.radians(30), np.radians(180), 0)[:3, :3] | |
| scene.add(rim_light, pose=rim_pose) | |
| # Render | |
| renderer = pyrender.OffscreenRenderer(viewport_width=frame_width, viewport_height=frame_height, point_size=1.0) | |
| color_img, _ = renderer.render(scene) | |
| renderer.delete() | |
| # Add label | |
| if label: | |
| img = Image.fromarray(color_img) | |
| draw = ImageDraw.Draw(img) | |
| try: | |
| font = ImageFont.truetype("/usr/share/fonts/truetype/dejavu/DejaVuSans-Bold.ttf", 20) | |
| except: | |
| font = ImageFont.load_default() | |
| text_width = len(label) * 10 + 20 | |
| draw.rectangle([10, 10, 10 + text_width, 35], fill=(0, 0, 0, 180)) | |
| draw.text((15, 12), label, fill=(255, 255, 255), font=font) | |
| color_img = np.array(img) | |
| return color_img | |
| def render_side_by_side_frame( | |
| verts_list: list, | |
| faces: np.ndarray, | |
| labels: list, | |
| fixed_centers: list = None, | |
| camera_distance: float = 3.5, | |
| focal_length: float = 2000, | |
| frame_width: int = FRAME_WIDTH, | |
| frame_height: int = FRAME_HEIGHT, | |
| bg_color: tuple = (0.95, 0.95, 0.97, 1.0) | |
| ) -> np.ndarray: | |
| """Render multiple meshes side-by-side for comparison.""" | |
| if not PYRENDER_AVAILABLE: | |
| raise RuntimeError("PyRender not available") | |
| # Colors for each avatar | |
| colors = [ | |
| (0.3, 0.8, 0.4, 1.0), # Green | |
| (0.3, 0.6, 0.9, 1.0), # Blue | |
| (0.9, 0.5, 0.2, 1.0), # Orange | |
| ] | |
| frames = [] | |
| for i, verts in enumerate(verts_list): | |
| fixed_center = fixed_centers[i] if fixed_centers else None | |
| color = colors[i % len(colors)] | |
| label = labels[i] if i < len(labels) else "" | |
| frame = render_single_frame( | |
| verts, faces, label=label, color=color, | |
| fixed_center=fixed_center, camera_distance=camera_distance, | |
| focal_length=focal_length, frame_width=frame_width, | |
| frame_height=frame_height, bg_color=bg_color | |
| ) | |
| frames.append(frame) | |
| return np.concatenate(frames, axis=1) | |
| def render_video( | |
| verts: np.ndarray, | |
| faces: np.ndarray, | |
| output_path: str, | |
| label: str = "", | |
| fps: int = VIDEO_FPS, | |
| slowdown: int = VIDEO_SLOWDOWN, | |
| camera_distance: float = 3.5, | |
| focal_length: float = 2000, | |
| frame_width: int = FRAME_WIDTH, | |
| frame_height: int = FRAME_HEIGHT | |
| ) -> str: | |
| """Render single avatar animation to video.""" | |
| if not ensure_pyrender(): | |
| raise RuntimeError("PyRender not available") | |
| # Apply orientation fix: rotate 180 degrees around X-axis | |
| verts = verts.copy() | |
| verts[..., 1:] *= -1 | |
| # Trim last few frames to remove end-of-sequence artifacts | |
| T_total = verts.shape[0] | |
| trim_amount = min(8, int(T_total * 0.15)) | |
| T = max(5, T_total - trim_amount) | |
| # Compute fixed camera target from first frame | |
| fixed_center = verts[0].mean(axis=0) | |
| frames = [] | |
| for t in range(T): | |
| frame = render_single_frame( | |
| verts[t], faces, label=label, | |
| fixed_center=fixed_center, camera_distance=camera_distance, | |
| focal_length=focal_length, frame_width=frame_width, | |
| frame_height=frame_height | |
| ) | |
| for _ in range(slowdown): | |
| frames.append(frame) | |
| # Save video | |
| Path(output_path).parent.mkdir(parents=True, exist_ok=True) | |
| if len(frames) > 0: | |
| imageio.mimsave(output_path, frames, fps=fps, codec='libx264', quality=8) | |
| return output_path | |
| def render_comparison_video( | |
| verts1: np.ndarray, | |
| faces1: np.ndarray, | |
| verts2: np.ndarray, | |
| faces2: np.ndarray, | |
| output_path: str, | |
| label1: str = "", | |
| label2: str = "", | |
| fps: int = VIDEO_FPS, | |
| slowdown: int = VIDEO_SLOWDOWN, | |
| camera_distance: float = 3.5, | |
| focal_length: float = 2000, | |
| frame_width: int = FRAME_WIDTH, | |
| frame_height: int = FRAME_HEIGHT | |
| ) -> str: | |
| """Render side-by-side comparison video.""" | |
| if not ensure_pyrender(): | |
| raise RuntimeError("PyRender not available") | |
| # Apply orientation fix | |
| verts1 = verts1.copy() | |
| verts2 = verts2.copy() | |
| verts1[..., 1:] *= -1 | |
| verts2[..., 1:] *= -1 | |
| # Match lengths and trim | |
| T_total = min(verts1.shape[0], verts2.shape[0]) | |
| trim_amount = min(8, int(T_total * 0.15)) | |
| T = max(5, T_total - trim_amount) | |
| verts1 = verts1[:T] | |
| verts2 = verts2[:T] | |
| # Compute fixed camera targets | |
| fixed_center1 = verts1[0].mean(axis=0) | |
| fixed_center2 = verts2[0].mean(axis=0) | |
| labels = [label1, label2] | |
| frames = [] | |
| for t in range(T): | |
| frame = render_side_by_side_frame( | |
| [verts1[t], verts2[t]], faces1, labels, | |
| fixed_centers=[fixed_center1, fixed_center2], | |
| camera_distance=camera_distance, focal_length=focal_length, | |
| frame_width=frame_width, frame_height=frame_height | |
| ) | |
| for _ in range(slowdown): | |
| frames.append(frame) | |
| # Save video | |
| Path(output_path).parent.mkdir(parents=True, exist_ok=True) | |
| if len(frames) > 0: | |
| imageio.mimsave(output_path, frames, fps=fps, codec='libx264', quality=8) | |
| return output_path | |
| # ===================================================================== | |
| # Main Processing Functions | |
| # ===================================================================== | |
| def generate_verts_for_word(word: str, pid: str) -> tuple: | |
| """Generate vertices and faces for a word-PID pair.""" | |
| generated_tokens = generate_motion_tokens(word, pid) | |
| token_ids = parse_motion_tokens(generated_tokens) | |
| if not token_ids: | |
| return None, None, generated_tokens | |
| if _model_cache["vqvae_model"] is None or _model_cache["smplx_model"] is None: | |
| return None, None, generated_tokens | |
| params = decode_tokens_to_params(token_ids) | |
| if params.shape[0] == 0: | |
| return None, None, generated_tokens | |
| verts, faces = params_to_vertices(params) | |
| return verts, faces, generated_tokens | |
| def generate_video_for_word(word: str, pid: str) -> tuple: | |
| """Generate video and tokens for a word. Returns (video_path, tokens).""" | |
| verts, faces, tokens = generate_verts_for_word(word, pid) | |
| if verts is None: | |
| return None, tokens | |
| # Generate unique filename | |
| video_filename = f"motion_{word}_{pid}_{uuid.uuid4().hex[:8]}.mp4" | |
| video_path = os.path.join(OUTPUT_DIR, video_filename) | |
| render_video(verts, faces, video_path, label=f"{pid}") | |
| return video_path, tokens | |
| def process_word(word: str): | |
| """Main processing: generate side-by-side comparison video for two random PIDs.""" | |
| if not word or not word.strip(): | |
| return None, "" | |
| word = word.strip().lower() | |
| pids = get_random_pids_for_word(word, 2) | |
| if not pids: | |
| return None, f"Word '{word}' not found in dataset" | |
| if len(pids) == 1: | |
| pids = [pids[0], pids[0]] | |
| try: | |
| verts1, faces1, tokens1 = generate_verts_for_word(word, pids[0]) | |
| verts2, faces2, tokens2 = generate_verts_for_word(word, pids[1]) | |
| if verts1 is None and verts2 is None: | |
| return None, tokens1 or tokens2 or "Failed to generate motion" | |
| # Generate unique filename | |
| video_filename = f"comparison_{word}_{uuid.uuid4().hex[:8]}.mp4" | |
| video_path = os.path.join(OUTPUT_DIR, video_filename) | |
| if verts1 is None: | |
| render_video(verts2, faces2, video_path, label=pids[1]) | |
| return video_path, tokens2 | |
| if verts2 is None: | |
| render_video(verts1, faces1, video_path, label=pids[0]) | |
| return video_path, tokens1 | |
| render_comparison_video( | |
| verts1, faces1, verts2, faces2, video_path, | |
| label1=pids[0], label2=pids[1] | |
| ) | |
| combined_tokens = f"[{pids[0]}] {tokens1}\n\n[{pids[1]}] {tokens2}" | |
| return video_path, combined_tokens | |
| except Exception as e: | |
| return None, f"Error: {str(e)[:100]}" | |
| def get_example_video(word: str, pid: str): | |
| """Get pre-computed example video.""" | |
| key = f"{word}_{pid}" | |
| if key in _example_cache: | |
| cached = _example_cache[key] | |
| return cached.get("video_path"), cached.get("tokens", "") | |
| video_path, tokens = generate_video_for_word(word, pid) | |
| return video_path, tokens | |
| # ===================================================================== | |
| # Gradio Interface | |
| # ===================================================================== | |
| def create_gradio_interface(): | |
| custom_css = """ | |
| .gradio-container { max-width: 1400px !important; } | |
| .example-row { margin-top: 15px; padding: 12px; background: #f8f9fa; border-radius: 6px; } | |
| .example-word-label { | |
| text-align: center; | |
| font-size: 28px !important; | |
| font-weight: bold !important; | |
| color: #2c3e50 !important; | |
| margin: 10px 0 !important; | |
| padding: 10px !important; | |
| } | |
| .example-variant-label { | |
| text-align: center; | |
| font-size: 14px !important; | |
| color: #7f8c8d !important; | |
| margin-bottom: 10px !important; | |
| } | |
| """ | |
| example_list = list(_example_cache.values()) if _example_cache else [] | |
| with gr.Blocks(title="SignMotionGPT", css=custom_css, theme=gr.themes.Default()) as demo: | |
| gr.Markdown("# SignMotionGPT Demo") | |
| gr.Markdown("Text-to-Sign Language Motion Generation with Variant Comparison") | |
| gr.Markdown("*High-quality PyRender visualization with proper hand motion rendering*") | |
| with gr.Row(): | |
| with gr.Column(scale=1, min_width=280): | |
| gr.Markdown("### Input") | |
| word_input = gr.Textbox( | |
| label="Word", | |
| placeholder="Enter a word from the dataset...", | |
| lines=1, max_lines=1 | |
| ) | |
| generate_btn = gr.Button("Generate Motion", variant="primary", size="lg") | |
| gr.Markdown("---") | |
| gr.Markdown("### Generated Tokens") | |
| tokens_output = gr.Textbox( | |
| label="Motion Tokens (both variants)", | |
| lines=8, | |
| interactive=False, | |
| ) | |
| if _word_pid_map: | |
| sample_words = list(_word_pid_map.keys())[:10] | |
| gr.Markdown(f"**Available words:** {', '.join(sample_words)}, ...") | |
| with gr.Column(scale=2, min_width=700): | |
| gr.Markdown("### Motion Comparison (Two Signer Variants)") | |
| video_output = gr.Video( | |
| label="Generated Motion", | |
| autoplay=True, | |
| ) | |
| if example_list: | |
| gr.Markdown("---") | |
| gr.Markdown("### Pre-computed Examples") | |
| for item in example_list: | |
| word, pid = item['word'], item['pid'] | |
| with gr.Row(elem_classes="example-row"): | |
| with gr.Column(scale=1, min_width=180): | |
| gr.HTML(f'<div class="example-word-label">{word.upper()}</div>') | |
| gr.HTML(f'<div class="example-variant-label">Variant: {pid}</div>') | |
| example_btn = gr.Button("Load Example", size="sm", variant="secondary") | |
| with gr.Column(scale=3, min_width=500): | |
| example_video = gr.Video( | |
| label=f"Example: {word}", | |
| autoplay=False | |
| ) | |
| example_btn.click( | |
| fn=lambda w=word, p=pid: get_example_video(w, p), | |
| inputs=[], | |
| outputs=[example_video, tokens_output] | |
| ) | |
| gr.Markdown("---") | |
| gr.Markdown("*SignMotionGPT: LLM-based sign language motion generation with PyRender visualization*") | |
| generate_btn.click( | |
| fn=process_word, | |
| inputs=[word_input], | |
| outputs=[video_output, tokens_output] | |
| ) | |
| word_input.submit( | |
| fn=process_word, | |
| inputs=[word_input], | |
| outputs=[video_output, tokens_output] | |
| ) | |
| return demo | |
| # ===================================================================== | |
| # Main Entry Point for HuggingFace Spaces | |
| # ===================================================================== | |
| print("\n" + "="*60) | |
| print(" SignMotionGPT - HuggingFace Spaces (PyRender)") | |
| print("="*60) | |
| print(f"Device: {DEVICE}") | |
| print(f"Model: {HF_REPO_ID}/{HF_SUBFOLDER}") | |
| print(f"Data Directory: {DATA_DIR}") | |
| print(f"Output Directory: {OUTPUT_DIR}") | |
| print(f"Dataset: {DATASET_PATH}") | |
| print(f"PyRender Available: {PYRENDER_AVAILABLE}") | |
| print("="*60 + "\n") | |
| # Initialize models at startup | |
| initialize_models() | |
| # Pre-compute example animations | |
| precompute_examples() | |
| # Create and launch interface | |
| demo = create_gradio_interface() | |
| if __name__ == "__main__": | |
| # Launch with settings for HuggingFace Spaces | |
| demo.launch( | |
| server_name="0.0.0.0", | |
| server_port=7860, | |
| share=False | |
| ) | |