| import asyncio |
| import logging |
| import logging.handlers |
| import queue |
| import threading |
| import urllib.request |
| from pathlib import Path |
| from typing import List, NamedTuple |
|
|
| try: |
| from typing import Literal |
| except ImportError: |
| from typing_extensions import Literal |
|
|
| import av |
| import cv2 |
| import matplotlib.pyplot as plt |
| import numpy as np |
| import pydub |
| import streamlit as st |
| from aiortc.contrib.media import MediaPlayer |
|
|
| from streamlit_webrtc import ( |
| AudioProcessorBase, |
| ClientSettings, |
| VideoProcessorBase, |
| WebRtcMode, |
| webrtc_streamer, |
| ) |
|
|
| HERE = Path(__file__).parent |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| |
| def download_file(url, download_to: Path, expected_size=None): |
| |
| |
| if download_to.exists(): |
| if expected_size: |
| if download_to.stat().st_size == expected_size: |
| return |
| else: |
| st.info(f"{url} is already downloaded.") |
| if not st.button("Download again?"): |
| return |
|
|
| download_to.parent.mkdir(parents=True, exist_ok=True) |
|
|
| |
| weights_warning, progress_bar = None, None |
| try: |
| weights_warning = st.warning("Downloading %s..." % url) |
| progress_bar = st.progress(0) |
| with open(download_to, "wb") as output_file: |
| with urllib.request.urlopen(url) as response: |
| length = int(response.info()["Content-Length"]) |
| counter = 0.0 |
| MEGABYTES = 2.0 ** 20.0 |
| while True: |
| data = response.read(8192) |
| if not data: |
| break |
| counter += len(data) |
| output_file.write(data) |
|
|
| |
| weights_warning.warning( |
| "Downloading %s... (%6.2f/%6.2f MB)" |
| % (url, counter / MEGABYTES, length / MEGABYTES) |
| ) |
| progress_bar.progress(min(counter / length, 1.0)) |
| |
| finally: |
| if weights_warning is not None: |
| weights_warning.empty() |
| if progress_bar is not None: |
| progress_bar.empty() |
|
|
|
|
| WEBRTC_CLIENT_SETTINGS = ClientSettings( |
| rtc_configuration={"iceServers": [{"urls": ["stun:stun.l.google.com:19302"]}]}, |
| media_stream_constraints={"video": True, "audio": True}, |
| ) |
|
|
|
|
| def main(): |
| st.header("WebRTC demo") |
|
|
| object_detection_page = "Real time object detection (sendrecv)" |
| video_filters_page = ( |
| "Real time video transform with simple OpenCV filters (sendrecv)" |
| ) |
| audio_filter_page = "Real time audio filter (sendrecv)" |
| delayed_echo_page = "Delayed echo (sendrecv)" |
| streaming_page = ( |
| "Consuming media files on server-side and streaming it to browser (recvonly)" |
| ) |
| video_sendonly_page = ( |
| "WebRTC is sendonly and images are shown via st.image() (sendonly)" |
| ) |
| audio_sendonly_page = ( |
| "WebRTC is sendonly and audio frames are visualized with matplotlib (sendonly)" |
| ) |
| loopback_page = "Simple video and audio loopback (sendrecv)" |
| app_mode = st.sidebar.selectbox( |
| "Choose the app mode", |
| [ |
| object_detection_page, |
| video_filters_page, |
| audio_filter_page, |
| delayed_echo_page, |
| streaming_page, |
| video_sendonly_page, |
| audio_sendonly_page, |
| loopback_page, |
| ], |
| ) |
| st.subheader(app_mode) |
|
|
| if app_mode == video_filters_page: |
| app_video_filters() |
| elif app_mode == object_detection_page: |
| app_object_detection() |
| elif app_mode == audio_filter_page: |
| app_audio_filter() |
| elif app_mode == delayed_echo_page: |
| app_delayed_echo() |
| elif app_mode == streaming_page: |
| app_streaming() |
| elif app_mode == video_sendonly_page: |
| app_sendonly_video() |
| elif app_mode == audio_sendonly_page: |
| app_sendonly_audio() |
| elif app_mode == loopback_page: |
| app_loopback() |
|
|
| logger.debug("=== Alive threads ===") |
| for thread in threading.enumerate(): |
| if thread.is_alive(): |
| logger.debug(f" {thread.name} ({thread.ident})") |
|
|
|
|
| def app_loopback(): |
| """ Simple video loopback """ |
| webrtc_streamer( |
| key="loopback", |
| mode=WebRtcMode.SENDRECV, |
| client_settings=WEBRTC_CLIENT_SETTINGS, |
| video_processor_factory=None, |
| ) |
|
|
|
|
| def app_video_filters(): |
| """ Video transforms with OpenCV """ |
|
|
| class OpenCVVideoProcessor(VideoProcessorBase): |
| type: Literal["noop", "cartoon", "edges", "rotate"] |
|
|
| def __init__(self) -> None: |
| self.type = "noop" |
|
|
| def recv(self, frame: av.VideoFrame) -> av.VideoFrame: |
| img = frame.to_ndarray(format="bgr24") |
|
|
| if self.type == "noop": |
| pass |
| elif self.type == "cartoon": |
| |
| img_color = cv2.pyrDown(cv2.pyrDown(img)) |
| for _ in range(6): |
| img_color = cv2.bilateralFilter(img_color, 9, 9, 7) |
| img_color = cv2.pyrUp(cv2.pyrUp(img_color)) |
|
|
| |
| img_edges = cv2.cvtColor(img, cv2.COLOR_RGB2GRAY) |
| img_edges = cv2.adaptiveThreshold( |
| cv2.medianBlur(img_edges, 7), |
| 255, |
| cv2.ADAPTIVE_THRESH_MEAN_C, |
| cv2.THRESH_BINARY, |
| 9, |
| 2, |
| ) |
| img_edges = cv2.cvtColor(img_edges, cv2.COLOR_GRAY2RGB) |
|
|
| |
| img = cv2.bitwise_and(img_color, img_edges) |
| elif self.type == "edges": |
| |
| img = cv2.cvtColor(cv2.Canny(img, 100, 200), cv2.COLOR_GRAY2BGR) |
| elif self.type == "rotate": |
| |
| rows, cols, _ = img.shape |
| M = cv2.getRotationMatrix2D((cols / 2, rows / 2), frame.time * 45, 1) |
| img = cv2.warpAffine(img, M, (cols, rows)) |
|
|
| return av.VideoFrame.from_ndarray(img, format="bgr24") |
|
|
| webrtc_ctx = webrtc_streamer( |
| key="opencv-filter", |
| mode=WebRtcMode.SENDRECV, |
| client_settings=WEBRTC_CLIENT_SETTINGS, |
| video_processor_factory=OpenCVVideoProcessor, |
| async_processing=True, |
| ) |
|
|
| if webrtc_ctx.video_processor: |
| webrtc_ctx.video_processor.type = st.radio( |
| "Select transform type", ("noop", "cartoon", "edges", "rotate") |
| ) |
|
|
| st.markdown( |
| "This demo is based on " |
| "https://github.com/aiortc/aiortc/blob/2362e6d1f0c730a0f8c387bbea76546775ad2fe8/examples/server/server.py#L34. " |
| "Many thanks to the project." |
| ) |
|
|
|
|
| def app_audio_filter(): |
| DEFAULT_GAIN = 1.0 |
|
|
| class AudioProcessor(AudioProcessorBase): |
| gain = DEFAULT_GAIN |
|
|
| def recv(self, frame: av.AudioFrame) -> av.AudioFrame: |
| raw_samples = frame.to_ndarray() |
| sound = pydub.AudioSegment( |
| data=raw_samples.tobytes(), |
| sample_width=frame.format.bytes, |
| frame_rate=frame.sample_rate, |
| channels=len(frame.layout.channels), |
| ) |
|
|
| sound = sound.apply_gain(self.gain) |
|
|
| |
| channel_sounds = sound.split_to_mono() |
| channel_samples = [s.get_array_of_samples() for s in channel_sounds] |
| new_samples: np.ndarray = np.array(channel_samples).T |
| new_samples = new_samples.reshape(raw_samples.shape) |
|
|
| new_frame = av.AudioFrame.from_ndarray( |
| new_samples, layout=frame.layout.name |
| ) |
| new_frame.sample_rate = frame.sample_rate |
| return new_frame |
|
|
| webrtc_ctx = webrtc_streamer( |
| key="audio-filter", |
| mode=WebRtcMode.SENDRECV, |
| client_settings=WEBRTC_CLIENT_SETTINGS, |
| audio_processor_factory=AudioProcessor, |
| async_processing=True, |
| ) |
|
|
| if webrtc_ctx.audio_processor: |
| webrtc_ctx.audio_processor.gain = st.slider( |
| "Gain", -10.0, +20.0, DEFAULT_GAIN, 0.05 |
| ) |
|
|
|
|
| def app_delayed_echo(): |
| DEFAULT_DELAY = 1.0 |
|
|
| class VideoProcessor(VideoProcessorBase): |
| delay = DEFAULT_DELAY |
|
|
| async def recv_queued(self, frames: List[av.VideoFrame]) -> List[av.VideoFrame]: |
| logger.debug("Delay:", self.delay) |
| await asyncio.sleep(self.delay) |
| return frames |
|
|
| class AudioProcessor(AudioProcessorBase): |
| delay = DEFAULT_DELAY |
|
|
| async def recv_queued(self, frames: List[av.AudioFrame]) -> List[av.AudioFrame]: |
| await asyncio.sleep(self.delay) |
| return frames |
|
|
| webrtc_ctx = webrtc_streamer( |
| key="delay", |
| mode=WebRtcMode.SENDRECV, |
| client_settings=WEBRTC_CLIENT_SETTINGS, |
| video_processor_factory=VideoProcessor, |
| audio_processor_factory=AudioProcessor, |
| async_processing=True, |
| ) |
|
|
| if webrtc_ctx.video_processor and webrtc_ctx.audio_processor: |
| delay = st.slider("Delay", 0.0, 5.0, DEFAULT_DELAY, 0.05) |
| webrtc_ctx.video_processor.delay = delay |
| webrtc_ctx.audio_processor.delay = delay |
|
|
|
|
| def app_object_detection(): |
| """Object detection demo with MobileNet SSD. |
| This model and code are based on |
| https://github.com/robmarkcole/object-detection-app |
| """ |
| MODEL_URL = "https://github.com/robmarkcole/object-detection-app/raw/master/model/MobileNetSSD_deploy.caffemodel" |
| MODEL_LOCAL_PATH = HERE / "./models/MobileNetSSD_deploy.caffemodel" |
| PROTOTXT_URL = "https://github.com/robmarkcole/object-detection-app/raw/master/model/MobileNetSSD_deploy.prototxt.txt" |
| PROTOTXT_LOCAL_PATH = HERE / "./models/MobileNetSSD_deploy.prototxt.txt" |
|
|
| CLASSES = [ |
| "background", |
| "aeroplane", |
| "bicycle", |
| "bird", |
| "boat", |
| "bottle", |
| "bus", |
| "car", |
| "cat", |
| "chair", |
| "cow", |
| "diningtable", |
| "dog", |
| "horse", |
| "motorbike", |
| "person", |
| "pottedplant", |
| "sheep", |
| "sofa", |
| "train", |
| "tvmonitor", |
| ] |
| COLORS = np.random.uniform(0, 255, size=(len(CLASSES), 3)) |
|
|
| download_file(MODEL_URL, MODEL_LOCAL_PATH, expected_size=23147564) |
| download_file(PROTOTXT_URL, PROTOTXT_LOCAL_PATH, expected_size=29353) |
|
|
| DEFAULT_CONFIDENCE_THRESHOLD = 0.5 |
|
|
| class Detection(NamedTuple): |
| name: str |
| prob: float |
|
|
| class MobileNetSSDVideoProcessor(VideoProcessorBase): |
| confidence_threshold: float |
| result_queue: "queue.Queue[List[Detection]]" |
|
|
| def __init__(self) -> None: |
| self._net = cv2.dnn.readNetFromCaffe( |
| str(PROTOTXT_LOCAL_PATH), str(MODEL_LOCAL_PATH) |
| ) |
| self.confidence_threshold = DEFAULT_CONFIDENCE_THRESHOLD |
| self.result_queue = queue.Queue() |
|
|
| def _annotate_image(self, image, detections): |
| |
| (h, w) = image.shape[:2] |
| result: List[Detection] = [] |
| for i in np.arange(0, detections.shape[2]): |
| confidence = detections[0, 0, i, 2] |
|
|
| if confidence > self.confidence_threshold: |
| |
| |
| |
| idx = int(detections[0, 0, i, 1]) |
| box = detections[0, 0, i, 3:7] * np.array([w, h, w, h]) |
| (startX, startY, endX, endY) = box.astype("int") |
|
|
| name = CLASSES[idx] |
| result.append(Detection(name=name, prob=float(confidence))) |
|
|
| |
| label = f"{name}: {round(confidence * 100, 2)}%" |
| cv2.rectangle(image, (startX, startY), (endX, endY), COLORS[idx], 2) |
| y = startY - 15 if startY - 15 > 15 else startY + 15 |
| cv2.putText( |
| image, |
| label, |
| (startX, y), |
| cv2.FONT_HERSHEY_SIMPLEX, |
| 0.5, |
| COLORS[idx], |
| 2, |
| ) |
| return image, result |
|
|
| def recv(self, frame: av.VideoFrame) -> av.VideoFrame: |
| image = frame.to_ndarray(format="bgr24") |
| blob = cv2.dnn.blobFromImage( |
| cv2.resize(image, (300, 300)), 0.007843, (300, 300), 127.5 |
| ) |
| self._net.setInput(blob) |
| detections = self._net.forward() |
| annotated_image, result = self._annotate_image(image, detections) |
|
|
| |
| |
| self.result_queue.put(result) |
|
|
| return av.VideoFrame.from_ndarray(annotated_image, format="bgr24") |
|
|
| webrtc_ctx = webrtc_streamer( |
| key="object-detection", |
| mode=WebRtcMode.SENDRECV, |
| client_settings=WEBRTC_CLIENT_SETTINGS, |
| video_processor_factory=MobileNetSSDVideoProcessor, |
| async_processing=True, |
| ) |
|
|
| confidence_threshold = st.slider( |
| "Confidence threshold", 0.0, 1.0, DEFAULT_CONFIDENCE_THRESHOLD, 0.05 |
| ) |
| if webrtc_ctx.video_processor: |
| webrtc_ctx.video_processor.confidence_threshold = confidence_threshold |
|
|
| if st.checkbox("Show the detected labels", value=True): |
| if webrtc_ctx.state.playing: |
| labels_placeholder = st.empty() |
| |
| |
| |
| |
| |
| while True: |
| if webrtc_ctx.video_processor: |
| try: |
| result = webrtc_ctx.video_processor.result_queue.get( |
| timeout=1.0 |
| ) |
| except queue.Empty: |
| result = None |
| labels_placeholder.table(result) |
| else: |
| break |
|
|
| st.markdown( |
| "This demo uses a model and code from " |
| "https://github.com/robmarkcole/object-detection-app. " |
| "Many thanks to the project." |
| ) |
|
|
|
|
| def app_streaming(): |
| """ Media streamings """ |
| MEDIAFILES = { |
| "big_buck_bunny_720p_2mb.mp4": { |
| "url": "https://sample-videos.com/video123/mp4/720/big_buck_bunny_720p_2mb.mp4", |
| "local_file_path": HERE / "data/big_buck_bunny_720p_2mb.mp4", |
| "type": "video", |
| }, |
| "big_buck_bunny_720p_10mb.mp4": { |
| "url": "https://sample-videos.com/video123/mp4/720/big_buck_bunny_720p_10mb.mp4", |
| "local_file_path": HERE / "data/big_buck_bunny_720p_10mb.mp4", |
| "type": "video", |
| }, |
| "file_example_MP3_700KB.mp3": { |
| "url": "https://file-examples-com.github.io/uploads/2017/11/file_example_MP3_700KB.mp3", |
| "local_file_path": HERE / "data/file_example_MP3_700KB.mp3", |
| "type": "audio", |
| }, |
| "file_example_MP3_5MG.mp3": { |
| "url": "https://file-examples-com.github.io/uploads/2017/11/file_example_MP3_5MG.mp3", |
| "local_file_path": HERE / "data/file_example_MP3_5MG.mp3", |
| "type": "audio", |
| }, |
| } |
| media_file_label = st.radio( |
| "Select a media file to stream", tuple(MEDIAFILES.keys()) |
| ) |
| media_file_info = MEDIAFILES[media_file_label] |
| download_file(media_file_info["url"], media_file_info["local_file_path"]) |
|
|
| def create_player(): |
| return MediaPlayer(str(media_file_info["local_file_path"])) |
|
|
| |
| |
| |
| |
| |
| |
|
|
| WEBRTC_CLIENT_SETTINGS.update( |
| { |
| "media_stream_constraints": { |
| "video": media_file_info["type"] == "video", |
| "audio": media_file_info["type"] == "audio", |
| } |
| } |
| ) |
|
|
| webrtc_streamer( |
| key=f"media-streaming-{media_file_label}", |
| mode=WebRtcMode.RECVONLY, |
| client_settings=WEBRTC_CLIENT_SETTINGS, |
| player_factory=create_player, |
| ) |
|
|
|
|
| def app_sendonly_video(): |
| """A sample to use WebRTC in sendonly mode to transfer frames |
| from the browser to the server and to render frames via `st.image`.""" |
| webrtc_ctx = webrtc_streamer( |
| key="loopback", |
| mode=WebRtcMode.SENDONLY, |
| client_settings=WEBRTC_CLIENT_SETTINGS, |
| ) |
|
|
| image_place = st.empty() |
|
|
| if webrtc_ctx.video_receiver: |
| while True: |
| try: |
| video_frame = webrtc_ctx.video_receiver.get_frame(timeout=1) |
| except queue.Empty: |
| logger.warning("Queue is empty. Abort.") |
| break |
|
|
| img_rgb = video_frame.to_ndarray(format="rgb24") |
| image_place.image(img_rgb) |
|
|
|
|
| def app_sendonly_audio(): |
| """A sample to use WebRTC in sendonly mode to transfer audio frames |
| from the browser to the server and visualize them with matplotlib |
| and `st.pyplog`.""" |
| webrtc_ctx = webrtc_streamer( |
| key="loopback", |
| mode=WebRtcMode.SENDONLY, |
| audio_receiver_size=256, |
| client_settings=WEBRTC_CLIENT_SETTINGS, |
| ) |
|
|
| fig_place = st.empty() |
|
|
| fig, [ax_time, ax_freq] = plt.subplots( |
| 2, 1, gridspec_kw={"top": 1.5, "bottom": 0.2} |
| ) |
|
|
| sound_window_len = 5000 |
| sound_window_buffer = None |
| while True: |
| if webrtc_ctx.audio_receiver: |
| try: |
| audio_frames = webrtc_ctx.audio_receiver.get_frames(timeout=1) |
| except queue.Empty: |
| logger.warning("Queue is empty. Abort.") |
| break |
|
|
| sound_chunk = pydub.AudioSegment.empty() |
| for audio_frame in audio_frames: |
| sound = pydub.AudioSegment( |
| data=audio_frame.to_ndarray().tobytes(), |
| sample_width=audio_frame.format.bytes, |
| frame_rate=audio_frame.sample_rate, |
| channels=len(audio_frame.layout.channels), |
| ) |
| sound_chunk += sound |
|
|
| if len(sound_chunk) > 0: |
| if sound_window_buffer is None: |
| sound_window_buffer = pydub.AudioSegment.silent( |
| duration=sound_window_len |
| ) |
|
|
| sound_window_buffer += sound_chunk |
| if len(sound_window_buffer) > sound_window_len: |
| sound_window_buffer = sound_window_buffer[-sound_window_len:] |
|
|
| if sound_window_buffer: |
| |
| sound_window_buffer = sound_window_buffer.set_channels( |
| 1 |
| ) |
| sample = np.array(sound_window_buffer.get_array_of_samples()) |
|
|
| ax_time.cla() |
| times = (np.arange(-len(sample), 0)) / sound_window_buffer.frame_rate |
| ax_time.plot(times, sample) |
| ax_time.set_xlabel("Time") |
| ax_time.set_ylabel("Magnitude") |
|
|
| spec = np.fft.fft(sample) |
| freq = np.fft.fftfreq(sample.shape[0], 1.0 / sound_chunk.frame_rate) |
| freq = freq[: int(freq.shape[0] / 2)] |
| spec = spec[: int(spec.shape[0] / 2)] |
| spec[0] = spec[0] / 2 |
|
|
| ax_freq.cla() |
| ax_freq.plot(freq, np.abs(spec)) |
| ax_freq.set_xlabel("Frequency") |
| ax_freq.set_yscale("log") |
| ax_freq.set_ylabel("Magnitude") |
|
|
| fig_place.pyplot(fig) |
| else: |
| logger.warning("AudioReciver is not set. Abort.") |
| break |
|
|
|
|
| if __name__ == "__main__": |
| import os |
|
|
| DEBUG = os.environ.get("DEBUG", "false").lower() not in ["false", "no", "0"] |
|
|
| logging.basicConfig( |
| format="[%(asctime)s] %(levelname)7s from %(name)s in %(pathname)s:%(lineno)d: " |
| "%(message)s", |
| force=True, |
| ) |
|
|
| logger.setLevel(level=logging.DEBUG if DEBUG else logging.INFO) |
|
|
| st_webrtc_logger = logging.getLogger("streamlit_webrtc") |
| st_webrtc_logger.setLevel(logging.DEBUG) |
|
|
| fsevents_logger = logging.getLogger("fsevents") |
| fsevents_logger.setLevel(logging.WARNING) |
|
|
| main() |
|
|