small-cuts / src /small_cuts /engine /library.py
macayaven's picture
Upload folder using huggingface_hub
24e5b39 verified
Raw
History Blame Contribute Delete
15 kB
"""Engine-side scene library: filesystem media + sqlite index (D6).
The real `SceneSink`. Every successful narration is persisted — frame JPEG,
title card, voice WAV, one sqlite row — and fanned out to live SSE
subscribers (D7). Blocking writes run in a worker thread via
`asyncio.to_thread`; the publish happens back on the event loop. Stored
entries and live events share the same NarratedScene shape, per
docs/contracts/narrated-scene.schema.json.
"""
from __future__ import annotations
import asyncio
import contextlib
import json
import os
import sqlite3
import sys
import threading
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
from PIL import Image
from small_cuts import narrator, tts
from small_cuts.frames import pick_key_frame
from small_cuts.title_card import TITLE_MAX_LEN, derive_title, render_title_card
from .session import CONTRACT_VERSION, _wav_bytes
DEFAULT_ROOT = "~/.small-cuts/library"
OWNER = "carlos" # v1 engines are single-user; the field is reserved for multi-user
VISIBILITIES = ("private", "shared", "public")
MEDIA_FILES = ("frame.jpg", "card.webp", "voice.wav", "clip.mp4")
STORAGE_TIMEOUT_S = 30.0
SUBSCRIBER_QUEUE_MAX = 256
CLIP_MP4_FPS = 12
CLIP_BLEND_STEPS = 1
H264_MIN_DIMENSION = 2
POSTER_JPEG_QUALITY = 90
RGB_MODE = "RGB"
VIDEO_PIXEL_FORMAT = "yuv420p"
PRIMARY_VIDEO_CODEC = "libx264"
FALLBACK_VIDEO_CODEC = "h264"
_SCHEMA = """\
CREATE TABLE IF NOT EXISTS scenes (
scene_id TEXT PRIMARY KEY,
seq INTEGER NOT NULL UNIQUE,
moment_id TEXT NOT NULL,
session_id TEXT NOT NULL,
captured_at TEXT NOT NULL,
created_at TEXT NOT NULL,
style_key TEXT NOT NULL,
title TEXT NOT NULL,
narration TEXT NOT NULL,
visibility TEXT NOT NULL DEFAULT 'private',
owner TEXT NOT NULL,
engine TEXT NOT NULL
)"""
_INSERT = """\
INSERT INTO scenes (scene_id, seq, moment_id, session_id, captured_at, created_at,
style_key, title, narration, visibility, owner, engine)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"""
class SceneLibrary:
"""Scene store + in-process pub/sub. The instance itself is the async SceneSink.
Layout: `<root>/library.sqlite3` + `<root>/media/<scene_id>/{frame.jpg,
card.webp, voice.wav}`. One sqlite connection, guarded by a lock: the
sink writes from worker threads, queries come from request handlers.
"""
def __init__(self, root: str | Path | None = None) -> None:
base = root or os.environ.get("SMALL_CUTS_LIBRARY_DIR") or DEFAULT_ROOT
self.root = Path(base).expanduser().resolve()
self.media_dir = self.root / "media"
self.media_dir.mkdir(parents=True, exist_ok=True)
self._lock = threading.Lock() # guards the connection and seq allocation
self._db = sqlite3.connect(self.root / "library.sqlite3", check_same_thread=False)
self._db.row_factory = sqlite3.Row
with self._lock, self._db:
# WAL + busy_timeout: viewer reads don't block sink writes, and a
# briefly locked database waits instead of raising immediately.
self._db.execute("PRAGMA journal_mode=WAL")
self._db.execute("PRAGMA busy_timeout=5000")
self._db.execute(_SCHEMA)
self._subscribers: list[asyncio.Queue[dict[str, Any]]] = []
# -- the sink -----------------------------------------------------------------
async def __call__(self, scene: dict[str, Any]) -> None:
"""SceneSink entry point: persist off the event loop, then publish.
A failed store (disk full, sqlite error) must not be silent data loss:
the mobile client already received its SceneAudio, so log to stderr and
fan an error ControlFrame to the viewer stream — the timeline stays
honest. `_hand_to_sink`'s suppression remains the last-resort backstop.
"""
try:
narrated = await asyncio.wait_for(
asyncio.to_thread(self.store, scene), timeout=STORAGE_TIMEOUT_S
)
except (Exception, asyncio.TimeoutError) as exc:
print(
f"small_cuts.engine: library write failed for scene {scene['scene_id']}: {exc!r}",
file=sys.stderr,
)
self.publish_event(
{
"contract_version": CONTRACT_VERSION,
"kind": "error",
"moment_id": scene["moment_id"],
"error": {
"stage": "storage",
"code": "library_write_failed",
"message": str(exc)[:300],
"retryable": False,
},
}
)
return
self.publish_event(narrated)
def publish_event(self, payload: dict[str, Any]) -> None:
"""Fan any event (stored scene or ControlFrame error) to live subscribers.
Events without a seq (errors) are EPHEMERAL: not persisted, not in Last-Event-ID replay.
"""
for queue in list(self._subscribers):
try:
queue.put_nowait(payload)
except asyncio.QueueFull:
self.unsubscribe(queue)
def store(self, scene: dict[str, Any]) -> dict[str, Any]:
"""Persist media + index row (blocking); returns the stored NarratedScene."""
scene_id: str = scene["scene_id"]
narration: str = scene["narration"]
style_key: str = scene["style_key"]
title = _stored_title(scene.get("title"), narration)
scene_dir = self.media_dir / scene_id
scene_dir.mkdir(parents=True, exist_ok=True)
clip_frames = scene.get("clip_frames") or []
poster = pick_key_frame(clip_frames) if clip_frames else scene["image"]
poster.convert(RGB_MODE).save(scene_dir / "frame.jpg", "JPEG", quality=POSTER_JPEG_QUALITY)
if len(clip_frames) >= 2:
try:
_write_clip_mp4(scene_dir / "clip.mp4", clip_frames)
except Exception as exc:
print(
f"small_cuts.engine: clip write failed for scene {scene_id}: {exc!r}",
file=sys.stderr,
)
render_title_card(title, style_key).save(scene_dir / "card.webp", "WEBP")
(scene_dir / "voice.wav").write_bytes(_wav_bytes(scene["audio"], scene["sample_rate"]))
narrator_backend = narrator.get_backend()
tts_backend = tts.get_tts_backend()
engine = {
"narrator_model": narrator_backend.model_id,
"narrator_backend": narrator_backend.name,
"tts_model": tts_backend.model_id,
"latency_ms": scene["latency_ms"],
}
with self._lock, self._db:
# max+1 under the lock: monotonic across the process AND across restarts.
seq = self._db.execute("SELECT COALESCE(MAX(seq), -1) + 1 FROM scenes").fetchone()[0]
self._db.execute(
_INSERT,
(
scene_id,
seq,
scene["moment_id"],
scene["session_id"],
_normalize_datetime(scene["captured_at"]),
_normalize_datetime(scene["created_at"]),
style_key,
title,
narration,
"private",
_owner(),
json.dumps(engine),
),
)
stored = self.get(scene_id)
if stored is None:
raise RuntimeError(f"scene {scene_id} missing immediately after insert")
return stored
# -- queries ---------------------------------------------------------------------
def to_narrated_scene(self, row: sqlite3.Row) -> dict[str, Any]:
"""Contract-valid NarratedScene (1.1.0) for one stored row."""
scene_id = row["scene_id"]
media = {
"frame_url": f"/media/{scene_id}/frame.jpg",
"card_url": f"/media/{scene_id}/card.webp",
"audio_url": f"/media/{scene_id}/voice.wav",
}
if (self.media_dir / scene_id / "clip.mp4").is_file():
media["clip_url"] = f"/media/{scene_id}/clip.mp4"
return {
"contract_version": CONTRACT_VERSION,
"scene_id": scene_id,
"moment_id": row["moment_id"],
"session_id": row["session_id"],
"captured_at": row["captured_at"],
"created_at": row["created_at"],
"style_key": row["style_key"],
"title": row["title"],
"narration": row["narration"],
"visibility": row["visibility"],
"seq": row["seq"],
"owner": row["owner"],
"media": media,
"engine": json.loads(row["engine"]),
}
def list_scenes(
self,
session_id: str | None = None,
visibility: str | None = None,
limit: int = 100,
) -> list[dict[str, Any]]:
"""Newest bounded window, returned in scene chronology for the viewer."""
clauses, params = [], []
if session_id is not None:
clauses.append("session_id = ?")
params.append(session_id)
if visibility is not None:
clauses.append("visibility = ?")
params.append(visibility)
where = f" WHERE {' AND '.join(clauses)}" if clauses else ""
query = (
"SELECT * FROM ("
f"SELECT * FROM scenes{where} ORDER BY seq DESC LIMIT ?"
") ORDER BY captured_at, seq"
)
with self._lock:
rows = self._db.execute(query, (*params, limit)).fetchall()
return [self.to_narrated_scene(row) for row in rows]
def get(self, scene_id: str) -> dict[str, Any] | None:
with self._lock:
row = self._db.execute(
"SELECT * FROM scenes WHERE scene_id = ?", (scene_id,)
).fetchone()
return self.to_narrated_scene(row) if row is not None else None
def set_visibility(self, scene_id: str, visibility: str) -> dict[str, Any] | None:
"""The viewer's only write (D7). Returns the updated scene, or None if unknown."""
if visibility not in VISIBILITIES:
raise ValueError(f"Unknown visibility {visibility!r}; expected one of {VISIBILITIES}")
with self._lock, self._db:
updated = self._db.execute(
"UPDATE scenes SET visibility = ? WHERE scene_id = ?", (visibility, scene_id)
).rowcount
return self.get(scene_id) if updated else None
def scenes_since(self, seq: int) -> list[dict[str, Any]]:
"""Scenes with seq > `seq`, ordered by seq — the SSE Last-Event-ID replay."""
with self._lock:
rows = self._db.execute(
"SELECT * FROM scenes WHERE seq > ? ORDER BY seq", (seq,)
).fetchall()
return [self.to_narrated_scene(row) for row in rows]
def media_path(self, scene_id: str, filename: str) -> Path | None:
"""Resolve a media file, or None: unknown name, traversal, or missing file."""
if filename not in MEDIA_FILES:
return None
path = (self.media_dir / scene_id / filename).resolve()
if not path.is_relative_to(self.media_dir): # traversal via scene_id
return None
return path if path.is_file() else None
# -- pub/sub ----------------------------------------------------------------------
def subscribe(self) -> asyncio.Queue[dict[str, Any]]:
"""New-scene feed for one SSE connection; pair with `unsubscribe`."""
queue: asyncio.Queue[dict[str, Any]] = asyncio.Queue(maxsize=SUBSCRIBER_QUEUE_MAX)
self._subscribers.append(queue)
return queue
def unsubscribe(self, queue: asyncio.Queue[dict[str, Any]]) -> None:
with contextlib.suppress(ValueError):
self._subscribers.remove(queue)
def close(self) -> None:
with self._lock:
self._db.close()
def _write_clip_mp4(
path: Path,
frames: list[Image.Image],
fps: int = CLIP_MP4_FPS,
blend_steps: int = CLIP_BLEND_STEPS,
) -> None:
"""Render a small browser-playable MP4 from sampled POV frames."""
import av
rgb_frames = [frame.convert(RGB_MODE) for frame in frames]
width, height = rgb_frames[0].size
# H.264/yuv420p expects even dimensions. Preserve portrait aspect and only
# shave one pixel if needed; capture frames are already downscaled upstream.
width = max(H264_MIN_DIMENSION, width - (width % 2))
height = max(H264_MIN_DIMENSION, height - (height % 2))
encode_frames = _smooth_clip_frames(rgb_frames, blend_steps=blend_steps, size=(width, height))
container = av.open(str(path), "w")
try:
try:
stream = container.add_stream(PRIMARY_VIDEO_CODEC, rate=fps)
except Exception:
stream = container.add_stream(FALLBACK_VIDEO_CODEC, rate=fps)
stream.width = width
stream.height = height
stream.pix_fmt = VIDEO_PIXEL_FORMAT
for image in encode_frames:
frame = av.VideoFrame.from_image(image)
for packet in stream.encode(frame):
container.mux(packet)
for packet in stream.encode():
container.mux(packet)
finally:
container.close()
def _smooth_clip_frames(
frames: list[Image.Image],
blend_steps: int = CLIP_BLEND_STEPS,
size: tuple[int, int] | None = None,
) -> list[Image.Image]:
"""Insert tiny cross-dissolve frames so sampled POV clips do not hard-cut."""
if not frames:
return []
prepared = []
for image in frames:
image = image.convert(RGB_MODE)
if size is not None and image.size != size:
image = image.resize(size, Image.Resampling.LANCZOS)
prepared.append(image)
if blend_steps <= 0 or len(prepared) < 2:
return prepared
smoothed = [prepared[0]]
for previous, current in zip(prepared, prepared[1:], strict=False):
for step in range(1, blend_steps + 1):
alpha = step / (blend_steps + 1)
smoothed.append(Image.blend(previous, current, alpha))
smoothed.append(current)
return smoothed
def _stored_title(raw_title: object, narration: str) -> str:
if isinstance(raw_title, str) and raw_title.strip():
return derive_title(raw_title, max_len=TITLE_MAX_LEN)
return derive_title(narration, max_len=TITLE_MAX_LEN)
def _owner() -> str:
return os.environ.get("SMALL_CUTS_ENGINE_OWNER", OWNER)
def _normalize_datetime(value: str) -> str:
raw = value.replace("Z", "+00:00")
parsed = datetime.fromisoformat(raw)
if parsed.tzinfo is None:
raise ValueError(f"timestamp must include timezone: {value}")
return parsed.astimezone(timezone.utc).isoformat()