Spaces:
Sleeping
Sleeping
s-egg-mentation / deployments /deployment /Instance segmentation task /python /demo_package /executors /asynchronous.py
| # Copyright (C) 2024 Intel Corporation | |
| # SPDX-License-Identifier: Apache-2.0 | |
| # | |
| """Async executor based on ModelAPI.""" | |
| from __future__ import annotations | |
| import time | |
| from typing import TYPE_CHECKING, Any | |
| from model_api.pipelines import AsyncPipeline | |
| if TYPE_CHECKING: | |
| import numpy as np | |
| from demo_package.model_wrapper import ModelWrapper | |
| from demo_package.streamer import get_streamer | |
| from demo_package.visualizers import BaseVisualizer, dump_frames | |
| class AsyncExecutor: | |
| """Async inferencer. | |
| Args: | |
| model: model for inference | |
| visualizer: visualizer of inference results | |
| """ | |
| def __init__(self, model: ModelWrapper, visualizer: BaseVisualizer) -> None: | |
| self.model = model | |
| self.visualizer = visualizer | |
| self.async_pipeline = AsyncPipeline(self.model.core_model) | |
| def run(self, input_stream: int | str, loop: bool = False) -> None: | |
| """Async inference for input stream (image, video stream, camera).""" | |
| streamer = get_streamer(input_stream, loop) | |
| next_frame_id = 0 | |
| next_frame_id_to_show = 0 | |
| stop_visualization = False | |
| saved_frames = [] | |
| for frame in streamer: | |
| results = self.async_pipeline.get_result(next_frame_id_to_show) | |
| while results: | |
| start_time = time.perf_counter() | |
| output = self.render_result(results) | |
| next_frame_id_to_show += 1 | |
| self.visualizer.show(output) | |
| if self.visualizer.output: | |
| saved_frames.append(output) | |
| stop_visualization = self.visualizer.is_quit() | |
| # visualize video not faster than the original FPS | |
| self.visualizer.video_delay(time.perf_counter() - start_time, streamer) | |
| results = self.async_pipeline.get_result(next_frame_id_to_show) | |
| if stop_visualization: | |
| break | |
| self.async_pipeline.submit_data(frame, next_frame_id, {"frame": frame}) | |
| next_frame_id += 1 | |
| self.async_pipeline.await_all() | |
| for next_id in range(next_frame_id_to_show, next_frame_id): | |
| start_time = time.perf_counter() | |
| results = self.async_pipeline.get_result(next_id) | |
| if not results: | |
| msg = "Async pipeline returned None results" | |
| raise RuntimeError(msg) | |
| output = self.render_result(results) | |
| self.visualizer.show(output) | |
| if self.visualizer.output: | |
| saved_frames.append(output) | |
| # visualize video not faster than the original FPS | |
| self.visualizer.video_delay(time.perf_counter() - start_time, streamer) | |
| dump_frames(saved_frames, self.visualizer.output, input_stream, streamer) | |
| def render_result(self, results: tuple[Any, dict]) -> np.ndarray: | |
| """Render for results of inference.""" | |
| predictions, frame_meta = results | |
| current_frame = frame_meta["frame"] | |
| return self.visualizer.draw(current_frame, predictions) | |