"""Lightloom Gradio Server entrypoint.
Parameter contribution: 0B. Runtime endpoints expose app health and private
benchmark hooks; model loading stays inside explicit benchmark/pipeline calls.
"""
from __future__ import annotations
import json
import os
import re
from pathlib import Path
import sys
from typing import Any
import uuid
import gradio as gr
import spaces
from fastapi import Request
from fastapi.responses import FileResponse, HTMLResponse, JSONResponse, StreamingResponse
ROOT = Path(__file__).resolve().parent
sys.path.insert(0, str(ROOT / "src"))
from lightloom.compliance.params_ledger import entries, total_runtime_params
from lightloom.core.config import CONFIG, LIGHTLOOM_PROFILE
app = gr.Server(title="Lightloom", version="0.1.0")
@spaces.GPU(duration=120)
def _run_g1_on_gpu() -> dict[str, Any]:
from benchmarks.gate_g1 import run
return run(dry_run=False, reps=int(os.getenv("LIGHTLOOM_G1_REPS", "5")), allow_local=False)
@spaces.GPU(duration=int(os.getenv("LIGHTLOOM_G23_DURATION", "180")))
def _run_g2_g3_on_gpu() -> tuple[dict[str, Any], dict[str, Any]]:
# Gate G3 hypothesis: the Director's ~1 tok/s came from the throttled
# persistent CPU. Running the llama.cpp Director here gives a real CPU
# allocation. If this still exceeds the 2.5 s/beat budget, escalate to GPU.
from benchmarks.gate_g2_g3 import run
return run(dry_run=False)
_PROBE_BEAT = "Far away, a market woke under orange awnings and wet stone."
@spaces.GPU(duration=110)
def _director_probe_on_gpu(backend: str, n_threads: int) -> dict[str, Any]:
# Bounded single-beat diagnostic. transformers = the GPU Director we now ship
# (G3: llama.cpp CPU was ~1 tok/s). Soft errors are caught so we return JSON.
import time as _time
info: dict[str, Any] = {"backend": backend, "cpu_count_in_gpu": os.cpu_count()}
t0 = _time.perf_counter()
try:
if backend == "transformers":
from lightloom.director.director import generate_shot_transformers, load_director_transformers
tokenizer, model = load_director_transformers()
info["load_s"] = round(_time.perf_counter() - t0, 2)
shot, meta = generate_shot_transformers(tokenizer, model, _PROBE_BEAT)
else:
from lightloom.director.director import generate_shot, load_director
info["n_threads"] = n_threads
llm = load_director(n_threads=n_threads)
info["load_s"] = round(_time.perf_counter() - t0, 2)
shot, meta = generate_shot(llm, _PROBE_BEAT)
info.update(
ok=True,
elapsed_s=round(meta["elapsed_s"], 2),
completion_tokens=meta["completion_tokens"],
tok_s=round(meta["tok_s"], 2) if meta["tok_s"] else None,
decision=shot.decision,
coerced=meta["coerced"],
attempts=meta.get("attempts"),
)
except Exception as exc: # noqa: BLE001 - diagnostic must report, not crash.
import traceback
info.update(ok=False, error=f"{type(exc).__name__}: {exc!r}", trace=traceback.format_exc()[-900:])
return info
FRONTEND_DIR = ROOT / "frontend"
ASSETS_DIR = ROOT / "assets"
_PLACEHOLDER_HTML = """
Lightloom
Lightloom
Private build. The projector is being assembled.
health
"""
def _serve_under(base: Path, rel: str):
"""Serve a file under ``base`` with path-traversal protection."""
target = (base / rel).resolve()
try:
target.relative_to(base.resolve())
except ValueError:
return JSONResponse({"error": "forbidden"}, status_code=403)
if not target.is_file():
return JSONResponse({"error": "not found"}, status_code=404)
# NO-CACHE: revalidate every load so a deploy's fresh JS/CSS reaches the browser without a hard
# refresh. Without this the browser served a STALE controller.js (new UI like "Ask Your World" and
# the fast-transcript fix silently missing until Ctrl+Shift+R). FileResponse still sends ETag/
# Last-Modified, so unchanged files return a cheap 304.
return FileResponse(target, headers={"Cache-Control": "no-cache, max-age=0, must-revalidate"})
@app.get("/", response_class=HTMLResponse)
def index() -> Any:
html = FRONTEND_DIR / "index.html"
if html.is_file():
return FileResponse(
html, media_type="text/html",
headers={"Cache-Control": "no-cache, max-age=0, must-revalidate"},
)
return HTMLResponse(_PLACEHOLDER_HTML)
@app.get("/frontend/{rel:path}")
def serve_frontend(rel: str):
return _serve_under(FRONTEND_DIR, rel)
@app.get("/assets/{rel:path}")
def serve_assets(rel: str):
return _serve_under(ASSETS_DIR, rel)
@app.get("/health")
def health() -> dict[str, Any]:
return {
"app": "lightloom",
"profile": LIGHTLOOM_PROFILE,
"config": {
"width": CONFIG.width,
"height": CONFIG.height,
"flux_dtype": CONFIG.flux_dtype,
"flux_aot": CONFIG.flux_aot,
"showcase_only": CONFIG.showcase_only,
},
"params_total": total_runtime_params(),
"params_limit": 32_000_000_000,
"ledger": [entry.__dict__ for entry in entries()],
"privacy_mode": os.getenv("LIGHTLOOM_PRIVACY_MODE", "1") == "1",
"cpu_count": os.cpu_count(),
"director_backend": CONFIG.director_backend,
"build": "color-styles-1",
}
@app.post("/internal/bench/g1")
def run_g1_endpoint() -> dict[str, Any]:
if os.getenv("LIGHTLOOM_ENABLE_INTERNAL_BENCH", "0") != "1":
return {"ok": False, "error": "internal benchmark endpoint disabled"}
data = _run_g1_on_gpu()
return {
"ok": True,
"data": data,
"decision": data["decision"],
"result_path": str(Path("benchmarks/results/g1.json")),
}
@app.post("/internal/bench/g2-g3")
def run_g2_g3_endpoint() -> dict[str, Any]:
if os.getenv("LIGHTLOOM_ENABLE_INTERNAL_BENCH", "0") != "1":
return {"ok": False, "error": "internal benchmark endpoint disabled"}
g2, g3 = _run_g2_g3_on_gpu()
return {"ok": True, "g2": g2, "g3": g3}
@app.post("/internal/warm")
def warm_endpoint(which: str = "director,asr") -> dict[str, Any]:
# Download weights in the (CPU) persistent process — no GPU spent — so the GPU
# probe/beat loads from cache instead of downloading on GPU time.
if os.getenv("LIGHTLOOM_ENABLE_INTERNAL_BENCH", "0") != "1":
return {"ok": False, "error": "internal benchmark endpoint disabled"}
from huggingface_hub import snapshot_download
from lightloom.core.config import MODEL_REFS
cached: dict[str, str] = {}
for key in which.split(","):
key = key.strip()
ref = MODEL_REFS.get(key)
if ref is None:
continue
cached[key] = str(snapshot_download(ref.repo_id, revision=ref.revision))
return {"ok": True, "cached": cached}
@spaces.GPU(duration=120)
def _asr_probe_on_gpu() -> dict[str, Any]:
import time as _time
from huggingface_hub import hf_hub_download
from lightloom.audio_in.asr import load_asr, read_wav, transcribe
from lightloom.core.config import MODEL_REFS
info: dict[str, Any] = {}
try:
ref = MODEL_REFS["asr"]
wav = hf_hub_download(ref.repo_id, "demo/voxpopuli_test_en_demo.wav", revision=ref.revision)
audio, sr = read_wav(wav)
info["wav_sr"] = sr
t = _time.perf_counter()
processor, model = load_asr()
info["load_s"] = round(_time.perf_counter() - t, 2)
t = _time.perf_counter()
text = transcribe(processor, model, audio, sampling_rate=sr, language="en")
info["transcribe_s"] = round(_time.perf_counter() - t, 2)
info.update(ok=True, text=text[:300])
except Exception as exc: # noqa: BLE001 - diagnostic must report, not crash.
import traceback
info.update(ok=False, error=f"{type(exc).__name__}: {exc!r}", trace=traceback.format_exc()[-1100:])
return info
@app.post("/internal/bench/asr-probe")
def asr_probe_endpoint() -> dict[str, Any]:
if os.getenv("LIGHTLOOM_ENABLE_INTERNAL_BENCH", "0") != "1":
return {"ok": False, "error": "internal benchmark endpoint disabled"}
return {"ok": True, "data": _asr_probe_on_gpu()}
# --- VOICE input (the "speak" in "speak, and watch your story become a world"):
# Cohere Transcribe turns the narrator's mic audio into text, which then drives the
# same panorama pipeline. The browser captures a 16 kHz mono PCM WAV and sends it
# base64-encoded; read_wav decodes it with the stdlib (no ffmpeg needed). ---
@spaces.GPU(duration=60)
def _transcribe_on_gpu(wav_path: str, lang: str) -> dict[str, Any]:
from lightloom.audio_in.asr import load_asr, read_wav, transcribe
audio, sr = read_wav(wav_path)
processor, model = load_asr()
text = transcribe(processor, model, audio, sampling_rate=sr, language=lang)
return {"text": text}
@app.api(name="transcribe", concurrency_id="gpu", concurrency_limit=1, stream_every=0.05)
def transcribe_api(audio_b64: str = "", lang: str = "en") -> dict:
import base64
import tempfile
yield {"stage": "transcribing"}
path = None
try:
payload = audio_b64.split(",")[-1] if audio_b64 else ""
if len(payload) > 8_000_000: # ~6 MB decoded; reject BEFORE decode (OOM / /tmp-fill DoS guard)
yield {"stage": "error", "error": "audio_too_large"}
return
raw = base64.b64decode(payload) if payload else b""
if len(raw) < 64: # nothing meaningful captured
yield {"stage": "error", "error": "empty_audio"}
return
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as handle:
handle.write(raw)
path = handle.name
result = _transcribe_on_gpu(path, lang)
yield {"stage": "transcript", "text": (result.get("text") or "").strip()}
except Exception as exc: # noqa: BLE001 - quota/decoding guard.
message = str(exc).lower()
stage = "quota_exceeded" if any(t in message for t in ("gpu", "quota", "zerogpu", "exceeded")) else "error"
yield {"stage": stage, "error": str(exc)[:200]}
finally:
if path: # never leave the temp WAV behind (would grow /tmp unbounded)
try:
os.unlink(path)
except OSError:
pass
# --- Panorama (Voice-Scroll-in-3D) verification: klein-base + the 360 ERP outpaint
# LoRA produce a 2:1 equirectangular panorama via Flux2 inpaint. Code-only (diffusers
# already ships Flux2KleinInpaintPipeline + load_lora_weights). ---
_PANO_BASE = "black-forest-labs/FLUX.2-klein-base-4B"
_PANO_LORA = "nomadoor/flux-2-klein-4B-360-erp-outpaint-lora"
_PANO_TRIGGER = (
"Fill the green spaces according to the image. Outpaint as a seamless 360 "
"equirectangular panorama (2:1). Keep the horizon level. Match left and right edges. "
)
@app.post("/internal/warm-panorama")
def warm_panorama_endpoint() -> dict[str, Any]:
if os.getenv("LIGHTLOOM_ENABLE_INTERNAL_BENCH", "0") != "1":
return {"ok": False, "error": "internal benchmark endpoint disabled"}
from huggingface_hub import snapshot_download
base = snapshot_download(_PANO_BASE)
lora = snapshot_download(_PANO_LORA)
return {"ok": True, "base": str(base)[-40:], "lora": str(lora)[-40:]}
@spaces.GPU(duration=120)
def _panorama_probe_on_gpu(prompt: str) -> dict[str, Any]:
import time as _time
import torch
from diffusers import Flux2KleinInpaintPipeline
from PIL import Image
info: dict[str, Any] = {}
try:
t = _time.perf_counter()
pipe = Flux2KleinInpaintPipeline.from_pretrained(_PANO_BASE, torch_dtype=torch.bfloat16).to("cuda")
pipe.load_lora_weights(_PANO_LORA)
info["load_s"] = round(_time.perf_counter() - t, 2)
# Seed canvas: green right 60% (to outpaint) + a neutral horizon band on the
# left 40% so the LoRA has structure to extend (keep-horizon-level training).
w, h = 1024, 512
canvas = Image.new("RGB", (w, h), (0, 255, 0))
seed = Image.new("RGB", (int(w * 0.4), h), (120, 150, 190)) # sky-ish
for y in range(h // 2, h): # ground band
for x in range(seed.width):
seed.putpixel((x, y), (90, 80, 70))
canvas.paste(seed, (0, 0))
mask = Image.new("L", (w, h), 0)
mask.paste(Image.new("L", (w - seed.width, h), 255), (seed.width, 0)) # inpaint the green
t = _time.perf_counter()
out = pipe(
prompt=_PANO_TRIGGER + prompt,
image=canvas,
mask_image=mask,
height=h,
width=w,
num_inference_steps=20,
guidance_scale=3.0,
strength=1.0,
)
pano = out.images[0]
info["gen_s"] = round(_time.perf_counter() - t, 2)
out_dir = Path("/tmp/lightloom/pano")
out_dir.mkdir(parents=True, exist_ok=True)
pano.save(out_dir / "pano.webp", "WEBP", quality=92)
info.update(ok=True, size=list(pano.size), image="/frames-pano/pano.webp")
except Exception as exc: # noqa: BLE001 - diagnostic must report, not crash.
import traceback
info.update(ok=False, error=f"{type(exc).__name__}: {exc!r}", trace=traceback.format_exc()[-1400:])
return info
@app.post("/internal/bench/panorama-probe")
def panorama_probe_endpoint(prompt: str = "an old lighthouse on a cliff at the edge of the world, stormy dusk") -> dict[str, Any]:
if os.getenv("LIGHTLOOM_ENABLE_INTERNAL_BENCH", "0") != "1":
return {"ok": False, "error": "internal benchmark endpoint disabled"}
return {"ok": True, "data": _panorama_probe_on_gpu(prompt)}
@app.get("/frames-pano/{name}")
def pano_frame_endpoint(name: str):
path = Path("/tmp/lightloom/pano") / Path(name).name
if not path.is_file():
return JSONResponse({"error": "not found"}, status_code=404)
return FileResponse(path)
# --- THE PANORAMA WORLD (Voice-Scroll-in-3D): narration -> one immersive
# equirectangular world (klein-base + 360 LoRA) + its depth -> the browser renders
# it as a sphere the camera lives inside. Streams via @app.api/@gradio/client. ---
@spaces.GPU(duration=110)
def _panorama_world(text: str, session: str):
import numpy as np
from PIL import Image
from lightloom.depth.depth import estimate_depth, load_depth_pipeline
from lightloom.director.director import generate_shot_transformers, load_director_transformers
from lightloom.director.state import initial_state
from lightloom.paint.panorama import generate_panorama, load_panorama_pipeline
out_dir = Path("/tmp/lightloom/pano") / session
out_dir.mkdir(parents=True, exist_ok=True)
yield {"stage": "warming"}
# Director turns the narration into one vivid establishing scene for the world.
tokenizer, model = load_director_transformers()
first = (text or "").strip().split("\n\n")[0][:300] or "a world at the edge of the world"
shot, _m = generate_shot_transformers(tokenizer, model, first, initial_state().model_dump())
scene = f"{shot.image_prompt_en}, {shot.lighting}"
yield {"stage": "directed", "shot": shot.model_dump(), "scene": scene[:160]}
yield {"stage": "painting"}
pano_pipe = load_panorama_pipeline()
pano = generate_panorama(pano_pipe, scene)
pano.save(out_dir / "world.webp", "WEBP", quality=92)
yield {"stage": "painted", "panorama": f"/frames-pano/{session}/world.webp"}
depth_pipeline = load_depth_pipeline()
dmap = estimate_depth(depth_pipeline, pano)
Image.fromarray((dmap * 255).astype(np.uint8)).save(out_dir / "world_depth.png")
yield {"stage": "depth", "depth": f"/frames-pano/{session}/world_depth.png"}
yield {"stage": "done", "session": session}
@app.api(name="panorama", concurrency_id="gpu", concurrency_limit=1, stream_every=0.05)
def panorama_api(text: str = "", lang: str = "en") -> dict:
# Legacy/showcase generator — loads klein-BASE + a 3rd-party 360 LoRA that are NOT in the
# runtime ledger; gate it like the /internal probes so no LIVE judge-facing route can load an
# un-declared model. The live experience is /scroll_live (the four-model ledger path); this
# stays only for offline showcase/bench builds.
if os.getenv("LIGHTLOOM_ENABLE_INTERNAL_BENCH", "0") != "1":
yield {"stage": "error", "error": "panorama endpoint disabled (showcase/bench only)"}
return
# default resolves at call time (when _GOLDEN_RECITAL, defined later, exists)
session = uuid.uuid4().hex[:8]
try:
for event in _panorama_world((text or _GOLDEN_RECITAL).strip(), session):
yield event
except Exception as exc: # noqa: BLE001 - quota guard.
message = str(exc).lower()
stage = "quota_exceeded" if any(t in message for t in ("gpu", "quota", "zerogpu", "exceeded")) else "error"
yield {"stage": stage, "error": str(exc)[:200]}
@app.get("/frames-pano/{session}/{name}")
def pano_world_frame(session: str, name: str):
path = Path("/tmp/lightloom/pano") / Path(session).name / Path(name).name
if not path.is_file():
return JSONResponse({"error": "not found"}, status_code=404)
return FileResponse(path)
# --- THE INFINITE 3D CORRIDOR (the live experience). Each phrase the narrator
# speaks becomes a vivid, forward-facing scene: the Director (MiniCPM) turns the
# phrase into a shot, a content-driven style is chosen per scene, the fast 4-step
# klein painter renders it, Depth-Anything gives it relief, and each scene streams
# into the browser's corridor where the camera flies forward through the growing
# world. "speak, and your world unrolls ahead of you." ---
_CORRIDOR_QUALITY = (
"cinematic anamorphic still, shot on 35mm film, shallow depth of field, volumetric god rays, "
"golden-hour rim light, rich saturated color, ultra detailed, sharp focus, subtle film grain, "
"ARRI Alexa, breathtaking establishing shot, 8k"
)
def _corridor_style(shot: Any) -> str:
"""Pick a per-scene art style from the Director's content/lighting so each scene
has its own colorful look (the Director decides via what it described)."""
text = f"{getattr(shot, 'lighting', '')} {getattr(shot, 'image_prompt_en', '')}".lower()
def has(*ks: str) -> bool:
return any(k in text for k in ks)
if has("storm", "dusk", "night", "dark", "rain", "thunder"):
return "dramatic moody cinematic concept art, deep shadows, glowing accents"
if has("morning", "dawn", "sunrise", "golden", "warm", "sunset"):
return "warm golden-hour illustration, glowing vibrant color"
if has("snow", "ice", "winter", "cold", "frost"):
return "crisp luminous winter concept art, cool blue-and-white palette"
if has("market", "city", "street", "village", "town", "harbor"):
return "vibrant colorful illustration, bustling saturated detail"
if has("forest", "garden", "meadow", "jungle", "flower", "tree", "green"):
return "lush verdant fantasy concept art, vivid greens, dappled light"
if has("sea", "ocean", "water", "wave", "river", "lake"):
return "cinematic seascape concept art, luminous water, dramatic sky"
if has("desert", "sand", "dune"):
return "surreal desert dreamscape, warm amber tones, vast scale"
return "vivid cinematic concept art, rich saturated color, dramatic light"
def _split_phrases(text: str, cap: int = 6) -> list[str]:
import re
text = (text or "").strip()
if not text:
return []
parts = re.split(r"(?<=[.!?…])\s+|\n+", text)
phrases: list[str] = []
for raw in parts:
p = raw.strip()
if not p:
continue
if phrases and len(p) < 18: # glue stray fragments to the prior phrase
phrases[-1] = (phrases[-1] + " " + p)[:240]
else:
phrases.append(p[:240])
return phrases[:cap]
@spaces.GPU(duration=170)
def _corridor_world(text: str, session: str):
import numpy as np
from PIL import Image
from lightloom.depth.depth import estimate_depth, load_depth_pipeline
from lightloom.director.director import generate_shot_transformers, load_director_transformers
from lightloom.director.state import initial_state
from lightloom.paint.klein import load_klein_pipeline, paint
out_dir = Path("/tmp/lightloom/world") / session
out_dir.mkdir(parents=True, exist_ok=True)
phrases = _split_phrases(text) or ["a luminous world at the edge of the world"]
yield {"stage": "warming", "scenes": len(phrases)}
tokenizer, model = load_director_transformers()
klein = load_klein_pipeline()
depth_pipe = load_depth_pipeline()
# NEUTRAL state per scene (not the lighthouse-keeper seed in initial_state) so each
# scene is driven by what the narrator actually said — no "lighthouses everywhere".
from lightloom.director.state import SceneState
neutral = SceneState(
characters=[], setting="", lighting="", mood="",
palette=("#202428", "#7d8794", "#e6ddc8"),
).model_dump()
for i, phrase in enumerate(phrases):
yield {"stage": "directing", "index": i, "text": phrase}
try:
shot, _m = generate_shot_transformers(tokenizer, model, phrase, dict(neutral))
style = _corridor_style(shot)
subject = (shot.image_prompt_en or phrase or "a vivid scene").strip()
lighting = (shot.lighting or "cinematic light").strip()
scene_prompt = f"{subject}, {lighting}. {style}, {_CORRIDOR_QUALITY}"
yield {"stage": "painting", "index": i, "scene": scene_prompt[:150]}
img = paint(klein, scene_prompt, width=1024, height=576, steps=6, seed=1900 + i * 7)
img.save(out_dir / f"scene-{i}.webp", "WEBP", quality=90)
yield {
"stage": "scene",
"index": i,
"image": f"/frames-world/{session}/scene-{i}.webp",
"caption": phrase[:160],
"shot": shot.model_dump(),
}
dmap = estimate_depth(depth_pipe, img)
# robust percentile normalization so one outlier pixel can't blow out the
# range -> cleaner, better-anchored parallax (research Tier-1 depth recipe).
lo, hi = float(np.percentile(dmap, 2)), float(np.percentile(dmap, 98))
if hi > lo:
dmap = np.clip((dmap - lo) / (hi - lo), 0.0, 1.0)
Image.fromarray((dmap * 255).astype(np.uint8)).save(out_dir / f"scene-{i}-depth.png")
yield {"stage": "depth", "index": i, "depth": f"/frames-world/{session}/scene-{i}-depth.png"}
except Exception as exc: # noqa: BLE001 - one bad scene must not end the journey.
yield {"stage": "scene_error", "index": i, "error": str(exc)[:160]}
yield {"stage": "done", "session": session, "count": len(phrases)}
@app.api(name="world", concurrency_id="gpu", concurrency_limit=1, stream_every=0.05)
def world_api(text: str = "", lang: str = "en") -> dict:
# Legacy/showcase generator — loads klein-BASE + a 3rd-party 360 LoRA that are NOT in the
# runtime ledger; gate it like the /internal probes so no LIVE judge-facing route can load an
# un-declared model. The live experience is /scroll_live (the four-model ledger path); this
# stays only for offline showcase/bench builds.
if os.getenv("LIGHTLOOM_ENABLE_INTERNAL_BENCH", "0") != "1":
yield {"stage": "error", "error": "world endpoint disabled (showcase/bench only)"}
return
# default resolves at call time (when _GOLDEN_RECITAL, defined later, exists)
session = uuid.uuid4().hex[:8]
try:
for event in _corridor_world((text or _GOLDEN_RECITAL).strip(), session):
yield event
except Exception as exc: # noqa: BLE001 - quota guard.
message = str(exc).lower()
stage = "quota_exceeded" if any(t in message for t in ("gpu", "quota", "zerogpu", "exceeded")) else "error"
yield {"stage": stage, "error": str(exc)[:200]}
@app.get("/frames-world/{session}/{name}")
def world_frame(session: str, name: str):
# sessions are uuid4 hex[:8]; reject anything else (defense-in-depth vs traversal)
if not re.fullmatch(r"[a-f0-9]{6,32}", session) or not re.fullmatch(r"[\w.-]{1,40}", name):
return JSONResponse({"error": "bad request"}, status_code=400)
path = Path("/tmp/lightloom/world") / session / Path(name).name
if not path.is_file():
return JSONResponse({"error": "not found"}, status_code=404)
return FileResponse(path)
# --- SCROLL prototype: validate that klein outpaints SEAMLESS horizontal
# continuations (the make-or-break for the Voice-Scroll engine). Generates a few
# evolving sections, stitches them, serves the strip so we can eyeball the seams. ---
_SCROLL_STYLES: dict[str, Any] = {
"desert": (
", golden hour warm light, cinematic painterly matte painting, soft haze, vivid color, consistent lighting",
["rolling golden sand dunes", "a lone caravan of camels crossing the dunes",
"a green oasis with tall palms", "a sandstone desert city on the horizon",
"the great carved gate of the sandstone city"],
),
"bosch": (
", in the style of Hieronymus Bosch The Garden of Earthly Delights, fantastical surreal "
"medieval oil painting, densely detailed, dreamlike, whimsical translucent creatures, "
"soft pastel light, consistent palette",
["a lush fantastical garden with a delicate crystal fountain",
"strange translucent flowers and tiny pale figures wandering",
"enormous ripe fruits and exotic jewel-coloured birds",
"a pink bulbous domed pavilion rising from a still lake",
"playful surreal creatures bathing by glassy pools"],
),
"dali": (
", in the style of Salvador Dali surrealism, dreamlike impossible landscape, melting soft "
"forms, very long shadows, luminous pale sky, oil painting, hyper detailed, consistent light",
["a vast empty plain with a single melting clock draped over a bare branch",
"impossible smooth floating rocks casting long shadows",
"a distorted elephant on impossibly long stilt legs far away",
"a perfect mirror lake reflecting a burning amber sky",
"surreal stone staircases rising and dissolving into the clouds"],
),
"fantasy": (
", vibrant fantasy concept art, luminous magical world, glowing flora, rich saturated color, "
"volumetric god rays, ethereal, highly detailed, consistent enchanted light",
["a glowing forest of giant bioluminescent mushrooms",
"a crystal river winding past floating glowing lily pads",
"an ancient tree-city with warm lantern windows",
"a waterfall of light pouring into a glowing canyon",
"a sky temple among drifting luminous clouds"],
),
}
@spaces.GPU(duration=160)
def _scroll_test_on_gpu(style: str = "desert") -> dict[str, Any]:
import time as _time
from PIL import Image
from lightloom.paint.scroll import outpaint_right
info: dict[str, Any] = {}
try:
import numpy as np
from lightloom.paint.panorama import load_panorama_pipeline
pipe = load_panorama_pipeline()
STYLE, subjects = _SCROLL_STYLES.get(style, _SCROLL_STYLES["desert"])
info["style"] = style
h, overlap, seg = 512, 384, 384
sections = []
ctx = None
t0 = _time.perf_counter()
for i, p in enumerate(subjects):
sec = outpaint_right(pipe, ctx, p + STYLE, seg_w=seg, height=h, overlap=overlap, steps=8, seed=100 + i)
sections.append(sec)
ctx = sec # the freshly generated section's right edge is the next context
info["gen_s"] = round(_time.perf_counter() - t0, 1)
# stitch left->right with a small FEATHER blend at each join (hides any micro-seam)
blend = 40
total_w = sum(s.width for s in sections) - blend * (len(sections) - 1)
strip = Image.new("RGB", (total_w, h))
x = 0
for i, s in enumerate(sections):
arr = np.asarray(s.convert("RGB")).astype(np.float32)
if i == 0:
strip.paste(s, (x, 0)); x += s.width
else:
# overlap the previous by `blend` px and cross-fade s's left edge in
px = x - blend
base = np.asarray(strip.crop((px, 0, px + s.width, h)).convert("RGB")).astype(np.float32)
a = np.ones((h, s.width, 1), np.float32)
a[:, :blend, 0] = np.linspace(0, 1, blend)[None, :]
merged = (arr * a + base * (1 - a)).astype(np.uint8)
strip.paste(Image.fromarray(merged), (px, 0))
x = px + s.width
out_dir = Path("/tmp/lightloom/scroll")
out_dir.mkdir(parents=True, exist_ok=True)
name = f"test-strip-{style}.webp"
strip.save(out_dir / name, "WEBP", quality=92)
info.update(ok=True, size=list(strip.size), strip=f"/frames-scroll/{name}", sections=len(sections))
except Exception as exc: # noqa: BLE001
import traceback
info.update(ok=False, error=f"{type(exc).__name__}: {exc!r}", trace=traceback.format_exc()[-1400:])
return info
@app.post("/internal/bench/scroll-test")
def scroll_test_endpoint(style: str = "desert") -> dict[str, Any]:
if os.getenv("LIGHTLOOM_ENABLE_INTERNAL_BENCH", "0") != "1":
return {"ok": False, "error": "internal benchmark endpoint disabled"}
return {"ok": True, "data": _scroll_test_on_gpu(style)}
@spaces.GPU(duration=140)
def _scroll_fast_test_on_gpu(style: str = "fantasy", n: int = 8, seg: int = 256, steps: int = 4) -> dict[str, Any]:
"""Benchmark the DISTILLED klein-4B fast continuous loop: how many small strips/sec
and how do the 4-step seams look. This is the make-or-break for real-time flow."""
import time as _time
from PIL import Image
from lightloom.paint.scroll import load_fast_pipeline, outpaint_right
info: dict[str, Any] = {}
try:
import numpy as np
STYLE, subjects = _SCROLL_STYLES.get(style, _SCROLL_STYLES["fantasy"])
t = _time.perf_counter()
pipe = load_fast_pipeline()
info["load_s"] = round(_time.perf_counter() - t, 2)
h, overlap = 512, 256
sections, times = [], []
ctx = None
for i in range(n):
subj = subjects[i % len(subjects)]
t = _time.perf_counter()
sec = outpaint_right(pipe, ctx, subj + STYLE, seg_w=seg, height=h, overlap=overlap, steps=steps, seed=300 + i)
times.append(round(_time.perf_counter() - t, 3))
sections.append(sec)
ctx = sec
info["per_strip_s"] = times
info["mean_strip_s"] = round(sum(times[1:]) / max(1, len(times) - 1), 3) # skip first (warm)
info["strips_per_sec"] = round(1.0 / info["mean_strip_s"], 2) if info["mean_strip_s"] else None
# stitch with feather
blend = 32
total_w = sum(s.width for s in sections) - blend * (len(sections) - 1)
strip = Image.new("RGB", (total_w, h))
x = 0
for i, s in enumerate(sections):
arr = np.asarray(s.convert("RGB")).astype(np.float32)
if i == 0:
strip.paste(s, (x, 0)); x += s.width
else:
px = x - blend
base = np.asarray(strip.crop((px, 0, px + s.width, h)).convert("RGB")).astype(np.float32)
a = np.ones((h, s.width, 1), np.float32); a[:, :blend, 0] = np.linspace(0, 1, blend)[None, :]
strip.paste(Image.fromarray((arr * a + base * (1 - a)).astype(np.uint8)), (px, 0)); x = px + s.width
out_dir = Path("/tmp/lightloom/scroll"); out_dir.mkdir(parents=True, exist_ok=True)
name = f"fast-strip-{style}.webp"
strip.save(out_dir / name, "WEBP", quality=92)
info.update(ok=True, size=list(strip.size), strip=f"/frames-scroll/{name}")
except Exception as exc: # noqa: BLE001
import traceback
info.update(ok=False, error=f"{type(exc).__name__}: {exc!r}", trace=traceback.format_exc()[-1500:])
return info
@app.post("/internal/bench/scroll-fast")
def scroll_fast_endpoint(style: str = "fantasy", n: int = 8, seg: int = 256, steps: int = 4) -> dict[str, Any]:
if os.getenv("LIGHTLOOM_ENABLE_INTERNAL_BENCH", "0") != "1":
return {"ok": False, "error": "internal benchmark endpoint disabled"}
return {"ok": True, "data": _scroll_fast_test_on_gpu(style, n, seg, steps)}
@app.get("/frames-scroll/{name}")
def scroll_frame(name: str):
path = Path("/tmp/lightloom/scroll") / Path(name).name
if not path.is_file():
return JSONResponse({"error": "not found"}, status_code=404)
return FileResponse(path)
# --- THE LIVING SCROLL (the experience): one continuous panorama that EXTENDS
# seamlessly as you narrate. The Director fixes a coherent session style from the
# story's tone; each phrase outpaints the next section that CONTINUES the previous
# one; Depth-Anything gives each section relief for the 2.5D scroll. Streamed. ---
# A rich palette of DISTINCT art styles — the session's visual "lens", chosen by the narration's
# mood/content so different stories look genuinely different (anime, noir, Ghibli, ink, impressionist,
# photoreal, sketch, synthwave, ...) instead of one uniform painterly look. Fixed per session so the
# continuous panorama stays coherent; variety comes ACROSS stories.
_ART_STYLES: dict[str, str] = {
"anime": ", anime cel-shaded illustration, crisp clean linework, vibrant saturated colour, luminous soft lighting",
"noir": ", film noir, high-contrast black and white, deep chiaroscuro shadows, 1940s mood, fine film grain",
"ghibli": ", Studio Ghibli hand-painted style, soft watercolour rendering, gentle diffuse light, painterly warmth",
"ink": ", Japanese sumi-e ink wash, minimalist, expressive black brushstrokes on rice paper, generous negative space",
"impressionist": ", impressionist oil painting, broken visible brushstrokes, shimmering Monet light, soft colour",
"photoreal": ", photorealistic cinematic, dramatic natural light, ultra-detailed, shallow depth of field, 8k",
"sketch": ", loose graphite pencil sketch, cross-hatching, sketchbook study, expressive monochrome lines",
"storybook": ", warm storybook illustration, painterly, soft rounded forms, children's-book charm",
"synthwave": ", 1980s synthwave colour grade, saturated magenta-and-cyan neon palette, soft retro glow, high contrast",
"oil": ", classical oil painting, old-master chiaroscuro, rich varnished colour, baroque drama",
"watercolour": ", delicate watercolour wash, soft bleeding pigments, luminous paper-white, airy",
"lowpoly": ", stylised low-poly 3D render, flat faceted geometry, soft gradient lighting, clean",
"pixelart": ", 16-bit pixel art, crisp blocky pixels, limited retro palette, dithering, game sprite",
"retro": ", vintage retro illustration, aged muted tones, halftone grain, mid-century travel-poster feel",
"comic": ", bold comic-book pop-art, thick black ink outlines, halftone Ben-Day dots, dynamic panel",
"ukiyoe": ", ukiyo-e Japanese woodblock print, flat bold colour, strong outlines, Edo-period, Hokusai",
"gothic": ", dark gothic painterly style, ornate and dramatic rendering, deep chiaroscuro lighting, moody low-key palette",
"artdeco": ", art deco, geometric gold-and-black elegance, 1920s glamour, symmetrical, sleek",
"claymation": ", claymation stop-motion, soft sculpted clay textures, handcrafted, tactile, charming",
"charcoal": ", expressive charcoal drawing, smudged deep blacks, dramatic monochrome, rough paper grain",
"modern": ", modern flat-design illustration, bold simple shapes, clean vector look, minimal palette",
"scifi": ", sleek sci-fi concept-art rendering, crisp hard-surface detail, cool cinematic palette, polished finish",
}
# Function/stop words that carry NO paintable scene. A clause that is ONLY these (a VAD fragment the mic cut
# mid-sentence: "is", "that's", "and then") gives klein nothing concrete, so it flows the smooth carried
# sky/water into ABSTRACT iridescent colour-WAVES ("franjas de colores que aparecen de la nada") that then
# propagate across strips. _phrase_is_paintable lets the live loop CONTINUE the previous real scene instead.
_NONVISUAL_WORDS = frozenset(
"a an the and or but is are was were be been being am i you he she it we they me him her them us this "
"that these those of to in on at for with as by from up down out over off into onto about there here "
"what who whom when where how why which whose so then now just very too also still yet do does did done "
"will would shall should can could may might must have has had not no nor only thats its im ive id ill "
"youre youve hes shes theyre dont cant wont isnt arent wasnt "
# greetings / fillers / meta-talk that are NOT a visual scene (observed: 'hello', 'is working' -> abstract)
"hello hi hey hiya bye goodbye ok okay yeah yep yes yup nope um uh huh hmm oh ah eh well alright lets let "
"really kinda sorta gonna wanna gotta thing things stuff something anything nothing someone anyone everyone "
"nobody everybody working testing test please thanks thank cool nice wow".split()
)
def _phrase_is_paintable(phrase: str) -> bool:
"""True if the clause has >=1 CONTENT word (a paintable subject); False for a function-word-only VAD
fragment ("is", "that's", "and then") that would otherwise make klein paint abstract colour-waves."""
import re
words = [
w
for w in re.findall(r"[a-z']+", (phrase or "").lower())
if w.replace("'", "") not in _NONVISUAL_WORDS and len(w) >= 3
]
return len(words) >= 1
def _scroll_session_style(text: str) -> str:
"""Pick ONE distinct art style for the session from the narration's mood/content, so different
stories look genuinely different — not one uniform painterly look."""
t = (text or "").lower()
def has(*ks: str) -> bool:
return any(k in t for k in ks)
if has("noir", "detective", "crime", "shadow", "smoke", "mystery", "murder", "alley", "rain-slick"):
return _ART_STYLES["noir"]
if has("anime", "neon", "cyber", "robot", "future", "mecha", "tokyo", "hologram", "android"):
return _ART_STYLES["anime"]
if has("spirit", "fairy", "child", "gentle", "wonder", "meadow", "totoro", "ghibli", "cottage"):
return _ART_STYLES["ghibli"]
if has("ink", "myth", "legend", "dragon", "samurai", "zen", "calligraphy", "temple"):
return _ART_STYLES["ink"]
if has("memory", "nostalgia", "garden", "afternoon", "impression", "monet", "pond", "blossom"):
return _ART_STYLES["impressionist"]
if has("epic", "war", "storm", "realistic", "battle", "cinematic", "ocean", "cliff", "mountain"):
return _ART_STYLES["photoreal"]
if has("sketch", "idea", "draft", "blueprint", "study", "outline", "drawing"):
return _ART_STYLES["sketch"]
if has("synth", "retro", "80s", "arcade", "vapor", "grid", "laser"):
return _ART_STYLES["synthwave"]
if has("watercolor", "watercolour", "calm", "quiet", "still", "mist", "drizzle"):
return _ART_STYLES["watercolour"]
if has("oil", "portrait", "classic", "baroque", "renaissance", "king", "palace", "throne"):
return _ART_STYLES["oil"]
if has("magic", "glow", "enchant", "dream", "star", "crystal", "luminous"):
return _ART_STYLES["storybook"]
# default: VARY by the text so different stories get different looks (never always the same one)
keys = ["storybook", "impressionist", "photoreal", "ghibli", "ink", "oil", "watercolour", "anime"]
return _ART_STYLES[keys[sum(ord(c) for c in t[:24]) % len(keys)]]
def _director_session_style(tokenizer: Any, model: Any, phrase: str) -> str:
"""The DIRECTOR sets the session's art style at world-start. DEFAULT = a RANDOM style from the
full palette, so every new world looks different WITHOUT the user having to say a trigger word —
a first-timer just narrates and the visual lens surprises them. Set LIGHTLOOM_STYLE_FROM_NARRATION=1
to instead infer the style from the narration's mood (the MiniCPM classifier). Fixed for the
session so the continuous panorama stays coherent."""
import random
# LIVE-SAFE styles only: ink/sketch/charcoal/noir are monochrome/sparse and DEGENERATE in a continuous
# outpaint — their negative space compounds strip-to-strip into grey bands + VISIBLY CHOPPY seams
# (verified on the user's ink-mood poem). The Director may still pick from the narration's mood, but a
# degenerating pick is remapped to a DENSE style that preserves the mood, so transitions stay beautiful.
_DEGEN = ("ink", "sketch", "charcoal", "noir")
live_keys = [k for k in _ART_STYLES if k not in _DEGEN]
if os.getenv("LIGHTLOOM_STYLE_FROM_NARRATION", "0") == "1":
try:
from lightloom.director.director import pick_style_transformers
key = pick_style_transformers(tokenizer, model, phrase)
if key in live_keys:
return _ART_STYLES[key]
if key in ("ink", "sketch", "charcoal"):
return _ART_STYLES["oil"] # rich painterly substitute for monochrome moods
if key == "noir":
return _ART_STYLES["gothic"] # dense, dramatic substitute that keeps the dark mood
except Exception: # noqa: BLE001 - never let style-picking break narration
pass
s = _scroll_session_style(phrase)
degen_vals = {_ART_STYLES[k] for k in _DEGEN}
return _ART_STYLES["oil"] if s in degen_vals else s
# default: a RANDOM art style per world -> guaranteed variety + surprise, no trigger words needed.
# Use SystemRandom (os.urandom), NOT random.choice: ZeroGPU forks a fresh worker per call that
# INHERITS the parent process's seeded RNG state, so plain random.choice returns the SAME
# "random" style on every fork. SystemRandom reads fresh OS entropy each call -> genuinely
# different per world. (This was the bug: two worlds kept landing on the identical style.)
# ...but only from styles that hold up DENSELY + IN COLOUR across a continuous outpaint. The
# monochrome/sparse ones (ink, sketch, charcoal, noir) DEGENERATE to washed-out grey in the live
# scroll — their negative space compounds strip-to-strip into empty grey + dark bands. Now excluded
# from BOTH the random pool AND the narration-mood pick (live_keys above).
return _ART_STYLES[random.SystemRandom().choice(live_keys)]
@spaces.GPU(duration=170)
def _scroll_world(text: str, session: str):
import numpy as np
from PIL import Image
from lightloom.depth.depth import estimate_depth, load_depth_pipeline
from lightloom.director.director import generate_shot_transformers, load_director_transformers
from lightloom.director.state import SceneState
from lightloom.paint.scroll import load_fast_pipeline, outpaint_right
out_dir = Path("/tmp/lightloom/scroll") / session
out_dir.mkdir(parents=True, exist_ok=True)
phrases = _split_phrases(text, cap=8) or ["a luminous world unfolding"]
style = _scroll_session_style(text)
yield {"stage": "warming", "sections": len(phrases)}
tokenizer, model = load_director_transformers()
pipe = load_fast_pipeline() # DISTILLED klein-4B, 4-step — the fast continuous loop
depth_pipe = load_depth_pipeline()
neutral = SceneState(
characters=[], setting="", lighting="", mood="", palette=("#202428", "#7d8794", "#e6ddc8")
).model_dump()
# CONTINUOUS loop: many small fast strips per phrase => the world keeps flowing
# (generation ~1.4 strips/s of 256px >> the browser's scroll, so it never freezes).
ctx = None
prev_subject = ""
h, overlap, seg = 512, 256, 256 # overlap==seg: carry the whole previous strip as context
strips_per_phrase = 4
g = 0 # global strip index
for i, phrase in enumerate(phrases):
yield {"stage": "directing", "index": i, "text": phrase}
try:
shot, _m = generate_shot_transformers(tokenizer, model, phrase, dict(neutral))
subject = (shot.image_prompt_en or phrase or "a vivid scene").strip()
yield {"stage": "painting", "index": i, "scene": subject[:120]}
for k in range(strips_per_phrase):
# first strip of a new scene morphs from the previous one (smooth panorama)
is_change = k == 0 and prev_subject and prev_subject[:48].lower() != subject[:48].lower()
strip_prompt = _scroll_transition_prompt(prev_subject, subject) if is_change else subject
sec = outpaint_right(pipe, ctx, strip_prompt + style, seg_w=seg, height=h, overlap=overlap, steps=4, seed=200 + g * 3, morph_band=(96 if is_change else 0))
ctx = sec
sec.save(out_dir / f"section-{g}.webp", "WEBP", quality=92)
yield {
"stage": "section",
"index": g,
"image": f"/frames-scroll/{session}/section-{g}.webp",
"caption": phrase[:160], # every strip carries the caption (onFocus pins it; no flicker)
"w": sec.width,
}
dmap = estimate_depth(depth_pipe, sec)
lo, hi = float(np.percentile(dmap, 2)), float(np.percentile(dmap, 98))
dmap = (dmap - lo) / (hi - lo) if hi > lo else np.full_like(dmap, 0.5)
dmap = np.clip(dmap, 0.0, 1.0) # constant/low-variance depth must not wrap mod 256
Image.fromarray((dmap * 255).astype(np.uint8)).save(out_dir / f"section-{g}-depth.png")
yield {"stage": "depth", "index": g, "depth": f"/frames-scroll/{session}/section-{g}-depth.png"}
g += 1
prev_subject = subject # the next phrase morphs FROM this scene
except Exception as exc: # noqa: BLE001 - one bad section must not end the scroll.
yield {"stage": "section_error", "index": g, "error": str(exc)[:160]}
yield {"stage": "done", "session": session, "count": g}
@app.api(name="scroll", concurrency_id="gpu", concurrency_limit=1, stream_every=0.05)
def scroll_api(text: str = "", lang: str = "en") -> dict:
session = uuid.uuid4().hex[:8]
try:
for event in _scroll_world((text or _GOLDEN_RECITAL).strip(), session):
yield event
except Exception as exc: # noqa: BLE001 - quota guard.
message = str(exc).lower()
stage = "quota_exceeded" if any(t in message for t in ("gpu", "quota", "zerogpu", "exceeded")) else "error"
yield {"stage": stage, "error": str(exc)[:200]}
@app.get("/frames-scroll/{session}/{name}")
def scroll_session_frame(session: str, name: str):
if not re.fullmatch(r"[a-f0-9]{6,32}", session) or not re.fullmatch(r"[\w.-]{1,40}", name):
return JSONResponse({"error": "bad request"}, status_code=400)
path = Path("/tmp/lightloom/scroll") / session / Path(name).name
if not path.is_file():
return JSONResponse({"error": "not found"}, status_code=404)
return FileResponse(path)
# --- LIVE narration: the world builds AS YOU SPEAK. While you talk, the browser
# VAD-cuts each phrase and streams it as a short voice segment; ONE GPU call per
# segment transcribes it AND paints a few strips that CONTINUE the same panorama
# (the last strip is kept per session as the outpaint context), so sections keep
# flowing in with low delay — no "record, stop, then render". ---
_SCROLL_ROOT = Path("/tmp/lightloom/scroll")
_LIVE_MAX = 24 # keep at most this many session dirs on disk; sweep the oldest
# Whisper hallucinates these short phrases from near-silent audio; treat them as silence so the
# world never paints an unrelated scene from a cough/breath.
_ASR_HALLUCINATIONS = frozenset({
"", "you", "thank you", "thanks", "thanks for watching", "thank you for watching",
"thanks for watching this video", "please subscribe", "subscribe", "subscribe to my channel",
"see you next time", "see you in the next video", "i'll see you next time", "bye", "bye bye",
"okay", "ok", "good halloween", "i am here", "i'm here", "music", "applause", "you're welcome",
"the end", "hello everyone", "thanks for listening",
})
def _sweep_scroll_sessions(keep: int = _LIVE_MAX, protect: frozenset[str] = frozenset()) -> None:
"""Bound /tmp: keep only the most-recently-touched ``keep`` session dirs. ``protect`` names sessions
that must NEVER be swept (the live session being served — mkdir(exist_ok) does not freshen mtime, so
a busy session could otherwise sweep its OWN dir mid-narration under multi-visitor load).
Runs on disk (not an in-memory dict), so it works regardless of the GPU-worker lifecycle."""
import shutil
try:
dirs = [p for p in _SCROLL_ROOT.iterdir() if p.is_dir() and p.name not in protect]
except (OSError, FileNotFoundError):
return
if len(dirs) <= keep:
return
for p in sorted(dirs, key=lambda d: d.stat().st_mtime)[: len(dirs) - keep]:
shutil.rmtree(p, ignore_errors=True)
def _strip_looks_corrupt(img: Any) -> bool:
"""Detect the saturated red/blue 'neon ribbon' artifact: once a 4-step strip overflows
into near-pure-saturation colour, the ctx-carry propagates it forever (the user's
long-session bug). If a strip is dominated by extreme-saturation/bright pixels, the
caller drops the carry-over context and repaints fresh, breaking the chain. Legitimate
vivid scenes sit well below sat>0.92, so this rarely false-positives."""
import numpy as np
a = np.asarray(img.convert("RGB"), dtype=np.float32) / 255.0
mx = a.max(axis=2)
mn = a.min(axis=2)
sat = (mx - mn) / (mx + 1e-6)
neon = (sat > 0.92) & (mx > 0.5) # near-pure saturated + bright
if float(neon.mean()) > 0.20:
return True
# also catch a THIN saturated streak-LINE: a few % of total area (below 0.20) but ~100% of its
# own row or column. Without this the ctx-carry propagates a neon ribbon down the whole panorama.
return bool(neon.mean(axis=1).max() > 0.6 or neon.mean(axis=0).max() > 0.6)
STRUCTURE_FLOOR = 0.045 # below this edge-density the strip is dissolving into abstract colour
def _strip_structure_score(img: Any) -> float:
"""Edge/gradient density in [0,1]: high for representational scenes (crisp foliage,
horizon, objects), collapses toward ~0 for the smooth 'abstract neon-wave' drift that
_strip_looks_corrupt (saturation-only) never catches. CPU/numpy, ~1 ms on a strip."""
import numpy as np
a = np.asarray(img.convert("L"), dtype=np.float32) / 255.0
a = a[::2, ::2] # 2x downsample: faster + ignores single-pixel paint/depth noise
gx = np.abs(np.diff(a, axis=1))
gy = np.abs(np.diff(a, axis=0))
g = np.maximum(gx[:-1, :], gy[:, :-1])
return float((g > 0.06).mean()) # fraction of 'edge' pixels
def _strip_looks_abstract(img: Any) -> float:
"""How far below the structure floor the strip is (0.0 = fine .. 1.0 = fully abstract).
Distinct from _strip_looks_corrupt: that flags SATURATION, this flags loss of STRUCTURE —
the gradual drift into smooth colour fields that dominates long sessions."""
s = _strip_structure_score(img)
if s >= STRUCTURE_FLOOR:
return 0.0
return min(1.0, (STRUCTURE_FLOOR - s) / STRUCTURE_FLOOR)
def _live_next_index(out_dir: Path) -> int:
"""Next strip index = (max existing ``section-N.webp``) + 1. DISK-derived so the
index is correct (and filenames never collide) even if the GPU worker recycled and
lost all in-process state between phrase calls."""
mx = -1
for p in out_dir.glob("section-*.webp"):
m = re.fullmatch(r"section-(\d+)", p.stem)
if m:
mx = max(mx, int(m.group(1)))
return mx + 1
def _live_state(out_dir: Path) -> dict[str, Any]:
"""Per-session continuity state on disk (style + the previous phrase's subject, used
to MORPH smoothly between scenes). Survives a GPU-worker recycle."""
try:
s = out_dir / "state.json"
if s.is_file():
return json.loads(s.read_text("utf-8"))
except (OSError, ValueError):
pass
return {}
def _live_save_state(out_dir: Path, **updates: Any) -> None:
state = _live_state(out_dir)
state.update(updates)
try:
(out_dir / "state.json").write_text(json.dumps(state), "utf-8")
except OSError:
pass
def _depth_global_norm(raw: Any, state: dict[str, Any], out_dir: Path) -> Any:
"""Normalize a RAW depth/disparity map against ONE session-global [lo,hi] (an EMA of per-strip
2/98 percentiles, persisted across GPU-worker recycles), so EVERY strip shares the SAME depth
scale -> adjacent strips align (no seam) and 'near' is consistent (no per-strip flattening — the
structural cause of the 'flat 2.5D' + seam shear). Returns [0,1] with 1 = near (renderer convention)."""
import numpy as np
p2, p98 = float(np.percentile(raw, 2)), float(np.percentile(raw, 98))
lo, hi = state.get("depth_lo"), state.get("depth_hi")
if not (isinstance(lo, (int, float)) and isinstance(hi, (int, float)) and hi > lo):
lo, hi = p2, p98 # the first strip seeds the session-global scale
else:
a = 0.25 # ease the global range toward each new strip's percentiles
lo, hi = (1 - a) * lo + a * p2, (1 - a) * hi + a * p98
if hi - lo < 1e-6:
hi = lo + 1e-6
state["depth_lo"], state["depth_hi"] = lo, hi
_live_save_state(out_dir, depth_lo=lo, depth_hi=hi)
norm = np.clip((raw - lo) / (hi - lo), 0.0, 1.0)
# raw is DA-V2 DISPARITY (near=HIGH) -> norm already has 1=near/0=far (the renderer convention).
# The old `1.0 - norm` flipped it inside-out (near read as far) -> inverted parallax + aerial on
# the wrong regions + disocclusion streaks. Return norm as-is.
return norm # 1 = near, 0 = far
def _scroll_transition_prompt(prev_subject: str, subject: str) -> str:
"""The boundary strip between two scenes: paint the FIRST as it DISSOLVES into the
SECOND, so the panorama morphs instead of cutting hard."""
return f"a seamless gradual transition where {prev_subject} slowly dissolves and transforms into {subject}"
# Stopwords for the Director content-word guard (function words carry no subject -> ignore them).
_STOPWORDS = frozenset({
"the", "and", "was", "were", "with", "that", "this", "they", "them", "then", "there", "here",
"have", "has", "had", "from", "into", "over", "under", "your", "you", "his", "her", "she", "him",
"are", "for", "but", "not", "all", "out", "off", "one", "when", "what", "where", "which", "who",
"why", "how", "little", "bit", "sorry", "describe", "very", "just", "like", "about", "would",
"could", "should", "been", "being", "their", "some", "more", "than", "also", "okay",
})
def _shares_content_word(phrase: str, cand: str) -> bool:
"""True iff the Director's candidate scene genuinely relates to the phrase. Requires TWO shared
concrete (>=4-char, non-stopword) words for a normal phrase — ONE coincidental word ("blue whale"
vs the few-shot's "blue apron") let the few-shot subject parrot through. A short phrase (<=3
concrete words) needs just one. If the phrase has no concrete word, returns False (paint the
user's own words, never an invented/parroted scene)."""
pw = {w for w in re.findall(r"[a-z]{4,}", phrase.lower())} - _STOPWORDS
if not pw:
return False
cw = {w for w in re.findall(r"[a-z]{4,}", cand.lower())}
n = len(pw & cw)
return n >= 2 or (len(pw) <= 3 and n >= 1)
@spaces.GPU(duration=90)
def _scroll_live_phrase(session: str, text: str, audio_b64: str, lang: str):
"""Transcribe ONE spoken segment (or take ``text``) and paint a few strips that
CONTINUE this session's panorama. Streams transcript/section/depth events and
persists the last strip as the carry-over context, so the next segment flows on
seamlessly. Loading the models first also serves as the warm-on-tap path."""
import base64
import tempfile
import numpy as np
from PIL import Image
from lightloom.depth.depth import estimate_depth, load_depth_pipeline
from lightloom.director.director import generate_shot_transformers, load_director_transformers
from lightloom.director.state import SceneState
from lightloom.paint.scroll import load_fast_pipeline, outpaint_right
# warm (cached after first call): director + fast painter + depth
tokenizer, model = load_director_transformers()
pipe = load_fast_pipeline()
depth_pipe = load_depth_pipeline()
# COLD-START: the very first generation compiles CUDA kernels/cuBLAS heuristics (multi-second). The
# client fires a '__warm__' beat on mic-tap so those kernels compile DURING the intro at the REAL
# production shape -> the user's first spoken phrase paints at warm speed, not a cold stall.
# The frontend sends "__warm__" through the AUDIO slot (streamScrollLive's 2nd arg), so accept it on
# EITHER field — otherwise the warm fell through to the normal path, decoded "__warm__" as garbage
# audio, returned 'silent', and NEVER compiled the kernels (verified: client-warm -> 'silent', the
# first real phrase then paid the full cold-start), while still holding the GPU slot for ~8s.
if (text or "").strip() == "__warm__" or (audio_b64 or "").strip() == "__warm__":
try:
from lightloom.paint.scroll import outpaint_right as _ow
_warm = _ow(pipe, None, "a calm open landscape at dawn", seg_w=556, height=768, overlap=212, steps=4, seed=1)
estimate_depth(depth_pipe, _warm, normalize=False) # warm the depth forward too
except Exception: # noqa: BLE001 - warming is best-effort; never block the session.
pass
yield {"stage": "warmed"}
return
# 1) get the phrase text — transcribe the spoken segment, or use the typed text
phrase = (text or "").strip()
_asr_be = "text" # DEBUG: which ASR backend actually ran (verify Parakeet vs Whisper-fallback)
if not phrase and audio_b64:
path = None
try:
payload = audio_b64.split(",")[-1]
if len(payload) > 8_000_000: # ~6 MB decoded; reject BEFORE decode (OOM / DoS guard)
yield {"stage": "asr_error", "error": "audio_too_large"}
return
raw = base64.b64decode(payload) if payload else b""
if len(raw) >= 64:
from lightloom.audio_in.asr import load_asr, read_wav
from lightloom.audio_in.asr import transcribe as _asr
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as handle:
handle.write(raw)
path = handle.name
audio, sr = read_wav(path)
# Silence/room-noise gate: Whisper hallucinates short phrases from near-silent audio.
# Only transcribe if the segment carries real energy; else leave phrase="" -> 'silent'.
# Gate on the VOICED portion, NOT the whole-clip mean: a soft/short phrase plus the ~800ms
# VAD silence tail averages below a fixed floor and is wrongly dropped (verified cause of
# "only catches phrases"). Use the 90th-percentile of ~100ms window RMS (the loudest tenth
# = the real speech) and match the browser VAD floor (recorder.js VAD_FLOOR=0.006), so we
# only admit audio the client VAD already validated; _ASR_HALLUCINATIONS stays the net.
_a = np.asarray(audio, dtype=np.float32).ravel()
_win = max(1, int(sr * 0.1))
if _a.size >= _win:
_n = (_a.size // _win) * _win
_wr = np.sqrt((_a[:_n].reshape(-1, _win) ** 2).mean(axis=1))
_speech = float(np.percentile(_wr, 90))
_voiced_frac = float((_wr >= 0.006).mean()) # how much of the clip is actually voiced
else:
_speech = float(np.sqrt(np.mean(_a * _a))) if _a.size else 0.0
_voiced_frac = 1.0 if _speech >= 0.006 else 0.0
# require a loud-enough peak AND a SUSTAINED voiced fraction (>=10% of windows) — a lone
# AGC-boosted transient (breath, keystroke) no longer passes as speech for Whisper to
# hallucinate on; the downstream confidence gate (asr.py) is the second line of defence.
# PARAKEET-AWARE: this strict gate exists ONLY because autoregressive Whisper HALLUCINATES a
# phrase from near-silent/short audio. NVIDIA Parakeet (CTC) emits BLANKS on silence/noise —
# it structurally CANNOT hallucinate — so the strict Whisper-era gate just DROPPED valid
# quiet/short speech. CORRECTED (audit w368kb6kp): 0.0015 was ~4x BELOW the browser VAD floor
# (recorder.js VAD_FLOOR=0.006) -> room tone/breath reached Parakeet, which (CTC argmax) still
# emits a best-guess word from faint noise -> "palabras que nunca dije". Keep the floor at the
# client VAD floor (0.006) but a LENIENT voiced-fraction (0.05 vs Whisper's 0.10) so genuinely
# quiet/short real speech still paints while room tone does not.
_is_parakeet = os.getenv("LIGHTLOOM_ASR", "whisper").strip().lower() == "parakeet"
_gate_ok = (_speech >= 0.006 and _voiced_frac >= 0.05) if _is_parakeet else (_speech >= 0.006 and _voiced_frac >= 0.10)
if _gate_ok:
proc, amodel = load_asr()
_asr_be = type(amodel).__name__
phrase = (_asr(proc, amodel, audio, sampling_rate=sr, language=lang) or "").strip()
if phrase.lower().strip(" .,!?") in _ASR_HALLUCINATIONS:
phrase = "" # a known Whisper-on-silence hallucination -> drop it
except Exception as exc: # noqa: BLE001 - one bad segment must not end narration.
yield {"stage": "asr_error", "error": str(exc)[:160]}
finally:
if path:
try:
os.unlink(path)
except OSError:
pass
if not phrase:
# nothing intelligible in this segment (or a pure warm call) — keep listening
yield {"stage": "silent"}
return
# Moderation net at the SOURCE: scrub here so BOTH the caption (phrase[:160]) and the painted
# subject (g>0 uses subject=phrase; g==0 derives it from phrase) are clean — no slur/NSFW ever
# reaches FLUX or the on-screen caption in front of judges. Pure CPU, microseconds.
from lightloom.paint.prompt import scrub_unsafe
phrase, _flagged = scrub_unsafe(phrase)
if _flagged:
yield {"stage": "filtered"}
yield {"stage": "transcript", "text": phrase, "asr": _asr_be}
# 2) session continuity — DISK-DERIVED so it survives a GPU-worker recycle: the
# strip index is the count of sections already on disk (no filename collisions), the
# outpaint carry-over is the last saved strip, and the style is fixed by phrase #1.
out_dir = _SCROLL_ROOT / session
out_dir.mkdir(parents=True, exist_ok=True)
_sweep_scroll_sessions(protect=frozenset({session})) # never sweep the session we are serving
g = _live_next_index(out_dir)
g0 = g # the strip index AT PHRASE START (the paint loop increments g) — used to pin the title to the FIRST phrase
state = _live_state(out_dir)
style = state.get("style")
if not style:
style = _director_session_style(tokenizer, model, phrase)
_live_save_state(out_dir, style=style)
prev_subject = (state.get("prev_subject") or "").strip()
roll = (state.get("narration") or "").strip() # bounded rolling narration anchor (continuity thread)
ctx = None
if g > 0:
prev = out_dir / f"section-{g - 1}.webp"
if prev.is_file():
try:
ctx = Image.open(prev).convert("RGB")
except Exception: # noqa: BLE001 - missing/corrupt carry-over -> fresh start
ctx = None
# ABSTRACTION anchor: hold the first clean strip as a STRUCTURE reference. When the
# autoregressive prior dissolves into smooth colour fields over a long session (drift the
# saturation guard never catches), we re-inject its real edges into the conditioning
# instead of cutting, so the world heals without a hard seam.
anchor = None
if g > 0:
a0 = out_dir / "section-0.webp"
if a0.is_file():
try:
anchor = Image.open(a0).convert("RGB")
except Exception: # noqa: BLE001 - no anchor -> abstraction guard simply no-ops
anchor = None
# SPEED: the Director sets the world's DIRECTION (a rich opening scene + the session
# style/aesthetic) ONCE — on the very first phrase. Every later phrase paints the
# user's WORDS DIRECTLY (+ that fixed style), with NO per-phrase Director call. That
# is ~2 s faster per phrase AND more faithful to what was actually said.
# FAITHFUL-SUBJECT design — the Director must never make the experience worse. The painted SUBJECT
# is ALWAYS the user's own words. The Director (MiniCPM) contributes only bounded CINEMATOGRAPHY
# (lighting + palette), NEVER a scene rewrite: its image_prompt_en parroted the few-shot subject
# (a "fishmonger in a blue apron" / "lone traveler in a grey cloak") into unrelated narration —
# verified by capturing its raw output. Lighting/palette can tint the mood but can never replace
# "she opened the little blue door" with a fishmonger.
subject = phrase.strip() or "a vivid scene"
# Give the painter only the PAINTABLE words: drop non-visual sound words (echo/silence/whisper/...)
# that the live path used to pass through verbatim and that confuse FLUX. The CAPTION still shows the
# FULL phrase (phrase[:160] below), so the user sees everything they said. GUARDED: keep the sanitized
# form only if it retains >= half the words; otherwise keep the original so we never paint LESS than
# what was spoken (a sound-word-only phrase is left intact).
from lightloom.paint.prompt import sanitize_image_prompt as _sanitize_subj
_clean_subj = _sanitize_subj(subject)
if _clean_subj and len(_clean_subj.split()) >= max(1, len(subject.split()) // 2):
subject = _clean_subj
# NON-PAINTABLE FRAGMENT -> continue the previous real scene (no abstract iridescent colour-wave band).
if not _phrase_is_paintable(subject):
if prev_subject:
subject = prev_subject # extend the last real scene: subtle, seamless, one coherent image
else:
yield {"stage": "silent"} # no prior scene to continue -> skip rather than paint an abstract scene
return
# DEFINITIVE Director scope: its ONLY painter-facing job is choosing the session ART STYLE (above,
# via _director_session_style, from _ART_STYLES — now PURE RENDER TECHNIQUE (medium/brushwork/palette),
# scrubbed of scene NOUNS (the old "neon grids/chrome" synthwave + "lush whimsical nature" ghibli WERE
# injecting a scene into every strip -> the world reverted to ONE concept regardless of the words). The
# SUBJECT (user's words) now owns the scene; the style only owns the look. The old free-form
# generate_shot_transformers cinematography call is REMOVED: its lighting/palette ("misty green" +
# a forest hex palette) was session-locked into EVERY strip and forced a green forest onto "now we
# are under the ocean" — the recurring "the Director locks the world to one scene" bug. The HUD still
# shows a brief Director beat on phrase #0 to surface that it chose the style.
if g == 0:
yield {"stage": "directing", "text": phrase}
yield {"stage": "painting", "scene": subject[:120]}
# overlap == seg means each strip carries the WHOLE previous strip as context (the max
# available, since the stored strip is seg-wide); smoothness comes from the morph prompt.
# QUALITY: generate near klein's native band. 768x768 canvas (0.59 MP) vs the old 640x512 (0.33 MP)
# ~doubles visible detail; the returned strip is 576x768. overlap=192 (25% of the canvas, down from
# 40%) spends more of the pixel budget on FRESH sharp content. Verified ~646 ms/strip, ~1.29 s/phrase
# 768x768 canvas (overlap 192 + seg 576), returns a 576x768 strip. seg=768 (canvas 960x768) was
# MEASURED on the Space at ~12s/phrase (FLUX attention is O(latent_tokens^2): 0.59->0.74 MP cost ~1.9x,
# NOT the sub-linear estimate) -> it broke real-time, so seg stays 576 here. CONTINUITY comes from the
# rolling narration anchor below + the 192px overlap carry, NOT from widening the strip.
# overlap 192, seg 576 (canvas stays 768x768): a BIGGER carry (192 = 25% vs the 128 = 17% that read as
# DISJOINT scene-to-scene JUMPS) so each strip FLOWS from the EVOLVING previous strip — continuity that
# still evolves (the new words drive the fresh 576px), unlike Cast Lock (image_reference=section-0)
# which the user found CLONED one scene (the opening neon city, "pegada"). If this feels too sticky
# ("se queda en la escena anterior"), dial overlap back toward 160. Coherence = carried previous strip
# + fixed session style (TEXT), NOT a frozen scene-0 lock.
# overlap 212 / seg 556: a slightly STRONGER carry than 192 (more carried context -> fewer grey transition
# bands) but well short of the 256 that got STUCK on narrative prose. 212+556=768; the __warm__ call below
# is matched to 212/556. The grey-band ROOT cause is also fixed in scroll.py (the fresh region is now seeded
# from the carried edge's average colour, not the jarring grey 128 base).
h, overlap, seg = 768, 212, 556
# 2 strips/phrase: 3 strips RE-PAINTED the same subject 3x ("dije shark una vez y aparece repetido" -> 3
# sharks), reading as incoherent repetition. 2 keeps a scene SUSTAINED (longer than 1) but with a subtler
# change per phrase; the overlap=192 carry + the fragment-continue above hold the single-image coherence.
strips_per_phrase = 2
from lightloom.paint.scroll import style_trigger # fused painterly-LoRA trigger ("" if LoRA off)
# The painter gets ONLY: the user's words (subject) + the Director's cinematography (cine: lighting +
# palette) + the fixed session style. NO previous-scene CONTENT is injected. A rolling "in the same
# continuous world, " clause was re-injecting the FIRST scene ("a neon synthwave city")
# into every prompt -> the painter kept repainting it regardless of the new words (the over-clone bug).
# Continuity is carried VISUALLY by Cast Lock (image_reference) + the overlap, not by re-stating content.
# NO cine in the per-strip prompt. VERIFIED root cause of the "world stays a forest" bug: the Director's
# phrase-#0 cinematography (e.g. lighting="misty green" + a forest hex palette) is SESSION-LOCKED and was
# appended to EVERY strip -> "now we are under the ocean, misty green, colour palette #forest..." forced a
# green forest onto the ocean. The Director still gives the scene-NEUTRAL art STYLE (style, from
# _director_session_style) which carries coherence; the SUBJECT (the user's words) now drives the scene so
# the world EVOLVES when they say something new. The Director's contribution is the curated art STYLE.
# QUALITY BOOSTERS: append tasteful FIDELITY words (technique only, NO scene nouns) so the 4-step distilled
# klein-4B punches above its weight and reads like a far larger model (the user's goal). Kept short so it
# raises detail/craft without over-baking or homogenising the scene.
paint_prompt = style_trigger() + subject + style + ", richly detailed, intricate, masterful, beautifully rendered"
# CAST LOCK (the innovation): feed the clean HERO anchor (section-0, downscaled to 256px to bound the
# added condition-token attention cost) into klein's in-context image_reference channel so the world
# keeps its identity/palette/architecture across the whole session — a VISUAL memory, not just the
# rolling text anchor. None on phrase #0 (that strip IS the anchor) and if the anchor failed to load.
# Env-gated: default OFF for the fast (~5.5s/phrase) production experience (the rolling text anchor
# above already gives thematic continuity). Flip LIGHTLOOM_CAST_LOCK=1 for the BFL demo, where the
# image_reference VISUAL identity-lock is the showcased innovation (costs ~+2.5s/phrase of attention).
cast_ref = None
if anchor is not None and os.getenv("LIGHTLOOM_CAST_LOCK", "0") == "1":
try:
cast_ref = [anchor.resize((256, max(1, round(256 * anchor.height / anchor.width))))] # 256px: the proven-good lock (192 drifted). Added attention cost is within ZeroGPU variance.
except Exception: # noqa: BLE001 - reference is best-effort; the plain outpaint still works.
cast_ref = None
# CLEAN per-strip outpaint: paint each strip as the scene description, carrying the
# previous strip as context. (No ctx=None re-anchor — it created a hard cut every N
# strips; long-session drift is fixed properly by the klein KV-cache reference anchor.)
try:
for k in range(strips_per_phrase):
# break the neon-ribbon corruption chain: if the carried strip overflowed into
# saturated colour, drop the context and repaint fresh (no endless propagation).
# Validated end-to-end: an injected AND a natural neon strip were each contained
# to one strip, the next strip came back clean.
# SATURATION guard: break the neon-ribbon corruption chain. SCOPE IT to the rightmost
# `overlap` px — that is the ONLY region outpaint_right actually carries as context, so a
# saturated SKY filling the strip's LEFT (which never reaches the next paint) must NOT trip a
# needless fresh repaint (the false-positive cut that read as the world resetting).
_ctx_edge = ctx.crop((ctx.width - overlap, 0, ctx.width, ctx.height)) if ctx is not None else None
# DROP the carried context if its right EDGE is corrupt (saturated) OR has COLLAPSED into a flat,
# near-structureless colour field (the green block / iridescent waves / dark smudge). Carrying such
# an edge propagates the bad scene forever; starting the next strip fresh from the (concrete) prompt
# breaks the chain. This RESTORES the long-session drift safety net the abstraction-heal used to give
# — but by DROPPING the bad context, NOT blending toward section-0 (which caused the grey-fog smear).
if _ctx_edge is not None and (_strip_looks_corrupt(_ctx_edge) or _strip_looks_abstract(_ctx_edge) > 0.5):
use_ctx = None
else:
use_ctx = ctx
# SMOOTH continuation: paint each strip as the current subject, carrying the previous strip
# as context, with NO morph band. The morph_band dissolve (an 8px->128px stretch of the
# carried edge) is what put a foggy blurry smear between every scene; the plain outpaint
# already lets a new spoken subject emerge from the carried scene fluidly, like before.
sec = outpaint_right(pipe, use_ctx, paint_prompt, seg_w=seg, height=h, overlap=overlap, steps=4, seed=200 + g * 3, image_reference=cast_ref)
# CATCH a bad FRESH strip before it is shown/carried: saturated corruption OR a collapse into a flat
# near-structureless colour field (the green block / iridescent waves / dark smudge the saturation
# guard misses). Reroll it (new seed) carrying the already-validated context, so a bad ROLL is fixed
# while continuity is kept; if use_ctx was dropped above, this is a clean fresh repaint instead.
if _strip_looks_corrupt(sec) or _strip_looks_abstract(sec) > 0.5:
sec = outpaint_right(pipe, use_ctx, paint_prompt, seg_w=seg, height=h, overlap=overlap, steps=4, seed=211 + g * 3, image_reference=cast_ref)
ctx = sec
sec.save(out_dir / f"section-{g}.webp", "WEBP", quality=92)
yield {
"stage": "section",
"index": g,
"image": f"/frames-scroll/{session}/section-{g}.webp",
# stamp EVERY strip of the phrase (not just k==0): the caption is shown by onFocus when
# the strip is under the view centre, so a blank on the 2nd strip made the words flicker
# off mid-clause. Same phrase across both strips -> the caption stays pinned.
"caption": phrase[:160],
"prompt": paint_prompt[:300], # DEBUG: the exact text sent to the painter (verify the mix)
"w": sec.width,
}
raw_d = estimate_depth(depth_pipe, sec, normalize=False) # RAW disparity (un-scaled)
dmap = _depth_global_norm(raw_d, state, out_dir) # ONE session-global scale, 1=near
Image.fromarray((dmap * 255).astype(np.uint8)).save(out_dir / f"section-{g}-depth.png")
yield {"stage": "depth", "index": g, "depth": f"/frames-scroll/{session}/section-{g}-depth.png"}
g += 1
except Exception as exc: # noqa: BLE001 - one bad strip must not end narration.
yield {"stage": "section_error", "index": g, "error": str(exc)[:160]}
# append the current subject to the rolling narration (keep ~last 3 phrases, cap 180 chars) so the next
# phrase continues THIS evolving world; the current words always dominate the prompt above.
roll = "; ".join([p for p in (roll.split("; ") + [subject]) if p.strip()][-3:])[:180]
_live_save_state(out_dir, prev_subject=subject, narration=roll)
# LIVE TITLE REMOVED (user decision): the text MiniCPM5-1B title (title_world_transformers) was unreliable
# — it echoed the spoken opening ("We Are", "Shark Under The Ocean") and never read like a real world
# name. The world is named instead by the MiniCPM-V-4.6 "Art Director" in the Director's Cut POST-PROCESS
# (docs/DIRECTORS_CUT_SPEC.md), which reads the FINISHED painted pixels — this keeps the Best-MiniCPM
# category without a flaky live title. NO 'title' event is emitted live, so #world-title stays empty.
yield {"stage": "phrase_done", "index": g}
@app.api(name="scroll_live", concurrency_id="gpu", concurrency_limit=1, stream_every=0.05)
def scroll_live_api(session: str = "", text: str = "", audio_b64: str = "", lang: str = "en") -> dict:
sess = (session or "").strip()
if not re.fullmatch(r"[a-f0-9]{6,32}", sess):
sess = uuid.uuid4().hex[:8]
try:
for event in _scroll_live_phrase(sess, (text or "").strip(), audio_b64 or "", lang or "en"):
event.setdefault("session", sess)
yield event
except Exception as exc: # noqa: BLE001 - quota guard.
message = str(exc).lower()
stage = "quota_exceeded" if any(t in message for t in ("gpu", "quota", "zerogpu", "exceeded")) else "error"
yield {"stage": stage, "error": str(exc)[:200], "session": sess}
@spaces.GPU(duration=170)
def _postprocess_world(session: str, lang: str = "en"):
"""Director's Cut POST-PROCESS (Tier-1). 100% SEPARATE from the live painter: never calls
_scroll_live_phrase, only READS the finished section-*.webp off disk, fail-isolated. Stitches the panorama,
lets the MiniCPM-V-4.6 "Art Director" name it from the PIXELS (env-gated, try/except center-fallback to the
live text title), then renders a gentle clamped depth-parallax pan-through to world.mp4 (served by the
existing /frames-scroll route). torchvision is required by the seer's processor (in requirements.txt)."""
import math
import time
import numpy as np
import torch
from PIL import Image
_t0 = time.monotonic() # soft budget vs the @spaces.GPU(170s) hard kill (see the render loop)
out_dir = _SCROLL_ROOT / session
strips = (
sorted(out_dir.glob("section-*.webp"), key=lambda p: int(re.match(r"section-(\d+)", p.stem).group(1)))
if out_dir.is_dir()
else []
)
if not strips:
yield {"stage": "postprocess_error", "error": "no_strips"}
return
# 1) STITCH the finished strips into ONE panorama. Read each strip fail-soft: a 0-byte/truncated
# WEBP (a live paint killed mid-save by a worker recycle / OOM / full disk) must NOT sink the whole
# keepsake — skip it, mirroring the live carry-over read guards. One bad strip != lost film.
imgs = []
for p in strips:
try:
imgs.append(Image.open(p).convert("RGB"))
except Exception: # noqa: BLE001 - drop one corrupt/partial strip, never lose the film
continue
if not imgs:
yield {"stage": "postprocess_error", "error": "no_readable_strips"}
return
h0 = max(i.height for i in imgs)
w0 = sum(i.width for i in imgs)
pano = Image.new("RGB", (w0, h0))
x = 0
for im in imgs:
pano.paste(im, (x, 0))
x += im.width
# A long session is a VERY wide image (e.g. 46 strips x 556px ~= 25k px) and WEBP hard-caps at
# 16383px (the bug that made long worlds fail with "Couldn't save") — persist the keepsake
# panorama as JPEG (caps at 65535px) and fail-soft: a save error must NEVER sink the film.
pano_url = ""
try:
pano.save(out_dir / "panorama.jpg", "JPEG", quality=90)
pano_url = f"/frames-scroll/{session}/panorama.jpg"
except Exception: # noqa: BLE001 - the full-panorama download is a bonus; the MP4 is the keepsake
pano_url = ""
yield {"stage": "stitched", "w": w0, "h": h0, "panorama": pano_url}
# 2) ART DIRECTOR (MiniCPM-V-4.6) — env-gated, try/except CENTER fallback so it can NEVER break the MP4
state = _live_state(out_dir)
seer = {
"title": (state.get("title") or "").strip(),
"caption": "",
"field_notes": [],
"focal_points": [{"x": 0.5, "y": 0.5, "label": ""}],
}
if os.getenv("LIGHTLOOM_WORLD_SEER", "1") == "1":
try:
from lightloom.paint.prompt import scrub_unsafe
from lightloom.vision.world_seer import load_world_seer, see_world
proc, vmodel = load_world_seer()
see_in = pano.copy()
see_in.thumbnail((1536, 1536)) # bound the VLM token count
seer = see_world(proc, vmodel, see_in, seed=len(strips), lang=lang)
seer["title"] = scrub_unsafe(seer.get("title") or "")[0]
seer["caption"] = scrub_unsafe(seer.get("caption") or "")[0]
except Exception as exc: # noqa: BLE001 - naming is optional; never block the keepsake MP4
yield {"stage": "seer_skipped", "error": str(exc)[:160]}
# The live text title was removed, so state has none and the seer's own _coerce fallback is bypassed
# whenever the seer is OFF or throws -> a nameless keepsake. Name it here so it is NEVER blank.
if not seer["title"].strip():
from lightloom.vision.world_seer import fallback_title
seer["title"] = fallback_title(len(strips))
yield {
"stage": "titled",
"title": seer["title"],
"caption": seer["caption"],
"field_notes": seer["field_notes"],
}
# 3) RAW float32 depth on the (downscaled) panorama -> one near/far scale (NOT the live 8-bit PNG)
from lightloom.depth.depth import estimate_depth, load_depth_pipeline
from lightloom.export.ldi import encode_frames_ffmpeg, load_caption_fonts, make_caption_overlay, normalize_disparity
depth_pipe = load_depth_pipeline()
render_h = 540
# A long world can be ~25k px wide; cap the working width so depth + warp stay bounded (GPU
# memory, DA-V2 input) and the pan never becomes a dizzying blur across a huge panorama.
max_render_w = 8192
render_w = max(2, int(round(w0 * render_h / h0)))
render_w = min(render_w, max_render_w)
render_w -= render_w % 2
render_pano = pano.resize((render_w, render_h))
raw_d = estimate_depth(depth_pipe, render_pano, normalize=False)
nd = normalize_disparity(np.asarray(raw_d, dtype=np.float32))
# 4) RENDER a CALM, SLOW, constant pan across the panorama -> a keepsake you can actually FOLLOW and read.
# The dizzying motion is gone ON PURPOSE: no focal whip-glides (dead-still then a 60px/frame rush), no
# oscillating parallax sway, no zoom pulse. Just a gentle constant drift. The real, NAVIGABLE 3D is the
# interactive WebGL view; this MP4 is a calm pan, and the full stitched panorama is a separate strip download.
img_full = torch.from_numpy(np.asarray(render_pano, dtype=np.float32).transpose(2, 0, 1)[None] / 255.0).to("cuda")
# Save render-sized, ALIGNED color + depth sidecars (<=8192 wide) so the browser can build a navigable
# depth-displaced 3D mesh CLIENT-side (no server GPU at view time). Fail-soft: a bonus, never blocks the MP4.
explore_color, explore_depth = "", ""
try:
render_pano.save(out_dir / "panorama-explore.jpg", "JPEG", quality=90)
Image.fromarray((np.clip(nd, 0, 1) * 255).astype(np.uint8)).save(out_dir / "panorama-depth.png")
explore_color = f"/frames-scroll/{session}/panorama-explore.jpg"
explore_depth = f"/frames-scroll/{session}/panorama-depth.png"
except Exception: # noqa: BLE001
explore_color = explore_depth = ""
win_w = min(render_w, int(round(render_h * 16 / 9)))
win_w -= win_w % 2
travel = max(0, render_w - win_w)
fps = 24
# Constant CALM pan speed (~130 px/s in render space); clamp ~10s..30s. A very wide world pans no faster
# than the 30s cap (it shows a slow slice; the whole world is the strip download), never a nauseating race.
n_frames = int(min(720, max(240, round(travel / 130 * fps))))
# MiniCPM-V's reading of YOUR world, burned into the film: pre-render the title + caption lower-third
# ONCE (content-identical every frame) and paste it per frame. Fail-soft -> the film renders text-free.
_ov = None
try:
_tf, _bf = load_caption_fonts(Path(__file__).resolve().parent / "assets" / "fonts", render_h)
_ov = make_caption_overlay(win_w, render_h, seer["title"], seer["caption"], _tf, _bf)
except Exception: # noqa: BLE001
_ov = None
frames_dir = out_dir / "frames"
frames_dir.mkdir(parents=True, exist_ok=True)
try:
for k in range(n_frames):
# SOFT time budget vs the @spaces.GPU(170s) hard kill: stop early (after a minimum) and encode
# the frames rendered so far -> a shorter but valid film instead of nothing.
if k >= 96 and time.monotonic() - _t0 > 150.0:
break
u = k / (n_frames - 1) if n_frames > 1 else 0.0
x0 = int(round(u * travel)) # constant-velocity drift -> predictable, never a surge
frame = img_full[0, :, :, x0 : x0 + win_w].clamp(0, 1).mul(255).byte().cpu().numpy().transpose(1, 2, 0)
frame_img = Image.fromarray(frame)
if _ov is not None:
frame_img.paste(_ov, (0, 0), _ov) # burn MiniCPM-V's title + caption (lower third)
frame_img.save(frames_dir / f"frame-{k:04d}.webp", "WEBP", quality=90)
if k % 48 == 0:
yield {"stage": "rendering", "frame": k, "total": n_frames}
yield {"stage": "encoding"}
encode_frames_ffmpeg(frames_dir, out_dir / "world.mp4", fps=fps, height=1080)
finally:
for f in frames_dir.glob("frame-*.webp"):
try:
f.unlink()
except OSError:
pass
yield {
"stage": "ready",
"video": f"/frames-scroll/{session}/world.mp4",
"title": seer["title"],
"caption": seer["caption"],
"panorama": pano_url,
"explore_color": explore_color,
"explore_depth": explore_depth,
"field_notes": seer.get("field_notes") or [],
"focal_points": seer.get("focal_points") or [],
}
@app.api(name="postprocess", concurrency_id="gpu", concurrency_limit=1, stream_every=0.05)
def postprocess_api(session: str = "", lang: str = "en") -> dict:
sess = (session or "").strip()
if not re.fullmatch(r"[a-f0-9]{6,32}", sess):
yield {"stage": "postprocess_error", "error": "bad_session"}
return
try:
for event in _postprocess_world(sess, lang or "en"):
event.setdefault("session", sess)
yield event
except Exception as exc: # noqa: BLE001 - quota guard; CANNOT affect the live painter (separate endpoint)
message = str(exc).lower()
# A ZeroGPU duration overrun raises "GPU task aborted" — do NOT mislabel that as account-quota
# exhaustion ("out of GPU for today"); it means this one render was too long, so tell the user to
# retry/shorten. Reserve quota_exceeded for the real schedule-time quota messages.
if "task aborted" in message or "timeout" in message or "duration" in message:
stage = "render_timeout"
elif "quota" in message or "exceeded your" in message:
stage = "quota_exceeded"
else:
stage = "postprocess_error"
yield {"stage": stage, "error": str(exc)[:200], "session": sess}
@spaces.GPU(duration=60)
def _ask_world_gpu(session: str, question: str, lang: str = "en") -> dict[str, Any]:
"""View-time interactive Q&A: MiniCPM-V-4.6 reads the FINISHED panorama and answers a free-form question
about it, grounded in the pixels. 100% INDEPENDENT of live image creation (post-process artifact). Fail-soft."""
from PIL import Image
out_dir = _SCROLL_ROOT / session
src = out_dir / "panorama-explore.jpg"
if not src.is_file():
src = out_dir / "panorama.jpg"
if not src.is_file():
return {"answer": ""}
try:
img = Image.open(src).convert("RGB")
img.thumbnail((1536, 1536)) # bound the VLM token count
from lightloom.paint.prompt import scrub_unsafe
from lightloom.vision.world_seer import ask_world, load_world_seer
proc, vmodel = load_world_seer()
ans = ask_world(proc, vmodel, img, question, lang=lang)
return {"answer": scrub_unsafe(ans or "")[0]}
except Exception as exc: # noqa: BLE001 - the keepsake/world is untouched if the Q&A fails
return {"answer": "", "error": str(exc)[:160]}
@app.api(name="ask", concurrency_id="gpu", concurrency_limit=1, stream_every=0.05)
def ask_api(session: str = "", question: str = "", lang: str = "en") -> dict:
""""Ask Your World" — a short MiniCPM-V-4.6 Q&A over the finished world (the OpenBMB lever). Shares the
single GPU pool with the live painter + postprocess, so it never overlaps a paint on ZeroGPU."""
sess = (session or "").strip()
if not re.fullmatch(r"[a-f0-9]{6,32}", sess):
yield {"answer": "", "error": "bad_session"}
return
q = (question or "").strip()
if not q or len(q) > 400:
yield {"answer": "", "error": "bad_question"}
return
try:
yield _ask_world_gpu(sess, q, lang or "en")
except Exception as exc: # noqa: BLE001 - quota guard; cannot affect the live painter
message = str(exc).lower()
stage = "quota_exceeded" if any(t in message for t in ("gpu", "quota", "zerogpu", "exceeded")) else "error"
yield {"answer": "", "error": str(exc)[:200], "stage": stage}
@spaces.GPU(duration=120)
def _introspect_klein_on_gpu() -> dict[str, Any]:
"""Report the klein pipeline's call signature + LoRA/embeds capabilities, so we can
build embedding-SLERP (prompt_embeds) and the style-LoRA fuse correctly (CLAUDE rule 4:
read the real API instead of guessing)."""
import inspect
from lightloom.paint.scroll import load_fast_pipeline
info: dict[str, Any] = {}
try:
pipe = load_fast_pipeline()
sig = inspect.signature(pipe.__call__)
params = list(sig.parameters.keys())
info.update(
ok=True,
pipe_class=type(pipe).__name__,
call_params=params,
accepts_prompt_embeds="prompt_embeds" in params,
accepts_mask_image="mask_image" in params,
has_encode_prompt=hasattr(pipe, "encode_prompt"),
has_load_lora_weights=hasattr(pipe, "load_lora_weights"),
has_fuse_lora=hasattr(pipe, "fuse_lora"),
components=list(getattr(pipe, "components", {}).keys()),
text_encoder=type(getattr(pipe, "text_encoder", None)).__name__,
)
# probe encode_prompt's exact signature + return shapes so SLERP is built right
info["encode_sig"] = list(inspect.signature(pipe.encode_prompt).parameters.keys())
try:
dev = getattr(pipe, "_execution_device", "cuda")
res = pipe.encode_prompt(prompt="an ancient glowing forest", device=dev, num_images_per_prompt=1)
seq = res if isinstance(res, (tuple, list)) else (res,)
info["encode_returns"] = [
(type(x).__name__ + (str(tuple(x.shape)) if hasattr(x, "shape") else "")) for x in seq
]
except Exception as ee: # noqa: BLE001
info["encode_test_err"] = f"{type(ee).__name__}: {str(ee)[:240]}"
except Exception as exc: # noqa: BLE001 - diagnostic must report, not crash.
import traceback
info.update(ok=False, error=f"{type(exc).__name__}: {exc!r}", trace=traceback.format_exc()[-800:])
return info
@app.post("/internal/introspect-klein")
def introspect_klein_endpoint() -> dict[str, Any]:
if os.getenv("LIGHTLOOM_ENABLE_INTERNAL_BENCH", "0") != "1":
return {"ok": False, "error": "internal endpoint disabled"}
return {"ok": True, "data": _introspect_klein_on_gpu()}
@app.post("/internal/bench/director-probe")
def director_probe_endpoint(backend: str = "transformers", threads: int = 8) -> dict[str, Any]:
if os.getenv("LIGHTLOOM_ENABLE_INTERNAL_BENCH", "0") != "1":
return {"ok": False, "error": "internal benchmark endpoint disabled"}
return {"ok": True, "data": _director_probe_on_gpu(backend, threads)}
@spaces.GPU(duration=120)
def _beat_e2e_on_gpu(beat_text: str) -> dict[str, Any]:
# Full single-beat pipeline: Director -> plan -> klein paint -> depth. Validates
# the whole GPU chain end to end (Recital path; no ASR needed). Bounded to one beat.
import time as _time
from lightloom.core.pipeline import plan_shot
from lightloom.depth.depth import estimate_depth, load_depth_pipeline
from lightloom.director.director import generate_shot_transformers, load_director_transformers
from lightloom.director.state import initial_state
from lightloom.paint.klein import load_klein_pipeline, paint
info: dict[str, Any] = {}
try:
t = _time.perf_counter()
tokenizer, model = load_director_transformers()
info["director_load_s"] = round(_time.perf_counter() - t, 2)
state = initial_state()
shot, meta = generate_shot_transformers(tokenizer, model, beat_text, state.model_dump())
info.update(director_s=round(meta["elapsed_s"], 2), attempts=meta.get("attempts"), decision=shot.decision)
plan = plan_shot(state, shot)
info["prompt"] = plan.prompt
t = _time.perf_counter()
klein = load_klein_pipeline()
info["klein_load_s"] = round(_time.perf_counter() - t, 2)
t = _time.perf_counter()
image = paint(klein, plan.prompt, plan.references, width=plan.width, height=plan.height, seed=plan.seed)
info["paint_s"] = round(_time.perf_counter() - t, 2)
info["image_size"] = list(image.size)
t = _time.perf_counter()
depth_pipeline = load_depth_pipeline()
depth_map = estimate_depth(depth_pipeline, image)
info["depth_s"] = round(_time.perf_counter() - t, 2)
info["depth_shape"] = list(depth_map.shape)
out_dir = Path("/tmp/lightloom")
out_dir.mkdir(parents=True, exist_ok=True)
image.save(out_dir / "beat0.webp", "WEBP", quality=90)
info["ok"] = True
except Exception as exc: # noqa: BLE001 - diagnostic must report, not crash.
import traceback
info.update(ok=False, error=f"{type(exc).__name__}: {exc!r}", trace=traceback.format_exc()[-1200:])
return info
@app.post("/internal/bench/beat-e2e")
def beat_e2e_endpoint(text: str = "Far away, a market woke under orange awnings and wet stone.") -> dict[str, Any]:
if os.getenv("LIGHTLOOM_ENABLE_INTERNAL_BENCH", "0") != "1":
return {"ok": False, "error": "internal benchmark endpoint disabled"}
return {"ok": True, "data": _beat_e2e_on_gpu(text)}
_GOLDEN_RECITAL = (
"An old lighthouse keeper counted the waves at the edge of the world.\n\n"
"Every night he climbed the spiral stairs, counting each step like a prayer.\n\n"
"Far away, a market woke under orange awnings and wet stone.\n\n"
"Years later, the same road lay buried under snow and silence."
)
@spaces.GPU(duration=180)
def _run_recital_on_gpu(text: str) -> dict[str, Any]:
# First real product E2E: a poem -> a Film. Models load once, then beats stream
# through Director -> plan -> klein (KV continuity via prev frame) -> depth, with
# SceneState threaded across beats. Returns a Film summary; frames saved to /tmp.
import time as _time
import numpy as np
from PIL import Image
from lightloom.core.pipeline import plan_shot
from lightloom.core.recital import split_recital
from lightloom.director.director import generate_shot_transformers, load_director_transformers
from lightloom.director.state import initial_state
from lightloom.depth.depth import estimate_depth, load_depth_pipeline
from lightloom.paint.klein import load_klein_pipeline, paint
try:
beat_texts = split_recital(text, max_beats=8)
tokenizer, model = load_director_transformers()
klein = load_klein_pipeline()
depth_pipeline = load_depth_pipeline()
out_dir = Path("/tmp/lightloom/recital")
out_dir.mkdir(parents=True, exist_ok=True)
state = initial_state()
prev_frame: str | None = None
frames: list[dict[str, Any]] = []
gen_times: list[int] = []
fogged = 0
for i, beat_text in enumerate(beat_texts):
t0 = _time.perf_counter()
try:
shot, _meta = generate_shot_transformers(
tokenizer, model, beat_text, state.model_dump(), beat_id=i + 1
)
plan = plan_shot(state, shot, prev_frame=prev_frame)
image = paint(
klein, plan.prompt, plan.references, width=plan.width, height=plan.height, seed=plan.seed
)
image_path = out_dir / f"beat{i}.webp"
image.save(image_path, "WEBP", quality=90)
depth_map = estimate_depth(depth_pipeline, image)
Image.fromarray((depth_map * 255).astype(np.uint8)).save(out_dir / f"beat{i}_depth.png")
gen_ms = int((_time.perf_counter() - t0) * 1000)
gen_times.append(gen_ms)
frames.append(
{
"beat_id": i + 1,
"text": beat_text[:60],
"decision": shot.decision,
"shot_scale": shot.shot_scale,
"camera_move": shot.camera_move,
"transition": shot.transition,
"refs_used": len(plan.references),
"seed": plan.seed,
"gen_ms": gen_ms,
"image": str(image_path),
}
)
state = plan.new_state
prev_frame = str(image_path)
except Exception as beat_exc: # noqa: BLE001 - one fogged beat must not end the film.
fogged += 1
frames.append({"beat_id": i + 1, "text": beat_text[:60], "fogged": f"{type(beat_exc).__name__}"})
cuts = sum(1 for f in frames if f.get("decision") == "cut")
rendered = len(gen_times)
return {
"ok": True,
"n_beats": len(beat_texts),
"rendered": rendered,
"fogged": fogged,
"cuts": cuts,
"continuity": rendered - cuts,
"median_gen_ms": int(sorted(gen_times)[len(gen_times) // 2]) if gen_times else 0,
"frames": frames,
}
except Exception as exc: # noqa: BLE001 - diagnostic must report, not crash.
import traceback
return {"ok": False, "error": f"{type(exc).__name__}: {exc!r}", "trace": traceback.format_exc()[-1200:]}
@app.post("/internal/bench/recital")
def recital_bench_endpoint(text: str = _GOLDEN_RECITAL) -> dict[str, Any]:
if os.getenv("LIGHTLOOM_ENABLE_INTERNAL_BENCH", "0") != "1":
return {"ok": False, "error": "internal benchmark endpoint disabled"}
return {"ok": True, "data": _run_recital_on_gpu(text)}
# --- Real streaming product endpoint (the contract the frontend speaks to) ---
SESSIONS_DIR = Path("/tmp/lightloom/sessions")
@spaces.GPU(duration=200)
def _recital_stream(text: str, lang: str, session: str):
# @spaces.GPU supports generator functions (yield streams to the caller). Each
# yield is one stage event the frontend renders. Models load once, then beats
# stream Director -> paint (KV continuity) -> depth.
import numpy as np
from PIL import Image
from lightloom.core.pipeline import plan_shot
from lightloom.core.recital import split_recital
from lightloom.depth.depth import estimate_depth, load_depth_pipeline
from lightloom.director.director import generate_shot_transformers, load_director_transformers
from lightloom.director.state import initial_state
from lightloom.paint.klein import load_klein_pipeline, paint
out_dir = SESSIONS_DIR / session
out_dir.mkdir(parents=True, exist_ok=True)
beats = split_recital(text, max_beats=8)
yield {"stage": "warming", "n_beats": len(beats)}
tokenizer, model = load_director_transformers()
klein = load_klein_pipeline()
depth_pipeline = load_depth_pipeline()
yield {"stage": "ready", "n_beats": len(beats)}
state = initial_state()
prev_frame: str | None = None
for i, beat_text in enumerate(beats):
try:
yield {"stage": "directing", "beat": i, "text": beat_text}
shot, _meta = generate_shot_transformers(tokenizer, model, beat_text, state.model_dump(), beat_id=i + 1)
plan = plan_shot(state, shot, prev_frame=prev_frame)
yield {
"stage": "directed",
"beat": i,
"shot": shot.model_dump(),
"transition": plan.transition, # refined (continuity hard_cut -> crossfade)
"scene_id": plan.new_state.scene_id,
"refs": len(plan.references),
}
yield {"stage": "painting", "beat": i}
image = paint(klein, plan.prompt, plan.references, width=plan.width, height=plan.height, seed=plan.seed)
image.save(out_dir / f"beat{i}.webp", "WEBP", quality=90)
yield {"stage": "painted", "beat": i, "image": f"/frames/{session}/beat{i}.webp"}
depth_map = estimate_depth(depth_pipeline, image)
Image.fromarray((depth_map * 255).astype(np.uint8)).save(out_dir / f"beat{i}_depth.png")
yield {"stage": "depth", "beat": i, "depth": f"/frames/{session}/beat{i}_depth.png"}
state = plan.new_state
prev_frame = str(out_dir / f"beat{i}.webp")
except Exception as exc: # noqa: BLE001 - one fogged beat must not end the film.
yield {"stage": "fogged", "beat": i, "error": type(exc).__name__}
yield {"stage": "done", "session": session}
@app.post("/api/recital")
async def recital_endpoint(request: Request) -> StreamingResponse:
body = await request.json()
text = (body.get("text") or _GOLDEN_RECITAL).strip()
lang = body.get("lang", "en")
session = uuid.uuid4().hex[:8]
def event_stream():
try:
for event in _recital_stream(text, lang, session):
yield f"data: {json.dumps(event)}\n\n"
except Exception as exc: # noqa: BLE001
yield f"data: {json.dumps({'stage': 'error', 'error': str(exc)[:200]})}\n\n"
return StreamingResponse(event_stream(), media_type="text/event-stream")
@app.get("/frames/{session}/{name}")
def frame_endpoint(session: str, name: str):
safe_session = Path(session).name
safe_name = Path(name).name
path = SESSIONS_DIR / safe_session / safe_name
if not path.is_file():
return JSONResponse({"error": "not found"}, status_code=404)
return FileResponse(path)
# The frontend MUST reach the GPU pipeline through @gradio/client (Client.submit),
# not raw fetch, or ZeroGPU cannot attribute quota. @app.api streams each yield of
# the @spaces.GPU generator as an SSE event; pin all GPU endpoints to one
# concurrency pool so they never run two GPU jobs at once.
@app.api(name="recital", concurrency_id="gpu", concurrency_limit=1, stream_every=0.05)
def recital_api(text: str = _GOLDEN_RECITAL, lang: str = "en") -> dict:
# The `-> dict` return hint is how Gradio's api() derives one streamed JSON
# output (get_return_types reads the annotation); each yield is one SSE event.
session = uuid.uuid4().hex[:8]
try:
for event in _recital_stream((text or _GOLDEN_RECITAL).strip(), lang, session):
yield event
except Exception as exc: # noqa: BLE001 - quota guard: degrade to Showcase, never a stack trace on stage.
message = str(exc).lower()
if any(token in message for token in ("gpu", "quota", "zerogpu", "exceeded")):
yield {"stage": "quota_exceeded", "detail": str(exc)[:200]}
else:
yield {"stage": "error", "error": str(exc)[:200]}
# --- ZeroGPU cold-start: load the runtime models at MODULE SCOPE so the ZeroGPU backend
# disk-packs them at Space startup and streams them into VRAM per fork (fast) instead of
# paying a full from_pretrained inside the first @spaces.GPU call (~30-40s). Best-effort +
# env-gated: any failure (or LIGHTLOOM_PRELOAD=0) silently falls back to lazy in-fork loading,
# so the Space always boots. ---
if LIGHTLOOM_PROFILE == "space" and os.getenv("LIGHTLOOM_PRELOAD", "1") == "1":
try:
from lightloom.audio_in.asr import load_asr as _pl_asr
from lightloom.depth.depth import load_depth_pipeline as _pl_depth
from lightloom.director.director import load_director_transformers as _pl_dir
from lightloom.paint.scroll import load_fast_pipeline as _pl_paint
_pl_paint(); _pl_dir(); _pl_depth(); _pl_asr()
print("[lightloom] module-scope model preload complete")
except Exception as _exc: # noqa: BLE001 - lazy in-fork loading still works.
print(f"[lightloom] module-scope preload skipped: {type(_exc).__name__}: {_exc}")
if __name__ == "__main__":
app.launch(
allowed_paths=[str(SESSIONS_DIR), str(ROOT / "frontend"), str(ROOT / "assets")],
pwa=True,
strict_cors=True,
)