Spaces:
Configuration error
Configuration error
| import time | |
| from dataclasses import dataclass | |
| from datetime import datetime | |
| from enum import Enum | |
| from queue import Empty, Queue | |
| from threading import Event, Lock, Thread | |
| from typing import Any, Callable, List, Optional, Protocol, Union | |
| import cv2 | |
| import supervision as sv | |
| from inference.core import logger | |
| from inference.core.env import ( | |
| DEFAULT_ADAPTIVE_MODE_READER_PACE_TOLERANCE, | |
| DEFAULT_ADAPTIVE_MODE_STREAM_PACE_TOLERANCE, | |
| DEFAULT_BUFFER_SIZE, | |
| DEFAULT_MAXIMUM_ADAPTIVE_FRAMES_DROPPED_IN_ROW, | |
| DEFAULT_MINIMUM_ADAPTIVE_MODE_SAMPLES, | |
| ) | |
| from inference.core.interfaces.camera.entities import ( | |
| StatusUpdate, | |
| UpdateSeverity, | |
| VideoFrame, | |
| ) | |
| from inference.core.interfaces.camera.exceptions import ( | |
| EndOfStreamError, | |
| SourceConnectionError, | |
| StreamOperationNotAllowedError, | |
| ) | |
| VIDEO_SOURCE_CONTEXT = "video_source" | |
| VIDEO_CONSUMER_CONTEXT = "video_consumer" | |
| SOURCE_STATE_UPDATE_EVENT = "SOURCE_STATE_UPDATE" | |
| SOURCE_ERROR_EVENT = "SOURCE_ERROR" | |
| FRAME_CAPTURED_EVENT = "FRAME_CAPTURED" | |
| FRAME_DROPPED_EVENT = "FRAME_DROPPED" | |
| FRAME_CONSUMED_EVENT = "FRAME_CONSUMED" | |
| VIDEO_CONSUMPTION_STARTED_EVENT = "VIDEO_CONSUMPTION_STARTED" | |
| VIDEO_CONSUMPTION_FINISHED_EVENT = "VIDEO_CONSUMPTION_FINISHED" | |
| class StreamState(Enum): | |
| NOT_STARTED = "NOT_STARTED" | |
| INITIALISING = "INITIALISING" | |
| RESTARTING = "RESTARTING" | |
| RUNNING = "RUNNING" | |
| PAUSED = "PAUSED" | |
| MUTED = "MUTED" | |
| TERMINATING = "TERMINATING" | |
| ENDED = "ENDED" | |
| ERROR = "ERROR" | |
| START_ELIGIBLE_STATES = { | |
| StreamState.NOT_STARTED, | |
| StreamState.RESTARTING, | |
| StreamState.ENDED, | |
| } | |
| PAUSE_ELIGIBLE_STATES = {StreamState.RUNNING} | |
| MUTE_ELIGIBLE_STATES = {StreamState.RUNNING} | |
| RESUME_ELIGIBLE_STATES = {StreamState.PAUSED, StreamState.MUTED} | |
| TERMINATE_ELIGIBLE_STATES = { | |
| StreamState.MUTED, | |
| StreamState.RUNNING, | |
| StreamState.PAUSED, | |
| StreamState.RESTARTING, | |
| StreamState.ENDED, | |
| StreamState.ERROR, | |
| } | |
| RESTART_ELIGIBLE_STATES = { | |
| StreamState.MUTED, | |
| StreamState.RUNNING, | |
| StreamState.PAUSED, | |
| StreamState.ENDED, | |
| StreamState.ERROR, | |
| } | |
| class BufferFillingStrategy(Enum): | |
| WAIT = "WAIT" | |
| DROP_OLDEST = "DROP_OLDEST" | |
| ADAPTIVE_DROP_OLDEST = "ADAPTIVE_DROP_OLDEST" | |
| DROP_LATEST = "DROP_LATEST" | |
| ADAPTIVE_DROP_LATEST = "ADAPTIVE_DROP_LATEST" | |
| ADAPTIVE_STRATEGIES = { | |
| BufferFillingStrategy.ADAPTIVE_DROP_LATEST, | |
| BufferFillingStrategy.ADAPTIVE_DROP_OLDEST, | |
| } | |
| DROP_OLDEST_STRATEGIES = { | |
| BufferFillingStrategy.DROP_OLDEST, | |
| BufferFillingStrategy.ADAPTIVE_DROP_OLDEST, | |
| } | |
| class BufferConsumptionStrategy(Enum): | |
| LAZY = "LAZY" | |
| EAGER = "EAGER" | |
| class SourceProperties: | |
| width: int | |
| height: int | |
| total_frames: int | |
| is_file: bool | |
| fps: float | |
| class SourceMetadata: | |
| source_properties: Optional[SourceProperties] | |
| source_reference: str | |
| buffer_size: int | |
| state: StreamState | |
| buffer_filling_strategy: Optional[BufferFillingStrategy] | |
| buffer_consumption_strategy: Optional[BufferConsumptionStrategy] | |
| class VideoSourceMethod(Protocol): | |
| def __call__(self, video_source: "VideoSource", *args, **kwargs) -> None: ... | |
| def lock_state_transition( | |
| method: VideoSourceMethod, | |
| ) -> Callable[["VideoSource"], None]: | |
| def locked_executor(video_source: "VideoSource", *args, **kwargs) -> None: | |
| with video_source._state_change_lock: | |
| return method(video_source, *args, **kwargs) | |
| return locked_executor | |
| class VideoSource: | |
| def init( | |
| cls, | |
| video_reference: Union[str, int], | |
| buffer_size: int = DEFAULT_BUFFER_SIZE, | |
| status_update_handlers: Optional[List[Callable[[StatusUpdate], None]]] = None, | |
| buffer_filling_strategy: Optional[BufferFillingStrategy] = None, | |
| buffer_consumption_strategy: Optional[BufferConsumptionStrategy] = None, | |
| adaptive_mode_stream_pace_tolerance: float = DEFAULT_ADAPTIVE_MODE_STREAM_PACE_TOLERANCE, | |
| adaptive_mode_reader_pace_tolerance: float = DEFAULT_ADAPTIVE_MODE_READER_PACE_TOLERANCE, | |
| minimum_adaptive_mode_samples: int = DEFAULT_MINIMUM_ADAPTIVE_MODE_SAMPLES, | |
| maximum_adaptive_frames_dropped_in_row: int = DEFAULT_MAXIMUM_ADAPTIVE_FRAMES_DROPPED_IN_ROW, | |
| ): | |
| """ | |
| This class is meant to represent abstraction over video sources - both video files and | |
| on-line streams that are possible to be consumed and used by other components of `inference` | |
| library. | |
| Before digging into details of the class behaviour, it is advised to familiarise with the following | |
| concepts and implementation assumptions: | |
| 1. Video file can be accessed from local (or remote) storage by the consumer in a pace dictated by | |
| its processing capabilities. If processing is faster than the frame rate of video, operations | |
| may be executed in a time shorter than the time of video playback. In the opposite case - consumer | |
| may freely decode and process frames in its own pace, without risk for failures due to temporal | |
| dependencies of processing - this is classical offline processing example. | |
| 2. Video streams, on the other hand, usually need to be consumed in a pace near to their frame-rate - | |
| in other words - this is on-line processing example. Consumer being faster than incoming stream | |
| frames cannot utilise its resources to the full extent as not-yet-delivered data would be needed. | |
| Slow consumer, however, may not be able to process everything on time and to keep up with the pace | |
| of stream - some frames would need to be dropped. Otherwise - over time, consumer could go out of | |
| sync with the stream causing decoding failures or unpredictable behavior. | |
| To fit those two types of video sources, `VideoSource` introduces the concept of buffered decoding of | |
| video stream (like at the YouTube - player buffers some frames that are soon to be displayed). | |
| The way on how buffer is filled and consumed dictates the behavior of `VideoSource`. | |
| Starting from `BufferFillingStrategy` - we have 3 basic options: | |
| * WAIT: in case of slow video consumption, when buffer is full - `VideoSource` will wait for | |
| the empty spot in buffer before next frame will be processed - this is suitable in cases when | |
| we want to ensure EACH FRAME of the video to be processed | |
| * DROP_OLDEST: when buffer is full, the frame that sits there for the longest time will be dropped - | |
| this is suitable for cases when we want to process the most recent frames possible | |
| * DROP_LATEST: when buffer is full, the newly decoded frame is dropped - useful in cases when | |
| it is expected to have processing performance drops, but we would like to consume portions of | |
| video that are locally smooth - but this is probably the least common use-case. | |
| On top of that - there are two ADAPTIVE strategies: ADAPTIVE_DROP_OLDEST and ADAPTIVE_DROP_LATEST, | |
| which are equivalent to DROP_OLDEST and DROP_LATEST with adaptive decoding feature enabled. The notion | |
| of that mode will be described later. | |
| Naturally, decoded frames must also be consumed. `VideoSource` provides a handy interface for reading | |
| a video source frames by a SINGLE consumer. Consumption strategy can also be dictated via | |
| `BufferConsumptionStrategy`: | |
| * LAZY - consume all the frames from decoding buffer one-by-one | |
| * EAGER - at each readout - take all frames already buffered, drop all of them apart from the most recent | |
| In consequence - there are various combinations of `BufferFillingStrategy` and `BufferConsumptionStrategy`. | |
| The most popular would be: | |
| * `BufferFillingStrategy.WAIT` and `BufferConsumptionStrategy.LAZY` - to always decode and process each and | |
| every frame of the source (useful while processing video files - and default behaviour enforced by | |
| `inference` if there is no explicit configuration) | |
| * `BufferFillingStrategy.DROP_OLDEST` and `BufferConsumptionStrategy.EAGER` - to always process the most | |
| recent frames of source (useful while processing video streams when low latency [real-time experience] | |
| is required - ADAPTIVE version of this is default for streams) | |
| ADAPTIVE strategies were introduced to handle corner-cases, when consumer hardware is not capable to consume | |
| video stream and process frames at the same time (for instance - Nvidia Jetson devices running processing | |
| against hi-res streams with high FPS ratio). It acts with buffer in nearly the same way as `DROP_OLDEST` | |
| and `DROP_LATEST` strategies, but there are two more conditions that may influence frame drop: | |
| * announced rate of source - which in fact dictate the pace of frames grabbing from incoming stream that | |
| MUST be met by consumer to avoid strange decoding issues causing decoder to fail - if the pace of frame grabbing | |
| deviates too much - decoding will be postponed, and frames dropped to grab next ones sooner | |
| * consumption rate - in resource constraints environment, not only decoding is problematic from the performance | |
| perspective - but also heavy processing. If consumer is not quick enough - allocating more useful resources | |
| for decoding frames that may never be processed is a waste. That's why - if decoding happens more frequently | |
| than consumption of frame - ADAPTIVE mode causes decoding to be done in a slower pace and more frames are just | |
| grabbed and dropped on the floor. | |
| ADAPTIVE mode increases latency slightly, but may be the only way to operate in some cases. | |
| Behaviour of adaptive mode, including the maximum acceptable deviations of frames grabbing pace from source, | |
| reader pace and maximum number of consecutive frames dropped in ADAPTIVE mode are configurable by clients, | |
| with reasonable defaults being set. | |
| `VideoSource` emits events regarding its activity - which can be intercepted by custom handlers. Take | |
| into account that they are always executed in context of thread invoking them (and should be fast to complete, | |
| otherwise may block the flow of stream consumption). All errors raised will be emitted as logger warnings only. | |
| `VideoSource` implementation is naturally multithreading, with different thread decoding video and different | |
| one consuming it and manipulating source state. Implementation of user interface is thread-safe, although | |
| stream it is meant to be consumed by a single thread only. | |
| ENV variables involved: | |
| * VIDEO_SOURCE_BUFFER_SIZE - default: 64 | |
| * VIDEO_SOURCE_ADAPTIVE_MODE_STREAM_PACE_TOLERANCE - default: 0.1 | |
| * VIDEO_SOURCE_ADAPTIVE_MODE_READER_PACE_TOLERANCE - default: 5.0 | |
| * VIDEO_SOURCE_MINIMUM_ADAPTIVE_MODE_SAMPLES - default: 10 | |
| * VIDEO_SOURCE_MAXIMUM_ADAPTIVE_FRAMES_DROPPED_IN_ROW - default: 16 | |
| As an `inference` user, please use .init() method instead of constructor to instantiate objects. | |
| Args: | |
| video_reference (Union[str, int]): Either str with file or stream reference, or int representing device ID | |
| buffer_size (int): size of decoding buffer | |
| status_update_handlers (Optional[List[Callable[[StatusUpdate], None]]]): List of handlers for status updates | |
| buffer_filling_strategy (Optional[BufferFillingStrategy]): Settings for buffer filling strategy - if not | |
| given - automatic choice regarding source type will be applied | |
| buffer_consumption_strategy (Optional[BufferConsumptionStrategy]): Settings for buffer consumption strategy, | |
| if not given - automatic choice regarding source type will be applied | |
| adaptive_mode_stream_pace_tolerance (float): Maximum deviation between frames grabbing pace and stream pace | |
| that will not trigger adaptive mode frame drop | |
| adaptive_mode_reader_pace_tolerance (float): Maximum deviation between decoding pace and stream consumption | |
| pace that will not trigger adaptive mode frame drop | |
| minimum_adaptive_mode_samples (int): Minimal number of frames to be used to establish actual pace of | |
| processing, before adaptive mode can drop any frame | |
| maximum_adaptive_frames_dropped_in_row (int): Maximum number of frames dropped in row due to application of | |
| adaptive strategy | |
| Returns: Instance of `VideoSource` class | |
| """ | |
| frames_buffer = Queue(maxsize=buffer_size) | |
| if status_update_handlers is None: | |
| status_update_handlers = [] | |
| video_consumer = VideoConsumer.init( | |
| buffer_filling_strategy=buffer_filling_strategy, | |
| adaptive_mode_stream_pace_tolerance=adaptive_mode_stream_pace_tolerance, | |
| adaptive_mode_reader_pace_tolerance=adaptive_mode_reader_pace_tolerance, | |
| minimum_adaptive_mode_samples=minimum_adaptive_mode_samples, | |
| maximum_adaptive_frames_dropped_in_row=maximum_adaptive_frames_dropped_in_row, | |
| status_update_handlers=status_update_handlers, | |
| ) | |
| return cls( | |
| stream_reference=video_reference, | |
| frames_buffer=frames_buffer, | |
| status_update_handlers=status_update_handlers, | |
| buffer_consumption_strategy=buffer_consumption_strategy, | |
| video_consumer=video_consumer, | |
| ) | |
| def __init__( | |
| self, | |
| stream_reference: Union[str, int], | |
| frames_buffer: Queue, | |
| status_update_handlers: List[Callable[[StatusUpdate], None]], | |
| buffer_consumption_strategy: Optional[BufferConsumptionStrategy], | |
| video_consumer: "VideoConsumer", | |
| ): | |
| self._stream_reference = stream_reference | |
| self._video: Optional[cv2.VideoCapture] = None | |
| self._source_properties: Optional[SourceProperties] = None | |
| self._frames_buffer = frames_buffer | |
| self._status_update_handlers = status_update_handlers | |
| self._buffer_consumption_strategy = buffer_consumption_strategy | |
| self._video_consumer = video_consumer | |
| self._state = StreamState.NOT_STARTED | |
| self._playback_allowed = Event() | |
| self._frames_buffering_allowed = True | |
| self._stream_consumption_thread: Optional[Thread] = None | |
| self._state_change_lock = Lock() | |
| def restart(self, wait_on_frames_consumption: bool = True) -> None: | |
| """ | |
| Method to restart source consumption. Eligible to be used in states: | |
| [MUTED, RUNNING, PAUSED, ENDED, ERROR]. | |
| End state: | |
| * INITIALISING - that should change into RUNNING once first frame is ready to be grabbed | |
| * ERROR - if it was not possible to connect with source | |
| Thread safe - only one transition of states possible at the time. | |
| Args: | |
| wait_on_frames_consumption (bool): Flag telling if all frames from buffer must be consumed before | |
| completion of this operation. | |
| Returns: None | |
| Throws: | |
| * StreamOperationNotAllowedError: if executed in context of incorrect state of the source | |
| * SourceConnectionError: if source cannot be connected | |
| """ | |
| if self._state not in RESTART_ELIGIBLE_STATES: | |
| raise StreamOperationNotAllowedError( | |
| f"Could not RESTART stream in state: {self._state}" | |
| ) | |
| self._restart(wait_on_frames_consumption=wait_on_frames_consumption) | |
| def start(self) -> None: | |
| """ | |
| Method to be used to start source consumption. Eligible to be used in states: | |
| [NOT_STARTED, ENDED, (RESTARTING - which is internal state only)] | |
| End state: | |
| * INITIALISING - that should change into RUNNING once first frame is ready to be grabbed | |
| * ERROR - if it was not possible to connect with source | |
| Thread safe - only one transition of states possible at the time. | |
| Returns: None | |
| Throws: | |
| * StreamOperationNotAllowedError: if executed in context of incorrect state of the source | |
| * SourceConnectionError: if source cannot be connected | |
| """ | |
| if self._state not in START_ELIGIBLE_STATES: | |
| raise StreamOperationNotAllowedError( | |
| f"Could not START stream in state: {self._state}" | |
| ) | |
| self._start() | |
| def terminate(self, wait_on_frames_consumption: bool = True) -> None: | |
| """ | |
| Method to be used to terminate source consumption. Eligible to be used in states: | |
| [MUTED, RUNNING, PAUSED, ENDED, ERROR, (RESTARTING - which is internal state only)] | |
| End state: | |
| * ENDED - indicating success of the process | |
| * ERROR - if error with processing occurred | |
| Must be used to properly dispose resources at the end. | |
| Thread safe - only one transition of states possible at the time. | |
| Args: | |
| wait_on_frames_consumption (bool): Flag telling if all frames from buffer must be consumed before | |
| completion of this operation. | |
| Returns: None | |
| Throws: | |
| * StreamOperationNotAllowedError: if executed in context of incorrect state of the source | |
| """ | |
| if self._state not in TERMINATE_ELIGIBLE_STATES: | |
| raise StreamOperationNotAllowedError( | |
| f"Could not TERMINATE stream in state: {self._state}" | |
| ) | |
| self._terminate(wait_on_frames_consumption=wait_on_frames_consumption) | |
| def pause(self) -> None: | |
| """ | |
| Method to be used to pause source consumption. During pause - no new frames are consumed. | |
| Used on on-line streams for too long may cause stream disconnection. | |
| Eligible to be used in states: | |
| [RUNNING] | |
| End state: | |
| * PAUSED | |
| Thread safe - only one transition of states possible at the time. | |
| Returns: None | |
| Throws: | |
| * StreamOperationNotAllowedError: if executed in context of incorrect state of the source | |
| """ | |
| if self._state not in PAUSE_ELIGIBLE_STATES: | |
| raise StreamOperationNotAllowedError( | |
| f"Could not PAUSE stream in state: {self._state}" | |
| ) | |
| self._pause() | |
| def mute(self) -> None: | |
| """ | |
| Method to be used to mute source consumption. Muting is an equivalent of pause for stream - where | |
| frames grabbing is not put on hold, just new frames decoding and buffering is not allowed - causing | |
| intermediate frames to be dropped. May be also used against files, although arguably less useful. | |
| Eligible to be used in states: | |
| [RUNNING] | |
| End state: | |
| * MUTED | |
| Thread safe - only one transition of states possible at the time. | |
| Returns: None | |
| Throws: | |
| * StreamOperationNotAllowedError: if executed in context of incorrect state of the source | |
| """ | |
| if self._state not in MUTE_ELIGIBLE_STATES: | |
| raise StreamOperationNotAllowedError( | |
| f"Could not MUTE stream in state: {self._state}" | |
| ) | |
| self._mute() | |
| def resume(self) -> None: | |
| """ | |
| Method to recover from pause or mute into running state. | |
| [PAUSED, MUTED] | |
| End state: | |
| * RUNNING | |
| Thread safe - only one transition of states possible at the time. | |
| Returns: None | |
| Throws: | |
| * StreamOperationNotAllowedError: if executed in context of incorrect state of the source | |
| """ | |
| if self._state not in RESUME_ELIGIBLE_STATES: | |
| raise StreamOperationNotAllowedError( | |
| f"Could not RESUME stream in state: {self._state}" | |
| ) | |
| self._resume() | |
| def get_state(self) -> StreamState: | |
| """ | |
| Method to get current state of the `VideoSource` | |
| Returns: StreamState | |
| """ | |
| return self._state | |
| def frame_ready(self) -> bool: | |
| """ | |
| Method to check if decoded frame is ready for consumer | |
| Returns: boolean flag indicating frame readiness | |
| """ | |
| return not self._frames_buffer.empty() | |
| def read_frame(self) -> VideoFrame: | |
| """ | |
| Method to be used by the consumer to get decoded source frame. | |
| Returns: VideoFrame object with decoded frame and its metadata. | |
| Throws: | |
| * EndOfStreamError: when trying to get the frame from closed source. | |
| """ | |
| if self._buffer_consumption_strategy is BufferConsumptionStrategy.EAGER: | |
| video_frame: Optional[VideoFrame] = purge_queue( | |
| queue=self._frames_buffer, | |
| on_successful_read=self._video_consumer.notify_frame_consumed, | |
| ) | |
| else: | |
| video_frame: Optional[VideoFrame] = self._frames_buffer.get() | |
| self._frames_buffer.task_done() | |
| self._video_consumer.notify_frame_consumed() | |
| if video_frame is None: | |
| raise EndOfStreamError( | |
| "Attempted to retrieve frame from stream that already ended." | |
| ) | |
| send_video_source_status_update( | |
| severity=UpdateSeverity.DEBUG, | |
| event_type=FRAME_CONSUMED_EVENT, | |
| payload={ | |
| "frame_timestamp": video_frame.frame_timestamp, | |
| "frame_id": video_frame.frame_id, | |
| }, | |
| status_update_handlers=self._status_update_handlers, | |
| ) | |
| return video_frame | |
| def describe_source(self) -> SourceMetadata: | |
| return SourceMetadata( | |
| source_properties=self._source_properties, | |
| source_reference=self._stream_reference, | |
| buffer_size=self._frames_buffer.maxsize, | |
| state=self._state, | |
| buffer_filling_strategy=self._video_consumer.buffer_filling_strategy, | |
| buffer_consumption_strategy=self._buffer_consumption_strategy, | |
| ) | |
| def _restart(self, wait_on_frames_consumption: bool = True) -> None: | |
| self._terminate(wait_on_frames_consumption=wait_on_frames_consumption) | |
| self._change_state(target_state=StreamState.RESTARTING) | |
| self._playback_allowed = Event() | |
| self._frames_buffering_allowed = True | |
| self._video: Optional[cv2.VideoCapture] = None | |
| self._source_properties: Optional[SourceProperties] = None | |
| self._start() | |
| def _start(self) -> None: | |
| self._change_state(target_state=StreamState.INITIALISING) | |
| self._video = cv2.VideoCapture(self._stream_reference) | |
| if not self._video.isOpened(): | |
| self._change_state(target_state=StreamState.ERROR) | |
| raise SourceConnectionError( | |
| f"Cannot connect to video source under reference: {self._stream_reference}" | |
| ) | |
| self._source_properties = discover_source_properties(stream=self._video) | |
| self._video_consumer.reset(source_properties=self._source_properties) | |
| if self._source_properties.is_file: | |
| self._set_file_mode_consumption_strategies() | |
| else: | |
| self._set_stream_mode_consumption_strategies() | |
| self._playback_allowed.set() | |
| self._stream_consumption_thread = Thread(target=self._consume_video) | |
| self._stream_consumption_thread.start() | |
| def _terminate(self, wait_on_frames_consumption: bool) -> None: | |
| if self._state in RESUME_ELIGIBLE_STATES: | |
| self._resume() | |
| previous_state = self._state | |
| self._change_state(target_state=StreamState.TERMINATING) | |
| if self._stream_consumption_thread is not None: | |
| self._stream_consumption_thread.join() | |
| if wait_on_frames_consumption: | |
| self._frames_buffer.join() | |
| if previous_state is not StreamState.ERROR: | |
| self._change_state(target_state=StreamState.ENDED) | |
| def _pause(self) -> None: | |
| self._playback_allowed.clear() | |
| self._change_state(target_state=StreamState.PAUSED) | |
| def _mute(self) -> None: | |
| self._frames_buffering_allowed = False | |
| self._change_state(target_state=StreamState.MUTED) | |
| def _resume(self) -> None: | |
| previous_state = self._state | |
| self._change_state(target_state=StreamState.RUNNING) | |
| if previous_state is StreamState.PAUSED: | |
| self._video_consumer.reset_stream_consumption_pace() | |
| self._playback_allowed.set() | |
| if previous_state is StreamState.MUTED: | |
| self._frames_buffering_allowed = True | |
| def _set_file_mode_consumption_strategies(self) -> None: | |
| if self._buffer_consumption_strategy is None: | |
| self._buffer_consumption_strategy = BufferConsumptionStrategy.LAZY | |
| def _set_stream_mode_consumption_strategies(self) -> None: | |
| if self._buffer_consumption_strategy is None: | |
| self._buffer_consumption_strategy = BufferConsumptionStrategy.EAGER | |
| def _consume_video(self) -> None: | |
| send_video_source_status_update( | |
| severity=UpdateSeverity.INFO, | |
| event_type=VIDEO_CONSUMPTION_STARTED_EVENT, | |
| status_update_handlers=self._status_update_handlers, | |
| ) | |
| logger.info(f"Video consumption started") | |
| try: | |
| self._change_state(target_state=StreamState.RUNNING) | |
| declared_source_fps = None | |
| if self._source_properties is not None: | |
| declared_source_fps = self._source_properties.fps | |
| while self._video.isOpened(): | |
| if self._state is StreamState.TERMINATING: | |
| break | |
| self._playback_allowed.wait() | |
| success = self._video_consumer.consume_frame( | |
| video=self._video, | |
| declared_source_fps=declared_source_fps, | |
| buffer=self._frames_buffer, | |
| frames_buffering_allowed=self._frames_buffering_allowed, | |
| ) | |
| if not success: | |
| break | |
| self._frames_buffer.put(None) | |
| self._video.release() | |
| self._change_state(target_state=StreamState.ENDED) | |
| send_video_source_status_update( | |
| severity=UpdateSeverity.INFO, | |
| event_type=VIDEO_CONSUMPTION_FINISHED_EVENT, | |
| status_update_handlers=self._status_update_handlers, | |
| ) | |
| logger.info(f"Video consumption finished") | |
| except Exception as error: | |
| self._change_state(target_state=StreamState.ERROR) | |
| payload = { | |
| "error_type": error.__class__.__name__, | |
| "error_message": str(error), | |
| "error_context": "stream_consumer_thread", | |
| } | |
| send_video_source_status_update( | |
| severity=UpdateSeverity.ERROR, | |
| event_type=SOURCE_ERROR_EVENT, | |
| payload=payload, | |
| status_update_handlers=self._status_update_handlers, | |
| ) | |
| logger.exception("Encountered error in video consumption thread") | |
| def _change_state(self, target_state: StreamState) -> None: | |
| payload = { | |
| "previous_state": self._state, | |
| "new_state": target_state, | |
| } | |
| self._state = target_state | |
| send_video_source_status_update( | |
| severity=UpdateSeverity.INFO, | |
| event_type=SOURCE_STATE_UPDATE_EVENT, | |
| payload=payload, | |
| status_update_handlers=self._status_update_handlers, | |
| ) | |
| def __iter__(self) -> "VideoSource": | |
| return self | |
| def __next__(self) -> VideoFrame: | |
| """ | |
| Method allowing to use `VideoSource` convenient to read frames | |
| Returns: VideoFrame | |
| Example: | |
| ```python | |
| source = VideoSource.init(video_reference="./some.mp4") | |
| source.start() | |
| for frame in source: | |
| pass | |
| ``` | |
| """ | |
| try: | |
| return self.read_frame() | |
| except EndOfStreamError: | |
| raise StopIteration() | |
| class VideoConsumer: | |
| """ | |
| This class should be consumed as part of internal implementation. | |
| It provides abstraction around stream consumption strategies. | |
| """ | |
| def init( | |
| cls, | |
| buffer_filling_strategy: Optional[BufferFillingStrategy], | |
| adaptive_mode_stream_pace_tolerance: float, | |
| adaptive_mode_reader_pace_tolerance: float, | |
| minimum_adaptive_mode_samples: int, | |
| maximum_adaptive_frames_dropped_in_row: int, | |
| status_update_handlers: List[Callable[[StatusUpdate], None]], | |
| ) -> "VideoConsumer": | |
| minimum_adaptive_mode_samples = max(minimum_adaptive_mode_samples, 2) | |
| reader_pace_monitor = sv.FPSMonitor( | |
| sample_size=10 * minimum_adaptive_mode_samples | |
| ) | |
| stream_consumption_pace_monitor = sv.FPSMonitor( | |
| sample_size=10 * minimum_adaptive_mode_samples | |
| ) | |
| decoding_pace_monitor = sv.FPSMonitor( | |
| sample_size=10 * minimum_adaptive_mode_samples | |
| ) | |
| return cls( | |
| buffer_filling_strategy=buffer_filling_strategy, | |
| adaptive_mode_stream_pace_tolerance=adaptive_mode_stream_pace_tolerance, | |
| adaptive_mode_reader_pace_tolerance=adaptive_mode_reader_pace_tolerance, | |
| minimum_adaptive_mode_samples=minimum_adaptive_mode_samples, | |
| maximum_adaptive_frames_dropped_in_row=maximum_adaptive_frames_dropped_in_row, | |
| status_update_handlers=status_update_handlers, | |
| reader_pace_monitor=reader_pace_monitor, | |
| stream_consumption_pace_monitor=stream_consumption_pace_monitor, | |
| decoding_pace_monitor=decoding_pace_monitor, | |
| ) | |
| def __init__( | |
| self, | |
| buffer_filling_strategy: Optional[BufferFillingStrategy], | |
| adaptive_mode_stream_pace_tolerance: float, | |
| adaptive_mode_reader_pace_tolerance: float, | |
| minimum_adaptive_mode_samples: int, | |
| maximum_adaptive_frames_dropped_in_row: int, | |
| status_update_handlers: List[Callable[[StatusUpdate], None]], | |
| reader_pace_monitor: sv.FPSMonitor, | |
| stream_consumption_pace_monitor: sv.FPSMonitor, | |
| decoding_pace_monitor: sv.FPSMonitor, | |
| ): | |
| self._buffer_filling_strategy = buffer_filling_strategy | |
| self._frame_counter = 0 | |
| self._adaptive_mode_stream_pace_tolerance = adaptive_mode_stream_pace_tolerance | |
| self._adaptive_mode_reader_pace_tolerance = adaptive_mode_reader_pace_tolerance | |
| self._minimum_adaptive_mode_samples = minimum_adaptive_mode_samples | |
| self._maximum_adaptive_frames_dropped_in_row = ( | |
| maximum_adaptive_frames_dropped_in_row | |
| ) | |
| self._adaptive_frames_dropped_in_row = 0 | |
| self._reader_pace_monitor = reader_pace_monitor | |
| self._stream_consumption_pace_monitor = stream_consumption_pace_monitor | |
| self._decoding_pace_monitor = decoding_pace_monitor | |
| self._status_update_handlers = status_update_handlers | |
| def buffer_filling_strategy(self) -> Optional[BufferFillingStrategy]: | |
| return self._buffer_filling_strategy | |
| def reset(self, source_properties: SourceProperties) -> None: | |
| if source_properties.is_file: | |
| self._set_file_mode_buffering_strategies() | |
| else: | |
| self._set_stream_mode_buffering_strategies() | |
| self._reader_pace_monitor.reset() | |
| self.reset_stream_consumption_pace() | |
| self._decoding_pace_monitor.reset() | |
| self._adaptive_frames_dropped_in_row = 0 | |
| def reset_stream_consumption_pace(self) -> None: | |
| self._stream_consumption_pace_monitor.reset() | |
| def notify_frame_consumed(self) -> None: | |
| self._reader_pace_monitor.tick() | |
| def consume_frame( | |
| self, | |
| video: cv2.VideoCapture, | |
| declared_source_fps: Optional[float], | |
| buffer: Queue, | |
| frames_buffering_allowed: bool, | |
| ) -> bool: | |
| frame_timestamp = datetime.now() | |
| success = video.grab() | |
| self._stream_consumption_pace_monitor.tick() | |
| if not success: | |
| return False | |
| self._frame_counter += 1 | |
| send_video_source_status_update( | |
| severity=UpdateSeverity.DEBUG, | |
| event_type=FRAME_CAPTURED_EVENT, | |
| payload={ | |
| "frame_timestamp": frame_timestamp, | |
| "frame_id": self._frame_counter, | |
| }, | |
| status_update_handlers=self._status_update_handlers, | |
| ) | |
| return self._consume_stream_frame( | |
| video=video, | |
| declared_source_fps=declared_source_fps, | |
| frame_timestamp=frame_timestamp, | |
| buffer=buffer, | |
| frames_buffering_allowed=frames_buffering_allowed, | |
| ) | |
| def _set_file_mode_buffering_strategies(self) -> None: | |
| if self._buffer_filling_strategy is None: | |
| self._buffer_filling_strategy = BufferFillingStrategy.WAIT | |
| def _set_stream_mode_buffering_strategies(self) -> None: | |
| if self._buffer_filling_strategy is None: | |
| self._buffer_filling_strategy = BufferFillingStrategy.ADAPTIVE_DROP_OLDEST | |
| def _consume_stream_frame( | |
| self, | |
| video: cv2.VideoCapture, | |
| declared_source_fps: Optional[float], | |
| frame_timestamp: datetime, | |
| buffer: Queue, | |
| frames_buffering_allowed: bool, | |
| ) -> bool: | |
| """ | |
| Returns: boolean flag with success status | |
| """ | |
| if not frames_buffering_allowed: | |
| send_frame_drop_update( | |
| frame_timestamp=frame_timestamp, | |
| frame_id=self._frame_counter, | |
| cause="Buffering not allowed at the moment", | |
| status_update_handlers=self._status_update_handlers, | |
| ) | |
| return True | |
| if self._frame_should_be_adaptively_dropped( | |
| declared_source_fps=declared_source_fps | |
| ): | |
| self._adaptive_frames_dropped_in_row += 1 | |
| send_frame_drop_update( | |
| frame_timestamp=frame_timestamp, | |
| frame_id=self._frame_counter, | |
| cause="ADAPTIVE strategy", | |
| status_update_handlers=self._status_update_handlers, | |
| ) | |
| return True | |
| self._adaptive_frames_dropped_in_row = 0 | |
| if ( | |
| not buffer.full() | |
| or self._buffer_filling_strategy is BufferFillingStrategy.WAIT | |
| ): | |
| return decode_video_frame_to_buffer( | |
| frame_timestamp=frame_timestamp, | |
| frame_id=self._frame_counter, | |
| video=video, | |
| buffer=buffer, | |
| decoding_pace_monitor=self._decoding_pace_monitor, | |
| ) | |
| if self._buffer_filling_strategy in DROP_OLDEST_STRATEGIES: | |
| return self._process_stream_frame_dropping_oldest( | |
| frame_timestamp=frame_timestamp, | |
| video=video, | |
| buffer=buffer, | |
| ) | |
| send_frame_drop_update( | |
| frame_timestamp=frame_timestamp, | |
| frame_id=self._frame_counter, | |
| cause="DROP_LATEST strategy", | |
| status_update_handlers=self._status_update_handlers, | |
| ) | |
| return True | |
| def _frame_should_be_adaptively_dropped( | |
| self, declared_source_fps: Optional[float] | |
| ) -> bool: | |
| if self._buffer_filling_strategy not in ADAPTIVE_STRATEGIES: | |
| return False | |
| if ( | |
| self._adaptive_frames_dropped_in_row | |
| >= self._maximum_adaptive_frames_dropped_in_row | |
| ): | |
| return False | |
| if ( | |
| len(self._stream_consumption_pace_monitor.all_timestamps) | |
| <= self._minimum_adaptive_mode_samples | |
| ): | |
| # not enough observations | |
| return False | |
| stream_consumption_pace = self._stream_consumption_pace_monitor() | |
| announced_stream_fps = stream_consumption_pace | |
| if declared_source_fps is not None and declared_source_fps > 0: | |
| announced_stream_fps = declared_source_fps | |
| if ( | |
| announced_stream_fps - stream_consumption_pace | |
| > self._adaptive_mode_stream_pace_tolerance | |
| ): | |
| # cannot keep up with stream emission | |
| return True | |
| if ( | |
| len(self._reader_pace_monitor.all_timestamps) | |
| <= self._minimum_adaptive_mode_samples | |
| ) or ( | |
| len(self._decoding_pace_monitor.all_timestamps) | |
| <= self._minimum_adaptive_mode_samples | |
| ): | |
| # not enough observations | |
| return False | |
| actual_reader_pace = get_fps_if_tick_happens_now( | |
| fps_monitor=self._reader_pace_monitor | |
| ) | |
| decoding_pace = self._decoding_pace_monitor() | |
| if ( | |
| decoding_pace - actual_reader_pace | |
| > self._adaptive_mode_reader_pace_tolerance | |
| ): | |
| # we are too fast for the reader - time to save compute on decoding | |
| return True | |
| return False | |
| def _process_stream_frame_dropping_oldest( | |
| self, | |
| frame_timestamp: datetime, | |
| video: cv2.VideoCapture, | |
| buffer: Queue, | |
| ) -> bool: | |
| drop_single_frame_from_buffer( | |
| buffer=buffer, | |
| cause="DROP_OLDEST strategy", | |
| status_update_handlers=self._status_update_handlers, | |
| ) | |
| return decode_video_frame_to_buffer( | |
| frame_timestamp=frame_timestamp, | |
| frame_id=self._frame_counter, | |
| video=video, | |
| buffer=buffer, | |
| decoding_pace_monitor=self._decoding_pace_monitor, | |
| ) | |
| def discover_source_properties(stream: cv2.VideoCapture) -> SourceProperties: | |
| width = int(stream.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
| height = int(stream.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
| fps = stream.get(cv2.CAP_PROP_FPS) | |
| total_frames = int(stream.get(cv2.CAP_PROP_FRAME_COUNT)) | |
| return SourceProperties( | |
| width=width, | |
| height=height, | |
| total_frames=total_frames, | |
| is_file=total_frames > 0, | |
| fps=fps, | |
| ) | |
| def purge_queue( | |
| queue: Queue, | |
| wait_on_empty: bool = True, | |
| on_successful_read: Callable[[], None] = lambda: None, | |
| ) -> Optional[Any]: | |
| result = None | |
| if queue.empty() and wait_on_empty: | |
| result = queue.get() | |
| queue.task_done() | |
| on_successful_read() | |
| while not queue.empty(): | |
| result = queue.get() | |
| queue.task_done() | |
| on_successful_read() | |
| return result | |
| def drop_single_frame_from_buffer( | |
| buffer: Queue, | |
| cause: str, | |
| status_update_handlers: List[Callable[[StatusUpdate], None]], | |
| ) -> None: | |
| try: | |
| video_frame = buffer.get_nowait() | |
| buffer.task_done() | |
| send_frame_drop_update( | |
| frame_timestamp=video_frame.frame_timestamp, | |
| frame_id=video_frame.frame_id, | |
| cause=cause, | |
| status_update_handlers=status_update_handlers, | |
| ) | |
| except Empty: | |
| # buffer may be emptied in the meantime, hence we ignore Empty | |
| pass | |
| def send_frame_drop_update( | |
| frame_timestamp: datetime, | |
| frame_id: int, | |
| cause: str, | |
| status_update_handlers: List[Callable[[StatusUpdate], None]], | |
| ) -> None: | |
| send_video_source_status_update( | |
| severity=UpdateSeverity.DEBUG, | |
| event_type=FRAME_DROPPED_EVENT, | |
| payload={ | |
| "frame_timestamp": frame_timestamp, | |
| "frame_id": frame_id, | |
| "cause": cause, | |
| }, | |
| status_update_handlers=status_update_handlers, | |
| sub_context=VIDEO_CONSUMER_CONTEXT, | |
| ) | |
| def send_video_source_status_update( | |
| severity: UpdateSeverity, | |
| event_type: str, | |
| status_update_handlers: List[Callable[[StatusUpdate], None]], | |
| sub_context: Optional[str] = None, | |
| payload: Optional[dict] = None, | |
| ) -> None: | |
| if payload is None: | |
| payload = {} | |
| context = VIDEO_SOURCE_CONTEXT | |
| if sub_context is not None: | |
| context = f"{context}.{sub_context}" | |
| status_update = StatusUpdate( | |
| timestamp=datetime.now(), | |
| severity=severity, | |
| event_type=event_type, | |
| payload=payload, | |
| context=context, | |
| ) | |
| for handler in status_update_handlers: | |
| try: | |
| handler(status_update) | |
| except Exception as error: | |
| logger.warning(f"Could not execute handler update. Cause: {error}") | |
| def decode_video_frame_to_buffer( | |
| frame_timestamp: datetime, | |
| frame_id: int, | |
| video: cv2.VideoCapture, | |
| buffer: Queue, | |
| decoding_pace_monitor: sv.FPSMonitor, | |
| ) -> bool: | |
| success, image = video.retrieve() | |
| if not success: | |
| return False | |
| decoding_pace_monitor.tick() | |
| video_frame = VideoFrame( | |
| image=image, frame_id=frame_id, frame_timestamp=frame_timestamp | |
| ) | |
| buffer.put(video_frame) | |
| return True | |
| def get_fps_if_tick_happens_now(fps_monitor: sv.FPSMonitor) -> float: | |
| if len(fps_monitor.all_timestamps) == 0: | |
| return 0.0 | |
| min_reader_timestamp = fps_monitor.all_timestamps[0] | |
| now = time.monotonic() | |
| reader_taken_time = now - min_reader_timestamp | |
| return (len(fps_monitor.all_timestamps) + 1) / reader_taken_time | |