import subprocess import gradio as gr import os import time import requests import spaces import sys import shutil import tempfile import torch import cv2 import subprocess import numpy as np import trimesh import open3d as o3d from huggingface_hub import hf_hub_download import html import base64 import inspect BASE_DIR = os.path.dirname(os.path.abspath(__file__)) STATIC_DIR = os.path.join(BASE_DIR, "static") EXAMPLES_DIR = os.path.join(BASE_DIR, "examples") def prepare_local_assets(): os.makedirs(STATIC_DIR, exist_ok=True) base_url = "https://registry.npmmirror.com/three/0.160.0/files" assets = { "three.module.js": f"{base_url}/build/three.module.js", "OrbitControls.js": f"{base_url}/examples/jsm/controls/OrbitControls.js", "GLTFLoader.js": f"{base_url}/examples/jsm/loaders/GLTFLoader.js", "BufferGeometryUtils.js": f"{base_url}/examples/jsm/utils/BufferGeometryUtils.js" } for name, url in assets.items(): path = os.path.join(STATIC_DIR, name) if not os.path.exists(path): try: r = requests.get(url, verify=False, timeout=10) if r.status_code == 200: with open(path, "wb") as f: f.write(r.content) except Exception as e: print(f"Error downloading {name}: {e}") prepare_local_assets() def install_pytorch3d(): try: import pytorch3d print("โ PyTorch3D already installed.") return except ImportError: print("โณ PyTorch3D not found. Starting dynamic installation...") pyt_version_str = torch.__version__.split("+")[0].replace(".", "") version_str = "".join([ f"py3{sys.version_info.minor}_", f"cu{torch.version.cuda.replace('.', '')}_", f"pyt{pyt_version_str}" ]) whl_url = f"https://dl.fbaipublicfiles.com/pytorch3d/packaging/wheels/{version_str}/pytorch3d-0.7.8-cp3{sys.version_info.minor}-cp3{sys.version_info.minor}-linux_x86_64.whl" print(f"๐ Detected Env: {version_str}") print(f"โฌ๏ธ Attempting to install from Wheel: {whl_url}") try: subprocess.run([sys.executable, "-m", "pip", "install", whl_url], check=True) print("โ PyTorch3D installed via Wheel!") except subprocess.CalledProcessError: print("โ ๏ธ Wheel installation failed (maybe version mismatch).") print("๐จ Falling back to source compilation (this will take a few minutes)...") subprocess.run( [sys.executable, "-m", "pip", "install", "--no-build-isolation", "git+https://github.com/facebookresearch/pytorch3d.git@stable"], check=True ) print("โ PyTorch3D installed via Source Build!") install_pytorch3d() def install_mmcv(): try: import mmcv print(f"โ mmcv {mmcv.__version__} is already installed.") return except ImportError: print("โณ mmcv not found. Starting dynamic installation...") # 1. Detect environment versions to construct the URL dynamically # Example: CUDA 12.1 -> "121", Torch 2.4.0 -> "2.4" cuda_ver = torch.version.cuda.replace(".", "") torch_ver = ".".join(torch.__version__.split(".")[:2]) # 2. Construct the find-links URL matching OpenMMLab's structure # Structure: https://download.openmmlab.com/mmcv/dist/cu{CUDA}/torch{TORCH}/index.html find_links_url = f"https://download.openmmlab.com/mmcv/dist/cu{cuda_ver}/torch{torch_ver}/index.html" print(f"๐ Detected Env: CUDA={cuda_ver}, Torch={torch_ver}") print(f"โฌ๏ธ Installing mmcv==2.2.0 from: {find_links_url}") try: # 3. Run pip install with the specific version and dynamic link subprocess.run([ sys.executable, "-m", "pip", "install", "mmcv==2.2.0", "--find-links", find_links_url ], check=True) print("โ mmcv installed successfully.") except subprocess.CalledProcessError: print("โ ๏ธ Installation failed. The specific version might not exist for this environment.") print("๐ Attempting fallback using openmim (auto-resolve mode)...") # Fallback: Install openmim and let it handle the resolution subprocess.run([sys.executable, "-m", "pip", "install", "openmim"], check=True) subprocess.run(["mim", "install", "mmcv==2.2.0"], check=True) def install_sam2(): try: import sam2 except ImportError: print("Installing SAM 2 with patch...") subprocess.run(["git", "clone", "https://github.com/facebookresearch/segment-anything-2.git", "_tmp_sam2"], check=True) setup_path = "_tmp_sam2/setup.py" with open(setup_path, "r") as f: content = f.read() content = content.replace("torch>=2.5.1", "torch>=2.4.1") with open(setup_path, "w") as f: f.write(content) subprocess.run(["pip", "install", "--no-build-isolation", "--no-deps", "-v", "."], cwd="_tmp_sam2", check=True) shutil.rmtree("_tmp_sam2") install_sam2() sys.path.append(BASE_DIR) from unish.utils.inference_utils import ( load_model, process_video, run_inference, generate_mixed_geometries_in_memory, save_smpl_meshes_per_frame ) MODEL = None BODY_MODELS_PATH = "body_models/" # ========================================== # 4. ่พ ๅฉๅฝๆฐ # ========================================== def download_smpl_assets(body_models_path): if 'smpl' not in body_models_path: model_path = os.path.join(body_models_path, 'smpl') else: model_path = body_models_path target_dir = os.path.join(model_path, 'smpl') os.makedirs(target_dir, exist_ok=True) files = ["SMPL_NEUTRAL.pkl", "SMPL_MALE.pkl", "SMPL_FEMALE.pkl"] repo_id = "Murphyyyy/UniSH-Private-Assets" # <--- ไฟฎๆนไธบไฝ ็ไปๅบ token = os.environ.get("SMPL_DOWNLOAD_TOKEN") if not token: print("โ CRITICAL ERROR: 'SMPL_DOWNLOAD_TOKEN' not found in environment variables!") print("๐ Since 'UniSH-Private-Assets' is likely private, inference WILL fail without a token.") for filename in files: file_path = os.path.join(target_dir, filename) if not os.path.exists(file_path): try: print(f"๐ฅ Downloading {filename} from {repo_id}...") hf_hub_download( repo_id=repo_id, filename=filename, token=token, local_dir=target_dir, local_dir_use_symlinks=False ) print(f"โ Downloaded to: {file_path}") except Exception as e: print(f"โ Failed to download {filename}: {e}") print(f" (Check if your HF Token has access to {repo_id})") def pack_sequence_to_glb(base_dir, output_path, start_frame, end_frame, scene_rate=1.0): scene = trimesh.Scene() scene_cloud_dir = os.path.join(base_dir, "scene_clouds_per_frame") smpl_mesh_dir = os.path.join(base_dir, "smpl_meshes_per_frame") MAX_POINTS_PER_FRAME = 60000 for i in range(start_frame, end_frame): candidates = [ os.path.join(smpl_mesh_dir, f"combined_smpl_mesh_frame_{i:04d}.ply"), os.path.join(smpl_mesh_dir, f"smpl_mesh_frame_{i:04d}.ply") ] target_human_path = None for p in candidates: if os.path.exists(p): target_human_path = p break if target_human_path: try: human_mesh = trimesh.load(target_human_path) node_name = f"frame_{i}_human" scene.add_geometry(human_mesh, node_name=node_name, geom_name=node_name) except Exception: pass scene_pcd_path = os.path.join(scene_cloud_dir, f"scene_frame_{i:04d}.ply") if os.path.exists(scene_pcd_path): try: scene_pc = trimesh.load(scene_pcd_path) if hasattr(scene_pc, 'vertices') and len(scene_pc.vertices) > 0: num_points = len(scene_pc.vertices) if num_points > MAX_POINTS_PER_FRAME: choice = np.random.choice(num_points, MAX_POINTS_PER_FRAME, replace=False) scene_pc.vertices = scene_pc.vertices[choice] if hasattr(scene_pc, 'colors') and len(scene_pc.colors) > 0: scene_pc.colors = scene_pc.colors[choice] node_name = f"frame_{i}_scene" scene.add_geometry(scene_pc, node_name=node_name, geom_name=node_name) except Exception: pass if len(scene.geometry) == 0: dummy = trimesh.creation.box(extents=[0.01, 0.01, 0.01]) scene.add_geometry(dummy, node_name='dummy') scene.export(output_path) if not os.path.exists(output_path): raise FileNotFoundError(f"Export failed: {output_path}") def get_video_duration(video_path): if not video_path: return 10.0 try: cap = cv2.VideoCapture(video_path) if not cap.isOpened(): return 10.0 fps = cap.get(cv2.CAP_PROP_FPS) frame_count = cap.get(cv2.CAP_PROP_FRAME_COUNT) duration = frame_count / fps if fps > 0 else 10.0 cap.release() return duration except: return 10.0 def get_loading_html(message="Processing..."): return f"""
{message}