Spaces:
Build error
Build error
| """Space 6: Full Pipeline (simplified Space 5) | |
| One-click: downloads models -> TTS -> Image -> Lip-sync -> video. | |
| GPU: A100 (same as Space 5 with fewer controls) | |
| """ | |
| import gc | |
| import json | |
| import logging | |
| import os | |
| import shutil | |
| import subprocess | |
| import sys | |
| import traceback | |
| from pathlib import Path | |
| import gradio as gr | |
| import numpy as np | |
| import soundfile as sf | |
| import torch | |
| from hub_utils import download_step, upload_step | |
| logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s") | |
| logger = logging.getLogger(__name__) | |
| # ββ Config ββ | |
| IS_HF_SPACE = os.environ.get("SPACE_ID") is not None | |
| _data_path = Path("/data") | |
| if IS_HF_SPACE and _data_path.exists() and os.access(_data_path, os.W_OK): | |
| BASE_DIR = _data_path | |
| else: | |
| BASE_DIR = Path("data") | |
| VOICE_MODEL_DIR = BASE_DIR / "voice_model" | |
| LORA_MODEL_DIR = BASE_DIR / "lora_model" | |
| GENERATED_VIDEO_DIR = BASE_DIR / "generated" | |
| TEMP_DIR = BASE_DIR / "temp" | |
| HF_CACHE_DIR = BASE_DIR / "hf_cache" | |
| for d in [VOICE_MODEL_DIR, LORA_MODEL_DIR, GENERATED_VIDEO_DIR, TEMP_DIR, HF_CACHE_DIR]: | |
| d.mkdir(parents=True, exist_ok=True) | |
| os.environ["HF_HOME"] = str(HF_CACHE_DIR) | |
| os.environ["TRANSFORMERS_CACHE"] = str(HF_CACHE_DIR) | |
| # Fix invalid PYTHONHASHSEED that HF Spaces may set (crashes subprocesses) | |
| _phs = os.environ.get("PYTHONHASHSEED", "") | |
| if _phs and _phs != "random": | |
| try: | |
| val = int(_phs) | |
| if val < 0 or val > 4294967295: | |
| os.environ["PYTHONHASHSEED"] = "random" | |
| except ValueError: | |
| os.environ["PYTHONHASHSEED"] = "random" | |
| FLUX_MODEL_ID = "black-forest-labs/FLUX.1-dev" | |
| F5_SPANISH_MODEL_ID = "jpgallegoar/F5-Spanish" | |
| MUSETALK_REPO_ID = "TMElyralab/MuseTalk" | |
| LORA_TRIGGER_WORD = "alvaro_person" | |
| IMAGE_WIDTH = 1024 | |
| IMAGE_HEIGHT = 1024 | |
| IMAGE_STEPS = 30 | |
| IMAGE_GUIDANCE = 3.5 | |
| TTS_SPEED = 1.0 | |
| MUSETALK_FPS = 30 | |
| MUSETALK_BBOX_SHIFT = 5 | |
| CHUNK_DURATION_S = 10 | |
| CROSSFADE_DURATION_S = 0.5 | |
| DEVICE = "cuda" if torch.cuda.is_available() else "cpu" | |
| APP_VERSION = "1.0.0" | |
| _f5_model = None | |
| _flux_pipe = None | |
| MUSETALK_DIR = Path("musetalk_repo") | |
| def _clear_cache(): | |
| gc.collect() | |
| if torch.cuda.is_available(): | |
| torch.cuda.empty_cache() | |
| torch.cuda.synchronize() | |
| def _unload_all(): | |
| global _f5_model, _flux_pipe | |
| if _f5_model is not None: | |
| del _f5_model | |
| _f5_model = None | |
| if _flux_pipe is not None: | |
| del _flux_pipe | |
| _flux_pipe = None | |
| _clear_cache() | |
| # ββ FFmpeg utils ββ | |
| def _ffmpeg_run(cmd, description): | |
| result = subprocess.run(cmd, capture_output=True, text=True) | |
| if result.returncode != 0: | |
| raise RuntimeError(f"FFmpeg failed ({description}): {result.stderr[-500:]}") | |
| def _get_duration(file_path): | |
| cmd = ["ffprobe", "-v", "error", "-show_entries", "format=duration", | |
| "-of", "default=noprint_wrappers=1:nokey=1", file_path] | |
| result = subprocess.run(cmd, capture_output=True, text=True, check=True) | |
| return float(result.stdout.strip()) | |
| def _concat_videos(video_paths, output_path): | |
| list_file = Path(output_path).parent / "concat_list.txt" | |
| with open(list_file, "w") as f: | |
| for vp in video_paths: | |
| f.write(f"file '{vp}'\n") | |
| _ffmpeg_run(["ffmpeg", "-y", "-f", "concat", "-safe", "0", "-i", str(list_file), "-c", "copy", output_path], "concat") | |
| list_file.unlink(missing_ok=True) | |
| def _crossfade_videos(v1, v2, output, duration=0.5): | |
| dur1 = _get_duration(v1) | |
| offset = dur1 - duration | |
| _ffmpeg_run([ | |
| "ffmpeg", "-y", "-i", v1, "-i", v2, | |
| "-filter_complex", f"[0:v][1:v]xfade=transition=fade:duration={duration}:offset={offset}[v]", | |
| "-map", "[v]", "-c:v", "libx264", "-pix_fmt", "yuv420p", output, | |
| ], "crossfade") | |
| def _mux_audio_video(video, audio, output): | |
| _ffmpeg_run([ | |
| "ffmpeg", "-y", "-i", video, "-i", audio, | |
| "-c:v", "copy", "-c:a", "aac", "-b:a", "192k", | |
| "-map", "0:v:0", "-map", "1:a:0", "-shortest", output, | |
| ], "mux") | |
| # ββ TTS ββ | |
| def _load_tts(): | |
| global _f5_model | |
| if _f5_model is not None: | |
| return | |
| _unload_all() | |
| from f5_tts.api import F5TTS | |
| finetuned_path = VOICE_MODEL_DIR / "model_last.pt" | |
| if not finetuned_path.exists(): | |
| checkpoints = list(VOICE_MODEL_DIR.glob("*.pt")) + list(VOICE_MODEL_DIR.glob("*.safetensors")) | |
| finetuned_path = checkpoints[0] if checkpoints else None | |
| if finetuned_path and finetuned_path.exists(): | |
| _f5_model = F5TTS(model_path=str(finetuned_path), device=DEVICE) | |
| else: | |
| _f5_model = F5TTS(model_name=F5_SPANISH_MODEL_ID, device=DEVICE) | |
| _ref_text_cache = {} | |
| def _get_ref_text(audio_path): | |
| """Pre-transcribe reference audio in Spanish to avoid Whisper auto-detecting wrong language.""" | |
| if audio_path in _ref_text_cache: | |
| return _ref_text_cache[audio_path] | |
| _load_tts() | |
| logger.info(f"Transcribing reference audio as Spanish: {audio_path}") | |
| ref_text = _f5_model.transcribe(audio_path, language="spanish") | |
| logger.info(f"Reference transcription: {ref_text}") | |
| _ref_text_cache[audio_path] = ref_text | |
| return ref_text | |
| def generate_speech(text): | |
| _load_tts() | |
| ref = VOICE_MODEL_DIR / "reference.wav" | |
| if not ref.exists(): | |
| raise FileNotFoundError("No reference audio found.") | |
| ref_text = _get_ref_text(str(ref)) | |
| output_path = str(TEMP_DIR / "tts_output.wav") | |
| audio, sr, _spec = _f5_model.infer(ref_file=str(ref), ref_text=ref_text, gen_text=text, speed=TTS_SPEED) | |
| sf.write(output_path, audio, sr) | |
| return output_path | |
| def _unload_tts(): | |
| global _f5_model | |
| if _f5_model is not None: | |
| del _f5_model | |
| _f5_model = None | |
| _clear_cache() | |
| # ββ Image generation ββ | |
| def _load_flux(): | |
| global _flux_pipe | |
| if _flux_pipe is not None: | |
| return | |
| _unload_tts() | |
| from diffusers import FluxPipeline | |
| _flux_pipe = FluxPipeline.from_pretrained( | |
| FLUX_MODEL_ID, torch_dtype=torch.bfloat16, | |
| token=os.environ.get("HF_TOKEN"), | |
| ).to(DEVICE) | |
| lora_weights = list(LORA_MODEL_DIR.glob("*.safetensors")) or list(LORA_MODEL_DIR.glob("adapter_model.*")) | |
| if lora_weights: | |
| try: | |
| _flux_pipe.load_lora_weights(str(LORA_MODEL_DIR)) | |
| except Exception as e: | |
| logger.warning(f"Could not load LoRA: {e}") | |
| _flux_pipe.enable_model_cpu_offload() | |
| def _unload_flux(): | |
| global _flux_pipe | |
| if _flux_pipe is not None: | |
| del _flux_pipe | |
| _flux_pipe = None | |
| _clear_cache() | |
| def generate_image(prompt): | |
| _load_flux() | |
| config_path = LORA_MODEL_DIR / "lora_config.json" | |
| trigger = LORA_TRIGGER_WORD | |
| if config_path.exists(): | |
| with open(config_path) as f: | |
| trigger = json.load(f).get("trigger_word", LORA_TRIGGER_WORD) | |
| if trigger and trigger not in prompt: | |
| prompt = f"{trigger}, {prompt}" | |
| output_path = str(TEMP_DIR / "generated_avatar.png") | |
| result = _flux_pipe( | |
| prompt=prompt, width=IMAGE_WIDTH, height=IMAGE_HEIGHT, | |
| num_inference_steps=IMAGE_STEPS, guidance_scale=IMAGE_GUIDANCE, | |
| ) | |
| result.images[0].save(output_path) | |
| return output_path | |
| # ββ MuseTalk ββ | |
| def _run_pip(cmd_args, timeout=600): | |
| cmd = [sys.executable, "-m", "pip", "install"] + cmd_args | |
| r = subprocess.run(cmd, capture_output=True, text=True, timeout=timeout) | |
| pkg_name = [a for a in cmd_args if not a.startswith("-")] | |
| logger.info(f"pip install {' '.join(pkg_name)}: rc={r.returncode}") | |
| if r.returncode != 0: | |
| logger.warning(f"STDERR: {r.stderr[-500:]}") | |
| return r.returncode == 0 | |
| def _ensure_mm_packages(): | |
| try: | |
| import mmpose | |
| return | |
| except ImportError: | |
| pass | |
| logger.info("Installing OpenMMLab packages at runtime via openmim...") | |
| _run_pip(["--upgrade", "setuptools", "pip", "wheel", "openmim"], timeout=120) | |
| for pkg in ["mmengine", "mmcv", "mmdet", "mmpose"]: | |
| r = subprocess.run( | |
| [sys.executable, "-m", "mim", "install", pkg], | |
| capture_output=True, text=True, timeout=900, | |
| ) | |
| logger.info(f"mim install {pkg}: rc={r.returncode}") | |
| if r.returncode != 0: | |
| logger.warning(f"mim install {pkg} failed: {r.stderr[-300:]}") | |
| _run_pip(["--no-build-isolation", "--no-deps", pkg], timeout=600) | |
| try: | |
| import mmpose | |
| logger.info(f"mmpose {mmpose.__version__} installed successfully") | |
| except ImportError as e: | |
| logger.error(f"mmpose still not available: {e}") | |
| def _ensure_musetalk(): | |
| _ensure_mm_packages() | |
| if not MUSETALK_DIR.exists(): | |
| try: | |
| subprocess.run( | |
| ["git", "clone", "https://github.com/TMElyralab/MuseTalk.git", str(MUSETALK_DIR)], | |
| capture_output=True, text=True, timeout=300, check=True, | |
| ) | |
| except Exception: | |
| from huggingface_hub import snapshot_download | |
| snapshot_download(repo_id=MUSETALK_REPO_ID, local_dir=str(MUSETALK_DIR), repo_type="model") | |
| _download_musetalk_models() | |
| def _download_musetalk_models(): | |
| from huggingface_hub import hf_hub_download | |
| import urllib.request | |
| models_dir = MUSETALK_DIR / "models" | |
| models_dir.mkdir(parents=True, exist_ok=True) | |
| for filename in ["musetalk/musetalk.json", "musetalk/pytorch_model.bin"]: | |
| local_path = models_dir / filename | |
| if not local_path.exists(): | |
| local_path.parent.mkdir(parents=True, exist_ok=True) | |
| try: | |
| hf_hub_download(repo_id="TMElyralab/MuseTalk", filename=filename, | |
| local_dir=str(models_dir)) | |
| logger.info(f"Downloaded {filename}") | |
| except Exception as e: | |
| logger.warning(f"Could not download {filename}: {e}") | |
| dwpose_path = models_dir / "dwpose" / "dw-ll_ucoco_384.onnx" | |
| if not dwpose_path.exists(): | |
| dwpose_path.parent.mkdir(parents=True, exist_ok=True) | |
| try: | |
| hf_hub_download(repo_id="yzd-v/DWPose", filename="dw-ll_ucoco_384.onnx", | |
| local_dir=str(dwpose_path.parent), local_dir_use_symlinks=False) | |
| logger.info("Downloaded dwpose model") | |
| except Exception as e: | |
| logger.warning(f"Could not download dwpose: {e}") | |
| vae_dir = models_dir / "sd-vae-ft-mse" | |
| for filename in ["config.json", "diffusion_pytorch_model.bin"]: | |
| local_path = vae_dir / filename | |
| if not local_path.exists(): | |
| vae_dir.mkdir(parents=True, exist_ok=True) | |
| try: | |
| hf_hub_download(repo_id="stabilityai/sd-vae-ft-mse", filename=filename, | |
| local_dir=str(vae_dir), local_dir_use_symlinks=False) | |
| logger.info(f"Downloaded sd-vae-ft-mse/{filename}") | |
| except Exception as e: | |
| logger.warning(f"Could not download sd-vae-ft-mse/{filename}: {e}") | |
| face_parse_path = models_dir / "face-parse-bisenet" / "79999_iter.pth" | |
| if not face_parse_path.exists(): | |
| face_parse_path.parent.mkdir(parents=True, exist_ok=True) | |
| try: | |
| hf_hub_download(repo_id="camenduru/face-parse-bisenet", filename="79999_iter.pth", | |
| local_dir=str(face_parse_path.parent), local_dir_use_symlinks=False) | |
| logger.info("Downloaded face-parse-bisenet model") | |
| except Exception as e: | |
| logger.warning(f"Could not download face-parse model: {e}") | |
| whisper_path = models_dir / "whisper" / "tiny.pt" | |
| if not whisper_path.exists(): | |
| whisper_path.parent.mkdir(parents=True, exist_ok=True) | |
| try: | |
| url = "https://openaipublic.azureedge.net/main/whisper/models/65147644a518d12f04e32d6f3b26facc3f8dd46e5390956a9424a650c0ce22b9/tiny.pt" | |
| urllib.request.urlretrieve(url, str(whisper_path)) | |
| logger.info("Downloaded whisper tiny.pt") | |
| except Exception as e: | |
| logger.warning(f"Could not download whisper tiny: {e}") | |
| def _generate_lipsync(image_path, audio_path, output_path, bbox_shift): | |
| _unload_all() | |
| _ensure_musetalk() | |
| try: | |
| sys.path.insert(0, str(MUSETALK_DIR)) | |
| from musetalk.models.musetalk import MuseTalk | |
| model = MuseTalk() | |
| model.load_model(str(MUSETALK_DIR / "models")) | |
| result = model.inference( | |
| video_path=image_path, audio_path=audio_path, | |
| bbox_shift=bbox_shift, result_dir=str(Path(output_path).parent), | |
| ) | |
| if result and Path(result).exists(): | |
| if str(result) != output_path: | |
| shutil.move(result, output_path) | |
| return output_path | |
| except Exception as e: | |
| logger.warning(f"Python MuseTalk failed: {e}, trying CLI...") | |
| result_dir = TEMP_DIR / "musetalk_output" | |
| result_dir.mkdir(parents=True, exist_ok=True) | |
| cmd = [ | |
| sys.executable, "-m", "scripts.inference", | |
| "--video_path", image_path, "--audio_path", audio_path, | |
| "--bbox_shift", str(bbox_shift), "--result_dir", str(result_dir), | |
| "--fps", str(MUSETALK_FPS), "--batch_size", "8", | |
| ] | |
| env = os.environ.copy() | |
| env["PYTHONPATH"] = str(MUSETALK_DIR) + ":" + env.get("PYTHONPATH", "") | |
| proc = subprocess.run(cmd, capture_output=True, text=True, cwd=str(MUSETALK_DIR), env=env, timeout=1800) | |
| if proc.returncode != 0: | |
| raise RuntimeError(f"MuseTalk failed: {proc.stderr[-500:]}") | |
| outputs = sorted(result_dir.glob("**/*.mp4"), key=lambda p: p.stat().st_mtime, reverse=True) | |
| if not outputs: | |
| raise RuntimeError("MuseTalk did not produce output") | |
| shutil.move(str(outputs[0]), output_path) | |
| shutil.rmtree(result_dir, ignore_errors=True) | |
| return output_path | |
| def compose_long_video(image_path, audio_path, output_path, bbox_shift, progress_callback=None): | |
| audio, sr = sf.read(audio_path) | |
| if audio.ndim > 1: | |
| audio = audio.mean(axis=1) | |
| total_duration = len(audio) / sr | |
| if total_duration <= CHUNK_DURATION_S * 1.5: | |
| if progress_callback: | |
| progress_callback(0.1, "Generando lip-sync...") | |
| return _generate_lipsync(image_path, audio_path, output_path, bbox_shift) | |
| work_dir = TEMP_DIR / "compose_work" | |
| if work_dir.exists(): | |
| shutil.rmtree(work_dir) | |
| work_dir.mkdir(parents=True) | |
| from pydub import AudioSegment | |
| from pydub.silence import detect_silence | |
| temp_path = str(TEMP_DIR / "_temp_silence.wav") | |
| sf.write(temp_path, audio, sr) | |
| sound = AudioSegment.from_wav(temp_path) | |
| silences = detect_silence(sound, min_silence_len=300, silence_thresh=-35) | |
| boundaries = [0.0] | |
| current = 0.0 | |
| while current + CHUNK_DURATION_S < total_duration: | |
| target = current + CHUNK_DURATION_S | |
| best_split, best_dist = target, float("inf") | |
| for start_ms, end_ms in silences: | |
| mid = (start_ms + end_ms) / 2000.0 | |
| if current + 3.0 < mid < total_duration - 1.0: | |
| dist = abs(mid - target) | |
| if dist < best_dist: | |
| best_dist = dist | |
| best_split = mid | |
| boundaries.append(best_split) | |
| current = best_split | |
| boundaries.append(total_duration) | |
| Path(temp_path).unlink(missing_ok=True) | |
| n_chunks = len(boundaries) - 1 | |
| chunk_videos = [] | |
| for i in range(n_chunks): | |
| if progress_callback: | |
| progress_callback(0.1 + (i / n_chunks) * 0.7, f"Chunk {i+1}/{n_chunks}...") | |
| start_sample = int(boundaries[i] * sr) | |
| end_sample = int(boundaries[i + 1] * sr) | |
| chunk_audio_path = str(work_dir / f"chunk_{i:03d}.wav") | |
| sf.write(chunk_audio_path, audio[start_sample:end_sample], sr) | |
| chunk_video_path = str(work_dir / f"chunk_{i:03d}.mp4") | |
| _generate_lipsync(image_path, chunk_audio_path, chunk_video_path, bbox_shift) | |
| chunk_videos.append(chunk_video_path) | |
| if len(chunk_videos) == 1: | |
| final_video = chunk_videos[0] | |
| elif CROSSFADE_DURATION_S > 0: | |
| current_vid = chunk_videos[0] | |
| for i in range(1, len(chunk_videos)): | |
| merged = str(work_dir / f"merged_{i:03d}.mp4") | |
| try: | |
| _crossfade_videos(current_vid, chunk_videos[i], merged, CROSSFADE_DURATION_S) | |
| except Exception: | |
| _concat_videos([current_vid, chunk_videos[i]], merged) | |
| current_vid = merged | |
| final_video = current_vid | |
| else: | |
| final_video = str(work_dir / "concat.mp4") | |
| _concat_videos(chunk_videos, final_video) | |
| _mux_audio_video(final_video, audio_path, output_path) | |
| shutil.rmtree(work_dir, ignore_errors=True) | |
| return output_path | |
| # ββ Gradio handlers ββ | |
| def download_models_from_hub(project_name): | |
| if not project_name or not project_name.strip(): | |
| return "Error: Debes introducir un nombre de proyecto" | |
| name = project_name.strip() | |
| try: | |
| status_parts = [] | |
| for step, local_dir, label in [ | |
| ("step3_voice", VOICE_MODEL_DIR, "voz"), | |
| ("step4_lora", LORA_MODEL_DIR, "LoRA"), | |
| ]: | |
| if local_dir.exists(): | |
| shutil.rmtree(local_dir) | |
| local_dir.mkdir(parents=True) | |
| download_step(name, step, str(BASE_DIR)) | |
| src = BASE_DIR / name / step | |
| if src.exists(): | |
| for f in src.iterdir(): | |
| shutil.move(str(f), str(local_dir / f.name)) | |
| status_parts.append(label) | |
| shutil.rmtree(BASE_DIR / name, ignore_errors=True) | |
| return f"OK - Descargados: {', '.join(status_parts)}" | |
| except Exception as e: | |
| return f"Error: {e}" | |
| def full_pipeline_handler(project_name, text, scene_prompt, bbox_shift, progress=gr.Progress()): | |
| if not project_name or not project_name.strip(): | |
| return None, "Error: Debes introducir un nombre de proyecto" | |
| if not text.strip(): | |
| return None, "Error: Introduce texto para hablar" | |
| voice_ready = any(VOICE_MODEL_DIR.glob("*.pt")) or any(VOICE_MODEL_DIR.glob("*.safetensors")) | |
| lora_ready = any(LORA_MODEL_DIR.glob("*.safetensors")) or any(LORA_MODEL_DIR.glob("adapter_model.*")) | |
| if not voice_ready: | |
| return None, "Error: Modelo de voz no encontrado. Descarga desde el Hub primero." | |
| if not lora_ready: | |
| return None, "Error: LoRA no encontrado. Descarga desde el Hub primero." | |
| try: | |
| progress(0.0, desc="Generando voz...") | |
| audio_path = generate_speech(text) | |
| progress(0.2, desc="Generando imagen...") | |
| image_path = generate_image(scene_prompt) | |
| _unload_flux() | |
| progress(0.4, desc="Generando lip-sync...") | |
| output_path = str(GENERATED_VIDEO_DIR / "final_output.mp4") | |
| compose_long_video( | |
| image_path=image_path, audio_path=audio_path, | |
| output_path=output_path, bbox_shift=int(bbox_shift), | |
| progress_callback=lambda p, m: progress(0.4 + p * 0.6, desc=m), | |
| ) | |
| return output_path, "OK - Video generado!" | |
| except Exception as e: | |
| logger.error(f"Pipeline failed:\n{traceback.format_exc()}") | |
| return None, f"Error: {e}" | |
| def save_to_hub(project_name): | |
| if not project_name or not project_name.strip(): | |
| return "Error: Debes introducir un nombre de proyecto" | |
| videos = list(GENERATED_VIDEO_DIR.glob("*.mp4")) | |
| if not videos: | |
| return "Error: No hay video para guardar." | |
| try: | |
| return upload_step(project_name.strip(), "step5_video", str(GENERATED_VIDEO_DIR)) | |
| except Exception as e: | |
| return f"Error: {e}" | |
| # ββ UI ββ | |
| with gr.Blocks(title="Talking Head - Full Pipeline", theme=gr.themes.Soft()) as demo: | |
| gr.Markdown(f"# Talking Head - Pipeline Completo `v{APP_VERSION}`\nTexto -> Video final (todo en uno)") | |
| project_name = gr.Textbox( | |
| label="Nombre del proyecto", | |
| placeholder="mi_proyecto", | |
| info="Obligatorio. Se usa como carpeta en el Hub.", | |
| ) | |
| gr.Markdown("### 1. Descargar modelos del Hub") | |
| download_btn = gr.Button("Descargar modelos del Hub", variant="secondary") | |
| download_status = gr.Textbox(label="Estado", interactive=False) | |
| gr.Markdown("### 2. Generar video") | |
| with gr.Row(): | |
| with gr.Column(): | |
| full_text = gr.Textbox(label="Texto a hablar", lines=6, placeholder="Escribe el texto aqui...") | |
| full_scene = gr.Textbox( | |
| label="Prompt de escena", | |
| value="portrait photo, professional lighting, neutral background", | |
| ) | |
| full_bbox = gr.Slider(-20, 20, value=MUSETALK_BBOX_SHIFT, step=1, label="Bbox Shift") | |
| full_btn = gr.Button("Generar Video", variant="primary") | |
| with gr.Column(): | |
| full_video = gr.Video(label="Video final") | |
| full_status = gr.Textbox(label="Estado", interactive=False) | |
| gr.Markdown("### 3. Guardar video en Hub") | |
| save_btn = gr.Button("Guardar en Hub", variant="secondary") | |
| save_status = gr.Textbox(label="Estado guardado", interactive=False) | |
| download_btn.click(download_models_from_hub, inputs=[project_name], outputs=[download_status]) | |
| full_btn.click( | |
| full_pipeline_handler, | |
| inputs=[project_name, full_text, full_scene, full_bbox], | |
| outputs=[full_video, full_status], | |
| ) | |
| save_btn.click(save_to_hub, inputs=[project_name], outputs=[save_status]) | |
| if __name__ == "__main__": | |
| demo.queue().launch(server_name="0.0.0.0", server_port=7860, ssr_mode=False) | |