Spaces:
Running on Zero
Running on Zero
| """Lightloom Gradio Server entrypoint. | |
| Parameter contribution: 0B. Runtime endpoints expose app health and private | |
| benchmark hooks; model loading stays inside explicit benchmark/pipeline calls. | |
| """ | |
| from __future__ import annotations | |
| import json | |
| import os | |
| import re | |
| from pathlib import Path | |
| import sys | |
| from typing import Any | |
| import uuid | |
| import gradio as gr | |
| import spaces | |
| from fastapi import Request | |
| from fastapi.responses import FileResponse, HTMLResponse, JSONResponse, StreamingResponse | |
| ROOT = Path(__file__).resolve().parent | |
| sys.path.insert(0, str(ROOT / "src")) | |
| from lightloom.compliance.params_ledger import entries, total_runtime_params | |
| from lightloom.core.config import CONFIG, LIGHTLOOM_PROFILE | |
| app = gr.Server(title="Lightloom", version="0.1.0") | |
| def _run_g1_on_gpu() -> dict[str, Any]: | |
| from benchmarks.gate_g1 import run | |
| return run(dry_run=False, reps=int(os.getenv("LIGHTLOOM_G1_REPS", "5")), allow_local=False) | |
| def _run_g2_g3_on_gpu() -> tuple[dict[str, Any], dict[str, Any]]: | |
| # Gate G3 hypothesis: the Director's ~1 tok/s came from the throttled | |
| # persistent CPU. Running the llama.cpp Director here gives a real CPU | |
| # allocation. If this still exceeds the 2.5 s/beat budget, escalate to GPU. | |
| from benchmarks.gate_g2_g3 import run | |
| return run(dry_run=False) | |
| _PROBE_BEAT = "Far away, a market woke under orange awnings and wet stone." | |
| def _director_probe_on_gpu(backend: str, n_threads: int) -> dict[str, Any]: | |
| # Bounded single-beat diagnostic. transformers = the GPU Director we now ship | |
| # (G3: llama.cpp CPU was ~1 tok/s). Soft errors are caught so we return JSON. | |
| import time as _time | |
| info: dict[str, Any] = {"backend": backend, "cpu_count_in_gpu": os.cpu_count()} | |
| t0 = _time.perf_counter() | |
| try: | |
| if backend == "transformers": | |
| from lightloom.director.director import generate_shot_transformers, load_director_transformers | |
| tokenizer, model = load_director_transformers() | |
| info["load_s"] = round(_time.perf_counter() - t0, 2) | |
| shot, meta = generate_shot_transformers(tokenizer, model, _PROBE_BEAT) | |
| else: | |
| from lightloom.director.director import generate_shot, load_director | |
| info["n_threads"] = n_threads | |
| llm = load_director(n_threads=n_threads) | |
| info["load_s"] = round(_time.perf_counter() - t0, 2) | |
| shot, meta = generate_shot(llm, _PROBE_BEAT) | |
| info.update( | |
| ok=True, | |
| elapsed_s=round(meta["elapsed_s"], 2), | |
| completion_tokens=meta["completion_tokens"], | |
| tok_s=round(meta["tok_s"], 2) if meta["tok_s"] else None, | |
| decision=shot.decision, | |
| coerced=meta["coerced"], | |
| attempts=meta.get("attempts"), | |
| ) | |
| except Exception as exc: # noqa: BLE001 - diagnostic must report, not crash. | |
| import traceback | |
| info.update(ok=False, error=f"{type(exc).__name__}: {exc!r}", trace=traceback.format_exc()[-900:]) | |
| return info | |
| FRONTEND_DIR = ROOT / "frontend" | |
| ASSETS_DIR = ROOT / "assets" | |
| _PLACEHOLDER_HTML = """<!doctype html><html lang="en"><head><meta charset="utf-8"/> | |
| <meta name="viewport" content="width=device-width, initial-scale=1"/><title>Lightloom</title> | |
| <style>body{margin:0;min-height:100vh;display:grid;place-items:center;background:#0A0A0C; | |
| color:#F5EFE6;font-family:Inter,system-ui,sans-serif}main{max-width:720px;padding:32px} | |
| h1{font-family:Georgia,serif;font-size:clamp(42px,8vw,92px);margin:0 0 12px}a{color:#E8A33D}</style> | |
| </head><body><main><h1>Lightloom</h1><p>Private build. The projector is being assembled.</p> | |
| <p><a href="/health">health</a></p></main></body></html>""" | |
| def _serve_under(base: Path, rel: str): | |
| """Serve a file under ``base`` with path-traversal protection.""" | |
| target = (base / rel).resolve() | |
| try: | |
| target.relative_to(base.resolve()) | |
| except ValueError: | |
| return JSONResponse({"error": "forbidden"}, status_code=403) | |
| if not target.is_file(): | |
| return JSONResponse({"error": "not found"}, status_code=404) | |
| # NO-CACHE: revalidate every load so a deploy's fresh JS/CSS reaches the browser without a hard | |
| # refresh. Without this the browser served a STALE controller.js (new UI like "Ask Your World" and | |
| # the fast-transcript fix silently missing until Ctrl+Shift+R). FileResponse still sends ETag/ | |
| # Last-Modified, so unchanged files return a cheap 304. | |
| return FileResponse(target, headers={"Cache-Control": "no-cache, max-age=0, must-revalidate"}) | |
| def index() -> Any: | |
| html = FRONTEND_DIR / "index.html" | |
| if html.is_file(): | |
| return FileResponse( | |
| html, media_type="text/html", | |
| headers={"Cache-Control": "no-cache, max-age=0, must-revalidate"}, | |
| ) | |
| return HTMLResponse(_PLACEHOLDER_HTML) | |
| def serve_frontend(rel: str): | |
| return _serve_under(FRONTEND_DIR, rel) | |
| def serve_assets(rel: str): | |
| return _serve_under(ASSETS_DIR, rel) | |
| def health() -> dict[str, Any]: | |
| return { | |
| "app": "lightloom", | |
| "profile": LIGHTLOOM_PROFILE, | |
| "config": { | |
| "width": CONFIG.width, | |
| "height": CONFIG.height, | |
| "flux_dtype": CONFIG.flux_dtype, | |
| "flux_aot": CONFIG.flux_aot, | |
| "showcase_only": CONFIG.showcase_only, | |
| }, | |
| "params_total": total_runtime_params(), | |
| "params_limit": 32_000_000_000, | |
| "ledger": [entry.__dict__ for entry in entries()], | |
| "privacy_mode": os.getenv("LIGHTLOOM_PRIVACY_MODE", "1") == "1", | |
| "cpu_count": os.cpu_count(), | |
| "director_backend": CONFIG.director_backend, | |
| "build": "color-styles-1", | |
| } | |
| def run_g1_endpoint() -> dict[str, Any]: | |
| if os.getenv("LIGHTLOOM_ENABLE_INTERNAL_BENCH", "0") != "1": | |
| return {"ok": False, "error": "internal benchmark endpoint disabled"} | |
| data = _run_g1_on_gpu() | |
| return { | |
| "ok": True, | |
| "data": data, | |
| "decision": data["decision"], | |
| "result_path": str(Path("benchmarks/results/g1.json")), | |
| } | |
| def run_g2_g3_endpoint() -> dict[str, Any]: | |
| if os.getenv("LIGHTLOOM_ENABLE_INTERNAL_BENCH", "0") != "1": | |
| return {"ok": False, "error": "internal benchmark endpoint disabled"} | |
| g2, g3 = _run_g2_g3_on_gpu() | |
| return {"ok": True, "g2": g2, "g3": g3} | |
| def warm_endpoint(which: str = "director,asr") -> dict[str, Any]: | |
| # Download weights in the (CPU) persistent process — no GPU spent — so the GPU | |
| # probe/beat loads from cache instead of downloading on GPU time. | |
| if os.getenv("LIGHTLOOM_ENABLE_INTERNAL_BENCH", "0") != "1": | |
| return {"ok": False, "error": "internal benchmark endpoint disabled"} | |
| from huggingface_hub import snapshot_download | |
| from lightloom.core.config import MODEL_REFS | |
| cached: dict[str, str] = {} | |
| for key in which.split(","): | |
| key = key.strip() | |
| ref = MODEL_REFS.get(key) | |
| if ref is None: | |
| continue | |
| cached[key] = str(snapshot_download(ref.repo_id, revision=ref.revision)) | |
| return {"ok": True, "cached": cached} | |
| def _asr_probe_on_gpu() -> dict[str, Any]: | |
| import time as _time | |
| from huggingface_hub import hf_hub_download | |
| from lightloom.audio_in.asr import load_asr, read_wav, transcribe | |
| from lightloom.core.config import MODEL_REFS | |
| info: dict[str, Any] = {} | |
| try: | |
| ref = MODEL_REFS["asr"] | |
| wav = hf_hub_download(ref.repo_id, "demo/voxpopuli_test_en_demo.wav", revision=ref.revision) | |
| audio, sr = read_wav(wav) | |
| info["wav_sr"] = sr | |
| t = _time.perf_counter() | |
| processor, model = load_asr() | |
| info["load_s"] = round(_time.perf_counter() - t, 2) | |
| t = _time.perf_counter() | |
| text = transcribe(processor, model, audio, sampling_rate=sr, language="en") | |
| info["transcribe_s"] = round(_time.perf_counter() - t, 2) | |
| info.update(ok=True, text=text[:300]) | |
| except Exception as exc: # noqa: BLE001 - diagnostic must report, not crash. | |
| import traceback | |
| info.update(ok=False, error=f"{type(exc).__name__}: {exc!r}", trace=traceback.format_exc()[-1100:]) | |
| return info | |
| def asr_probe_endpoint() -> dict[str, Any]: | |
| if os.getenv("LIGHTLOOM_ENABLE_INTERNAL_BENCH", "0") != "1": | |
| return {"ok": False, "error": "internal benchmark endpoint disabled"} | |
| return {"ok": True, "data": _asr_probe_on_gpu()} | |
| # --- VOICE input (the "speak" in "speak, and watch your story become a world"): | |
| # Cohere Transcribe turns the narrator's mic audio into text, which then drives the | |
| # same panorama pipeline. The browser captures a 16 kHz mono PCM WAV and sends it | |
| # base64-encoded; read_wav decodes it with the stdlib (no ffmpeg needed). --- | |
| def _transcribe_on_gpu(wav_path: str, lang: str) -> dict[str, Any]: | |
| from lightloom.audio_in.asr import load_asr, read_wav, transcribe | |
| audio, sr = read_wav(wav_path) | |
| processor, model = load_asr() | |
| text = transcribe(processor, model, audio, sampling_rate=sr, language=lang) | |
| return {"text": text} | |
| def transcribe_api(audio_b64: str = "", lang: str = "en") -> dict: | |
| import base64 | |
| import tempfile | |
| yield {"stage": "transcribing"} | |
| path = None | |
| try: | |
| payload = audio_b64.split(",")[-1] if audio_b64 else "" | |
| if len(payload) > 8_000_000: # ~6 MB decoded; reject BEFORE decode (OOM / /tmp-fill DoS guard) | |
| yield {"stage": "error", "error": "audio_too_large"} | |
| return | |
| raw = base64.b64decode(payload) if payload else b"" | |
| if len(raw) < 64: # nothing meaningful captured | |
| yield {"stage": "error", "error": "empty_audio"} | |
| return | |
| with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as handle: | |
| handle.write(raw) | |
| path = handle.name | |
| result = _transcribe_on_gpu(path, lang) | |
| yield {"stage": "transcript", "text": (result.get("text") or "").strip()} | |
| except Exception as exc: # noqa: BLE001 - quota/decoding guard. | |
| message = str(exc).lower() | |
| stage = "quota_exceeded" if any(t in message for t in ("gpu", "quota", "zerogpu", "exceeded")) else "error" | |
| yield {"stage": stage, "error": str(exc)[:200]} | |
| finally: | |
| if path: # never leave the temp WAV behind (would grow /tmp unbounded) | |
| try: | |
| os.unlink(path) | |
| except OSError: | |
| pass | |
| # --- Panorama (Voice-Scroll-in-3D) verification: klein-base + the 360 ERP outpaint | |
| # LoRA produce a 2:1 equirectangular panorama via Flux2 inpaint. Code-only (diffusers | |
| # already ships Flux2KleinInpaintPipeline + load_lora_weights). --- | |
| _PANO_BASE = "black-forest-labs/FLUX.2-klein-base-4B" | |
| _PANO_LORA = "nomadoor/flux-2-klein-4B-360-erp-outpaint-lora" | |
| _PANO_TRIGGER = ( | |
| "Fill the green spaces according to the image. Outpaint as a seamless 360 " | |
| "equirectangular panorama (2:1). Keep the horizon level. Match left and right edges. " | |
| ) | |
| def warm_panorama_endpoint() -> dict[str, Any]: | |
| if os.getenv("LIGHTLOOM_ENABLE_INTERNAL_BENCH", "0") != "1": | |
| return {"ok": False, "error": "internal benchmark endpoint disabled"} | |
| from huggingface_hub import snapshot_download | |
| base = snapshot_download(_PANO_BASE) | |
| lora = snapshot_download(_PANO_LORA) | |
| return {"ok": True, "base": str(base)[-40:], "lora": str(lora)[-40:]} | |
| def _panorama_probe_on_gpu(prompt: str) -> dict[str, Any]: | |
| import time as _time | |
| import torch | |
| from diffusers import Flux2KleinInpaintPipeline | |
| from PIL import Image | |
| info: dict[str, Any] = {} | |
| try: | |
| t = _time.perf_counter() | |
| pipe = Flux2KleinInpaintPipeline.from_pretrained(_PANO_BASE, torch_dtype=torch.bfloat16).to("cuda") | |
| pipe.load_lora_weights(_PANO_LORA) | |
| info["load_s"] = round(_time.perf_counter() - t, 2) | |
| # Seed canvas: green right 60% (to outpaint) + a neutral horizon band on the | |
| # left 40% so the LoRA has structure to extend (keep-horizon-level training). | |
| w, h = 1024, 512 | |
| canvas = Image.new("RGB", (w, h), (0, 255, 0)) | |
| seed = Image.new("RGB", (int(w * 0.4), h), (120, 150, 190)) # sky-ish | |
| for y in range(h // 2, h): # ground band | |
| for x in range(seed.width): | |
| seed.putpixel((x, y), (90, 80, 70)) | |
| canvas.paste(seed, (0, 0)) | |
| mask = Image.new("L", (w, h), 0) | |
| mask.paste(Image.new("L", (w - seed.width, h), 255), (seed.width, 0)) # inpaint the green | |
| t = _time.perf_counter() | |
| out = pipe( | |
| prompt=_PANO_TRIGGER + prompt, | |
| image=canvas, | |
| mask_image=mask, | |
| height=h, | |
| width=w, | |
| num_inference_steps=20, | |
| guidance_scale=3.0, | |
| strength=1.0, | |
| ) | |
| pano = out.images[0] | |
| info["gen_s"] = round(_time.perf_counter() - t, 2) | |
| out_dir = Path("/tmp/lightloom/pano") | |
| out_dir.mkdir(parents=True, exist_ok=True) | |
| pano.save(out_dir / "pano.webp", "WEBP", quality=92) | |
| info.update(ok=True, size=list(pano.size), image="/frames-pano/pano.webp") | |
| except Exception as exc: # noqa: BLE001 - diagnostic must report, not crash. | |
| import traceback | |
| info.update(ok=False, error=f"{type(exc).__name__}: {exc!r}", trace=traceback.format_exc()[-1400:]) | |
| return info | |
| def panorama_probe_endpoint(prompt: str = "an old lighthouse on a cliff at the edge of the world, stormy dusk") -> dict[str, Any]: | |
| if os.getenv("LIGHTLOOM_ENABLE_INTERNAL_BENCH", "0") != "1": | |
| return {"ok": False, "error": "internal benchmark endpoint disabled"} | |
| return {"ok": True, "data": _panorama_probe_on_gpu(prompt)} | |
| def pano_frame_endpoint(name: str): | |
| path = Path("/tmp/lightloom/pano") / Path(name).name | |
| if not path.is_file(): | |
| return JSONResponse({"error": "not found"}, status_code=404) | |
| return FileResponse(path) | |
| # --- THE PANORAMA WORLD (Voice-Scroll-in-3D): narration -> one immersive | |
| # equirectangular world (klein-base + 360 LoRA) + its depth -> the browser renders | |
| # it as a sphere the camera lives inside. Streams via @app.api/@gradio/client. --- | |
| def _panorama_world(text: str, session: str): | |
| import numpy as np | |
| from PIL import Image | |
| from lightloom.depth.depth import estimate_depth, load_depth_pipeline | |
| from lightloom.director.director import generate_shot_transformers, load_director_transformers | |
| from lightloom.director.state import initial_state | |
| from lightloom.paint.panorama import generate_panorama, load_panorama_pipeline | |
| out_dir = Path("/tmp/lightloom/pano") / session | |
| out_dir.mkdir(parents=True, exist_ok=True) | |
| yield {"stage": "warming"} | |
| # Director turns the narration into one vivid establishing scene for the world. | |
| tokenizer, model = load_director_transformers() | |
| first = (text or "").strip().split("\n\n")[0][:300] or "a world at the edge of the world" | |
| shot, _m = generate_shot_transformers(tokenizer, model, first, initial_state().model_dump()) | |
| scene = f"{shot.image_prompt_en}, {shot.lighting}" | |
| yield {"stage": "directed", "shot": shot.model_dump(), "scene": scene[:160]} | |
| yield {"stage": "painting"} | |
| pano_pipe = load_panorama_pipeline() | |
| pano = generate_panorama(pano_pipe, scene) | |
| pano.save(out_dir / "world.webp", "WEBP", quality=92) | |
| yield {"stage": "painted", "panorama": f"/frames-pano/{session}/world.webp"} | |
| depth_pipeline = load_depth_pipeline() | |
| dmap = estimate_depth(depth_pipeline, pano) | |
| Image.fromarray((dmap * 255).astype(np.uint8)).save(out_dir / "world_depth.png") | |
| yield {"stage": "depth", "depth": f"/frames-pano/{session}/world_depth.png"} | |
| yield {"stage": "done", "session": session} | |
| def panorama_api(text: str = "", lang: str = "en") -> dict: | |
| # Legacy/showcase generator — loads klein-BASE + a 3rd-party 360 LoRA that are NOT in the | |
| # runtime ledger; gate it like the /internal probes so no LIVE judge-facing route can load an | |
| # un-declared model. The live experience is /scroll_live (the four-model ledger path); this | |
| # stays only for offline showcase/bench builds. | |
| if os.getenv("LIGHTLOOM_ENABLE_INTERNAL_BENCH", "0") != "1": | |
| yield {"stage": "error", "error": "panorama endpoint disabled (showcase/bench only)"} | |
| return | |
| # default resolves at call time (when _GOLDEN_RECITAL, defined later, exists) | |
| session = uuid.uuid4().hex[:8] | |
| try: | |
| for event in _panorama_world((text or _GOLDEN_RECITAL).strip(), session): | |
| yield event | |
| except Exception as exc: # noqa: BLE001 - quota guard. | |
| message = str(exc).lower() | |
| stage = "quota_exceeded" if any(t in message for t in ("gpu", "quota", "zerogpu", "exceeded")) else "error" | |
| yield {"stage": stage, "error": str(exc)[:200]} | |
| def pano_world_frame(session: str, name: str): | |
| path = Path("/tmp/lightloom/pano") / Path(session).name / Path(name).name | |
| if not path.is_file(): | |
| return JSONResponse({"error": "not found"}, status_code=404) | |
| return FileResponse(path) | |
| # --- THE INFINITE 3D CORRIDOR (the live experience). Each phrase the narrator | |
| # speaks becomes a vivid, forward-facing scene: the Director (MiniCPM) turns the | |
| # phrase into a shot, a content-driven style is chosen per scene, the fast 4-step | |
| # klein painter renders it, Depth-Anything gives it relief, and each scene streams | |
| # into the browser's corridor where the camera flies forward through the growing | |
| # world. "speak, and your world unrolls ahead of you." --- | |
| _CORRIDOR_QUALITY = ( | |
| "cinematic anamorphic still, shot on 35mm film, shallow depth of field, volumetric god rays, " | |
| "golden-hour rim light, rich saturated color, ultra detailed, sharp focus, subtle film grain, " | |
| "ARRI Alexa, breathtaking establishing shot, 8k" | |
| ) | |
| def _corridor_style(shot: Any) -> str: | |
| """Pick a per-scene art style from the Director's content/lighting so each scene | |
| has its own colorful look (the Director decides via what it described).""" | |
| text = f"{getattr(shot, 'lighting', '')} {getattr(shot, 'image_prompt_en', '')}".lower() | |
| def has(*ks: str) -> bool: | |
| return any(k in text for k in ks) | |
| if has("storm", "dusk", "night", "dark", "rain", "thunder"): | |
| return "dramatic moody cinematic concept art, deep shadows, glowing accents" | |
| if has("morning", "dawn", "sunrise", "golden", "warm", "sunset"): | |
| return "warm golden-hour illustration, glowing vibrant color" | |
| if has("snow", "ice", "winter", "cold", "frost"): | |
| return "crisp luminous winter concept art, cool blue-and-white palette" | |
| if has("market", "city", "street", "village", "town", "harbor"): | |
| return "vibrant colorful illustration, bustling saturated detail" | |
| if has("forest", "garden", "meadow", "jungle", "flower", "tree", "green"): | |
| return "lush verdant fantasy concept art, vivid greens, dappled light" | |
| if has("sea", "ocean", "water", "wave", "river", "lake"): | |
| return "cinematic seascape concept art, luminous water, dramatic sky" | |
| if has("desert", "sand", "dune"): | |
| return "surreal desert dreamscape, warm amber tones, vast scale" | |
| return "vivid cinematic concept art, rich saturated color, dramatic light" | |
| def _split_phrases(text: str, cap: int = 6) -> list[str]: | |
| import re | |
| text = (text or "").strip() | |
| if not text: | |
| return [] | |
| parts = re.split(r"(?<=[.!?…])\s+|\n+", text) | |
| phrases: list[str] = [] | |
| for raw in parts: | |
| p = raw.strip() | |
| if not p: | |
| continue | |
| if phrases and len(p) < 18: # glue stray fragments to the prior phrase | |
| phrases[-1] = (phrases[-1] + " " + p)[:240] | |
| else: | |
| phrases.append(p[:240]) | |
| return phrases[:cap] | |
| def _corridor_world(text: str, session: str): | |
| import numpy as np | |
| from PIL import Image | |
| from lightloom.depth.depth import estimate_depth, load_depth_pipeline | |
| from lightloom.director.director import generate_shot_transformers, load_director_transformers | |
| from lightloom.director.state import initial_state | |
| from lightloom.paint.klein import load_klein_pipeline, paint | |
| out_dir = Path("/tmp/lightloom/world") / session | |
| out_dir.mkdir(parents=True, exist_ok=True) | |
| phrases = _split_phrases(text) or ["a luminous world at the edge of the world"] | |
| yield {"stage": "warming", "scenes": len(phrases)} | |
| tokenizer, model = load_director_transformers() | |
| klein = load_klein_pipeline() | |
| depth_pipe = load_depth_pipeline() | |
| # NEUTRAL state per scene (not the lighthouse-keeper seed in initial_state) so each | |
| # scene is driven by what the narrator actually said — no "lighthouses everywhere". | |
| from lightloom.director.state import SceneState | |
| neutral = SceneState( | |
| characters=[], setting="", lighting="", mood="", | |
| palette=("#202428", "#7d8794", "#e6ddc8"), | |
| ).model_dump() | |
| for i, phrase in enumerate(phrases): | |
| yield {"stage": "directing", "index": i, "text": phrase} | |
| try: | |
| shot, _m = generate_shot_transformers(tokenizer, model, phrase, dict(neutral)) | |
| style = _corridor_style(shot) | |
| subject = (shot.image_prompt_en or phrase or "a vivid scene").strip() | |
| lighting = (shot.lighting or "cinematic light").strip() | |
| scene_prompt = f"{subject}, {lighting}. {style}, {_CORRIDOR_QUALITY}" | |
| yield {"stage": "painting", "index": i, "scene": scene_prompt[:150]} | |
| img = paint(klein, scene_prompt, width=1024, height=576, steps=6, seed=1900 + i * 7) | |
| img.save(out_dir / f"scene-{i}.webp", "WEBP", quality=90) | |
| yield { | |
| "stage": "scene", | |
| "index": i, | |
| "image": f"/frames-world/{session}/scene-{i}.webp", | |
| "caption": phrase[:160], | |
| "shot": shot.model_dump(), | |
| } | |
| dmap = estimate_depth(depth_pipe, img) | |
| # robust percentile normalization so one outlier pixel can't blow out the | |
| # range -> cleaner, better-anchored parallax (research Tier-1 depth recipe). | |
| lo, hi = float(np.percentile(dmap, 2)), float(np.percentile(dmap, 98)) | |
| if hi > lo: | |
| dmap = np.clip((dmap - lo) / (hi - lo), 0.0, 1.0) | |
| Image.fromarray((dmap * 255).astype(np.uint8)).save(out_dir / f"scene-{i}-depth.png") | |
| yield {"stage": "depth", "index": i, "depth": f"/frames-world/{session}/scene-{i}-depth.png"} | |
| except Exception as exc: # noqa: BLE001 - one bad scene must not end the journey. | |
| yield {"stage": "scene_error", "index": i, "error": str(exc)[:160]} | |
| yield {"stage": "done", "session": session, "count": len(phrases)} | |
| def world_api(text: str = "", lang: str = "en") -> dict: | |
| # Legacy/showcase generator — loads klein-BASE + a 3rd-party 360 LoRA that are NOT in the | |
| # runtime ledger; gate it like the /internal probes so no LIVE judge-facing route can load an | |
| # un-declared model. The live experience is /scroll_live (the four-model ledger path); this | |
| # stays only for offline showcase/bench builds. | |
| if os.getenv("LIGHTLOOM_ENABLE_INTERNAL_BENCH", "0") != "1": | |
| yield {"stage": "error", "error": "world endpoint disabled (showcase/bench only)"} | |
| return | |
| # default resolves at call time (when _GOLDEN_RECITAL, defined later, exists) | |
| session = uuid.uuid4().hex[:8] | |
| try: | |
| for event in _corridor_world((text or _GOLDEN_RECITAL).strip(), session): | |
| yield event | |
| except Exception as exc: # noqa: BLE001 - quota guard. | |
| message = str(exc).lower() | |
| stage = "quota_exceeded" if any(t in message for t in ("gpu", "quota", "zerogpu", "exceeded")) else "error" | |
| yield {"stage": stage, "error": str(exc)[:200]} | |
| def world_frame(session: str, name: str): | |
| # sessions are uuid4 hex[:8]; reject anything else (defense-in-depth vs traversal) | |
| if not re.fullmatch(r"[a-f0-9]{6,32}", session) or not re.fullmatch(r"[\w.-]{1,40}", name): | |
| return JSONResponse({"error": "bad request"}, status_code=400) | |
| path = Path("/tmp/lightloom/world") / session / Path(name).name | |
| if not path.is_file(): | |
| return JSONResponse({"error": "not found"}, status_code=404) | |
| return FileResponse(path) | |
| # --- SCROLL prototype: validate that klein outpaints SEAMLESS horizontal | |
| # continuations (the make-or-break for the Voice-Scroll engine). Generates a few | |
| # evolving sections, stitches them, serves the strip so we can eyeball the seams. --- | |
| _SCROLL_STYLES: dict[str, Any] = { | |
| "desert": ( | |
| ", golden hour warm light, cinematic painterly matte painting, soft haze, vivid color, consistent lighting", | |
| ["rolling golden sand dunes", "a lone caravan of camels crossing the dunes", | |
| "a green oasis with tall palms", "a sandstone desert city on the horizon", | |
| "the great carved gate of the sandstone city"], | |
| ), | |
| "bosch": ( | |
| ", in the style of Hieronymus Bosch The Garden of Earthly Delights, fantastical surreal " | |
| "medieval oil painting, densely detailed, dreamlike, whimsical translucent creatures, " | |
| "soft pastel light, consistent palette", | |
| ["a lush fantastical garden with a delicate crystal fountain", | |
| "strange translucent flowers and tiny pale figures wandering", | |
| "enormous ripe fruits and exotic jewel-coloured birds", | |
| "a pink bulbous domed pavilion rising from a still lake", | |
| "playful surreal creatures bathing by glassy pools"], | |
| ), | |
| "dali": ( | |
| ", in the style of Salvador Dali surrealism, dreamlike impossible landscape, melting soft " | |
| "forms, very long shadows, luminous pale sky, oil painting, hyper detailed, consistent light", | |
| ["a vast empty plain with a single melting clock draped over a bare branch", | |
| "impossible smooth floating rocks casting long shadows", | |
| "a distorted elephant on impossibly long stilt legs far away", | |
| "a perfect mirror lake reflecting a burning amber sky", | |
| "surreal stone staircases rising and dissolving into the clouds"], | |
| ), | |
| "fantasy": ( | |
| ", vibrant fantasy concept art, luminous magical world, glowing flora, rich saturated color, " | |
| "volumetric god rays, ethereal, highly detailed, consistent enchanted light", | |
| ["a glowing forest of giant bioluminescent mushrooms", | |
| "a crystal river winding past floating glowing lily pads", | |
| "an ancient tree-city with warm lantern windows", | |
| "a waterfall of light pouring into a glowing canyon", | |
| "a sky temple among drifting luminous clouds"], | |
| ), | |
| } | |
| def _scroll_test_on_gpu(style: str = "desert") -> dict[str, Any]: | |
| import time as _time | |
| from PIL import Image | |
| from lightloom.paint.scroll import outpaint_right | |
| info: dict[str, Any] = {} | |
| try: | |
| import numpy as np | |
| from lightloom.paint.panorama import load_panorama_pipeline | |
| pipe = load_panorama_pipeline() | |
| STYLE, subjects = _SCROLL_STYLES.get(style, _SCROLL_STYLES["desert"]) | |
| info["style"] = style | |
| h, overlap, seg = 512, 384, 384 | |
| sections = [] | |
| ctx = None | |
| t0 = _time.perf_counter() | |
| for i, p in enumerate(subjects): | |
| sec = outpaint_right(pipe, ctx, p + STYLE, seg_w=seg, height=h, overlap=overlap, steps=8, seed=100 + i) | |
| sections.append(sec) | |
| ctx = sec # the freshly generated section's right edge is the next context | |
| info["gen_s"] = round(_time.perf_counter() - t0, 1) | |
| # stitch left->right with a small FEATHER blend at each join (hides any micro-seam) | |
| blend = 40 | |
| total_w = sum(s.width for s in sections) - blend * (len(sections) - 1) | |
| strip = Image.new("RGB", (total_w, h)) | |
| x = 0 | |
| for i, s in enumerate(sections): | |
| arr = np.asarray(s.convert("RGB")).astype(np.float32) | |
| if i == 0: | |
| strip.paste(s, (x, 0)); x += s.width | |
| else: | |
| # overlap the previous by `blend` px and cross-fade s's left edge in | |
| px = x - blend | |
| base = np.asarray(strip.crop((px, 0, px + s.width, h)).convert("RGB")).astype(np.float32) | |
| a = np.ones((h, s.width, 1), np.float32) | |
| a[:, :blend, 0] = np.linspace(0, 1, blend)[None, :] | |
| merged = (arr * a + base * (1 - a)).astype(np.uint8) | |
| strip.paste(Image.fromarray(merged), (px, 0)) | |
| x = px + s.width | |
| out_dir = Path("/tmp/lightloom/scroll") | |
| out_dir.mkdir(parents=True, exist_ok=True) | |
| name = f"test-strip-{style}.webp" | |
| strip.save(out_dir / name, "WEBP", quality=92) | |
| info.update(ok=True, size=list(strip.size), strip=f"/frames-scroll/{name}", sections=len(sections)) | |
| except Exception as exc: # noqa: BLE001 | |
| import traceback | |
| info.update(ok=False, error=f"{type(exc).__name__}: {exc!r}", trace=traceback.format_exc()[-1400:]) | |
| return info | |
| def scroll_test_endpoint(style: str = "desert") -> dict[str, Any]: | |
| if os.getenv("LIGHTLOOM_ENABLE_INTERNAL_BENCH", "0") != "1": | |
| return {"ok": False, "error": "internal benchmark endpoint disabled"} | |
| return {"ok": True, "data": _scroll_test_on_gpu(style)} | |
| def _scroll_fast_test_on_gpu(style: str = "fantasy", n: int = 8, seg: int = 256, steps: int = 4) -> dict[str, Any]: | |
| """Benchmark the DISTILLED klein-4B fast continuous loop: how many small strips/sec | |
| and how do the 4-step seams look. This is the make-or-break for real-time flow.""" | |
| import time as _time | |
| from PIL import Image | |
| from lightloom.paint.scroll import load_fast_pipeline, outpaint_right | |
| info: dict[str, Any] = {} | |
| try: | |
| import numpy as np | |
| STYLE, subjects = _SCROLL_STYLES.get(style, _SCROLL_STYLES["fantasy"]) | |
| t = _time.perf_counter() | |
| pipe = load_fast_pipeline() | |
| info["load_s"] = round(_time.perf_counter() - t, 2) | |
| h, overlap = 512, 256 | |
| sections, times = [], [] | |
| ctx = None | |
| for i in range(n): | |
| subj = subjects[i % len(subjects)] | |
| t = _time.perf_counter() | |
| sec = outpaint_right(pipe, ctx, subj + STYLE, seg_w=seg, height=h, overlap=overlap, steps=steps, seed=300 + i) | |
| times.append(round(_time.perf_counter() - t, 3)) | |
| sections.append(sec) | |
| ctx = sec | |
| info["per_strip_s"] = times | |
| info["mean_strip_s"] = round(sum(times[1:]) / max(1, len(times) - 1), 3) # skip first (warm) | |
| info["strips_per_sec"] = round(1.0 / info["mean_strip_s"], 2) if info["mean_strip_s"] else None | |
| # stitch with feather | |
| blend = 32 | |
| total_w = sum(s.width for s in sections) - blend * (len(sections) - 1) | |
| strip = Image.new("RGB", (total_w, h)) | |
| x = 0 | |
| for i, s in enumerate(sections): | |
| arr = np.asarray(s.convert("RGB")).astype(np.float32) | |
| if i == 0: | |
| strip.paste(s, (x, 0)); x += s.width | |
| else: | |
| px = x - blend | |
| base = np.asarray(strip.crop((px, 0, px + s.width, h)).convert("RGB")).astype(np.float32) | |
| a = np.ones((h, s.width, 1), np.float32); a[:, :blend, 0] = np.linspace(0, 1, blend)[None, :] | |
| strip.paste(Image.fromarray((arr * a + base * (1 - a)).astype(np.uint8)), (px, 0)); x = px + s.width | |
| out_dir = Path("/tmp/lightloom/scroll"); out_dir.mkdir(parents=True, exist_ok=True) | |
| name = f"fast-strip-{style}.webp" | |
| strip.save(out_dir / name, "WEBP", quality=92) | |
| info.update(ok=True, size=list(strip.size), strip=f"/frames-scroll/{name}") | |
| except Exception as exc: # noqa: BLE001 | |
| import traceback | |
| info.update(ok=False, error=f"{type(exc).__name__}: {exc!r}", trace=traceback.format_exc()[-1500:]) | |
| return info | |
| def scroll_fast_endpoint(style: str = "fantasy", n: int = 8, seg: int = 256, steps: int = 4) -> dict[str, Any]: | |
| if os.getenv("LIGHTLOOM_ENABLE_INTERNAL_BENCH", "0") != "1": | |
| return {"ok": False, "error": "internal benchmark endpoint disabled"} | |
| return {"ok": True, "data": _scroll_fast_test_on_gpu(style, n, seg, steps)} | |
| def scroll_frame(name: str): | |
| path = Path("/tmp/lightloom/scroll") / Path(name).name | |
| if not path.is_file(): | |
| return JSONResponse({"error": "not found"}, status_code=404) | |
| return FileResponse(path) | |
| # --- THE LIVING SCROLL (the experience): one continuous panorama that EXTENDS | |
| # seamlessly as you narrate. The Director fixes a coherent session style from the | |
| # story's tone; each phrase outpaints the next section that CONTINUES the previous | |
| # one; Depth-Anything gives each section relief for the 2.5D scroll. Streamed. --- | |
| # A rich palette of DISTINCT art styles — the session's visual "lens", chosen by the narration's | |
| # mood/content so different stories look genuinely different (anime, noir, Ghibli, ink, impressionist, | |
| # photoreal, sketch, synthwave, ...) instead of one uniform painterly look. Fixed per session so the | |
| # continuous panorama stays coherent; variety comes ACROSS stories. | |
| _ART_STYLES: dict[str, str] = { | |
| "anime": ", anime cel-shaded illustration, crisp clean linework, vibrant saturated colour, luminous soft lighting", | |
| "noir": ", film noir, high-contrast black and white, deep chiaroscuro shadows, 1940s mood, fine film grain", | |
| "ghibli": ", Studio Ghibli hand-painted style, soft watercolour rendering, gentle diffuse light, painterly warmth", | |
| "ink": ", Japanese sumi-e ink wash, minimalist, expressive black brushstrokes on rice paper, generous negative space", | |
| "impressionist": ", impressionist oil painting, broken visible brushstrokes, shimmering Monet light, soft colour", | |
| "photoreal": ", photorealistic cinematic, dramatic natural light, ultra-detailed, shallow depth of field, 8k", | |
| "sketch": ", loose graphite pencil sketch, cross-hatching, sketchbook study, expressive monochrome lines", | |
| "storybook": ", warm storybook illustration, painterly, soft rounded forms, children's-book charm", | |
| "synthwave": ", 1980s synthwave colour grade, saturated magenta-and-cyan neon palette, soft retro glow, high contrast", | |
| "oil": ", classical oil painting, old-master chiaroscuro, rich varnished colour, baroque drama", | |
| "watercolour": ", delicate watercolour wash, soft bleeding pigments, luminous paper-white, airy", | |
| "lowpoly": ", stylised low-poly 3D render, flat faceted geometry, soft gradient lighting, clean", | |
| "pixelart": ", 16-bit pixel art, crisp blocky pixels, limited retro palette, dithering, game sprite", | |
| "retro": ", vintage retro illustration, aged muted tones, halftone grain, mid-century travel-poster feel", | |
| "comic": ", bold comic-book pop-art, thick black ink outlines, halftone Ben-Day dots, dynamic panel", | |
| "ukiyoe": ", ukiyo-e Japanese woodblock print, flat bold colour, strong outlines, Edo-period, Hokusai", | |
| "gothic": ", dark gothic painterly style, ornate and dramatic rendering, deep chiaroscuro lighting, moody low-key palette", | |
| "artdeco": ", art deco, geometric gold-and-black elegance, 1920s glamour, symmetrical, sleek", | |
| "claymation": ", claymation stop-motion, soft sculpted clay textures, handcrafted, tactile, charming", | |
| "charcoal": ", expressive charcoal drawing, smudged deep blacks, dramatic monochrome, rough paper grain", | |
| "modern": ", modern flat-design illustration, bold simple shapes, clean vector look, minimal palette", | |
| "scifi": ", sleek sci-fi concept-art rendering, crisp hard-surface detail, cool cinematic palette, polished finish", | |
| } | |
| # Function/stop words that carry NO paintable scene. A clause that is ONLY these (a VAD fragment the mic cut | |
| # mid-sentence: "is", "that's", "and then") gives klein nothing concrete, so it flows the smooth carried | |
| # sky/water into ABSTRACT iridescent colour-WAVES ("franjas de colores que aparecen de la nada") that then | |
| # propagate across strips. _phrase_is_paintable lets the live loop CONTINUE the previous real scene instead. | |
| _NONVISUAL_WORDS = frozenset( | |
| "a an the and or but is are was were be been being am i you he she it we they me him her them us this " | |
| "that these those of to in on at for with as by from up down out over off into onto about there here " | |
| "what who whom when where how why which whose so then now just very too also still yet do does did done " | |
| "will would shall should can could may might must have has had not no nor only thats its im ive id ill " | |
| "youre youve hes shes theyre dont cant wont isnt arent wasnt " | |
| # greetings / fillers / meta-talk that are NOT a visual scene (observed: 'hello', 'is working' -> abstract) | |
| "hello hi hey hiya bye goodbye ok okay yeah yep yes yup nope um uh huh hmm oh ah eh well alright lets let " | |
| "really kinda sorta gonna wanna gotta thing things stuff something anything nothing someone anyone everyone " | |
| "nobody everybody working testing test please thanks thank cool nice wow".split() | |
| ) | |
| def _phrase_is_paintable(phrase: str) -> bool: | |
| """True if the clause has >=1 CONTENT word (a paintable subject); False for a function-word-only VAD | |
| fragment ("is", "that's", "and then") that would otherwise make klein paint abstract colour-waves.""" | |
| import re | |
| words = [ | |
| w | |
| for w in re.findall(r"[a-z']+", (phrase or "").lower()) | |
| if w.replace("'", "") not in _NONVISUAL_WORDS and len(w) >= 3 | |
| ] | |
| return len(words) >= 1 | |
| def _scroll_session_style(text: str) -> str: | |
| """Pick ONE distinct art style for the session from the narration's mood/content, so different | |
| stories look genuinely different — not one uniform painterly look.""" | |
| t = (text or "").lower() | |
| def has(*ks: str) -> bool: | |
| return any(k in t for k in ks) | |
| if has("noir", "detective", "crime", "shadow", "smoke", "mystery", "murder", "alley", "rain-slick"): | |
| return _ART_STYLES["noir"] | |
| if has("anime", "neon", "cyber", "robot", "future", "mecha", "tokyo", "hologram", "android"): | |
| return _ART_STYLES["anime"] | |
| if has("spirit", "fairy", "child", "gentle", "wonder", "meadow", "totoro", "ghibli", "cottage"): | |
| return _ART_STYLES["ghibli"] | |
| if has("ink", "myth", "legend", "dragon", "samurai", "zen", "calligraphy", "temple"): | |
| return _ART_STYLES["ink"] | |
| if has("memory", "nostalgia", "garden", "afternoon", "impression", "monet", "pond", "blossom"): | |
| return _ART_STYLES["impressionist"] | |
| if has("epic", "war", "storm", "realistic", "battle", "cinematic", "ocean", "cliff", "mountain"): | |
| return _ART_STYLES["photoreal"] | |
| if has("sketch", "idea", "draft", "blueprint", "study", "outline", "drawing"): | |
| return _ART_STYLES["sketch"] | |
| if has("synth", "retro", "80s", "arcade", "vapor", "grid", "laser"): | |
| return _ART_STYLES["synthwave"] | |
| if has("watercolor", "watercolour", "calm", "quiet", "still", "mist", "drizzle"): | |
| return _ART_STYLES["watercolour"] | |
| if has("oil", "portrait", "classic", "baroque", "renaissance", "king", "palace", "throne"): | |
| return _ART_STYLES["oil"] | |
| if has("magic", "glow", "enchant", "dream", "star", "crystal", "luminous"): | |
| return _ART_STYLES["storybook"] | |
| # default: VARY by the text so different stories get different looks (never always the same one) | |
| keys = ["storybook", "impressionist", "photoreal", "ghibli", "ink", "oil", "watercolour", "anime"] | |
| return _ART_STYLES[keys[sum(ord(c) for c in t[:24]) % len(keys)]] | |
| def _director_session_style(tokenizer: Any, model: Any, phrase: str) -> str: | |
| """The DIRECTOR sets the session's art style at world-start. DEFAULT = a RANDOM style from the | |
| full palette, so every new world looks different WITHOUT the user having to say a trigger word — | |
| a first-timer just narrates and the visual lens surprises them. Set LIGHTLOOM_STYLE_FROM_NARRATION=1 | |
| to instead infer the style from the narration's mood (the MiniCPM classifier). Fixed for the | |
| session so the continuous panorama stays coherent.""" | |
| import random | |
| # LIVE-SAFE styles only: ink/sketch/charcoal/noir are monochrome/sparse and DEGENERATE in a continuous | |
| # outpaint — their negative space compounds strip-to-strip into grey bands + VISIBLY CHOPPY seams | |
| # (verified on the user's ink-mood poem). The Director may still pick from the narration's mood, but a | |
| # degenerating pick is remapped to a DENSE style that preserves the mood, so transitions stay beautiful. | |
| _DEGEN = ("ink", "sketch", "charcoal", "noir") | |
| live_keys = [k for k in _ART_STYLES if k not in _DEGEN] | |
| if os.getenv("LIGHTLOOM_STYLE_FROM_NARRATION", "0") == "1": | |
| try: | |
| from lightloom.director.director import pick_style_transformers | |
| key = pick_style_transformers(tokenizer, model, phrase) | |
| if key in live_keys: | |
| return _ART_STYLES[key] | |
| if key in ("ink", "sketch", "charcoal"): | |
| return _ART_STYLES["oil"] # rich painterly substitute for monochrome moods | |
| if key == "noir": | |
| return _ART_STYLES["gothic"] # dense, dramatic substitute that keeps the dark mood | |
| except Exception: # noqa: BLE001 - never let style-picking break narration | |
| pass | |
| s = _scroll_session_style(phrase) | |
| degen_vals = {_ART_STYLES[k] for k in _DEGEN} | |
| return _ART_STYLES["oil"] if s in degen_vals else s | |
| # default: a RANDOM art style per world -> guaranteed variety + surprise, no trigger words needed. | |
| # Use SystemRandom (os.urandom), NOT random.choice: ZeroGPU forks a fresh worker per call that | |
| # INHERITS the parent process's seeded RNG state, so plain random.choice returns the SAME | |
| # "random" style on every fork. SystemRandom reads fresh OS entropy each call -> genuinely | |
| # different per world. (This was the bug: two worlds kept landing on the identical style.) | |
| # ...but only from styles that hold up DENSELY + IN COLOUR across a continuous outpaint. The | |
| # monochrome/sparse ones (ink, sketch, charcoal, noir) DEGENERATE to washed-out grey in the live | |
| # scroll — their negative space compounds strip-to-strip into empty grey + dark bands. Now excluded | |
| # from BOTH the random pool AND the narration-mood pick (live_keys above). | |
| return _ART_STYLES[random.SystemRandom().choice(live_keys)] | |
| def _scroll_world(text: str, session: str): | |
| import numpy as np | |
| from PIL import Image | |
| from lightloom.depth.depth import estimate_depth, load_depth_pipeline | |
| from lightloom.director.director import generate_shot_transformers, load_director_transformers | |
| from lightloom.director.state import SceneState | |
| from lightloom.paint.scroll import load_fast_pipeline, outpaint_right | |
| out_dir = Path("/tmp/lightloom/scroll") / session | |
| out_dir.mkdir(parents=True, exist_ok=True) | |
| phrases = _split_phrases(text, cap=8) or ["a luminous world unfolding"] | |
| style = _scroll_session_style(text) | |
| yield {"stage": "warming", "sections": len(phrases)} | |
| tokenizer, model = load_director_transformers() | |
| pipe = load_fast_pipeline() # DISTILLED klein-4B, 4-step — the fast continuous loop | |
| depth_pipe = load_depth_pipeline() | |
| neutral = SceneState( | |
| characters=[], setting="", lighting="", mood="", palette=("#202428", "#7d8794", "#e6ddc8") | |
| ).model_dump() | |
| # CONTINUOUS loop: many small fast strips per phrase => the world keeps flowing | |
| # (generation ~1.4 strips/s of 256px >> the browser's scroll, so it never freezes). | |
| ctx = None | |
| prev_subject = "" | |
| h, overlap, seg = 512, 256, 256 # overlap==seg: carry the whole previous strip as context | |
| strips_per_phrase = 4 | |
| g = 0 # global strip index | |
| for i, phrase in enumerate(phrases): | |
| yield {"stage": "directing", "index": i, "text": phrase} | |
| try: | |
| shot, _m = generate_shot_transformers(tokenizer, model, phrase, dict(neutral)) | |
| subject = (shot.image_prompt_en or phrase or "a vivid scene").strip() | |
| yield {"stage": "painting", "index": i, "scene": subject[:120]} | |
| for k in range(strips_per_phrase): | |
| # first strip of a new scene morphs from the previous one (smooth panorama) | |
| is_change = k == 0 and prev_subject and prev_subject[:48].lower() != subject[:48].lower() | |
| strip_prompt = _scroll_transition_prompt(prev_subject, subject) if is_change else subject | |
| sec = outpaint_right(pipe, ctx, strip_prompt + style, seg_w=seg, height=h, overlap=overlap, steps=4, seed=200 + g * 3, morph_band=(96 if is_change else 0)) | |
| ctx = sec | |
| sec.save(out_dir / f"section-{g}.webp", "WEBP", quality=92) | |
| yield { | |
| "stage": "section", | |
| "index": g, | |
| "image": f"/frames-scroll/{session}/section-{g}.webp", | |
| "caption": phrase[:160], # every strip carries the caption (onFocus pins it; no flicker) | |
| "w": sec.width, | |
| } | |
| dmap = estimate_depth(depth_pipe, sec) | |
| lo, hi = float(np.percentile(dmap, 2)), float(np.percentile(dmap, 98)) | |
| dmap = (dmap - lo) / (hi - lo) if hi > lo else np.full_like(dmap, 0.5) | |
| dmap = np.clip(dmap, 0.0, 1.0) # constant/low-variance depth must not wrap mod 256 | |
| Image.fromarray((dmap * 255).astype(np.uint8)).save(out_dir / f"section-{g}-depth.png") | |
| yield {"stage": "depth", "index": g, "depth": f"/frames-scroll/{session}/section-{g}-depth.png"} | |
| g += 1 | |
| prev_subject = subject # the next phrase morphs FROM this scene | |
| except Exception as exc: # noqa: BLE001 - one bad section must not end the scroll. | |
| yield {"stage": "section_error", "index": g, "error": str(exc)[:160]} | |
| yield {"stage": "done", "session": session, "count": g} | |
| def scroll_api(text: str = "", lang: str = "en") -> dict: | |
| session = uuid.uuid4().hex[:8] | |
| try: | |
| for event in _scroll_world((text or _GOLDEN_RECITAL).strip(), session): | |
| yield event | |
| except Exception as exc: # noqa: BLE001 - quota guard. | |
| message = str(exc).lower() | |
| stage = "quota_exceeded" if any(t in message for t in ("gpu", "quota", "zerogpu", "exceeded")) else "error" | |
| yield {"stage": stage, "error": str(exc)[:200]} | |
| def scroll_session_frame(session: str, name: str): | |
| if not re.fullmatch(r"[a-f0-9]{6,32}", session) or not re.fullmatch(r"[\w.-]{1,40}", name): | |
| return JSONResponse({"error": "bad request"}, status_code=400) | |
| path = Path("/tmp/lightloom/scroll") / session / Path(name).name | |
| if not path.is_file(): | |
| return JSONResponse({"error": "not found"}, status_code=404) | |
| return FileResponse(path) | |
| # --- LIVE narration: the world builds AS YOU SPEAK. While you talk, the browser | |
| # VAD-cuts each phrase and streams it as a short voice segment; ONE GPU call per | |
| # segment transcribes it AND paints a few strips that CONTINUE the same panorama | |
| # (the last strip is kept per session as the outpaint context), so sections keep | |
| # flowing in with low delay — no "record, stop, then render". --- | |
| _SCROLL_ROOT = Path("/tmp/lightloom/scroll") | |
| _LIVE_MAX = 24 # keep at most this many session dirs on disk; sweep the oldest | |
| # Whisper hallucinates these short phrases from near-silent audio; treat them as silence so the | |
| # world never paints an unrelated scene from a cough/breath. | |
| _ASR_HALLUCINATIONS = frozenset({ | |
| "", "you", "thank you", "thanks", "thanks for watching", "thank you for watching", | |
| "thanks for watching this video", "please subscribe", "subscribe", "subscribe to my channel", | |
| "see you next time", "see you in the next video", "i'll see you next time", "bye", "bye bye", | |
| "okay", "ok", "good halloween", "i am here", "i'm here", "music", "applause", "you're welcome", | |
| "the end", "hello everyone", "thanks for listening", | |
| }) | |
| def _sweep_scroll_sessions(keep: int = _LIVE_MAX, protect: frozenset[str] = frozenset()) -> None: | |
| """Bound /tmp: keep only the most-recently-touched ``keep`` session dirs. ``protect`` names sessions | |
| that must NEVER be swept (the live session being served — mkdir(exist_ok) does not freshen mtime, so | |
| a busy session could otherwise sweep its OWN dir mid-narration under multi-visitor load). | |
| Runs on disk (not an in-memory dict), so it works regardless of the GPU-worker lifecycle.""" | |
| import shutil | |
| try: | |
| dirs = [p for p in _SCROLL_ROOT.iterdir() if p.is_dir() and p.name not in protect] | |
| except (OSError, FileNotFoundError): | |
| return | |
| if len(dirs) <= keep: | |
| return | |
| for p in sorted(dirs, key=lambda d: d.stat().st_mtime)[: len(dirs) - keep]: | |
| shutil.rmtree(p, ignore_errors=True) | |
| def _strip_looks_corrupt(img: Any) -> bool: | |
| """Detect the saturated red/blue 'neon ribbon' artifact: once a 4-step strip overflows | |
| into near-pure-saturation colour, the ctx-carry propagates it forever (the user's | |
| long-session bug). If a strip is dominated by extreme-saturation/bright pixels, the | |
| caller drops the carry-over context and repaints fresh, breaking the chain. Legitimate | |
| vivid scenes sit well below sat>0.92, so this rarely false-positives.""" | |
| import numpy as np | |
| a = np.asarray(img.convert("RGB"), dtype=np.float32) / 255.0 | |
| mx = a.max(axis=2) | |
| mn = a.min(axis=2) | |
| sat = (mx - mn) / (mx + 1e-6) | |
| neon = (sat > 0.92) & (mx > 0.5) # near-pure saturated + bright | |
| if float(neon.mean()) > 0.20: | |
| return True | |
| # also catch a THIN saturated streak-LINE: a few % of total area (below 0.20) but ~100% of its | |
| # own row or column. Without this the ctx-carry propagates a neon ribbon down the whole panorama. | |
| return bool(neon.mean(axis=1).max() > 0.6 or neon.mean(axis=0).max() > 0.6) | |
| STRUCTURE_FLOOR = 0.045 # below this edge-density the strip is dissolving into abstract colour | |
| def _strip_structure_score(img: Any) -> float: | |
| """Edge/gradient density in [0,1]: high for representational scenes (crisp foliage, | |
| horizon, objects), collapses toward ~0 for the smooth 'abstract neon-wave' drift that | |
| _strip_looks_corrupt (saturation-only) never catches. CPU/numpy, ~1 ms on a strip.""" | |
| import numpy as np | |
| a = np.asarray(img.convert("L"), dtype=np.float32) / 255.0 | |
| a = a[::2, ::2] # 2x downsample: faster + ignores single-pixel paint/depth noise | |
| gx = np.abs(np.diff(a, axis=1)) | |
| gy = np.abs(np.diff(a, axis=0)) | |
| g = np.maximum(gx[:-1, :], gy[:, :-1]) | |
| return float((g > 0.06).mean()) # fraction of 'edge' pixels | |
| def _strip_looks_abstract(img: Any) -> float: | |
| """How far below the structure floor the strip is (0.0 = fine .. 1.0 = fully abstract). | |
| Distinct from _strip_looks_corrupt: that flags SATURATION, this flags loss of STRUCTURE — | |
| the gradual drift into smooth colour fields that dominates long sessions.""" | |
| s = _strip_structure_score(img) | |
| if s >= STRUCTURE_FLOOR: | |
| return 0.0 | |
| return min(1.0, (STRUCTURE_FLOOR - s) / STRUCTURE_FLOOR) | |
| def _live_next_index(out_dir: Path) -> int: | |
| """Next strip index = (max existing ``section-N.webp``) + 1. DISK-derived so the | |
| index is correct (and filenames never collide) even if the GPU worker recycled and | |
| lost all in-process state between phrase calls.""" | |
| mx = -1 | |
| for p in out_dir.glob("section-*.webp"): | |
| m = re.fullmatch(r"section-(\d+)", p.stem) | |
| if m: | |
| mx = max(mx, int(m.group(1))) | |
| return mx + 1 | |
| def _live_state(out_dir: Path) -> dict[str, Any]: | |
| """Per-session continuity state on disk (style + the previous phrase's subject, used | |
| to MORPH smoothly between scenes). Survives a GPU-worker recycle.""" | |
| try: | |
| s = out_dir / "state.json" | |
| if s.is_file(): | |
| return json.loads(s.read_text("utf-8")) | |
| except (OSError, ValueError): | |
| pass | |
| return {} | |
| def _live_save_state(out_dir: Path, **updates: Any) -> None: | |
| state = _live_state(out_dir) | |
| state.update(updates) | |
| try: | |
| (out_dir / "state.json").write_text(json.dumps(state), "utf-8") | |
| except OSError: | |
| pass | |
| def _depth_global_norm(raw: Any, state: dict[str, Any], out_dir: Path) -> Any: | |
| """Normalize a RAW depth/disparity map against ONE session-global [lo,hi] (an EMA of per-strip | |
| 2/98 percentiles, persisted across GPU-worker recycles), so EVERY strip shares the SAME depth | |
| scale -> adjacent strips align (no seam) and 'near' is consistent (no per-strip flattening — the | |
| structural cause of the 'flat 2.5D' + seam shear). Returns [0,1] with 1 = near (renderer convention).""" | |
| import numpy as np | |
| p2, p98 = float(np.percentile(raw, 2)), float(np.percentile(raw, 98)) | |
| lo, hi = state.get("depth_lo"), state.get("depth_hi") | |
| if not (isinstance(lo, (int, float)) and isinstance(hi, (int, float)) and hi > lo): | |
| lo, hi = p2, p98 # the first strip seeds the session-global scale | |
| else: | |
| a = 0.25 # ease the global range toward each new strip's percentiles | |
| lo, hi = (1 - a) * lo + a * p2, (1 - a) * hi + a * p98 | |
| if hi - lo < 1e-6: | |
| hi = lo + 1e-6 | |
| state["depth_lo"], state["depth_hi"] = lo, hi | |
| _live_save_state(out_dir, depth_lo=lo, depth_hi=hi) | |
| norm = np.clip((raw - lo) / (hi - lo), 0.0, 1.0) | |
| # raw is DA-V2 DISPARITY (near=HIGH) -> norm already has 1=near/0=far (the renderer convention). | |
| # The old `1.0 - norm` flipped it inside-out (near read as far) -> inverted parallax + aerial on | |
| # the wrong regions + disocclusion streaks. Return norm as-is. | |
| return norm # 1 = near, 0 = far | |
| def _scroll_transition_prompt(prev_subject: str, subject: str) -> str: | |
| """The boundary strip between two scenes: paint the FIRST as it DISSOLVES into the | |
| SECOND, so the panorama morphs instead of cutting hard.""" | |
| return f"a seamless gradual transition where {prev_subject} slowly dissolves and transforms into {subject}" | |
| # Stopwords for the Director content-word guard (function words carry no subject -> ignore them). | |
| _STOPWORDS = frozenset({ | |
| "the", "and", "was", "were", "with", "that", "this", "they", "them", "then", "there", "here", | |
| "have", "has", "had", "from", "into", "over", "under", "your", "you", "his", "her", "she", "him", | |
| "are", "for", "but", "not", "all", "out", "off", "one", "when", "what", "where", "which", "who", | |
| "why", "how", "little", "bit", "sorry", "describe", "very", "just", "like", "about", "would", | |
| "could", "should", "been", "being", "their", "some", "more", "than", "also", "okay", | |
| }) | |
| def _shares_content_word(phrase: str, cand: str) -> bool: | |
| """True iff the Director's candidate scene genuinely relates to the phrase. Requires TWO shared | |
| concrete (>=4-char, non-stopword) words for a normal phrase — ONE coincidental word ("blue whale" | |
| vs the few-shot's "blue apron") let the few-shot subject parrot through. A short phrase (<=3 | |
| concrete words) needs just one. If the phrase has no concrete word, returns False (paint the | |
| user's own words, never an invented/parroted scene).""" | |
| pw = {w for w in re.findall(r"[a-z]{4,}", phrase.lower())} - _STOPWORDS | |
| if not pw: | |
| return False | |
| cw = {w for w in re.findall(r"[a-z]{4,}", cand.lower())} | |
| n = len(pw & cw) | |
| return n >= 2 or (len(pw) <= 3 and n >= 1) | |
| def _scroll_live_phrase(session: str, text: str, audio_b64: str, lang: str): | |
| """Transcribe ONE spoken segment (or take ``text``) and paint a few strips that | |
| CONTINUE this session's panorama. Streams transcript/section/depth events and | |
| persists the last strip as the carry-over context, so the next segment flows on | |
| seamlessly. Loading the models first also serves as the warm-on-tap path.""" | |
| import base64 | |
| import tempfile | |
| import numpy as np | |
| from PIL import Image | |
| from lightloom.depth.depth import estimate_depth, load_depth_pipeline | |
| from lightloom.director.director import generate_shot_transformers, load_director_transformers | |
| from lightloom.director.state import SceneState | |
| from lightloom.paint.scroll import load_fast_pipeline, outpaint_right | |
| # warm (cached after first call): director + fast painter + depth | |
| tokenizer, model = load_director_transformers() | |
| pipe = load_fast_pipeline() | |
| depth_pipe = load_depth_pipeline() | |
| # COLD-START: the very first generation compiles CUDA kernels/cuBLAS heuristics (multi-second). The | |
| # client fires a '__warm__' beat on mic-tap so those kernels compile DURING the intro at the REAL | |
| # production shape -> the user's first spoken phrase paints at warm speed, not a cold stall. | |
| # The frontend sends "__warm__" through the AUDIO slot (streamScrollLive's 2nd arg), so accept it on | |
| # EITHER field — otherwise the warm fell through to the normal path, decoded "__warm__" as garbage | |
| # audio, returned 'silent', and NEVER compiled the kernels (verified: client-warm -> 'silent', the | |
| # first real phrase then paid the full cold-start), while still holding the GPU slot for ~8s. | |
| if (text or "").strip() == "__warm__" or (audio_b64 or "").strip() == "__warm__": | |
| try: | |
| from lightloom.paint.scroll import outpaint_right as _ow | |
| _warm = _ow(pipe, None, "a calm open landscape at dawn", seg_w=556, height=768, overlap=212, steps=4, seed=1) | |
| estimate_depth(depth_pipe, _warm, normalize=False) # warm the depth forward too | |
| except Exception: # noqa: BLE001 - warming is best-effort; never block the session. | |
| pass | |
| yield {"stage": "warmed"} | |
| return | |
| # 1) get the phrase text — transcribe the spoken segment, or use the typed text | |
| phrase = (text or "").strip() | |
| _asr_be = "text" # DEBUG: which ASR backend actually ran (verify Parakeet vs Whisper-fallback) | |
| if not phrase and audio_b64: | |
| path = None | |
| try: | |
| payload = audio_b64.split(",")[-1] | |
| if len(payload) > 8_000_000: # ~6 MB decoded; reject BEFORE decode (OOM / DoS guard) | |
| yield {"stage": "asr_error", "error": "audio_too_large"} | |
| return | |
| raw = base64.b64decode(payload) if payload else b"" | |
| if len(raw) >= 64: | |
| from lightloom.audio_in.asr import load_asr, read_wav | |
| from lightloom.audio_in.asr import transcribe as _asr | |
| with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as handle: | |
| handle.write(raw) | |
| path = handle.name | |
| audio, sr = read_wav(path) | |
| # Silence/room-noise gate: Whisper hallucinates short phrases from near-silent audio. | |
| # Only transcribe if the segment carries real energy; else leave phrase="" -> 'silent'. | |
| # Gate on the VOICED portion, NOT the whole-clip mean: a soft/short phrase plus the ~800ms | |
| # VAD silence tail averages below a fixed floor and is wrongly dropped (verified cause of | |
| # "only catches phrases"). Use the 90th-percentile of ~100ms window RMS (the loudest tenth | |
| # = the real speech) and match the browser VAD floor (recorder.js VAD_FLOOR=0.006), so we | |
| # only admit audio the client VAD already validated; _ASR_HALLUCINATIONS stays the net. | |
| _a = np.asarray(audio, dtype=np.float32).ravel() | |
| _win = max(1, int(sr * 0.1)) | |
| if _a.size >= _win: | |
| _n = (_a.size // _win) * _win | |
| _wr = np.sqrt((_a[:_n].reshape(-1, _win) ** 2).mean(axis=1)) | |
| _speech = float(np.percentile(_wr, 90)) | |
| _voiced_frac = float((_wr >= 0.006).mean()) # how much of the clip is actually voiced | |
| else: | |
| _speech = float(np.sqrt(np.mean(_a * _a))) if _a.size else 0.0 | |
| _voiced_frac = 1.0 if _speech >= 0.006 else 0.0 | |
| # require a loud-enough peak AND a SUSTAINED voiced fraction (>=10% of windows) — a lone | |
| # AGC-boosted transient (breath, keystroke) no longer passes as speech for Whisper to | |
| # hallucinate on; the downstream confidence gate (asr.py) is the second line of defence. | |
| # PARAKEET-AWARE: this strict gate exists ONLY because autoregressive Whisper HALLUCINATES a | |
| # phrase from near-silent/short audio. NVIDIA Parakeet (CTC) emits BLANKS on silence/noise — | |
| # it structurally CANNOT hallucinate — so the strict Whisper-era gate just DROPPED valid | |
| # quiet/short speech. CORRECTED (audit w368kb6kp): 0.0015 was ~4x BELOW the browser VAD floor | |
| # (recorder.js VAD_FLOOR=0.006) -> room tone/breath reached Parakeet, which (CTC argmax) still | |
| # emits a best-guess word from faint noise -> "palabras que nunca dije". Keep the floor at the | |
| # client VAD floor (0.006) but a LENIENT voiced-fraction (0.05 vs Whisper's 0.10) so genuinely | |
| # quiet/short real speech still paints while room tone does not. | |
| _is_parakeet = os.getenv("LIGHTLOOM_ASR", "whisper").strip().lower() == "parakeet" | |
| _gate_ok = (_speech >= 0.006 and _voiced_frac >= 0.05) if _is_parakeet else (_speech >= 0.006 and _voiced_frac >= 0.10) | |
| if _gate_ok: | |
| proc, amodel = load_asr() | |
| _asr_be = type(amodel).__name__ | |
| phrase = (_asr(proc, amodel, audio, sampling_rate=sr, language=lang) or "").strip() | |
| if phrase.lower().strip(" .,!?") in _ASR_HALLUCINATIONS: | |
| phrase = "" # a known Whisper-on-silence hallucination -> drop it | |
| except Exception as exc: # noqa: BLE001 - one bad segment must not end narration. | |
| yield {"stage": "asr_error", "error": str(exc)[:160]} | |
| finally: | |
| if path: | |
| try: | |
| os.unlink(path) | |
| except OSError: | |
| pass | |
| if not phrase: | |
| # nothing intelligible in this segment (or a pure warm call) — keep listening | |
| yield {"stage": "silent"} | |
| return | |
| # Moderation net at the SOURCE: scrub here so BOTH the caption (phrase[:160]) and the painted | |
| # subject (g>0 uses subject=phrase; g==0 derives it from phrase) are clean — no slur/NSFW ever | |
| # reaches FLUX or the on-screen caption in front of judges. Pure CPU, microseconds. | |
| from lightloom.paint.prompt import scrub_unsafe | |
| phrase, _flagged = scrub_unsafe(phrase) | |
| if _flagged: | |
| yield {"stage": "filtered"} | |
| yield {"stage": "transcript", "text": phrase, "asr": _asr_be} | |
| # 2) session continuity — DISK-DERIVED so it survives a GPU-worker recycle: the | |
| # strip index is the count of sections already on disk (no filename collisions), the | |
| # outpaint carry-over is the last saved strip, and the style is fixed by phrase #1. | |
| out_dir = _SCROLL_ROOT / session | |
| out_dir.mkdir(parents=True, exist_ok=True) | |
| _sweep_scroll_sessions(protect=frozenset({session})) # never sweep the session we are serving | |
| g = _live_next_index(out_dir) | |
| g0 = g # the strip index AT PHRASE START (the paint loop increments g) — used to pin the title to the FIRST phrase | |
| state = _live_state(out_dir) | |
| style = state.get("style") | |
| if not style: | |
| style = _director_session_style(tokenizer, model, phrase) | |
| _live_save_state(out_dir, style=style) | |
| prev_subject = (state.get("prev_subject") or "").strip() | |
| roll = (state.get("narration") or "").strip() # bounded rolling narration anchor (continuity thread) | |
| ctx = None | |
| if g > 0: | |
| prev = out_dir / f"section-{g - 1}.webp" | |
| if prev.is_file(): | |
| try: | |
| ctx = Image.open(prev).convert("RGB") | |
| except Exception: # noqa: BLE001 - missing/corrupt carry-over -> fresh start | |
| ctx = None | |
| # ABSTRACTION anchor: hold the first clean strip as a STRUCTURE reference. When the | |
| # autoregressive prior dissolves into smooth colour fields over a long session (drift the | |
| # saturation guard never catches), we re-inject its real edges into the conditioning | |
| # instead of cutting, so the world heals without a hard seam. | |
| anchor = None | |
| if g > 0: | |
| a0 = out_dir / "section-0.webp" | |
| if a0.is_file(): | |
| try: | |
| anchor = Image.open(a0).convert("RGB") | |
| except Exception: # noqa: BLE001 - no anchor -> abstraction guard simply no-ops | |
| anchor = None | |
| # SPEED: the Director sets the world's DIRECTION (a rich opening scene + the session | |
| # style/aesthetic) ONCE — on the very first phrase. Every later phrase paints the | |
| # user's WORDS DIRECTLY (+ that fixed style), with NO per-phrase Director call. That | |
| # is ~2 s faster per phrase AND more faithful to what was actually said. | |
| # FAITHFUL-SUBJECT design — the Director must never make the experience worse. The painted SUBJECT | |
| # is ALWAYS the user's own words. The Director (MiniCPM) contributes only bounded CINEMATOGRAPHY | |
| # (lighting + palette), NEVER a scene rewrite: its image_prompt_en parroted the few-shot subject | |
| # (a "fishmonger in a blue apron" / "lone traveler in a grey cloak") into unrelated narration — | |
| # verified by capturing its raw output. Lighting/palette can tint the mood but can never replace | |
| # "she opened the little blue door" with a fishmonger. | |
| subject = phrase.strip() or "a vivid scene" | |
| # Give the painter only the PAINTABLE words: drop non-visual sound words (echo/silence/whisper/...) | |
| # that the live path used to pass through verbatim and that confuse FLUX. The CAPTION still shows the | |
| # FULL phrase (phrase[:160] below), so the user sees everything they said. GUARDED: keep the sanitized | |
| # form only if it retains >= half the words; otherwise keep the original so we never paint LESS than | |
| # what was spoken (a sound-word-only phrase is left intact). | |
| from lightloom.paint.prompt import sanitize_image_prompt as _sanitize_subj | |
| _clean_subj = _sanitize_subj(subject) | |
| if _clean_subj and len(_clean_subj.split()) >= max(1, len(subject.split()) // 2): | |
| subject = _clean_subj | |
| # NON-PAINTABLE FRAGMENT -> continue the previous real scene (no abstract iridescent colour-wave band). | |
| if not _phrase_is_paintable(subject): | |
| if prev_subject: | |
| subject = prev_subject # extend the last real scene: subtle, seamless, one coherent image | |
| else: | |
| yield {"stage": "silent"} # no prior scene to continue -> skip rather than paint an abstract scene | |
| return | |
| # DEFINITIVE Director scope: its ONLY painter-facing job is choosing the session ART STYLE (above, | |
| # via _director_session_style, from _ART_STYLES — now PURE RENDER TECHNIQUE (medium/brushwork/palette), | |
| # scrubbed of scene NOUNS (the old "neon grids/chrome" synthwave + "lush whimsical nature" ghibli WERE | |
| # injecting a scene into every strip -> the world reverted to ONE concept regardless of the words). The | |
| # SUBJECT (user's words) now owns the scene; the style only owns the look. The old free-form | |
| # generate_shot_transformers cinematography call is REMOVED: its lighting/palette ("misty green" + | |
| # a forest hex palette) was session-locked into EVERY strip and forced a green forest onto "now we | |
| # are under the ocean" — the recurring "the Director locks the world to one scene" bug. The HUD still | |
| # shows a brief Director beat on phrase #0 to surface that it chose the style. | |
| if g == 0: | |
| yield {"stage": "directing", "text": phrase} | |
| yield {"stage": "painting", "scene": subject[:120]} | |
| # overlap == seg means each strip carries the WHOLE previous strip as context (the max | |
| # available, since the stored strip is seg-wide); smoothness comes from the morph prompt. | |
| # QUALITY: generate near klein's native band. 768x768 canvas (0.59 MP) vs the old 640x512 (0.33 MP) | |
| # ~doubles visible detail; the returned strip is 576x768. overlap=192 (25% of the canvas, down from | |
| # 40%) spends more of the pixel budget on FRESH sharp content. Verified ~646 ms/strip, ~1.29 s/phrase | |
| # 768x768 canvas (overlap 192 + seg 576), returns a 576x768 strip. seg=768 (canvas 960x768) was | |
| # MEASURED on the Space at ~12s/phrase (FLUX attention is O(latent_tokens^2): 0.59->0.74 MP cost ~1.9x, | |
| # NOT the sub-linear estimate) -> it broke real-time, so seg stays 576 here. CONTINUITY comes from the | |
| # rolling narration anchor below + the 192px overlap carry, NOT from widening the strip. | |
| # overlap 192, seg 576 (canvas stays 768x768): a BIGGER carry (192 = 25% vs the 128 = 17% that read as | |
| # DISJOINT scene-to-scene JUMPS) so each strip FLOWS from the EVOLVING previous strip — continuity that | |
| # still evolves (the new words drive the fresh 576px), unlike Cast Lock (image_reference=section-0) | |
| # which the user found CLONED one scene (the opening neon city, "pegada"). If this feels too sticky | |
| # ("se queda en la escena anterior"), dial overlap back toward 160. Coherence = carried previous strip | |
| # + fixed session style (TEXT), NOT a frozen scene-0 lock. | |
| # overlap 212 / seg 556: a slightly STRONGER carry than 192 (more carried context -> fewer grey transition | |
| # bands) but well short of the 256 that got STUCK on narrative prose. 212+556=768; the __warm__ call below | |
| # is matched to 212/556. The grey-band ROOT cause is also fixed in scroll.py (the fresh region is now seeded | |
| # from the carried edge's average colour, not the jarring grey 128 base). | |
| h, overlap, seg = 768, 212, 556 | |
| # 2 strips/phrase: 3 strips RE-PAINTED the same subject 3x ("dije shark una vez y aparece repetido" -> 3 | |
| # sharks), reading as incoherent repetition. 2 keeps a scene SUSTAINED (longer than 1) but with a subtler | |
| # change per phrase; the overlap=192 carry + the fragment-continue above hold the single-image coherence. | |
| strips_per_phrase = 2 | |
| from lightloom.paint.scroll import style_trigger # fused painterly-LoRA trigger ("" if LoRA off) | |
| # The painter gets ONLY: the user's words (subject) + the Director's cinematography (cine: lighting + | |
| # palette) + the fixed session style. NO previous-scene CONTENT is injected. A rolling "in the same | |
| # continuous world, <prev scene>" clause was re-injecting the FIRST scene ("a neon synthwave city") | |
| # into every prompt -> the painter kept repainting it regardless of the new words (the over-clone bug). | |
| # Continuity is carried VISUALLY by Cast Lock (image_reference) + the overlap, not by re-stating content. | |
| # NO cine in the per-strip prompt. VERIFIED root cause of the "world stays a forest" bug: the Director's | |
| # phrase-#0 cinematography (e.g. lighting="misty green" + a forest hex palette) is SESSION-LOCKED and was | |
| # appended to EVERY strip -> "now we are under the ocean, misty green, colour palette #forest..." forced a | |
| # green forest onto the ocean. The Director still gives the scene-NEUTRAL art STYLE (style, from | |
| # _director_session_style) which carries coherence; the SUBJECT (the user's words) now drives the scene so | |
| # the world EVOLVES when they say something new. The Director's contribution is the curated art STYLE. | |
| # QUALITY BOOSTERS: append tasteful FIDELITY words (technique only, NO scene nouns) so the 4-step distilled | |
| # klein-4B punches above its weight and reads like a far larger model (the user's goal). Kept short so it | |
| # raises detail/craft without over-baking or homogenising the scene. | |
| paint_prompt = style_trigger() + subject + style + ", richly detailed, intricate, masterful, beautifully rendered" | |
| # CAST LOCK (the innovation): feed the clean HERO anchor (section-0, downscaled to 256px to bound the | |
| # added condition-token attention cost) into klein's in-context image_reference channel so the world | |
| # keeps its identity/palette/architecture across the whole session — a VISUAL memory, not just the | |
| # rolling text anchor. None on phrase #0 (that strip IS the anchor) and if the anchor failed to load. | |
| # Env-gated: default OFF for the fast (~5.5s/phrase) production experience (the rolling text anchor | |
| # above already gives thematic continuity). Flip LIGHTLOOM_CAST_LOCK=1 for the BFL demo, where the | |
| # image_reference VISUAL identity-lock is the showcased innovation (costs ~+2.5s/phrase of attention). | |
| cast_ref = None | |
| if anchor is not None and os.getenv("LIGHTLOOM_CAST_LOCK", "0") == "1": | |
| try: | |
| cast_ref = [anchor.resize((256, max(1, round(256 * anchor.height / anchor.width))))] # 256px: the proven-good lock (192 drifted). Added attention cost is within ZeroGPU variance. | |
| except Exception: # noqa: BLE001 - reference is best-effort; the plain outpaint still works. | |
| cast_ref = None | |
| # CLEAN per-strip outpaint: paint each strip as the scene description, carrying the | |
| # previous strip as context. (No ctx=None re-anchor — it created a hard cut every N | |
| # strips; long-session drift is fixed properly by the klein KV-cache reference anchor.) | |
| try: | |
| for k in range(strips_per_phrase): | |
| # break the neon-ribbon corruption chain: if the carried strip overflowed into | |
| # saturated colour, drop the context and repaint fresh (no endless propagation). | |
| # Validated end-to-end: an injected AND a natural neon strip were each contained | |
| # to one strip, the next strip came back clean. | |
| # SATURATION guard: break the neon-ribbon corruption chain. SCOPE IT to the rightmost | |
| # `overlap` px — that is the ONLY region outpaint_right actually carries as context, so a | |
| # saturated SKY filling the strip's LEFT (which never reaches the next paint) must NOT trip a | |
| # needless fresh repaint (the false-positive cut that read as the world resetting). | |
| _ctx_edge = ctx.crop((ctx.width - overlap, 0, ctx.width, ctx.height)) if ctx is not None else None | |
| # DROP the carried context if its right EDGE is corrupt (saturated) OR has COLLAPSED into a flat, | |
| # near-structureless colour field (the green block / iridescent waves / dark smudge). Carrying such | |
| # an edge propagates the bad scene forever; starting the next strip fresh from the (concrete) prompt | |
| # breaks the chain. This RESTORES the long-session drift safety net the abstraction-heal used to give | |
| # — but by DROPPING the bad context, NOT blending toward section-0 (which caused the grey-fog smear). | |
| if _ctx_edge is not None and (_strip_looks_corrupt(_ctx_edge) or _strip_looks_abstract(_ctx_edge) > 0.5): | |
| use_ctx = None | |
| else: | |
| use_ctx = ctx | |
| # SMOOTH continuation: paint each strip as the current subject, carrying the previous strip | |
| # as context, with NO morph band. The morph_band dissolve (an 8px->128px stretch of the | |
| # carried edge) is what put a foggy blurry smear between every scene; the plain outpaint | |
| # already lets a new spoken subject emerge from the carried scene fluidly, like before. | |
| sec = outpaint_right(pipe, use_ctx, paint_prompt, seg_w=seg, height=h, overlap=overlap, steps=4, seed=200 + g * 3, image_reference=cast_ref) | |
| # CATCH a bad FRESH strip before it is shown/carried: saturated corruption OR a collapse into a flat | |
| # near-structureless colour field (the green block / iridescent waves / dark smudge the saturation | |
| # guard misses). Reroll it (new seed) carrying the already-validated context, so a bad ROLL is fixed | |
| # while continuity is kept; if use_ctx was dropped above, this is a clean fresh repaint instead. | |
| if _strip_looks_corrupt(sec) or _strip_looks_abstract(sec) > 0.5: | |
| sec = outpaint_right(pipe, use_ctx, paint_prompt, seg_w=seg, height=h, overlap=overlap, steps=4, seed=211 + g * 3, image_reference=cast_ref) | |
| ctx = sec | |
| sec.save(out_dir / f"section-{g}.webp", "WEBP", quality=92) | |
| yield { | |
| "stage": "section", | |
| "index": g, | |
| "image": f"/frames-scroll/{session}/section-{g}.webp", | |
| # stamp EVERY strip of the phrase (not just k==0): the caption is shown by onFocus when | |
| # the strip is under the view centre, so a blank on the 2nd strip made the words flicker | |
| # off mid-clause. Same phrase across both strips -> the caption stays pinned. | |
| "caption": phrase[:160], | |
| "prompt": paint_prompt[:300], # DEBUG: the exact text sent to the painter (verify the mix) | |
| "w": sec.width, | |
| } | |
| raw_d = estimate_depth(depth_pipe, sec, normalize=False) # RAW disparity (un-scaled) | |
| dmap = _depth_global_norm(raw_d, state, out_dir) # ONE session-global scale, 1=near | |
| Image.fromarray((dmap * 255).astype(np.uint8)).save(out_dir / f"section-{g}-depth.png") | |
| yield {"stage": "depth", "index": g, "depth": f"/frames-scroll/{session}/section-{g}-depth.png"} | |
| g += 1 | |
| except Exception as exc: # noqa: BLE001 - one bad strip must not end narration. | |
| yield {"stage": "section_error", "index": g, "error": str(exc)[:160]} | |
| # append the current subject to the rolling narration (keep ~last 3 phrases, cap 180 chars) so the next | |
| # phrase continues THIS evolving world; the current words always dominate the prompt above. | |
| roll = "; ".join([p for p in (roll.split("; ") + [subject]) if p.strip()][-3:])[:180] | |
| _live_save_state(out_dir, prev_subject=subject, narration=roll) | |
| # LIVE TITLE REMOVED (user decision): the text MiniCPM5-1B title (title_world_transformers) was unreliable | |
| # — it echoed the spoken opening ("We Are", "Shark Under The Ocean") and never read like a real world | |
| # name. The world is named instead by the MiniCPM-V-4.6 "Art Director" in the Director's Cut POST-PROCESS | |
| # (docs/DIRECTORS_CUT_SPEC.md), which reads the FINISHED painted pixels — this keeps the Best-MiniCPM | |
| # category without a flaky live title. NO 'title' event is emitted live, so #world-title stays empty. | |
| yield {"stage": "phrase_done", "index": g} | |
| def scroll_live_api(session: str = "", text: str = "", audio_b64: str = "", lang: str = "en") -> dict: | |
| sess = (session or "").strip() | |
| if not re.fullmatch(r"[a-f0-9]{6,32}", sess): | |
| sess = uuid.uuid4().hex[:8] | |
| try: | |
| for event in _scroll_live_phrase(sess, (text or "").strip(), audio_b64 or "", lang or "en"): | |
| event.setdefault("session", sess) | |
| yield event | |
| except Exception as exc: # noqa: BLE001 - quota guard. | |
| message = str(exc).lower() | |
| stage = "quota_exceeded" if any(t in message for t in ("gpu", "quota", "zerogpu", "exceeded")) else "error" | |
| yield {"stage": stage, "error": str(exc)[:200], "session": sess} | |
| def _postprocess_world(session: str, lang: str = "en"): | |
| """Director's Cut POST-PROCESS (Tier-1). 100% SEPARATE from the live painter: never calls | |
| _scroll_live_phrase, only READS the finished section-*.webp off disk, fail-isolated. Stitches the panorama, | |
| lets the MiniCPM-V-4.6 "Art Director" name it from the PIXELS (env-gated, try/except center-fallback to the | |
| live text title), then renders a gentle clamped depth-parallax pan-through to world.mp4 (served by the | |
| existing /frames-scroll route). torchvision is required by the seer's processor (in requirements.txt).""" | |
| import math | |
| import time | |
| import numpy as np | |
| import torch | |
| from PIL import Image | |
| _t0 = time.monotonic() # soft budget vs the @spaces.GPU(170s) hard kill (see the render loop) | |
| out_dir = _SCROLL_ROOT / session | |
| strips = ( | |
| sorted(out_dir.glob("section-*.webp"), key=lambda p: int(re.match(r"section-(\d+)", p.stem).group(1))) | |
| if out_dir.is_dir() | |
| else [] | |
| ) | |
| if not strips: | |
| yield {"stage": "postprocess_error", "error": "no_strips"} | |
| return | |
| # 1) STITCH the finished strips into ONE panorama. Read each strip fail-soft: a 0-byte/truncated | |
| # WEBP (a live paint killed mid-save by a worker recycle / OOM / full disk) must NOT sink the whole | |
| # keepsake — skip it, mirroring the live carry-over read guards. One bad strip != lost film. | |
| imgs = [] | |
| for p in strips: | |
| try: | |
| imgs.append(Image.open(p).convert("RGB")) | |
| except Exception: # noqa: BLE001 - drop one corrupt/partial strip, never lose the film | |
| continue | |
| if not imgs: | |
| yield {"stage": "postprocess_error", "error": "no_readable_strips"} | |
| return | |
| h0 = max(i.height for i in imgs) | |
| w0 = sum(i.width for i in imgs) | |
| pano = Image.new("RGB", (w0, h0)) | |
| x = 0 | |
| for im in imgs: | |
| pano.paste(im, (x, 0)) | |
| x += im.width | |
| # A long session is a VERY wide image (e.g. 46 strips x 556px ~= 25k px) and WEBP hard-caps at | |
| # 16383px (the bug that made long worlds fail with "Couldn't save") — persist the keepsake | |
| # panorama as JPEG (caps at 65535px) and fail-soft: a save error must NEVER sink the film. | |
| pano_url = "" | |
| try: | |
| pano.save(out_dir / "panorama.jpg", "JPEG", quality=90) | |
| pano_url = f"/frames-scroll/{session}/panorama.jpg" | |
| except Exception: # noqa: BLE001 - the full-panorama download is a bonus; the MP4 is the keepsake | |
| pano_url = "" | |
| yield {"stage": "stitched", "w": w0, "h": h0, "panorama": pano_url} | |
| # 2) ART DIRECTOR (MiniCPM-V-4.6) — env-gated, try/except CENTER fallback so it can NEVER break the MP4 | |
| state = _live_state(out_dir) | |
| seer = { | |
| "title": (state.get("title") or "").strip(), | |
| "caption": "", | |
| "field_notes": [], | |
| "focal_points": [{"x": 0.5, "y": 0.5, "label": ""}], | |
| } | |
| if os.getenv("LIGHTLOOM_WORLD_SEER", "1") == "1": | |
| try: | |
| from lightloom.paint.prompt import scrub_unsafe | |
| from lightloom.vision.world_seer import load_world_seer, see_world | |
| proc, vmodel = load_world_seer() | |
| see_in = pano.copy() | |
| see_in.thumbnail((1536, 1536)) # bound the VLM token count | |
| seer = see_world(proc, vmodel, see_in, seed=len(strips), lang=lang) | |
| seer["title"] = scrub_unsafe(seer.get("title") or "")[0] | |
| seer["caption"] = scrub_unsafe(seer.get("caption") or "")[0] | |
| except Exception as exc: # noqa: BLE001 - naming is optional; never block the keepsake MP4 | |
| yield {"stage": "seer_skipped", "error": str(exc)[:160]} | |
| # The live text title was removed, so state has none and the seer's own _coerce fallback is bypassed | |
| # whenever the seer is OFF or throws -> a nameless keepsake. Name it here so it is NEVER blank. | |
| if not seer["title"].strip(): | |
| from lightloom.vision.world_seer import fallback_title | |
| seer["title"] = fallback_title(len(strips)) | |
| yield { | |
| "stage": "titled", | |
| "title": seer["title"], | |
| "caption": seer["caption"], | |
| "field_notes": seer["field_notes"], | |
| } | |
| # 3) RAW float32 depth on the (downscaled) panorama -> one near/far scale (NOT the live 8-bit PNG) | |
| from lightloom.depth.depth import estimate_depth, load_depth_pipeline | |
| from lightloom.export.ldi import encode_frames_ffmpeg, load_caption_fonts, make_caption_overlay, normalize_disparity | |
| depth_pipe = load_depth_pipeline() | |
| render_h = 540 | |
| # A long world can be ~25k px wide; cap the working width so depth + warp stay bounded (GPU | |
| # memory, DA-V2 input) and the pan never becomes a dizzying blur across a huge panorama. | |
| max_render_w = 8192 | |
| render_w = max(2, int(round(w0 * render_h / h0))) | |
| render_w = min(render_w, max_render_w) | |
| render_w -= render_w % 2 | |
| render_pano = pano.resize((render_w, render_h)) | |
| raw_d = estimate_depth(depth_pipe, render_pano, normalize=False) | |
| nd = normalize_disparity(np.asarray(raw_d, dtype=np.float32)) | |
| # 4) RENDER a CALM, SLOW, constant pan across the panorama -> a keepsake you can actually FOLLOW and read. | |
| # The dizzying motion is gone ON PURPOSE: no focal whip-glides (dead-still then a 60px/frame rush), no | |
| # oscillating parallax sway, no zoom pulse. Just a gentle constant drift. The real, NAVIGABLE 3D is the | |
| # interactive WebGL view; this MP4 is a calm pan, and the full stitched panorama is a separate strip download. | |
| img_full = torch.from_numpy(np.asarray(render_pano, dtype=np.float32).transpose(2, 0, 1)[None] / 255.0).to("cuda") | |
| # Save render-sized, ALIGNED color + depth sidecars (<=8192 wide) so the browser can build a navigable | |
| # depth-displaced 3D mesh CLIENT-side (no server GPU at view time). Fail-soft: a bonus, never blocks the MP4. | |
| explore_color, explore_depth = "", "" | |
| try: | |
| render_pano.save(out_dir / "panorama-explore.jpg", "JPEG", quality=90) | |
| Image.fromarray((np.clip(nd, 0, 1) * 255).astype(np.uint8)).save(out_dir / "panorama-depth.png") | |
| explore_color = f"/frames-scroll/{session}/panorama-explore.jpg" | |
| explore_depth = f"/frames-scroll/{session}/panorama-depth.png" | |
| except Exception: # noqa: BLE001 | |
| explore_color = explore_depth = "" | |
| win_w = min(render_w, int(round(render_h * 16 / 9))) | |
| win_w -= win_w % 2 | |
| travel = max(0, render_w - win_w) | |
| fps = 24 | |
| # Constant CALM pan speed (~130 px/s in render space); clamp ~10s..30s. A very wide world pans no faster | |
| # than the 30s cap (it shows a slow slice; the whole world is the strip download), never a nauseating race. | |
| n_frames = int(min(720, max(240, round(travel / 130 * fps)))) | |
| # MiniCPM-V's reading of YOUR world, burned into the film: pre-render the title + caption lower-third | |
| # ONCE (content-identical every frame) and paste it per frame. Fail-soft -> the film renders text-free. | |
| _ov = None | |
| try: | |
| _tf, _bf = load_caption_fonts(Path(__file__).resolve().parent / "assets" / "fonts", render_h) | |
| _ov = make_caption_overlay(win_w, render_h, seer["title"], seer["caption"], _tf, _bf) | |
| except Exception: # noqa: BLE001 | |
| _ov = None | |
| frames_dir = out_dir / "frames" | |
| frames_dir.mkdir(parents=True, exist_ok=True) | |
| try: | |
| for k in range(n_frames): | |
| # SOFT time budget vs the @spaces.GPU(170s) hard kill: stop early (after a minimum) and encode | |
| # the frames rendered so far -> a shorter but valid film instead of nothing. | |
| if k >= 96 and time.monotonic() - _t0 > 150.0: | |
| break | |
| u = k / (n_frames - 1) if n_frames > 1 else 0.0 | |
| x0 = int(round(u * travel)) # constant-velocity drift -> predictable, never a surge | |
| frame = img_full[0, :, :, x0 : x0 + win_w].clamp(0, 1).mul(255).byte().cpu().numpy().transpose(1, 2, 0) | |
| frame_img = Image.fromarray(frame) | |
| if _ov is not None: | |
| frame_img.paste(_ov, (0, 0), _ov) # burn MiniCPM-V's title + caption (lower third) | |
| frame_img.save(frames_dir / f"frame-{k:04d}.webp", "WEBP", quality=90) | |
| if k % 48 == 0: | |
| yield {"stage": "rendering", "frame": k, "total": n_frames} | |
| yield {"stage": "encoding"} | |
| encode_frames_ffmpeg(frames_dir, out_dir / "world.mp4", fps=fps, height=1080) | |
| finally: | |
| for f in frames_dir.glob("frame-*.webp"): | |
| try: | |
| f.unlink() | |
| except OSError: | |
| pass | |
| yield { | |
| "stage": "ready", | |
| "video": f"/frames-scroll/{session}/world.mp4", | |
| "title": seer["title"], | |
| "caption": seer["caption"], | |
| "panorama": pano_url, | |
| "explore_color": explore_color, | |
| "explore_depth": explore_depth, | |
| "field_notes": seer.get("field_notes") or [], | |
| "focal_points": seer.get("focal_points") or [], | |
| } | |
| def postprocess_api(session: str = "", lang: str = "en") -> dict: | |
| sess = (session or "").strip() | |
| if not re.fullmatch(r"[a-f0-9]{6,32}", sess): | |
| yield {"stage": "postprocess_error", "error": "bad_session"} | |
| return | |
| try: | |
| for event in _postprocess_world(sess, lang or "en"): | |
| event.setdefault("session", sess) | |
| yield event | |
| except Exception as exc: # noqa: BLE001 - quota guard; CANNOT affect the live painter (separate endpoint) | |
| message = str(exc).lower() | |
| # A ZeroGPU duration overrun raises "GPU task aborted" — do NOT mislabel that as account-quota | |
| # exhaustion ("out of GPU for today"); it means this one render was too long, so tell the user to | |
| # retry/shorten. Reserve quota_exceeded for the real schedule-time quota messages. | |
| if "task aborted" in message or "timeout" in message or "duration" in message: | |
| stage = "render_timeout" | |
| elif "quota" in message or "exceeded your" in message: | |
| stage = "quota_exceeded" | |
| else: | |
| stage = "postprocess_error" | |
| yield {"stage": stage, "error": str(exc)[:200], "session": sess} | |
| def _ask_world_gpu(session: str, question: str, lang: str = "en") -> dict[str, Any]: | |
| """View-time interactive Q&A: MiniCPM-V-4.6 reads the FINISHED panorama and answers a free-form question | |
| about it, grounded in the pixels. 100% INDEPENDENT of live image creation (post-process artifact). Fail-soft.""" | |
| from PIL import Image | |
| out_dir = _SCROLL_ROOT / session | |
| src = out_dir / "panorama-explore.jpg" | |
| if not src.is_file(): | |
| src = out_dir / "panorama.jpg" | |
| if not src.is_file(): | |
| return {"answer": ""} | |
| try: | |
| img = Image.open(src).convert("RGB") | |
| img.thumbnail((1536, 1536)) # bound the VLM token count | |
| from lightloom.paint.prompt import scrub_unsafe | |
| from lightloom.vision.world_seer import ask_world, load_world_seer | |
| proc, vmodel = load_world_seer() | |
| ans = ask_world(proc, vmodel, img, question, lang=lang) | |
| return {"answer": scrub_unsafe(ans or "")[0]} | |
| except Exception as exc: # noqa: BLE001 - the keepsake/world is untouched if the Q&A fails | |
| return {"answer": "", "error": str(exc)[:160]} | |
| def ask_api(session: str = "", question: str = "", lang: str = "en") -> dict: | |
| """"Ask Your World" — a short MiniCPM-V-4.6 Q&A over the finished world (the OpenBMB lever). Shares the | |
| single GPU pool with the live painter + postprocess, so it never overlaps a paint on ZeroGPU.""" | |
| sess = (session or "").strip() | |
| if not re.fullmatch(r"[a-f0-9]{6,32}", sess): | |
| yield {"answer": "", "error": "bad_session"} | |
| return | |
| q = (question or "").strip() | |
| if not q or len(q) > 400: | |
| yield {"answer": "", "error": "bad_question"} | |
| return | |
| try: | |
| yield _ask_world_gpu(sess, q, lang or "en") | |
| except Exception as exc: # noqa: BLE001 - quota guard; cannot affect the live painter | |
| message = str(exc).lower() | |
| stage = "quota_exceeded" if any(t in message for t in ("gpu", "quota", "zerogpu", "exceeded")) else "error" | |
| yield {"answer": "", "error": str(exc)[:200], "stage": stage} | |
| def _introspect_klein_on_gpu() -> dict[str, Any]: | |
| """Report the klein pipeline's call signature + LoRA/embeds capabilities, so we can | |
| build embedding-SLERP (prompt_embeds) and the style-LoRA fuse correctly (CLAUDE rule 4: | |
| read the real API instead of guessing).""" | |
| import inspect | |
| from lightloom.paint.scroll import load_fast_pipeline | |
| info: dict[str, Any] = {} | |
| try: | |
| pipe = load_fast_pipeline() | |
| sig = inspect.signature(pipe.__call__) | |
| params = list(sig.parameters.keys()) | |
| info.update( | |
| ok=True, | |
| pipe_class=type(pipe).__name__, | |
| call_params=params, | |
| accepts_prompt_embeds="prompt_embeds" in params, | |
| accepts_mask_image="mask_image" in params, | |
| has_encode_prompt=hasattr(pipe, "encode_prompt"), | |
| has_load_lora_weights=hasattr(pipe, "load_lora_weights"), | |
| has_fuse_lora=hasattr(pipe, "fuse_lora"), | |
| components=list(getattr(pipe, "components", {}).keys()), | |
| text_encoder=type(getattr(pipe, "text_encoder", None)).__name__, | |
| ) | |
| # probe encode_prompt's exact signature + return shapes so SLERP is built right | |
| info["encode_sig"] = list(inspect.signature(pipe.encode_prompt).parameters.keys()) | |
| try: | |
| dev = getattr(pipe, "_execution_device", "cuda") | |
| res = pipe.encode_prompt(prompt="an ancient glowing forest", device=dev, num_images_per_prompt=1) | |
| seq = res if isinstance(res, (tuple, list)) else (res,) | |
| info["encode_returns"] = [ | |
| (type(x).__name__ + (str(tuple(x.shape)) if hasattr(x, "shape") else "")) for x in seq | |
| ] | |
| except Exception as ee: # noqa: BLE001 | |
| info["encode_test_err"] = f"{type(ee).__name__}: {str(ee)[:240]}" | |
| except Exception as exc: # noqa: BLE001 - diagnostic must report, not crash. | |
| import traceback | |
| info.update(ok=False, error=f"{type(exc).__name__}: {exc!r}", trace=traceback.format_exc()[-800:]) | |
| return info | |
| def introspect_klein_endpoint() -> dict[str, Any]: | |
| if os.getenv("LIGHTLOOM_ENABLE_INTERNAL_BENCH", "0") != "1": | |
| return {"ok": False, "error": "internal endpoint disabled"} | |
| return {"ok": True, "data": _introspect_klein_on_gpu()} | |
| def director_probe_endpoint(backend: str = "transformers", threads: int = 8) -> dict[str, Any]: | |
| if os.getenv("LIGHTLOOM_ENABLE_INTERNAL_BENCH", "0") != "1": | |
| return {"ok": False, "error": "internal benchmark endpoint disabled"} | |
| return {"ok": True, "data": _director_probe_on_gpu(backend, threads)} | |
| def _beat_e2e_on_gpu(beat_text: str) -> dict[str, Any]: | |
| # Full single-beat pipeline: Director -> plan -> klein paint -> depth. Validates | |
| # the whole GPU chain end to end (Recital path; no ASR needed). Bounded to one beat. | |
| import time as _time | |
| from lightloom.core.pipeline import plan_shot | |
| from lightloom.depth.depth import estimate_depth, load_depth_pipeline | |
| from lightloom.director.director import generate_shot_transformers, load_director_transformers | |
| from lightloom.director.state import initial_state | |
| from lightloom.paint.klein import load_klein_pipeline, paint | |
| info: dict[str, Any] = {} | |
| try: | |
| t = _time.perf_counter() | |
| tokenizer, model = load_director_transformers() | |
| info["director_load_s"] = round(_time.perf_counter() - t, 2) | |
| state = initial_state() | |
| shot, meta = generate_shot_transformers(tokenizer, model, beat_text, state.model_dump()) | |
| info.update(director_s=round(meta["elapsed_s"], 2), attempts=meta.get("attempts"), decision=shot.decision) | |
| plan = plan_shot(state, shot) | |
| info["prompt"] = plan.prompt | |
| t = _time.perf_counter() | |
| klein = load_klein_pipeline() | |
| info["klein_load_s"] = round(_time.perf_counter() - t, 2) | |
| t = _time.perf_counter() | |
| image = paint(klein, plan.prompt, plan.references, width=plan.width, height=plan.height, seed=plan.seed) | |
| info["paint_s"] = round(_time.perf_counter() - t, 2) | |
| info["image_size"] = list(image.size) | |
| t = _time.perf_counter() | |
| depth_pipeline = load_depth_pipeline() | |
| depth_map = estimate_depth(depth_pipeline, image) | |
| info["depth_s"] = round(_time.perf_counter() - t, 2) | |
| info["depth_shape"] = list(depth_map.shape) | |
| out_dir = Path("/tmp/lightloom") | |
| out_dir.mkdir(parents=True, exist_ok=True) | |
| image.save(out_dir / "beat0.webp", "WEBP", quality=90) | |
| info["ok"] = True | |
| except Exception as exc: # noqa: BLE001 - diagnostic must report, not crash. | |
| import traceback | |
| info.update(ok=False, error=f"{type(exc).__name__}: {exc!r}", trace=traceback.format_exc()[-1200:]) | |
| return info | |
| def beat_e2e_endpoint(text: str = "Far away, a market woke under orange awnings and wet stone.") -> dict[str, Any]: | |
| if os.getenv("LIGHTLOOM_ENABLE_INTERNAL_BENCH", "0") != "1": | |
| return {"ok": False, "error": "internal benchmark endpoint disabled"} | |
| return {"ok": True, "data": _beat_e2e_on_gpu(text)} | |
| _GOLDEN_RECITAL = ( | |
| "An old lighthouse keeper counted the waves at the edge of the world.\n\n" | |
| "Every night he climbed the spiral stairs, counting each step like a prayer.\n\n" | |
| "Far away, a market woke under orange awnings and wet stone.\n\n" | |
| "Years later, the same road lay buried under snow and silence." | |
| ) | |
| def _run_recital_on_gpu(text: str) -> dict[str, Any]: | |
| # First real product E2E: a poem -> a Film. Models load once, then beats stream | |
| # through Director -> plan -> klein (KV continuity via prev frame) -> depth, with | |
| # SceneState threaded across beats. Returns a Film summary; frames saved to /tmp. | |
| import time as _time | |
| import numpy as np | |
| from PIL import Image | |
| from lightloom.core.pipeline import plan_shot | |
| from lightloom.core.recital import split_recital | |
| from lightloom.director.director import generate_shot_transformers, load_director_transformers | |
| from lightloom.director.state import initial_state | |
| from lightloom.depth.depth import estimate_depth, load_depth_pipeline | |
| from lightloom.paint.klein import load_klein_pipeline, paint | |
| try: | |
| beat_texts = split_recital(text, max_beats=8) | |
| tokenizer, model = load_director_transformers() | |
| klein = load_klein_pipeline() | |
| depth_pipeline = load_depth_pipeline() | |
| out_dir = Path("/tmp/lightloom/recital") | |
| out_dir.mkdir(parents=True, exist_ok=True) | |
| state = initial_state() | |
| prev_frame: str | None = None | |
| frames: list[dict[str, Any]] = [] | |
| gen_times: list[int] = [] | |
| fogged = 0 | |
| for i, beat_text in enumerate(beat_texts): | |
| t0 = _time.perf_counter() | |
| try: | |
| shot, _meta = generate_shot_transformers( | |
| tokenizer, model, beat_text, state.model_dump(), beat_id=i + 1 | |
| ) | |
| plan = plan_shot(state, shot, prev_frame=prev_frame) | |
| image = paint( | |
| klein, plan.prompt, plan.references, width=plan.width, height=plan.height, seed=plan.seed | |
| ) | |
| image_path = out_dir / f"beat{i}.webp" | |
| image.save(image_path, "WEBP", quality=90) | |
| depth_map = estimate_depth(depth_pipeline, image) | |
| Image.fromarray((depth_map * 255).astype(np.uint8)).save(out_dir / f"beat{i}_depth.png") | |
| gen_ms = int((_time.perf_counter() - t0) * 1000) | |
| gen_times.append(gen_ms) | |
| frames.append( | |
| { | |
| "beat_id": i + 1, | |
| "text": beat_text[:60], | |
| "decision": shot.decision, | |
| "shot_scale": shot.shot_scale, | |
| "camera_move": shot.camera_move, | |
| "transition": shot.transition, | |
| "refs_used": len(plan.references), | |
| "seed": plan.seed, | |
| "gen_ms": gen_ms, | |
| "image": str(image_path), | |
| } | |
| ) | |
| state = plan.new_state | |
| prev_frame = str(image_path) | |
| except Exception as beat_exc: # noqa: BLE001 - one fogged beat must not end the film. | |
| fogged += 1 | |
| frames.append({"beat_id": i + 1, "text": beat_text[:60], "fogged": f"{type(beat_exc).__name__}"}) | |
| cuts = sum(1 for f in frames if f.get("decision") == "cut") | |
| rendered = len(gen_times) | |
| return { | |
| "ok": True, | |
| "n_beats": len(beat_texts), | |
| "rendered": rendered, | |
| "fogged": fogged, | |
| "cuts": cuts, | |
| "continuity": rendered - cuts, | |
| "median_gen_ms": int(sorted(gen_times)[len(gen_times) // 2]) if gen_times else 0, | |
| "frames": frames, | |
| } | |
| except Exception as exc: # noqa: BLE001 - diagnostic must report, not crash. | |
| import traceback | |
| return {"ok": False, "error": f"{type(exc).__name__}: {exc!r}", "trace": traceback.format_exc()[-1200:]} | |
| def recital_bench_endpoint(text: str = _GOLDEN_RECITAL) -> dict[str, Any]: | |
| if os.getenv("LIGHTLOOM_ENABLE_INTERNAL_BENCH", "0") != "1": | |
| return {"ok": False, "error": "internal benchmark endpoint disabled"} | |
| return {"ok": True, "data": _run_recital_on_gpu(text)} | |
| # --- Real streaming product endpoint (the contract the frontend speaks to) --- | |
| SESSIONS_DIR = Path("/tmp/lightloom/sessions") | |
| def _recital_stream(text: str, lang: str, session: str): | |
| # @spaces.GPU supports generator functions (yield streams to the caller). Each | |
| # yield is one stage event the frontend renders. Models load once, then beats | |
| # stream Director -> paint (KV continuity) -> depth. | |
| import numpy as np | |
| from PIL import Image | |
| from lightloom.core.pipeline import plan_shot | |
| from lightloom.core.recital import split_recital | |
| from lightloom.depth.depth import estimate_depth, load_depth_pipeline | |
| from lightloom.director.director import generate_shot_transformers, load_director_transformers | |
| from lightloom.director.state import initial_state | |
| from lightloom.paint.klein import load_klein_pipeline, paint | |
| out_dir = SESSIONS_DIR / session | |
| out_dir.mkdir(parents=True, exist_ok=True) | |
| beats = split_recital(text, max_beats=8) | |
| yield {"stage": "warming", "n_beats": len(beats)} | |
| tokenizer, model = load_director_transformers() | |
| klein = load_klein_pipeline() | |
| depth_pipeline = load_depth_pipeline() | |
| yield {"stage": "ready", "n_beats": len(beats)} | |
| state = initial_state() | |
| prev_frame: str | None = None | |
| for i, beat_text in enumerate(beats): | |
| try: | |
| yield {"stage": "directing", "beat": i, "text": beat_text} | |
| shot, _meta = generate_shot_transformers(tokenizer, model, beat_text, state.model_dump(), beat_id=i + 1) | |
| plan = plan_shot(state, shot, prev_frame=prev_frame) | |
| yield { | |
| "stage": "directed", | |
| "beat": i, | |
| "shot": shot.model_dump(), | |
| "transition": plan.transition, # refined (continuity hard_cut -> crossfade) | |
| "scene_id": plan.new_state.scene_id, | |
| "refs": len(plan.references), | |
| } | |
| yield {"stage": "painting", "beat": i} | |
| image = paint(klein, plan.prompt, plan.references, width=plan.width, height=plan.height, seed=plan.seed) | |
| image.save(out_dir / f"beat{i}.webp", "WEBP", quality=90) | |
| yield {"stage": "painted", "beat": i, "image": f"/frames/{session}/beat{i}.webp"} | |
| depth_map = estimate_depth(depth_pipeline, image) | |
| Image.fromarray((depth_map * 255).astype(np.uint8)).save(out_dir / f"beat{i}_depth.png") | |
| yield {"stage": "depth", "beat": i, "depth": f"/frames/{session}/beat{i}_depth.png"} | |
| state = plan.new_state | |
| prev_frame = str(out_dir / f"beat{i}.webp") | |
| except Exception as exc: # noqa: BLE001 - one fogged beat must not end the film. | |
| yield {"stage": "fogged", "beat": i, "error": type(exc).__name__} | |
| yield {"stage": "done", "session": session} | |
| async def recital_endpoint(request: Request) -> StreamingResponse: | |
| body = await request.json() | |
| text = (body.get("text") or _GOLDEN_RECITAL).strip() | |
| lang = body.get("lang", "en") | |
| session = uuid.uuid4().hex[:8] | |
| def event_stream(): | |
| try: | |
| for event in _recital_stream(text, lang, session): | |
| yield f"data: {json.dumps(event)}\n\n" | |
| except Exception as exc: # noqa: BLE001 | |
| yield f"data: {json.dumps({'stage': 'error', 'error': str(exc)[:200]})}\n\n" | |
| return StreamingResponse(event_stream(), media_type="text/event-stream") | |
| def frame_endpoint(session: str, name: str): | |
| safe_session = Path(session).name | |
| safe_name = Path(name).name | |
| path = SESSIONS_DIR / safe_session / safe_name | |
| if not path.is_file(): | |
| return JSONResponse({"error": "not found"}, status_code=404) | |
| return FileResponse(path) | |
| # The frontend MUST reach the GPU pipeline through @gradio/client (Client.submit), | |
| # not raw fetch, or ZeroGPU cannot attribute quota. @app.api streams each yield of | |
| # the @spaces.GPU generator as an SSE event; pin all GPU endpoints to one | |
| # concurrency pool so they never run two GPU jobs at once. | |
| def recital_api(text: str = _GOLDEN_RECITAL, lang: str = "en") -> dict: | |
| # The `-> dict` return hint is how Gradio's api() derives one streamed JSON | |
| # output (get_return_types reads the annotation); each yield is one SSE event. | |
| session = uuid.uuid4().hex[:8] | |
| try: | |
| for event in _recital_stream((text or _GOLDEN_RECITAL).strip(), lang, session): | |
| yield event | |
| except Exception as exc: # noqa: BLE001 - quota guard: degrade to Showcase, never a stack trace on stage. | |
| message = str(exc).lower() | |
| if any(token in message for token in ("gpu", "quota", "zerogpu", "exceeded")): | |
| yield {"stage": "quota_exceeded", "detail": str(exc)[:200]} | |
| else: | |
| yield {"stage": "error", "error": str(exc)[:200]} | |
| # --- ZeroGPU cold-start: load the runtime models at MODULE SCOPE so the ZeroGPU backend | |
| # disk-packs them at Space startup and streams them into VRAM per fork (fast) instead of | |
| # paying a full from_pretrained inside the first @spaces.GPU call (~30-40s). Best-effort + | |
| # env-gated: any failure (or LIGHTLOOM_PRELOAD=0) silently falls back to lazy in-fork loading, | |
| # so the Space always boots. --- | |
| if LIGHTLOOM_PROFILE == "space" and os.getenv("LIGHTLOOM_PRELOAD", "1") == "1": | |
| try: | |
| from lightloom.audio_in.asr import load_asr as _pl_asr | |
| from lightloom.depth.depth import load_depth_pipeline as _pl_depth | |
| from lightloom.director.director import load_director_transformers as _pl_dir | |
| from lightloom.paint.scroll import load_fast_pipeline as _pl_paint | |
| _pl_paint(); _pl_dir(); _pl_depth(); _pl_asr() | |
| print("[lightloom] module-scope model preload complete") | |
| except Exception as _exc: # noqa: BLE001 - lazy in-fork loading still works. | |
| print(f"[lightloom] module-scope preload skipped: {type(_exc).__name__}: {_exc}") | |
| if __name__ == "__main__": | |
| app.launch( | |
| allowed_paths=[str(SESSIONS_DIR), str(ROOT / "frontend"), str(ROOT / "assets")], | |
| pwa=True, | |
| strict_cors=True, | |
| ) | |