Spaces:
Sleeping
Sleeping
File size: 6,244 Bytes
85b485a 6e3fc02 85b485a 6e3fc02 85b485a 0e7c368 85b485a 6e3fc02 0e7c368 85b485a 0e7c368 6e3fc02 85b485a 0e7c368 85b485a 0e7c368 85b485a | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 | """Frame extraction: automatic (scene detection) and manual (browser capture).
A :class:`FrameRecord` is the common unit passed around the app and into the
guide builder. Automatic frames come from PySceneDetect scene midpoints (with a
uniform-sampling fallback for single-scene videos) and are de-duplicated with a
perceptual hash. Manual frames arrive as base64 data URLs from the browser.
"""
from __future__ import annotations
import base64
import binascii
from dataclasses import dataclass
from pathlib import Path
import imagehash
from PIL import Image
from . import config, video
@dataclass
class FrameRecord:
"""One extracted frame plus where it came from in the video."""
path: str
timestamp: float
source: str = "auto" # "auto" | "manual"
caption: str | None = None
@property
def label(self) -> str:
mm, ss = divmod(int(self.timestamp), 60)
return f"{self.source} @ {mm:02d}:{ss:02d}"
def detect_scenes(video_path: str | Path) -> list[tuple[float, float]]:
"""Return a list of (start_sec, end_sec) scene spans via PySceneDetect."""
try:
from scenedetect import ContentDetector, detect
scenes = detect(str(video_path), ContentDetector(threshold=config.SCENE_THRESHOLD))
except Exception:
# Detection is best-effort; callers fall back to uniform sampling.
return []
return [(start.get_seconds(), end.get_seconds()) for start, end in scenes]
def _scene_timestamps(
video_path: str | Path,
max_frames: int,
spoken_intervals: list[tuple[float, float]] | None = None,
) -> list[float]:
scenes = detect_scenes(video_path)
if scenes:
timestamps = [(start + end) / 2.0 for start, end in scenes]
else:
# Single static scene (or detection failed): sample uniformly.
duration = video.get_duration(video_path) or 60.0
count = min(max_frames, max(3, int(duration // 5)))
step = duration / (count + 1)
timestamps = [step * (i + 1) for i in range(count)]
# Anchor to the narration: keep only frames within the spoken time range.
# This drops screen-recorder intro/outro and idle screens (no speech there),
# which is the main source of irrelevant auto-frames.
if spoken_intervals:
lo = min(s for s, _ in spoken_intervals)
hi = max(e for _, e in spoken_intervals)
pad = 1.5
gated = [t for t in timestamps if lo - pad <= t <= hi + pad]
# If gating removed everything, fall back to the segment start times.
timestamps = gated or [s for s, _ in spoken_intervals]
timestamps = sorted(timestamps)
# Keep an evenly spaced subset if we overshoot the cap.
if len(timestamps) > max_frames:
last = len(timestamps) - 1
picks = sorted({round(i * last / (max_frames - 1)) for i in range(max_frames)})
timestamps = [timestamps[i] for i in picks]
return timestamps
def _dedup_close(timestamps: list[float], min_gap: float = 1.5) -> list[float]:
"""Collapse timestamps that are closer than ``min_gap`` seconds."""
out: list[float] = []
for t in sorted(timestamps):
if not out or t - out[-1] >= min_gap:
out.append(t)
return out
def extract_auto_frames(
video_path: str | Path,
session_dir: str | Path,
max_frames: int = 40,
spoken_intervals: list[tuple[float, float]] | None = None,
step_timestamps: list[float] | None = None,
) -> list[FrameRecord]:
"""Extract representative frames, then dedup.
Priority of anchors:
1. ``step_timestamps`` (from the LLM step draft) — extract at the exact moments
the guide refers to, so the pool matches the steps.
2. ``spoken_intervals`` (from the transcript) — scene frames gated to the
narrated time range, dropping recorder intro/idle screens.
3. Otherwise — scene midpoints (or uniform sampling).
"""
frames_dir = Path(session_dir) / "frames"
frames_dir.mkdir(parents=True, exist_ok=True)
if step_timestamps:
timestamps = _dedup_close([t for t in step_timestamps if t is not None and t >= 0])
if len(timestamps) > max_frames:
last = len(timestamps) - 1
picks = sorted({round(i * last / (max_frames - 1)) for i in range(max_frames)})
timestamps = [timestamps[i] for i in picks]
else:
timestamps = _scene_timestamps(video_path, max_frames, spoken_intervals)
records: list[FrameRecord] = []
for i, ts in enumerate(timestamps):
out = frames_dir / f"auto_{i:03d}_{int(ts * 1000):08d}.png"
try:
video.extract_frame(video_path, ts, out)
except Exception:
continue
records.append(FrameRecord(path=str(out), timestamp=ts, source="auto"))
return dedup_records(records)
def dedup_records(
records: list[FrameRecord], distance: int | None = None
) -> list[FrameRecord]:
"""Drop near-identical frames using perceptual hashing (pHash)."""
distance = config.DEDUP_HASH_DISTANCE if distance is None else distance
kept: list[FrameRecord] = []
hashes: list[imagehash.ImageHash] = []
for rec in records:
try:
with Image.open(rec.path) as im:
phash = imagehash.phash(im)
except Exception:
continue
if any((phash - kept_hash) <= distance for kept_hash in hashes):
Path(rec.path).unlink(missing_ok=True) # remove the redundant file
continue
hashes.append(phash)
kept.append(rec)
return kept
def save_manual_frame(
data_url: str, timestamp: float, session_dir: str | Path
) -> FrameRecord | None:
"""Decode a base64 image data URL captured in the browser and save it."""
if not data_url:
return None
frames_dir = Path(session_dir) / "frames"
frames_dir.mkdir(parents=True, exist_ok=True)
b64 = data_url.split(",", 1)[1] if "," in data_url else data_url
try:
raw = base64.b64decode(b64)
except (binascii.Error, ValueError):
return None
ts = float(timestamp or 0.0)
out = frames_dir / f"manual_{int(ts * 1000):08d}.png"
out.write_bytes(raw)
return FrameRecord(path=str(out), timestamp=ts, source="manual")
|