Spaces:
Running on Zero
Running on Zero
| """ | |
| server.py β FastAPI backend for VideoVoice. | |
| Endpoints: | |
| POST /api/jobs β Submit a video for translation (file upload or URL) | |
| GET /api/jobs/{id} β SSE stream of pipeline progress | |
| GET /api/jobs/{id}/result β Download the translated video | |
| POST /api/jobs/{id}/select-model β Select TTS model after preview | |
| GET /api/jobs/{id}/preview/{model} β Stream preview audio | |
| GET /api/demo-videos β List available demo videos (outputs + data) | |
| GET /api/demo-videos/{video_id}/stream β Stream demo video by ID | |
| GET /api/showcase β Curated before/after showcase entries | |
| """ | |
| import asyncio | |
| import hashlib | |
| import json | |
| import os | |
| import subprocess | |
| import shutil | |
| import threading | |
| import time | |
| import uuid | |
| import re | |
| from pathlib import Path | |
| from urllib.parse import urlparse | |
| from typing import Optional | |
| from dotenv import load_dotenv | |
| from fastapi import FastAPI, APIRouter, File, Form, HTTPException, Request, UploadFile, Header | |
| from fastapi.middleware.cors import CORSMiddleware | |
| from fastapi import Request | |
| from fastapi.responses import FileResponse, JSONResponse | |
| from fastapi.staticfiles import StaticFiles | |
| from pydantic import BaseModel | |
| from slowapi import Limiter, _rate_limit_exceeded_handler | |
| from slowapi.errors import RateLimitExceeded | |
| from slowapi.middleware import SlowAPIMiddleware | |
| from slowapi.util import get_remote_address | |
| from sse_starlette.sse import EventSourceResponse | |
| load_dotenv() | |
| # TTS_ENGINE controls which TTS backend this Space serves | |
| TTS_ENGINE = os.getenv("TTS_ENGINE", "chatterbox").lower() | |
| if TTS_ENGINE not in ("chatterbox", "omnivoice", "qwen3", "dramabox"): | |
| raise ValueError(f"Invalid TTS_ENGINE: {TTS_ENGINE}. Use 'chatterbox', 'omnivoice', 'qwen3', or 'dramabox'.") | |
| # ββ Config ββββββββββββββββββββββββββββββββββββββββββββββββ | |
| PORT = int(os.getenv("PORT", "7860")) | |
| MAX_FILE_SIZE_MB = 90 | |
| MAX_DURATION_SEC = 90 | |
| MAX_UPLOAD_BYTES = MAX_FILE_SIZE_MB * 1024 * 1024 | |
| def _default_artifacts_root() -> Path: | |
| # Prefer /data/jobs when the Space has persistent storage mounted | |
| # (Docker deploys, or Gradio SDK Spaces with persistent storage enabled). | |
| # Fall back to /tmp when /data is not writable, which is the case on | |
| # Zero GPU / Gradio SDK Spaces without the paid persistent-storage add-on. | |
| preferred = Path("/data/jobs") | |
| try: | |
| preferred.parent.mkdir(parents=True, exist_ok=True) | |
| if os.access(preferred.parent, os.W_OK): | |
| return preferred | |
| except (PermissionError, OSError): | |
| pass | |
| return Path("/tmp/videovoice_jobs") | |
| ARTIFACTS_ROOT = Path(os.getenv("ARTIFACTS_ROOT") or _default_artifacts_root()) | |
| ALLOWED_YTDLP_HOSTS = { | |
| "instagram.com", | |
| "youtube.com", | |
| "youtu.be", | |
| "tiktok.com", | |
| "vm.tiktok.com", | |
| } | |
| PERSISTENT_ARTIFACT_DIRS = {"uploads", "outputs", "data", "tmp", "tools"} | |
| REAPER_INTERVAL_SECONDS = 10 * 60 | |
| REAPER_MAX_AGE_SECONDS = 2 * 60 * 60 | |
| def _parse_allowed_origins(value: str) -> list[str]: | |
| origins = [origin.strip() for origin in value.split(",") if origin.strip()] | |
| return origins or ["http://localhost:5173"] | |
| ALLOWED_ORIGINS = _parse_allowed_origins( | |
| os.getenv("ALLOWED_ORIGINS", "http://localhost:5173") | |
| ) | |
| # ββ App ββββββββββββββββββββββββββββββββββββββββββββββββ | |
| router = APIRouter() | |
| _RATE_LIMIT_ENABLED = os.getenv("DISABLE_RATE_LIMIT", "").lower() not in ("1", "true", "yes") | |
| limiter = Limiter(key_func=get_remote_address, enabled=_RATE_LIMIT_ENABLED) | |
| # Note: app.state.limiter, exception handlers, and SlowAPIMiddleware | |
| # are now configured on the main Server instance in app.py. | |
| # ββ In-memory job store ββββββββββββββββββββββββββββββββ | |
| # Structure: { job_id: { status, messages[], result_path, error, created_at, | |
| # voice_mode, preview_paths, preview_event, selected_model } } | |
| jobs: dict = {} | |
| # ββ GPU job queue βββββββββββββββββββββββββββββββββββββ | |
| # Only 1 GPU job at a time β others wait in FIFO order | |
| gpu_semaphore = threading.Semaphore(1) | |
| gpu_queue: list[str] = [] # ordered list of queued job_ids waiting for GPU | |
| gpu_active: dict = { # the currently running job's live info | |
| "job_id": None, | |
| "started_at": None, | |
| "step": 0, | |
| "total_steps": 6, | |
| "step_label": "", | |
| } | |
| # Per-step timing history: { step_num: [durations] } β learns real per-step costs | |
| step_durations: dict[int, list[float]] = {} | |
| session_active_jobs: dict[str, str] = {} | |
| artifact_reaper_task: Optional[asyncio.Task] = None | |
| UPLOAD_DIR = ARTIFACTS_ROOT / "uploads" | |
| OUTPUT_DIR = ARTIFACTS_ROOT / "outputs" | |
| SHOWCASE_DIR = ARTIFACTS_ROOT / "data" / "showcase" | |
| SHOWCASE_FILE = ARTIFACTS_ROOT / "data" / "showcase.json" | |
| DEMO_VIDEO_DIRS = { | |
| "outputs": OUTPUT_DIR, | |
| "data": ARTIFACTS_ROOT / "data", | |
| "showcase": SHOWCASE_DIR, | |
| } | |
| # ββ Helpers ββββββββββββββββββββββββββββββββββββββββββββ | |
| def _download_url(url: str, dest: str) -> str: | |
| """Download video from Instagram/YouTube using yt-dlp.""" | |
| result = subprocess.run( | |
| [ | |
| "yt-dlp", | |
| "--no-playlist", | |
| "--max-filesize", "100M", | |
| "--js-runtimes", "node", | |
| "--extractor-args", "youtube:player_client=android,ios,web_safari", | |
| "-f", "mp4/best[ext=mp4]/best", | |
| "-o", dest, | |
| url, | |
| ], | |
| capture_output=True, | |
| text=True, | |
| timeout=120, | |
| ) | |
| if result.returncode != 0: | |
| raise RuntimeError(f"yt-dlp failed: {result.stderr[:300]}") | |
| return dest | |
| def _is_allowed_video_host(url: str) -> bool: | |
| """Allow only trusted social platforms for yt-dlp.""" | |
| parsed = urlparse(url) | |
| host = (parsed.hostname or "").lower() | |
| if not host: | |
| return False | |
| return ( | |
| host in ALLOWED_YTDLP_HOSTS | |
| or host.endswith(".instagram.com") | |
| or host.endswith(".youtube.com") | |
| or host.endswith(".tiktok.com") | |
| ) | |
| def _probe_duration_seconds(path: str) -> float: | |
| """Read media duration from ffprobe.""" | |
| result = subprocess.run( | |
| [ | |
| "ffprobe", | |
| "-v", "error", | |
| "-show_entries", "format=duration", | |
| "-of", "csv=p=0", | |
| path, | |
| ], | |
| capture_output=True, | |
| text=True, | |
| timeout=30, | |
| ) | |
| if result.returncode != 0: | |
| raise RuntimeError(f"ffprobe failed: {result.stderr[:300]}") | |
| try: | |
| return float(result.stdout.strip()) | |
| except ValueError as exc: | |
| raise RuntimeError("ffprobe returned an invalid duration value") from exc | |
| def _gpu_available() -> bool: | |
| """Report CUDA/MPS availability.""" | |
| try: | |
| import torch | |
| mps_available = hasattr(torch.backends, "mps") and torch.backends.mps.is_available() | |
| return bool(torch.cuda.is_available() or mps_available) | |
| except Exception: | |
| return False | |
| def _queue_depth() -> int: | |
| """Total queue pressure: active job + queued jobs.""" | |
| return len(gpu_queue) + (1 if gpu_active["job_id"] else 0) | |
| def _is_job_active(job_id: str) -> bool: | |
| """Whether a job is still active (queued/running).""" | |
| job = jobs.get(job_id) | |
| if not job: | |
| return False | |
| return job.get("status") in {"queued", "running"} | |
| def _release_session_lock(job: dict) -> None: | |
| session_id = job.get("session_id") | |
| if not session_id: | |
| return | |
| if session_active_jobs.get(session_id) == job.get("job_id"): | |
| session_active_jobs.pop(session_id, None) | |
| def _demo_video_id(folder: str, filename: str) -> str: | |
| """Generate a stable opaque ID for a whitelisted demo video.""" | |
| raw = f"{folder}/{filename}".encode("utf-8") | |
| return hashlib.sha256(raw).hexdigest()[:20] | |
| def _collect_demo_videos(): | |
| """Discover demo videos and return (metadata list, id -> path lookup).""" | |
| videos = [] | |
| video_lookup = {} | |
| for folder, directory in DEMO_VIDEO_DIRS.items(): | |
| if not directory.exists() or not directory.is_dir(): | |
| continue | |
| for file_path in directory.iterdir(): | |
| if not file_path.is_file() or file_path.suffix.lower() != ".mp4": | |
| continue | |
| stat = file_path.stat() | |
| video_id = _demo_video_id(folder, file_path.name) | |
| videos.append( | |
| { | |
| "id": video_id, | |
| "name": file_path.name, | |
| "url": f"/api/demo-videos/{video_id}/stream", | |
| "folder": folder, | |
| "size_bytes": stat.st_size, | |
| "modified_at": int(stat.st_mtime), | |
| } | |
| ) | |
| video_lookup[video_id] = file_path | |
| videos.sort( | |
| key=lambda item: ( | |
| item["name"].lower(), | |
| item["folder"].lower(), | |
| item["url"].lower(), | |
| ) | |
| ) | |
| return videos, video_lookup | |
| def _queue_status_for(job_id: str) -> str | None: | |
| """Build a live queue status string for a waiting job.""" | |
| if job_id not in gpu_queue: | |
| return None | |
| pos = gpu_queue.index(job_id) + 1 # 1-based position | |
| active = gpu_active | |
| if not active["job_id"]: | |
| return f"Queue position: {pos} β GPU starting up..." | |
| step = active["step"] | |
| total = active["total_steps"] | |
| label = active["step_label"] | |
| # Build ETA from per-step history if we have it | |
| eta_part = "" | |
| if step > 0 and step_durations: | |
| remaining_secs = 0 | |
| for s in range(step, total + 1): | |
| hist = step_durations.get(s, []) | |
| remaining_secs += (sum(hist) / len(hist)) if hist else 15 | |
| # Multiply by queue position (jobs ahead) | |
| remaining_secs = int(remaining_secs * pos) | |
| if remaining_secs > 0: | |
| if remaining_secs < 60: | |
| eta_part = f" β ~{remaining_secs}s remaining" | |
| else: | |
| m, s_ = divmod(remaining_secs, 60) | |
| eta_part = f" β ~{m}m {s_:02d}s remaining" | |
| jobs_word = "job" if pos == 1 else "jobs" | |
| if label: | |
| return f"{pos} {jobs_word} ahead (Step {step}/{total} β {label}){eta_part}" | |
| else: | |
| return f"{pos} {jobs_word} ahead (Step {step}/{total}){eta_part}" | |
| def _config_languages() -> list[str]: | |
| """Expose supported language names from the pipeline (Chatterbox set).""" | |
| from pipeline import LANGUAGE_CODES | |
| return list(LANGUAGE_CODES.keys()) | |
| def _chatterbox_language_options() -> list[dict]: | |
| from pipeline import LANGUAGE_CODES | |
| return [{"name": name, "code": code} for name, code in LANGUAGE_CODES.items()] | |
| def _omnivoice_language_options() -> list[dict]: | |
| from steps.lang.omnivoice_languages import OMNIVOICE_LANGUAGE_CODES | |
| return [{"name": name, "code": code} for name, code in OMNIVOICE_LANGUAGE_CODES.items()] | |
| def _qwen3_language_options() -> list[dict]: | |
| from steps.lang.qwen3_languages import QWEN3_LANGUAGE_CODES | |
| return [{"name": name, "code": code} for name, code in QWEN3_LANGUAGE_CODES.items()] | |
| async def _artifact_reaper_loop(): | |
| """Delete stale per-job artifact directories from ARTIFACTS_ROOT.""" | |
| while True: | |
| try: | |
| now = time.time() | |
| for path in ARTIFACTS_ROOT.iterdir(): | |
| if not path.is_dir(): | |
| continue | |
| if path.name in PERSISTENT_ARTIFACT_DIRS: | |
| continue | |
| age = now - path.stat().st_mtime | |
| if age > REAPER_MAX_AGE_SECONDS: | |
| shutil.rmtree(path, ignore_errors=True) | |
| stale_jobs = [ | |
| job_id | |
| for job_id, state in jobs.items() | |
| if state.get("status") in {"complete", "error"} | |
| and (now - state.get("created_at", now)) > REAPER_MAX_AGE_SECONDS | |
| ] | |
| for job_id in stale_jobs: | |
| jobs.pop(job_id, None) | |
| except Exception as exc: | |
| print(f"[reaper] cleanup error: {exc}") | |
| await asyncio.sleep(REAPER_INTERVAL_SECONDS) | |
| async def enforce_content_length_limit(request: Request, call_next): | |
| """Reject oversized uploads before body parsing.""" | |
| if request.method.upper() == "POST" and request.url.path == "/api/jobs": | |
| content_length = request.headers.get("content-length") | |
| if content_length: | |
| try: | |
| if int(content_length) > MAX_UPLOAD_BYTES: | |
| return JSONResponse( | |
| status_code=413, | |
| content={"detail": f"File too large (max {MAX_FILE_SIZE_MB}MB)."}, | |
| ) | |
| except ValueError: | |
| return JSONResponse( | |
| status_code=400, | |
| content={"detail": "Invalid Content-Length header."}, | |
| ) | |
| return await call_next(request) | |
| async def _run_pipeline_async( | |
| job_id: str, video_path: str, target_lang: str, source_lang: str, voice_mode: str, captions: bool = True, preserve_music: bool = True, video_link: Optional[str] = None | |
| ): | |
| """Run the translation pipeline in a background thread, pushing progress to the job store.""" | |
| from pipeline import run_pipeline | |
| job = jobs[job_id] | |
| job["status"] = "queued" | |
| # Join the queue | |
| gpu_queue.append(job_id) | |
| job["_wait_status"] = _queue_status_for(job_id) or "Waiting for GPU..." | |
| # Wait for GPU without blocking the event loop β update queue status each tick | |
| while not gpu_semaphore.acquire(blocking=False): | |
| job["_wait_status"] = _queue_status_for(job_id) or "Waiting for GPU..." | |
| await asyncio.sleep(1) | |
| # Leave the queue, mark as running | |
| if job_id in gpu_queue: | |
| gpu_queue.remove(job_id) | |
| job["_wait_status"] = None | |
| job["status"] = "running" | |
| # Fixed 6 pipeline steps: extract, separate, transcribe, translate, tts, sync, merge | |
| # (+1 if preserve_music for music restoration) | |
| total_steps = 6 + (1 if preserve_music else 0) | |
| gpu_active["job_id"] = job_id | |
| gpu_active["started_at"] = time.time() | |
| gpu_active["step"] = 0 | |
| gpu_active["total_steps"] = total_steps | |
| gpu_active["step_label"] = "" | |
| job["messages"].append({"type": "progress", "message": "GPU acquired β starting pipeline...", "step": 0}) | |
| start = time.time() | |
| step_start = time.time() | |
| try: | |
| data_dir = str(ARTIFACTS_ROOT / job_id) | |
| os.makedirs(data_dir, exist_ok=True) | |
| output_path = str(Path(data_dir) / "output.mp4") | |
| # Note: preview_both mode removed in single-engine Spaces | |
| # Each Space only serves one TTS engine (TTS_ENGINE env var) | |
| preview_event = None | |
| gen = run_pipeline( | |
| video_path=video_path, | |
| target_language=target_lang, | |
| source_language=source_lang, | |
| output_path=output_path, | |
| voice_mode=voice_mode, | |
| preview_event=preview_event, | |
| job_state=job, | |
| captions=captions, | |
| preserve_music=preserve_music, | |
| data_dir=data_dir, | |
| video_link=video_link, | |
| ) | |
| step = 0 | |
| def _run_gen(): | |
| nonlocal step, step_start | |
| output = None | |
| try: | |
| while True: | |
| msg = next(gen) | |
| # Handle preview-ready sentinel dict | |
| if isinstance(msg, dict) and msg.get("__PREVIEW_READY__"): | |
| preview_paths = msg["paths"] | |
| job["preview_paths"] = preview_paths | |
| # Build preview URLs | |
| preview_urls = {} | |
| for model_name, path in preview_paths.items(): | |
| if path: | |
| preview_urls[model_name] = ( | |
| f"/api/jobs/{job_id}/preview/{model_name}" | |
| ) | |
| job["messages"].append({ | |
| "type": "voice_preview", | |
| "step": 4, | |
| "previews": preview_urls, | |
| }) | |
| continue | |
| # Regular string message | |
| if isinstance(msg, str): | |
| # Detect step transitions and record per-step timing | |
| if "Step" in msg and f"/{total_steps}" in msg: | |
| try: | |
| new_step = int( | |
| msg.split("Step")[1].split("/")[0].strip() | |
| ) | |
| # Record duration of the step that just ended | |
| if step > 0: | |
| dur = time.time() - step_start | |
| step_durations.setdefault(step, []) | |
| step_durations[step].append(dur) | |
| if len(step_durations[step]) > 10: | |
| step_durations[step].pop(0) | |
| step = new_step | |
| step_start = time.time() | |
| # Extract step label (text after "Step X/Y: ") | |
| label = msg.split(":", 1)[1].strip() if ":" in msg else "" | |
| # Remove emoji prefix | |
| label = label.lstrip("ππππ£οΈβ±οΈποΈπ§ ") | |
| gpu_active["step"] = step | |
| gpu_active["step_label"] = label | |
| except (ValueError, IndexError): | |
| pass | |
| job["messages"].append({ | |
| "type": "progress", | |
| "message": msg.strip(), | |
| "step": step, | |
| }) | |
| except StopIteration as e: | |
| output = e.value | |
| except Exception as e: | |
| # Pipeline crashed β set error status directly from | |
| # the thread so the frontend sees it immediately, | |
| # rather than relying on exception propagation through | |
| # run_in_executor (which can silently swallow errors | |
| # when stdout/stderr are in a broken state). | |
| import traceback | |
| tb = traceback.format_exc() | |
| print(f"[pipeline] CRASH in job {job_id}: {e}\n{tb}") | |
| job["status"] = "error" | |
| job["messages"].append({ | |
| "type": "error", | |
| "message": f"Pipeline crashed: {e}", | |
| }) | |
| return None | |
| # Record the final step's duration | |
| if step > 0: | |
| dur = time.time() - step_start | |
| step_durations.setdefault(step, []) | |
| step_durations[step].append(dur) | |
| if len(step_durations[step]) > 10: | |
| step_durations[step].pop(0) | |
| return output | |
| loop = asyncio.get_event_loop() | |
| result_path = await loop.run_in_executor(None, _run_gen) | |
| if job["status"] == "error": | |
| # Error already reported by _run_gen β skip marking as complete | |
| pass | |
| else: | |
| elapsed = round(time.time() - start) | |
| job["status"] = "complete" | |
| job["result_path"] = result_path or output_path | |
| job["messages"].append({"type": "complete", "elapsed": elapsed}) | |
| except Exception as e: | |
| job["status"] = "error" | |
| job["messages"].append({"type": "error", "message": str(e)}) | |
| finally: | |
| # Free GPU memory between jobs | |
| import gc | |
| import torch | |
| gc.collect() | |
| if hasattr(torch, "mps") and torch.backends.mps.is_available(): | |
| torch.mps.empty_cache() | |
| gpu_active["job_id"] = None | |
| gpu_active["started_at"] = None | |
| gpu_active["step"] = 0 | |
| gpu_active["step_label"] = "" | |
| if job_id in gpu_queue: | |
| gpu_queue.remove(job_id) | |
| _release_session_lock(job) | |
| gpu_semaphore.release() | |
| # ββ Routes βββββββββββββββββββββββββββββββββββββββββββββ | |
| async def health(): | |
| return JSONResponse( | |
| { | |
| "status": "ok", | |
| "gpu_available": _gpu_available(), | |
| "queue_depth": _queue_depth(), | |
| "active_job_id": gpu_active["job_id"], | |
| } | |
| ) | |
| async def config(): | |
| return JSONResponse( | |
| { | |
| "max_file_size_mb": MAX_FILE_SIZE_MB, | |
| "max_duration_sec": MAX_DURATION_SEC, | |
| "languages": _config_languages(), | |
| "chatterbox_languages": _chatterbox_language_options(), | |
| "omnivoice_languages": _omnivoice_language_options(), | |
| "qwen3_languages": _qwen3_language_options(), | |
| "tts_models": [TTS_ENGINE], | |
| "tts_engine": TTS_ENGINE, | |
| } | |
| ) | |
| async def list_demo_videos(): | |
| """List whitelisted MP4 demo videos from outputs/ and data/.""" | |
| videos, _ = _collect_demo_videos() | |
| return JSONResponse({"videos": videos}) | |
| async def stream_demo_video(video_id: str): | |
| """Stream a demo video by opaque ID (no client-provided path).""" | |
| _, video_lookup = _collect_demo_videos() | |
| video_path = video_lookup.get(video_id) | |
| if not video_path: | |
| raise HTTPException(404, "Demo video not found.") | |
| return FileResponse( | |
| str(video_path), | |
| media_type="video/mp4", | |
| filename=video_path.name, | |
| ) | |
| async def get_showcase(): | |
| """Return curated showcase entries with resolved streaming URLs.""" | |
| if not SHOWCASE_FILE.exists(): | |
| return JSONResponse({"showcases": []}) | |
| try: | |
| data = json.loads(SHOWCASE_FILE.read_text(encoding="utf-8")) | |
| except (json.JSONDecodeError, OSError): | |
| return JSONResponse({"showcases": []}) | |
| showcases = data.get("showcases", []) | |
| for entry in showcases: | |
| for key in ("their_dub", "our_dub"): | |
| dub = entry.get(key) | |
| if dub and dub.get("type") == "local" and dub.get("filename"): | |
| video_id = _demo_video_id("showcase", dub["filename"]) | |
| dub["url"] = f"/api/demo-videos/{video_id}/stream" | |
| return JSONResponse({"showcases": showcases}) | |
| async def create_job( | |
| request: Request, | |
| file: Optional[UploadFile] = File(None), | |
| url: Optional[str] = Form(None), | |
| target_language: str = Form("Spanish"), | |
| source_language: str = Form("auto"), | |
| voice_mode: str = Form("chatterbox"), | |
| captions: str = Form("true"), | |
| preserve_music: str = Form("false"), | |
| x_session_id: Optional[str] = Header(default=None, alias="X-Session-Id"), | |
| ): | |
| """Submit a video for translation.""" | |
| if not file and not url: | |
| raise HTTPException(400, "Provide either a file upload or a URL.") | |
| if x_session_id: | |
| existing_job_id = session_active_jobs.get(x_session_id) | |
| if existing_job_id and _is_job_active(existing_job_id): | |
| return JSONResponse( | |
| status_code=409, | |
| content={"existing_job_id": existing_job_id}, | |
| ) | |
| if existing_job_id and not _is_job_active(existing_job_id): | |
| session_active_jobs.pop(x_session_id, None) | |
| # Validate voice_mode - only TTS_ENGINE is valid for this Space | |
| # "preview_both" is disabled in single-engine mode (no way to choose between engines) | |
| valid_modes = (TTS_ENGINE,) | |
| if voice_mode not in valid_modes: | |
| voice_mode = TTS_ENGINE | |
| job_id = None | |
| if url: | |
| if not _is_allowed_video_host(url): | |
| raise HTTPException(400, "Unsupported URL host.") | |
| m = re.search(r'/(?:reel|reels|p)/([A-Za-z0-9_-]+)', url) | |
| if m: | |
| job_id = m.group(1) | |
| # YouTube | |
| if not job_id: | |
| m = re.search(r'(?:v=|youtu\.be/)([\w-]+)', url) | |
| if m: | |
| job_id = m.group(1) | |
| # TikTok (vm.tiktok.com) | |
| if not job_id: | |
| m = re.search(r'vm\.tiktok\.com/([\w-]+)', url) | |
| if m: | |
| job_id = m.group(1) | |
| # TikTok (standard /video/xxx) | |
| if not job_id: | |
| m = re.search(r'/video/(\d+)', url) | |
| if m: | |
| job_id = m.group(1) | |
| if not job_id: | |
| job_id = str(uuid.uuid4())[:12] | |
| base_job_id = job_id | |
| counter = 1 | |
| job_dir = ARTIFACTS_ROOT / job_id | |
| while job_dir.exists(): | |
| job_id = f"{base_job_id}_{counter}" | |
| job_dir = ARTIFACTS_ROOT / job_id | |
| counter += 1 | |
| job_dir.mkdir(parents=True, exist_ok=True) | |
| video_path = "" | |
| if file: | |
| # Save uploaded file | |
| ext = Path(file.filename or "video.mp4").suffix or ".mp4" | |
| video_path = str(job_dir / f"input{ext}") | |
| with open(video_path, "wb") as f: | |
| content = await file.read() | |
| f.write(content) | |
| elif url: | |
| # Download from URL | |
| video_path = str(job_dir / "input.mp4") | |
| try: | |
| _download_url(url, video_path) | |
| except Exception as e: | |
| shutil.rmtree(job_dir, ignore_errors=True) | |
| raise HTTPException(400, f"Failed to download video: {e}") | |
| try: | |
| duration_seconds = _probe_duration_seconds(video_path) | |
| except Exception as exc: | |
| shutil.rmtree(job_dir, ignore_errors=True) | |
| raise HTTPException(400, f"Could not validate video duration: {exc}") | |
| if duration_seconds > MAX_DURATION_SEC: | |
| shutil.rmtree(job_dir, ignore_errors=True) | |
| raise HTTPException(400, f"Video exceeds {MAX_DURATION_SEC} seconds limit.") | |
| # Initialize job | |
| jobs[job_id] = { | |
| "job_id": job_id, | |
| "status": "queued", | |
| "messages": [], | |
| "result_path": None, | |
| "error": None, | |
| "created_at": time.time(), | |
| "voice_mode": voice_mode, | |
| "preview_paths": None, | |
| "preview_event": None, | |
| "selected_model": None, | |
| "session_id": x_session_id, | |
| } | |
| if x_session_id: | |
| session_active_jobs[x_session_id] = job_id | |
| # Start pipeline in background | |
| enable_captions = captions.lower() == "true" | |
| enable_music = preserve_music.lower() == "true" | |
| asyncio.create_task( | |
| _run_pipeline_async(job_id, video_path, target_language, source_language, voice_mode, enable_captions, enable_music, url) | |
| ) | |
| return JSONResponse({"job_id": job_id, "status": "queued"}) | |
| async def job_status_poll(request: Request, job_id: str, after: int = 0): | |
| """Poll endpoint returning new messages since index `after`, plus live wait status.""" | |
| if job_id not in jobs: | |
| raise HTTPException(404, "Job not found.") | |
| job = jobs[job_id] | |
| messages = job["messages"][after:] | |
| # Include live wait ETA (updated in-place, not a queued message) | |
| wait_status = job.get("_wait_status") | |
| return JSONResponse( | |
| {"messages": messages, "next": after + len(messages), "wait_status": wait_status}, | |
| headers={"Cache-Control": "no-cache, no-store"}, | |
| ) | |
| class ModelSelection(BaseModel): | |
| model: str | |
| async def select_model(job_id: str, selection: ModelSelection): | |
| """User selects a TTS model after previewing.""" | |
| job = jobs.get(job_id) | |
| if not job: | |
| raise HTTPException(404, "Job not found.") | |
| if selection.model != TTS_ENGINE: | |
| raise HTTPException(400, f"Invalid model. This Space only serves {TTS_ENGINE}.") | |
| job["selected_model"] = selection.model | |
| # Unblock the pipeline | |
| if job.get("preview_event"): | |
| job["preview_event"].set() | |
| return JSONResponse({"status": "ok", "selected": selection.model}) | |
| async def get_preview_audio(job_id: str, model_name: str): | |
| """Serve a preview audio WAV file.""" | |
| job = jobs.get(job_id) | |
| if not job: | |
| raise HTTPException(404, "Job not found.") | |
| if model_name != TTS_ENGINE: | |
| raise HTTPException(400, f"Invalid model name. This Space serves {TTS_ENGINE} only.") | |
| preview_paths = job.get("preview_paths") | |
| if not preview_paths: | |
| raise HTTPException(404, "Previews not yet generated.") | |
| path = preview_paths.get(model_name) | |
| if not path or not Path(path).exists(): | |
| raise HTTPException(404, f"Preview for '{model_name}' not available.") | |
| return FileResponse( | |
| path, | |
| media_type="audio/wav", | |
| filename=f"preview_{model_name}.wav", | |
| ) | |
| async def job_result(job_id: str): | |
| """Download the translated video.""" | |
| job = jobs.get(job_id) | |
| if not job: | |
| raise HTTPException(404, "Job not found.") | |
| if job["status"] != "complete": | |
| raise HTTPException(400, f"Job is {job['status']}, not complete.") | |
| if not job["result_path"] or not Path(job["result_path"]).exists(): | |
| raise HTTPException(404, "Result file not found.") | |
| return FileResponse( | |
| job["result_path"], | |
| media_type="video/mp4", | |
| filename=f"videovoice_{job_id}.mp4", | |
| ) | |
| async def startup_event(): | |
| """Create artifact directories and start background cleanup.""" | |
| global artifact_reaper_task | |
| ARTIFACTS_ROOT.mkdir(parents=True, exist_ok=True) | |
| UPLOAD_DIR.mkdir(parents=True, exist_ok=True) | |
| OUTPUT_DIR.mkdir(parents=True, exist_ok=True) | |
| (ARTIFACTS_ROOT / "data").mkdir(parents=True, exist_ok=True) | |
| (ARTIFACTS_ROOT / "tmp").mkdir(parents=True, exist_ok=True) | |
| if os.getenv("DISABLE_CLEANUP", "").lower() in ("1", "true", "yes"): | |
| print("[reaper] DISABLE_CLEANUP is set β artifact reaper will not run") | |
| elif artifact_reaper_task is None or artifact_reaper_task.done(): | |
| artifact_reaper_task = asyncio.create_task(_artifact_reaper_loop()) | |
| async def shutdown_event(): | |
| global artifact_reaper_task | |
| if artifact_reaper_task is not None and not artifact_reaper_task.done(): | |
| artifact_reaper_task.cancel() | |
| try: | |
| await artifact_reaper_task | |
| except asyncio.CancelledError: | |
| pass | |
| # ββ No-cache headers for dev/tunnel (ensures Cloudflare serves fresh files) ββ | |
| from starlette.middleware.base import BaseHTTPMiddleware | |
| # Phase 1.7 marker: remove legacy static middleware when React FE fully owns UI. | |
| class NoCacheStaticMiddleware(BaseHTTPMiddleware): | |
| async def dispatch(self, request: Request, call_next): | |
| response = await call_next(request) | |
| if request.url.path.endswith(('.css', '.js', '.html')) or request.url.path == '/': | |
| response.headers['Cache-Control'] = 'no-cache, no-store, must-revalidate' | |
| response.headers['Pragma'] = 'no-cache' | |
| return response | |
| # Standalone middleware and static mounts removed (now handled in app.py/main app) | |
| # ββ Local dev entrypoint ββββββββββββββββββββββββββββββ | |
| # On HF Spaces `app.py` creates its own Server and imports this router, so | |
| # the block below is skipped. Locally, `python server.py` builds a minimal | |
| # FastAPI wrapper around the router so there's something for uvicorn to run. | |
| if __name__ == "__main__": | |
| local_app = FastAPI(title="VideoVoice API (local)") | |
| local_app.state.limiter = limiter | |
| local_app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler) | |
| local_app.add_middleware(SlowAPIMiddleware) | |
| local_app.add_middleware(NoCacheStaticMiddleware) | |
| local_app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=ALLOWED_ORIGINS, | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| async def _local_content_length(request: Request, call_next): | |
| return await enforce_content_length_limit(request, call_next) | |
| local_app.include_router(router) | |
| # Tools API β independent of pipeline; safe to include here too. | |
| from tools_api import router as tools_router | |
| local_app.include_router(tools_router) | |
| # Serve the legacy static frontend at / so `python server.py` keeps the | |
| # old dev UX (open http://localhost:8000 to hit frontend/index.html). | |
| # The React SPA in production is deployed separately to S3. | |
| frontend_dir = Path(__file__).parent / "frontend" | |
| if frontend_dir.exists(): | |
| local_app.mount("/", StaticFiles(directory=str(frontend_dir), html=True), name="frontend") | |
| import uvicorn | |
| port = int(os.getenv("PORT", 8000)) | |
| uvicorn.run(local_app, host="0.0.0.0", port=port) | |