File size: 9,338 Bytes
f60a6c1
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
from tempfile import NamedTemporaryFile
from logging import getLogger
from pathlib import Path
from collections import OrderedDict
from threading import RLock

from contextlib import contextmanager
from cv2 import (
    CAP_PROP_FRAME_COUNT,
    CAP_PROP_POS_FRAMES,
    COLOR_BGR2GRAY,
    COLOR_HSV2BGR,
    NORM_MINMAX,
    VideoCapture,
    calcOpticalFlowFarneback,
    cartToPolar,
    cvtColor,
    normalize,
)
from numpy import ndarray, pi, zeros_like


from scorevision.utils.settings import get_settings
from scorevision.utils.async_clients import get_async_client

logger = getLogger(__name__)


@contextmanager
def open_video(path: Path) -> VideoCapture:
    logger.info(f"Attempting to open video: {path}")
    if not path.exists():
        raise FileNotFoundError
    if not path.is_file():
        raise ValueError("Path is not a file")
    video = VideoCapture(str(path))
    if not video.isOpened():
        video.release()
        raise ValueError("Could not open video")
    try:
        yield video
    finally:
        video.release()


def background_temporal_differencing(
    video_path: Path, frame_numbers: list[int]
) -> tuple[dict[int, ndarray], dict[int, ndarray]]:
    logger.info(
        f"Computing Background Temporal Differencing for frame_numbers {frame_numbers} using Dense Optical Flow..."
    )
    images, flow_images = {}, {}
    with open_video(path=video_path) as video:
        if not video.isOpened():
            raise IOError(f"Cannot open video: {video_path}")

        max_frame_number = int(video.get(CAP_PROP_FRAME_COUNT))
        prev_frame, prev_gray = None, None
        for frame_number in range(max_frame_number):
            ok, frame = video.read()
            if not ok:
                logger.error(f"Error reading frame {frame_number}")
                continue
            images[frame_number] = frame

            gray = cvtColor(frame, COLOR_BGR2GRAY)
            if frame_number in frame_numbers and prev_gray is not None:
                flow = calcOpticalFlowFarneback(
                    prev_gray,
                    gray,
                    None,
                    pyr_scale=0.5,
                    levels=3,
                    winsize=15,
                    iterations=3,
                    poly_n=5,
                    poly_sigma=1.2,
                    flags=0,
                )
                mag, ang = cartToPolar(flow[..., 0], flow[..., 1])
                hsv = zeros_like(prev_frame)
                hsv[..., 0] = ang * 180 / pi / 2
                hsv[..., 1] = 255
                hsv[..., 2] = normalize(mag, None, 0, 255, NORM_MINMAX)
                rgb = cvtColor(hsv, COLOR_HSV2BGR)

                flow_images[frame_number] = rgb

            prev_gray = gray
            prev_frame = frame

    return images, flow_images


async def download_video(
    url: str, frame_numbers: list[int]
) -> tuple[str, dict[int, ndarray], dict[int, ndarray]]:
    settings = get_settings()
    session = await get_async_client()
    async with session.get(url) as response:
        if response.status != 200:
            txt = await response.text()
            raise RuntimeError(f"Download failed {response.status}: {txt[:200]}")
        data = await response.read()

    with NamedTemporaryFile(prefix="sv_video_", suffix=".mp4") as f:
        f.write(data)

        frames, flows = background_temporal_differencing(
            video_path=Path(f.name), frame_numbers=frame_numbers
        )
    name = url.split("/")[-1]
    return name, frames, flows


class FrameStore:
    """Lazy frame/flow accessor backed by a cached MP4 on disk."""

    def __init__(
        self,
        video_path: Path,
        *,
        max_frames: int = 64,
        max_flows: int = 32,
    ) -> None:
        self.video_path = video_path
        self.video_name = video_path.name
        self._frame_cache: OrderedDict[int, ndarray] = OrderedDict()
        self._flow_cache: OrderedDict[int, ndarray] = OrderedDict()
        self._max_frames = max_frames
        self._max_flows = max_flows
        self._lock = RLock()
        self._capture: VideoCapture | None = None
        self._current_frame_index: int | None = None

    def _ensure_capture(self) -> None:
        if self._capture is None:
            cap = VideoCapture(str(self.video_path))
            if not cap.isOpened():
                raise ValueError(f"Could not open video: {self.video_path}")
            self._capture = cap

    def _evict_if_needed(self, cache: OrderedDict[int, ndarray], limit: int) -> None:
        if limit <= 0:
            return
        while len(cache) > limit:
            cache.popitem(last=False)

    def get_frame(self, frame_number: int) -> ndarray:
        with self._lock:
            cached = self._frame_cache.get(frame_number)
            if cached is not None:
                self._frame_cache.move_to_end(frame_number)
                return cached

            self._ensure_capture()
            if not self._capture:
                raise RuntimeError("Video capture not initialised")

            if (
                self._current_frame_index is None
                or frame_number < self._current_frame_index
            ):
                self._capture.set(CAP_PROP_POS_FRAMES, frame_number)
            elif frame_number > self._current_frame_index + 1:
                self._capture.set(CAP_PROP_POS_FRAMES, frame_number)

            ok, frame = self._capture.read()
            if not ok or frame is None:
                raise IOError(f"Failed to read frame {frame_number}")

            self._current_frame_index = frame_number
            result = frame.copy()
            self._frame_cache[frame_number] = result
            self._frame_cache.move_to_end(frame_number)
            self._evict_if_needed(self._frame_cache, self._max_frames)
            return result

    def get_flow(self, frame_number: int) -> ndarray:
        if frame_number <= 0:
            raise ValueError("Optical flow requires frame_number > 0")
        with self._lock:
            cached = self._flow_cache.get(frame_number)
            if cached is not None:
                self._flow_cache.move_to_end(frame_number)
                return cached

            prev_frame = self.get_frame(frame_number - 1)
            current_frame = self.get_frame(frame_number)

            prev_gray = cvtColor(prev_frame, COLOR_BGR2GRAY)
            gray = cvtColor(current_frame, COLOR_BGR2GRAY)
            flow = calcOpticalFlowFarneback(
                prev_gray,
                gray,
                None,
                pyr_scale=0.5,
                levels=3,
                winsize=15,
                iterations=3,
                poly_n=5,
                poly_sigma=1.2,
                flags=0,
            )
            mag, ang = cartToPolar(flow[..., 0], flow[..., 1])
            hsv = zeros_like(prev_frame)
            hsv[..., 0] = ang * 180 / pi / 2
            hsv[..., 1] = 255
            hsv[..., 2] = normalize(mag, None, 0, 255, NORM_MINMAX)
            rgb = cvtColor(hsv, COLOR_HSV2BGR)

            self._flow_cache[frame_number] = rgb
            self._flow_cache.move_to_end(frame_number)
            self._evict_if_needed(self._flow_cache, self._max_flows)
            return rgb

    def close(self) -> None:
        with self._lock:
            if self._capture is not None:
                try:
                    self._capture.release()
                except Exception:
                    pass
                self._capture = None
                self._current_frame_index = None

    def clear(self) -> None:
        with self._lock:
            self._frame_cache.clear()
            self._flow_cache.clear()

    def unlink(self) -> None:
        try:
            self.close()
            self.video_path.unlink(missing_ok=True)
        except Exception:
            pass

    def __del__(self) -> None:
        self.close()


async def download_video_cached(
    url: str,
    _frame_numbers: list[int],  # retained for backward compatibility
    cached_path: Path | None = None,
) -> tuple[str, FrameStore]:
    """
    Download the video once and reuse the cached file across retries.
    When `cached_path` is provided, the file is not re-downloaded.
    The returned Path should be cleaned up by the caller when no longer needed.
    """
    if cached_path is None:
        session = await get_async_client()
        temp_path: Path | None = None
        try:
            async with session.get(url) as response:
                if response.status != 200:
                    txt = await response.text()
                    raise RuntimeError(
                        f"Download failed {response.status}: {txt[:200]}"
                    )
                with NamedTemporaryFile(
                    prefix="sv_video_", suffix=".mp4", delete=False
                ) as tmp:
                    async for chunk in response.content.iter_chunked(1024 * 1024):
                        tmp.write(chunk)
                    temp_path = Path(tmp.name)
        except Exception:
            if temp_path is not None:
                temp_path.unlink(missing_ok=True)
            raise
        video_path = temp_path
    else:
        video_path = cached_path

    name = url.split("/")[-1]
    return name, FrameStore(video_path)