DocuMaker / src /frames.py
vivekchakraverty's picture
Auto-extract: generate the LLM step outline first, anchor frames to step times
0e7c368
Raw
History Blame Contribute Delete
6.24 kB
"""Frame extraction: automatic (scene detection) and manual (browser capture).
A :class:`FrameRecord` is the common unit passed around the app and into the
guide builder. Automatic frames come from PySceneDetect scene midpoints (with a
uniform-sampling fallback for single-scene videos) and are de-duplicated with a
perceptual hash. Manual frames arrive as base64 data URLs from the browser.
"""
from __future__ import annotations
import base64
import binascii
from dataclasses import dataclass
from pathlib import Path
import imagehash
from PIL import Image
from . import config, video
@dataclass
class FrameRecord:
"""One extracted frame plus where it came from in the video."""
path: str
timestamp: float
source: str = "auto" # "auto" | "manual"
caption: str | None = None
@property
def label(self) -> str:
mm, ss = divmod(int(self.timestamp), 60)
return f"{self.source} @ {mm:02d}:{ss:02d}"
def detect_scenes(video_path: str | Path) -> list[tuple[float, float]]:
"""Return a list of (start_sec, end_sec) scene spans via PySceneDetect."""
try:
from scenedetect import ContentDetector, detect
scenes = detect(str(video_path), ContentDetector(threshold=config.SCENE_THRESHOLD))
except Exception:
# Detection is best-effort; callers fall back to uniform sampling.
return []
return [(start.get_seconds(), end.get_seconds()) for start, end in scenes]
def _scene_timestamps(
video_path: str | Path,
max_frames: int,
spoken_intervals: list[tuple[float, float]] | None = None,
) -> list[float]:
scenes = detect_scenes(video_path)
if scenes:
timestamps = [(start + end) / 2.0 for start, end in scenes]
else:
# Single static scene (or detection failed): sample uniformly.
duration = video.get_duration(video_path) or 60.0
count = min(max_frames, max(3, int(duration // 5)))
step = duration / (count + 1)
timestamps = [step * (i + 1) for i in range(count)]
# Anchor to the narration: keep only frames within the spoken time range.
# This drops screen-recorder intro/outro and idle screens (no speech there),
# which is the main source of irrelevant auto-frames.
if spoken_intervals:
lo = min(s for s, _ in spoken_intervals)
hi = max(e for _, e in spoken_intervals)
pad = 1.5
gated = [t for t in timestamps if lo - pad <= t <= hi + pad]
# If gating removed everything, fall back to the segment start times.
timestamps = gated or [s for s, _ in spoken_intervals]
timestamps = sorted(timestamps)
# Keep an evenly spaced subset if we overshoot the cap.
if len(timestamps) > max_frames:
last = len(timestamps) - 1
picks = sorted({round(i * last / (max_frames - 1)) for i in range(max_frames)})
timestamps = [timestamps[i] for i in picks]
return timestamps
def _dedup_close(timestamps: list[float], min_gap: float = 1.5) -> list[float]:
"""Collapse timestamps that are closer than ``min_gap`` seconds."""
out: list[float] = []
for t in sorted(timestamps):
if not out or t - out[-1] >= min_gap:
out.append(t)
return out
def extract_auto_frames(
video_path: str | Path,
session_dir: str | Path,
max_frames: int = 40,
spoken_intervals: list[tuple[float, float]] | None = None,
step_timestamps: list[float] | None = None,
) -> list[FrameRecord]:
"""Extract representative frames, then dedup.
Priority of anchors:
1. ``step_timestamps`` (from the LLM step draft) — extract at the exact moments
the guide refers to, so the pool matches the steps.
2. ``spoken_intervals`` (from the transcript) — scene frames gated to the
narrated time range, dropping recorder intro/idle screens.
3. Otherwise — scene midpoints (or uniform sampling).
"""
frames_dir = Path(session_dir) / "frames"
frames_dir.mkdir(parents=True, exist_ok=True)
if step_timestamps:
timestamps = _dedup_close([t for t in step_timestamps if t is not None and t >= 0])
if len(timestamps) > max_frames:
last = len(timestamps) - 1
picks = sorted({round(i * last / (max_frames - 1)) for i in range(max_frames)})
timestamps = [timestamps[i] for i in picks]
else:
timestamps = _scene_timestamps(video_path, max_frames, spoken_intervals)
records: list[FrameRecord] = []
for i, ts in enumerate(timestamps):
out = frames_dir / f"auto_{i:03d}_{int(ts * 1000):08d}.png"
try:
video.extract_frame(video_path, ts, out)
except Exception:
continue
records.append(FrameRecord(path=str(out), timestamp=ts, source="auto"))
return dedup_records(records)
def dedup_records(
records: list[FrameRecord], distance: int | None = None
) -> list[FrameRecord]:
"""Drop near-identical frames using perceptual hashing (pHash)."""
distance = config.DEDUP_HASH_DISTANCE if distance is None else distance
kept: list[FrameRecord] = []
hashes: list[imagehash.ImageHash] = []
for rec in records:
try:
with Image.open(rec.path) as im:
phash = imagehash.phash(im)
except Exception:
continue
if any((phash - kept_hash) <= distance for kept_hash in hashes):
Path(rec.path).unlink(missing_ok=True) # remove the redundant file
continue
hashes.append(phash)
kept.append(rec)
return kept
def save_manual_frame(
data_url: str, timestamp: float, session_dir: str | Path
) -> FrameRecord | None:
"""Decode a base64 image data URL captured in the browser and save it."""
if not data_url:
return None
frames_dir = Path(session_dir) / "frames"
frames_dir.mkdir(parents=True, exist_ok=True)
b64 = data_url.split(",", 1)[1] if "," in data_url else data_url
try:
raw = base64.b64decode(b64)
except (binascii.Error, ValueError):
return None
ts = float(timestamp or 0.0)
out = frames_dir / f"manual_{int(ts * 1000):08d}.png"
out.write_bytes(raw)
return FrameRecord(path=str(out), timestamp=ts, source="manual")