Spaces:
Running
Running
| from __future__ import annotations | |
| import json | |
| import os | |
| import shutil | |
| import subprocess | |
| import sys | |
| import textwrap | |
| import time | |
| import urllib.parse | |
| import urllib.request | |
| from dataclasses import dataclass | |
| from pathlib import Path | |
| from typing import Iterable | |
| from PIL import Image, ImageDraw, ImageFont, ImageOps | |
| from selenium import webdriver | |
| from selenium.common.exceptions import TimeoutException | |
| from selenium.webdriver.chrome.options import Options | |
| from selenium.webdriver.common.by import By | |
| from selenium.webdriver.support.ui import WebDriverWait | |
| ROOT = Path(__file__).resolve().parents[1] | |
| OUTPUT_DIR = ROOT / "replicalab" / "outputs" / "demo_video" | |
| SCREENS_DIR = OUTPUT_DIR / "screens" | |
| SLIDES_DIR = OUTPUT_DIR / "slides" | |
| AUDIO_DIR = OUTPUT_DIR / "audio" | |
| TEXT_DIR = OUTPUT_DIR / "text" | |
| CHROME_PATH = Path(r"C:\Program Files\Google\Chrome\Application\chrome.exe") | |
| BASE_URL = "http://127.0.0.1:7860" | |
| ONBOARDING_KEY = "replicalab-onboarded" | |
| THEME_KEY = "replicalab-theme" | |
| VIDEO_SIZE = (1920, 1080) | |
| NARRATION = ( | |
| "ReplicaLab starts from a research paper and turns it into a seeded replication benchmark. " | |
| "The Scientist proposes a protocol, the Lab Manager enforces budget, tools, and scheduling, " | |
| "and a deterministic Judge scores rigor, feasibility, and fidelity. In our first scenario, " | |
| "the agents agree immediately, so the paper looks replicable in this lab. In the second scenario, " | |
| "they negotiate across all six rounds, which creates a rich reinforcement learning signal. " | |
| "In the third, they never resolve the blockers, so the system rejects the paper for the current setup. " | |
| "Because every outcome is scored deterministically, we can train the Scientist with Unsloth and TRL, " | |
| "compare baseline versus trained runs, inspect real logs, and see exactly where more learning is still needed. " | |
| "The training page is intentionally honest: the live run reached positive rewards, but the held-out compare still " | |
| "shows that the trained Scientist has not beaten the deterministic baseline yet." | |
| ) | |
| class Scene: | |
| id: str | |
| url: str | |
| title: str | |
| subtitle: str | |
| duration: float | |
| expected_text: str | None = None | |
| SCENES: tuple[Scene, ...] = ( | |
| Scene( | |
| id="dashboard", | |
| url=f"{BASE_URL}/", | |
| title="Paper to benchmark", | |
| subtitle="ReplicaLab turns a paper into a seeded replication benchmark.", | |
| duration=8.0, | |
| expected_text="ReplicaLab", | |
| ), | |
| Scene( | |
| id="fast_agreement", | |
| url=f"{BASE_URL}/episode?template=ml_benchmark&difficulty=medium&seed=101&demo=1&autoplay=1&demoCase=fast-agreement", | |
| title="Scenario 1: first-round agreement", | |
| subtitle="The agents converge quickly and the paper scores as a strong replication candidate.", | |
| duration=11.0, | |
| expected_text="Completed: First-round agreement", | |
| ), | |
| Scene( | |
| id="learning_opportunity", | |
| url=f"{BASE_URL}/episode?template=ml_benchmark&difficulty=medium&seed=202&demo=1&autoplay=1&demoCase=learning-opportunity", | |
| title="Scenario 2: multi-round learning", | |
| subtitle="Six rounds of disagreement create a rich RL signal before the final acceptance.", | |
| duration=13.0, | |
| expected_text="Completed: Multi-round learning opportunity", | |
| ), | |
| Scene( | |
| id="no_agreement", | |
| url=f"{BASE_URL}/episode?template=ml_benchmark&difficulty=medium&seed=303&demo=1&autoplay=1&demoCase=no-agreement", | |
| title="Scenario 3: no agreement", | |
| subtitle="The blockers remain unresolved, so the system rejects replication for this setup.", | |
| duration=12.0, | |
| expected_text="Completed: No agreement reached", | |
| ), | |
| Scene( | |
| id="training", | |
| url=f"{BASE_URL}/training", | |
| title="Artifact-backed training review", | |
| subtitle="The training page shows real checkpoints, real compare metrics, and what still needs improvement.", | |
| duration=16.0, | |
| expected_text="Training Logs And Analysis", | |
| ), | |
| ) | |
| def load_env_value(key: str) -> str | None: | |
| if os.getenv(key): | |
| return os.getenv(key) | |
| for path in (ROOT / ".env", ROOT / ".env.local", ROOT / "frontend" / ".env"): | |
| if not path.exists(): | |
| continue | |
| for raw in path.read_text(encoding="utf-8").splitlines(): | |
| line = raw.strip() | |
| if not line or line.startswith("#") or "=" not in line: | |
| continue | |
| env_key, env_value = line.split("=", 1) | |
| if env_key.strip() == key: | |
| return env_value.strip().strip('"').strip("'") | |
| return None | |
| def ensure_backend() -> None: | |
| try: | |
| with urllib.request.urlopen(f"{BASE_URL}/health", timeout=10) as response: | |
| data = json.loads(response.read().decode("utf-8")) | |
| except Exception as exc: # pragma: no cover - user-facing failure | |
| raise RuntimeError( | |
| f"Backend unavailable at {BASE_URL}. Start it with " | |
| "\"python -m uvicorn server.app:app --host 127.0.0.1 --port 7860\"." | |
| ) from exc | |
| if data.get("status") != "ok": | |
| raise RuntimeError(f"Unexpected backend health payload: {data}") | |
| def ensure_output_dirs() -> None: | |
| for directory in (OUTPUT_DIR, SCREENS_DIR, SLIDES_DIR, AUDIO_DIR, TEXT_DIR): | |
| directory.mkdir(parents=True, exist_ok=True) | |
| def get_font(size: int, bold: bool = False) -> ImageFont.FreeTypeFont | ImageFont.ImageFont: | |
| candidates = [ | |
| Path(r"C:\Windows\Fonts\segoeuib.ttf" if bold else r"C:\Windows\Fonts\segoeui.ttf"), | |
| Path(r"C:\Windows\Fonts\arialbd.ttf" if bold else r"C:\Windows\Fonts\arial.ttf"), | |
| ] | |
| for candidate in candidates: | |
| if candidate.exists(): | |
| return ImageFont.truetype(str(candidate), size=size) | |
| return ImageFont.load_default() | |
| def pick_voice(api_key: str, preferred_voice_id: str | None) -> str: | |
| if preferred_voice_id: | |
| return preferred_voice_id | |
| request = urllib.request.Request( | |
| "https://api.elevenlabs.io/v1/voices", | |
| headers={"xi-api-key": api_key, "Accept": "application/json"}, | |
| method="GET", | |
| ) | |
| with urllib.request.urlopen(request, timeout=30) as response: | |
| payload = json.loads(response.read().decode("utf-8")) | |
| voices = payload.get("voices", []) | |
| if not voices: | |
| raise RuntimeError("ElevenLabs returned no voices for the current API key.") | |
| preferred_names = ("Rachel", "Aria", "Sarah", "Charlie", "George") | |
| for name in preferred_names: | |
| for voice in voices: | |
| if voice.get("name") == name and voice.get("voice_id"): | |
| return voice["voice_id"] | |
| for voice in voices: | |
| if voice.get("voice_id"): | |
| return voice["voice_id"] | |
| raise RuntimeError("No usable ElevenLabs voice_id found.") | |
| def synthesize_voiceover(api_key: str, voice_id: str, text: str, output_path: Path) -> None: | |
| payload = { | |
| "text": text, | |
| "model_id": "eleven_multilingual_v2", | |
| "voice_settings": { | |
| "stability": 0.4, | |
| "similarity_boost": 0.8, | |
| "style": 0.25, | |
| "use_speaker_boost": True, | |
| }, | |
| } | |
| body = json.dumps(payload).encode("utf-8") | |
| urls = ( | |
| f"https://api.elevenlabs.io/v1/text-to-speech/{voice_id}?output_format=mp3_44100_128", | |
| f"https://api.elevenlabs.io/v1/text-to-speech/{voice_id}", | |
| ) | |
| last_error: Exception | None = None | |
| for url in urls: | |
| request = urllib.request.Request( | |
| url, | |
| data=body, | |
| headers={ | |
| "xi-api-key": api_key, | |
| "Accept": "audio/mpeg", | |
| "Content-Type": "application/json", | |
| }, | |
| method="POST", | |
| ) | |
| try: | |
| with urllib.request.urlopen(request, timeout=90) as response: | |
| audio = response.read() | |
| if not audio: | |
| raise RuntimeError("ElevenLabs returned an empty audio payload.") | |
| output_path.write_bytes(audio) | |
| return | |
| except Exception as exc: # pragma: no cover - fallback path | |
| last_error = exc | |
| raise RuntimeError(f"Failed to synthesize ElevenLabs audio: {last_error}") | |
| def build_driver() -> webdriver.Chrome: | |
| options = Options() | |
| options.binary_location = str(CHROME_PATH) | |
| options.add_argument("--headless=new") | |
| options.add_argument("--disable-gpu") | |
| options.add_argument("--window-size=1600,1200") | |
| options.add_argument("--force-device-scale-factor=1") | |
| options.add_argument("--hide-scrollbars") | |
| options.add_argument("--mute-audio") | |
| options.add_argument("--no-first-run") | |
| options.add_argument("--no-default-browser-check") | |
| options.add_argument(f"--user-data-dir={OUTPUT_DIR / 'chrome_profile'}") | |
| driver = webdriver.Chrome(options=options) | |
| driver.set_window_size(1600, 1200) | |
| return driver | |
| def capture_screens() -> list[Path]: | |
| driver = build_driver() | |
| files: list[Path] = [] | |
| try: | |
| driver.get(BASE_URL) | |
| WebDriverWait(driver, 20).until( | |
| lambda d: d.execute_script("return document.readyState") == "complete" | |
| ) | |
| driver.execute_script( | |
| "window.localStorage.setItem(arguments[0], '1');" | |
| "window.localStorage.setItem(arguments[1], 'light');", | |
| ONBOARDING_KEY, | |
| THEME_KEY, | |
| ) | |
| for scene in SCENES: | |
| driver.get(scene.url) | |
| WebDriverWait(driver, 30).until( | |
| lambda d: d.execute_script("return document.readyState") == "complete" | |
| ) | |
| if scene.expected_text: | |
| try: | |
| WebDriverWait(driver, 35).until( | |
| lambda d: scene.expected_text in d.page_source | |
| ) | |
| except TimeoutException: | |
| pass | |
| time.sleep(1.5) | |
| output = SCREENS_DIR / f"{scene.id}.png" | |
| driver.save_screenshot(str(output)) | |
| files.append(output) | |
| finally: | |
| driver.quit() | |
| return files | |
| def wrap_text(draw: ImageDraw.ImageDraw, text: str, font: ImageFont.ImageFont, width: int) -> list[str]: | |
| words = text.split() | |
| lines: list[str] = [] | |
| current = "" | |
| for word in words: | |
| candidate = f"{current} {word}".strip() | |
| if draw.textlength(candidate, font=font) <= width: | |
| current = candidate | |
| else: | |
| if current: | |
| lines.append(current) | |
| current = word | |
| if current: | |
| lines.append(current) | |
| return lines | |
| def create_slides() -> list[Path]: | |
| title_font = get_font(46, bold=True) | |
| subtitle_font = get_font(28) | |
| badge_font = get_font(24, bold=True) | |
| output_paths: list[Path] = [] | |
| for scene in SCENES: | |
| raw_image = Image.open(SCREENS_DIR / f"{scene.id}.png").convert("RGB") | |
| canvas = ImageOps.fit(raw_image, VIDEO_SIZE, method=Image.Resampling.LANCZOS) | |
| overlay = Image.new("RGBA", VIDEO_SIZE, (0, 0, 0, 0)) | |
| draw = ImageDraw.Draw(overlay) | |
| draw.rounded_rectangle((60, 780, 1860, 1020), radius=36, fill=(7, 13, 36, 190)) | |
| draw.rounded_rectangle((60, 56, 520, 116), radius=30, fill=(99, 102, 241, 220)) | |
| draw.text((92, 72), "ReplicaLab - 60 second demo", font=badge_font, fill=(255, 255, 255)) | |
| draw.text((96, 820), scene.title, font=title_font, fill=(255, 255, 255)) | |
| subtitle_lines = wrap_text(draw, scene.subtitle, subtitle_font, width=1620) | |
| y = 888 | |
| for line in subtitle_lines: | |
| draw.text((96, y), line, font=subtitle_font, fill=(226, 232, 240)) | |
| y += 40 | |
| final = Image.alpha_composite(canvas.convert("RGBA"), overlay).convert("RGB") | |
| slide_path = SLIDES_DIR / f"{scene.id}.png" | |
| final.save(slide_path, quality=95) | |
| output_paths.append(slide_path) | |
| return output_paths | |
| def write_concat_file(paths: Iterable[Path]) -> Path: | |
| concat_path = TEXT_DIR / "slides.txt" | |
| lines: list[str] = [] | |
| ordered = list(paths) | |
| for scene, path in zip(SCENES, ordered): | |
| lines.append(f"file '{path.as_posix()}'") | |
| lines.append(f"duration {scene.duration:.2f}") | |
| lines.append(f"file '{ordered[-1].as_posix()}'") | |
| concat_path.write_text("\n".join(lines), encoding="utf-8") | |
| return concat_path | |
| def write_script_assets() -> None: | |
| (TEXT_DIR / "voiceover.txt").write_text(NARRATION, encoding="utf-8") | |
| (TEXT_DIR / "shot_list.json").write_text( | |
| json.dumps( | |
| [ | |
| { | |
| "id": scene.id, | |
| "title": scene.title, | |
| "subtitle": scene.subtitle, | |
| "url": scene.url, | |
| "duration_seconds": scene.duration, | |
| } | |
| for scene in SCENES | |
| ], | |
| indent=2, | |
| ), | |
| encoding="utf-8", | |
| ) | |
| def seconds_to_srt(value: float) -> str: | |
| millis = int(round(value * 1000)) | |
| hours, millis = divmod(millis, 3_600_000) | |
| minutes, millis = divmod(millis, 60_000) | |
| seconds, millis = divmod(millis, 1000) | |
| return f"{hours:02}:{minutes:02}:{seconds:02},{millis:03}" | |
| def write_srt() -> None: | |
| lines = wrap_text(ImageDraw.Draw(Image.new("RGB", (1, 1))), NARRATION, get_font(30), 72 * 18) | |
| segment_count = max(1, len(lines)) | |
| total_duration = sum(scene.duration for scene in SCENES) | |
| step = total_duration / segment_count | |
| chunks = [] | |
| start = 0.0 | |
| for index, line in enumerate(lines, start=1): | |
| end = min(total_duration, start + step) | |
| chunks.append(f"{index}\n{seconds_to_srt(start)} --> {seconds_to_srt(end)}\n{line}\n") | |
| start = end | |
| (TEXT_DIR / "voiceover.srt").write_text("\n".join(chunks), encoding="utf-8") | |
| def ffprobe_duration(path: Path) -> float: | |
| result = subprocess.run( | |
| [ | |
| "ffprobe", | |
| "-v", | |
| "error", | |
| "-show_entries", | |
| "format=duration", | |
| "-of", | |
| "default=noprint_wrappers=1:nokey=1", | |
| str(path), | |
| ], | |
| check=True, | |
| capture_output=True, | |
| text=True, | |
| ) | |
| return float(result.stdout.strip()) | |
| def run_ffmpeg(audio_path: Path, concat_path: Path) -> Path: | |
| silent_video = OUTPUT_DIR / "replicalab_demo_60s_silent.mp4" | |
| final_video = OUTPUT_DIR / "replicalab_demo_60s.mp4" | |
| subprocess.run( | |
| [ | |
| "ffmpeg", | |
| "-y", | |
| "-f", | |
| "concat", | |
| "-safe", | |
| "0", | |
| "-i", | |
| str(concat_path), | |
| "-vf", | |
| "fps=30,format=yuv420p", | |
| "-c:v", | |
| "libx264", | |
| "-pix_fmt", | |
| "yuv420p", | |
| str(silent_video), | |
| ], | |
| check=True, | |
| ) | |
| subprocess.run( | |
| [ | |
| "ffmpeg", | |
| "-y", | |
| "-i", | |
| str(silent_video), | |
| "-i", | |
| str(audio_path), | |
| "-c:v", | |
| "copy", | |
| "-c:a", | |
| "aac", | |
| "-b:a", | |
| "192k", | |
| "-shortest", | |
| str(final_video), | |
| ], | |
| check=True, | |
| ) | |
| return final_video | |
| def main() -> int: | |
| ensure_backend() | |
| ensure_output_dirs() | |
| write_script_assets() | |
| write_srt() | |
| api_key = load_env_value("ELEVENLABS_API_KEY") | |
| if not api_key: | |
| raise RuntimeError("ELEVENLABS_API_KEY was not found in the environment or .env file.") | |
| voice_id = pick_voice(api_key, load_env_value("ELEVENLABS_VOICE_ID")) | |
| audio_path = AUDIO_DIR / "voiceover.mp3" | |
| synthesize_voiceover(api_key, voice_id, NARRATION, audio_path) | |
| capture_screens() | |
| slides = create_slides() | |
| concat_path = write_concat_file(slides) | |
| final_video = run_ffmpeg(audio_path, concat_path) | |
| metadata = { | |
| "voice_id": voice_id, | |
| "audio_duration_seconds": round(ffprobe_duration(audio_path), 3), | |
| "video_path": str(final_video), | |
| "slides": [str(path) for path in slides], | |
| } | |
| (TEXT_DIR / "build_metadata.json").write_text(json.dumps(metadata, indent=2), encoding="utf-8") | |
| print(textwrap.dedent(f""" | |
| Built demo assets: | |
| audio: {audio_path} | |
| video: {final_video} | |
| script: {TEXT_DIR / 'voiceover.txt'} | |
| subtitles: {TEXT_DIR / 'voiceover.srt'} | |
| """).strip()) | |
| return 0 | |
| if __name__ == "__main__": | |
| try: | |
| raise SystemExit(main()) | |
| except Exception as exc: # pragma: no cover - CLI path | |
| print(f"ERROR: {exc}", file=sys.stderr) | |
| raise | |