| import time |
| from enum import Enum |
| from typing import Generator, Iterable, Optional, Tuple, Union |
|
|
| import numpy as np |
|
|
| from inference.core.interfaces.camera.entities import ( |
| FrameID, |
| FrameTimestamp, |
| VideoFrame, |
| ) |
| from inference.core.interfaces.camera.video_source import SourceProperties, VideoSource |
|
|
| MINIMAL_FPS = 0.01 |
|
|
|
|
| class FPSLimiterStrategy(Enum): |
| DROP = "drop" |
| WAIT = "wait" |
|
|
|
|
| def get_video_frames_generator( |
| video: Union[VideoSource, str, int], |
| max_fps: Optional[Union[float, int]] = None, |
| limiter_strategy: Optional[FPSLimiterStrategy] = None, |
| ) -> Generator[VideoFrame, None, None]: |
| """ |
| Util function to create a frames generator from `VideoSource` with possibility to |
| limit FPS of consumed frames and dictate what to do if frames are produced to fast. |
| |
| Args: |
| video (Union[VideoSource, str, int]): Either instance of VideoSource or video reference accepted |
| by VideoSource.init(...) |
| max_fps (Optional[Union[float, int]]): value of maximum FPS rate of generated frames - can be used to limit |
| generation frequency |
| limiter_strategy (Optional[FPSLimiterStrategy]): strategy used to deal with frames decoding exceeding |
| limit of `max_fps`. By default - for files, in the interest of processing all frames - |
| generation will be awaited, for streams - frames will be dropped on the floor. |
| Returns: generator of `VideoFrame` |
| |
| Example: |
| ```python |
| for frame in get_video_frames_generator( |
| video="./some.mp4", |
| max_fps=50, |
| ): |
| pass |
| ``` |
| """ |
| if issubclass(type(video), str) or issubclass(type(video), int): |
| video = VideoSource.init( |
| video_reference=video, |
| ) |
| video.start() |
| if max_fps is None: |
| yield from video |
| return None |
| limiter_strategy = resolve_limiter_strategy( |
| explicitly_defined_strategy=limiter_strategy, |
| source_properties=video.describe_source().source_properties, |
| ) |
| yield from limit_frame_rate( |
| frames_generator=video, max_fps=max_fps, strategy=limiter_strategy |
| ) |
|
|
|
|
| def resolve_limiter_strategy( |
| explicitly_defined_strategy: Optional[FPSLimiterStrategy], |
| source_properties: Optional[SourceProperties], |
| ) -> FPSLimiterStrategy: |
| if explicitly_defined_strategy is not None: |
| return explicitly_defined_strategy |
| limiter_strategy = FPSLimiterStrategy.DROP |
| if source_properties is not None and source_properties.is_file: |
| limiter_strategy = FPSLimiterStrategy.WAIT |
| return limiter_strategy |
|
|
|
|
| def limit_frame_rate( |
| frames_generator: Iterable[Tuple[FrameTimestamp, FrameID, np.ndarray]], |
| max_fps: Union[float, int], |
| strategy: FPSLimiterStrategy, |
| ) -> Generator[Tuple[FrameTimestamp, FrameID, np.ndarray], None, None]: |
| rate_limiter = RateLimiter(desired_fps=max_fps) |
| for frame_data in frames_generator: |
| delay = rate_limiter.estimate_next_action_delay() |
| if delay <= 0.0: |
| rate_limiter.tick() |
| yield frame_data |
| continue |
| if strategy is FPSLimiterStrategy.WAIT: |
| time.sleep(delay) |
| rate_limiter.tick() |
| yield frame_data |
|
|
|
|
| class RateLimiter: |
| """ |
| Implements rate upper-bound rate limiting by ensuring estimate_next_tick_delay() |
| to be at min 1 / desired_fps, not letting the client obeying outcomes to exceed |
| assumed rate. |
| """ |
|
|
| def __init__(self, desired_fps: Union[float, int]): |
| self._desired_fps = max(desired_fps, MINIMAL_FPS) |
| self._last_tick: Optional[float] = None |
|
|
| def tick(self) -> None: |
| self._last_tick = time.monotonic() |
|
|
| def estimate_next_action_delay(self) -> float: |
| if self._last_tick is None: |
| return 0.0 |
| desired_delay = 1 / self._desired_fps |
| time_since_last_tick = time.monotonic() - self._last_tick |
| return max(desired_delay - time_since_last_tick, 0.0) |
|
|