File size: 6,244 Bytes
85b485a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6e3fc02
 
 
 
 
85b485a
 
 
 
 
 
 
 
 
 
6e3fc02
 
 
 
 
 
 
 
 
 
 
 
85b485a
 
 
 
 
 
 
 
0e7c368
 
 
 
 
 
 
 
 
85b485a
6e3fc02
 
 
 
0e7c368
85b485a
0e7c368
 
 
 
 
 
 
 
6e3fc02
85b485a
 
 
0e7c368
 
 
 
 
 
 
 
 
85b485a
0e7c368
85b485a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
"""Frame extraction: automatic (scene detection) and manual (browser capture).

A :class:`FrameRecord` is the common unit passed around the app and into the
guide builder. Automatic frames come from PySceneDetect scene midpoints (with a
uniform-sampling fallback for single-scene videos) and are de-duplicated with a
perceptual hash. Manual frames arrive as base64 data URLs from the browser.
"""
from __future__ import annotations

import base64
import binascii
from dataclasses import dataclass
from pathlib import Path

import imagehash
from PIL import Image

from . import config, video


@dataclass
class FrameRecord:
    """One extracted frame plus where it came from in the video."""

    path: str
    timestamp: float
    source: str = "auto"  # "auto" | "manual"
    caption: str | None = None

    @property
    def label(self) -> str:
        mm, ss = divmod(int(self.timestamp), 60)
        return f"{self.source} @ {mm:02d}:{ss:02d}"


def detect_scenes(video_path: str | Path) -> list[tuple[float, float]]:
    """Return a list of (start_sec, end_sec) scene spans via PySceneDetect."""
    try:
        from scenedetect import ContentDetector, detect

        scenes = detect(str(video_path), ContentDetector(threshold=config.SCENE_THRESHOLD))
    except Exception:
        # Detection is best-effort; callers fall back to uniform sampling.
        return []
    return [(start.get_seconds(), end.get_seconds()) for start, end in scenes]


def _scene_timestamps(
    video_path: str | Path,
    max_frames: int,
    spoken_intervals: list[tuple[float, float]] | None = None,
) -> list[float]:
    scenes = detect_scenes(video_path)
    if scenes:
        timestamps = [(start + end) / 2.0 for start, end in scenes]
    else:
        # Single static scene (or detection failed): sample uniformly.
        duration = video.get_duration(video_path) or 60.0
        count = min(max_frames, max(3, int(duration // 5)))
        step = duration / (count + 1)
        timestamps = [step * (i + 1) for i in range(count)]

    # Anchor to the narration: keep only frames within the spoken time range.
    # This drops screen-recorder intro/outro and idle screens (no speech there),
    # which is the main source of irrelevant auto-frames.
    if spoken_intervals:
        lo = min(s for s, _ in spoken_intervals)
        hi = max(e for _, e in spoken_intervals)
        pad = 1.5
        gated = [t for t in timestamps if lo - pad <= t <= hi + pad]
        # If gating removed everything, fall back to the segment start times.
        timestamps = gated or [s for s, _ in spoken_intervals]

    timestamps = sorted(timestamps)
    # Keep an evenly spaced subset if we overshoot the cap.
    if len(timestamps) > max_frames:
        last = len(timestamps) - 1
        picks = sorted({round(i * last / (max_frames - 1)) for i in range(max_frames)})
        timestamps = [timestamps[i] for i in picks]
    return timestamps


def _dedup_close(timestamps: list[float], min_gap: float = 1.5) -> list[float]:
    """Collapse timestamps that are closer than ``min_gap`` seconds."""
    out: list[float] = []
    for t in sorted(timestamps):
        if not out or t - out[-1] >= min_gap:
            out.append(t)
    return out


def extract_auto_frames(
    video_path: str | Path,
    session_dir: str | Path,
    max_frames: int = 40,
    spoken_intervals: list[tuple[float, float]] | None = None,
    step_timestamps: list[float] | None = None,
) -> list[FrameRecord]:
    """Extract representative frames, then dedup.

    Priority of anchors:
    1. ``step_timestamps`` (from the LLM step draft) — extract at the exact moments
       the guide refers to, so the pool matches the steps.
    2. ``spoken_intervals`` (from the transcript) — scene frames gated to the
       narrated time range, dropping recorder intro/idle screens.
    3. Otherwise — scene midpoints (or uniform sampling).
    """
    frames_dir = Path(session_dir) / "frames"
    frames_dir.mkdir(parents=True, exist_ok=True)

    if step_timestamps:
        timestamps = _dedup_close([t for t in step_timestamps if t is not None and t >= 0])
        if len(timestamps) > max_frames:
            last = len(timestamps) - 1
            picks = sorted({round(i * last / (max_frames - 1)) for i in range(max_frames)})
            timestamps = [timestamps[i] for i in picks]
    else:
        timestamps = _scene_timestamps(video_path, max_frames, spoken_intervals)

    records: list[FrameRecord] = []
    for i, ts in enumerate(timestamps):
        out = frames_dir / f"auto_{i:03d}_{int(ts * 1000):08d}.png"
        try:
            video.extract_frame(video_path, ts, out)
        except Exception:
            continue
        records.append(FrameRecord(path=str(out), timestamp=ts, source="auto"))
    return dedup_records(records)


def dedup_records(
    records: list[FrameRecord], distance: int | None = None
) -> list[FrameRecord]:
    """Drop near-identical frames using perceptual hashing (pHash)."""
    distance = config.DEDUP_HASH_DISTANCE if distance is None else distance
    kept: list[FrameRecord] = []
    hashes: list[imagehash.ImageHash] = []
    for rec in records:
        try:
            with Image.open(rec.path) as im:
                phash = imagehash.phash(im)
        except Exception:
            continue
        if any((phash - kept_hash) <= distance for kept_hash in hashes):
            Path(rec.path).unlink(missing_ok=True)  # remove the redundant file
            continue
        hashes.append(phash)
        kept.append(rec)
    return kept


def save_manual_frame(
    data_url: str, timestamp: float, session_dir: str | Path
) -> FrameRecord | None:
    """Decode a base64 image data URL captured in the browser and save it."""
    if not data_url:
        return None
    frames_dir = Path(session_dir) / "frames"
    frames_dir.mkdir(parents=True, exist_ok=True)

    b64 = data_url.split(",", 1)[1] if "," in data_url else data_url
    try:
        raw = base64.b64decode(b64)
    except (binascii.Error, ValueError):
        return None

    ts = float(timestamp or 0.0)
    out = frames_dir / f"manual_{int(ts * 1000):08d}.png"
    out.write_bytes(raw)
    return FrameRecord(path=str(out), timestamp=ts, source="manual")