Spaces:
Sleeping
Sleeping
| """Frame extraction: automatic (scene detection) and manual (browser capture). | |
| A :class:`FrameRecord` is the common unit passed around the app and into the | |
| guide builder. Automatic frames come from PySceneDetect scene midpoints (with a | |
| uniform-sampling fallback for single-scene videos) and are de-duplicated with a | |
| perceptual hash. Manual frames arrive as base64 data URLs from the browser. | |
| """ | |
| from __future__ import annotations | |
| import base64 | |
| import binascii | |
| from dataclasses import dataclass | |
| from pathlib import Path | |
| import imagehash | |
| from PIL import Image | |
| from . import config, video | |
| class FrameRecord: | |
| """One extracted frame plus where it came from in the video.""" | |
| path: str | |
| timestamp: float | |
| source: str = "auto" # "auto" | "manual" | |
| caption: str | None = None | |
| def label(self) -> str: | |
| mm, ss = divmod(int(self.timestamp), 60) | |
| return f"{self.source} @ {mm:02d}:{ss:02d}" | |
| def detect_scenes(video_path: str | Path) -> list[tuple[float, float]]: | |
| """Return a list of (start_sec, end_sec) scene spans via PySceneDetect.""" | |
| try: | |
| from scenedetect import ContentDetector, detect | |
| scenes = detect(str(video_path), ContentDetector(threshold=config.SCENE_THRESHOLD)) | |
| except Exception: | |
| # Detection is best-effort; callers fall back to uniform sampling. | |
| return [] | |
| return [(start.get_seconds(), end.get_seconds()) for start, end in scenes] | |
| def _scene_timestamps( | |
| video_path: str | Path, | |
| max_frames: int, | |
| spoken_intervals: list[tuple[float, float]] | None = None, | |
| ) -> list[float]: | |
| scenes = detect_scenes(video_path) | |
| if scenes: | |
| timestamps = [(start + end) / 2.0 for start, end in scenes] | |
| else: | |
| # Single static scene (or detection failed): sample uniformly. | |
| duration = video.get_duration(video_path) or 60.0 | |
| count = min(max_frames, max(3, int(duration // 5))) | |
| step = duration / (count + 1) | |
| timestamps = [step * (i + 1) for i in range(count)] | |
| # Anchor to the narration: keep only frames within the spoken time range. | |
| # This drops screen-recorder intro/outro and idle screens (no speech there), | |
| # which is the main source of irrelevant auto-frames. | |
| if spoken_intervals: | |
| lo = min(s for s, _ in spoken_intervals) | |
| hi = max(e for _, e in spoken_intervals) | |
| pad = 1.5 | |
| gated = [t for t in timestamps if lo - pad <= t <= hi + pad] | |
| # If gating removed everything, fall back to the segment start times. | |
| timestamps = gated or [s for s, _ in spoken_intervals] | |
| timestamps = sorted(timestamps) | |
| # Keep an evenly spaced subset if we overshoot the cap. | |
| if len(timestamps) > max_frames: | |
| last = len(timestamps) - 1 | |
| picks = sorted({round(i * last / (max_frames - 1)) for i in range(max_frames)}) | |
| timestamps = [timestamps[i] for i in picks] | |
| return timestamps | |
| def _dedup_close(timestamps: list[float], min_gap: float = 1.5) -> list[float]: | |
| """Collapse timestamps that are closer than ``min_gap`` seconds.""" | |
| out: list[float] = [] | |
| for t in sorted(timestamps): | |
| if not out or t - out[-1] >= min_gap: | |
| out.append(t) | |
| return out | |
| def extract_auto_frames( | |
| video_path: str | Path, | |
| session_dir: str | Path, | |
| max_frames: int = 40, | |
| spoken_intervals: list[tuple[float, float]] | None = None, | |
| step_timestamps: list[float] | None = None, | |
| ) -> list[FrameRecord]: | |
| """Extract representative frames, then dedup. | |
| Priority of anchors: | |
| 1. ``step_timestamps`` (from the LLM step draft) — extract at the exact moments | |
| the guide refers to, so the pool matches the steps. | |
| 2. ``spoken_intervals`` (from the transcript) — scene frames gated to the | |
| narrated time range, dropping recorder intro/idle screens. | |
| 3. Otherwise — scene midpoints (or uniform sampling). | |
| """ | |
| frames_dir = Path(session_dir) / "frames" | |
| frames_dir.mkdir(parents=True, exist_ok=True) | |
| if step_timestamps: | |
| timestamps = _dedup_close([t for t in step_timestamps if t is not None and t >= 0]) | |
| if len(timestamps) > max_frames: | |
| last = len(timestamps) - 1 | |
| picks = sorted({round(i * last / (max_frames - 1)) for i in range(max_frames)}) | |
| timestamps = [timestamps[i] for i in picks] | |
| else: | |
| timestamps = _scene_timestamps(video_path, max_frames, spoken_intervals) | |
| records: list[FrameRecord] = [] | |
| for i, ts in enumerate(timestamps): | |
| out = frames_dir / f"auto_{i:03d}_{int(ts * 1000):08d}.png" | |
| try: | |
| video.extract_frame(video_path, ts, out) | |
| except Exception: | |
| continue | |
| records.append(FrameRecord(path=str(out), timestamp=ts, source="auto")) | |
| return dedup_records(records) | |
| def dedup_records( | |
| records: list[FrameRecord], distance: int | None = None | |
| ) -> list[FrameRecord]: | |
| """Drop near-identical frames using perceptual hashing (pHash).""" | |
| distance = config.DEDUP_HASH_DISTANCE if distance is None else distance | |
| kept: list[FrameRecord] = [] | |
| hashes: list[imagehash.ImageHash] = [] | |
| for rec in records: | |
| try: | |
| with Image.open(rec.path) as im: | |
| phash = imagehash.phash(im) | |
| except Exception: | |
| continue | |
| if any((phash - kept_hash) <= distance for kept_hash in hashes): | |
| Path(rec.path).unlink(missing_ok=True) # remove the redundant file | |
| continue | |
| hashes.append(phash) | |
| kept.append(rec) | |
| return kept | |
| def save_manual_frame( | |
| data_url: str, timestamp: float, session_dir: str | Path | |
| ) -> FrameRecord | None: | |
| """Decode a base64 image data URL captured in the browser and save it.""" | |
| if not data_url: | |
| return None | |
| frames_dir = Path(session_dir) / "frames" | |
| frames_dir.mkdir(parents=True, exist_ok=True) | |
| b64 = data_url.split(",", 1)[1] if "," in data_url else data_url | |
| try: | |
| raw = base64.b64decode(b64) | |
| except (binascii.Error, ValueError): | |
| return None | |
| ts = float(timestamp or 0.0) | |
| out = frames_dir / f"manual_{int(ts * 1000):08d}.png" | |
| out.write_bytes(raw) | |
| return FrameRecord(path=str(out), timestamp=ts, source="manual") | |