Spaces:
Runtime error
Runtime error
| import asyncio | |
| import io | |
| import logging | |
| import traceback | |
| from typing import List | |
| import av | |
| import numpy as np | |
| import streamlit as st | |
| from streamlit_webrtc import WebRtcMode, webrtc_streamer | |
| import pydub | |
| from dotenv import load_dotenv | |
| load_dotenv() | |
| from sample_utils.turn import get_ice_servers | |
| logger = logging.getLogger(__name__) | |
| class StreamingMP3ToFrames: | |
| def __init__(self): | |
| self.append = False | |
| def process_chunk(self, chunk): | |
| audio_frames = [] | |
| try: | |
| if self.append: | |
| self.bytes_io.write(chunk) | |
| self.append = False | |
| self.bytes_io.seek(0) | |
| else: | |
| self.bytes_io = io.BytesIO(chunk) | |
| container = av.open(self.bytes_io, 'r', format='mp3') | |
| audio_stream = next(s for s in container.streams if s.type == 'audio') | |
| for frame in container.decode(audio_stream): | |
| # Convert the audio frame to a NumPy array | |
| array = frame.to_ndarray() | |
| # Now you can use av.AudioFrame.from_ndarray | |
| # audio_frame = av.AudioFrame.from_ndarray(array, format='flt', layout='mono') | |
| audio_frame = av.AudioFrame.from_ndarray(array, format='fltp', layout='mono') | |
| audio_frame.sample_rate = 44100 | |
| audio_frames.append(audio_frame) | |
| return audio_frames | |
| except Exception as e: | |
| print (e) | |
| self.append = True | |
| self.bytes_io.seek(0, io.SEEK_END) | |
| return audio_frames | |
| def video_frame_callback( | |
| frame: av.VideoFrame, | |
| ) -> av.VideoFrame: | |
| return frame | |
| streaming_mp3_to_frames = StreamingMP3ToFrames() | |
| with open("chunks.pkl", "rb") as f: | |
| import pickle | |
| debug_chunks = pickle.load(f) | |
| debug_frames = [] | |
| debug_frame_idx = 0 | |
| for chunk in debug_chunks: | |
| new_frames = streaming_mp3_to_frames.process_chunk(chunk) | |
| for frame in new_frames: | |
| debug_frames.append(frame) | |
| # print (frame) | |
| def dequeue_frame(): | |
| global debug_frame_idx, debug_frames | |
| enqueued_frame = debug_frames[debug_frame_idx] | |
| debug_frame_idx += 1 | |
| if debug_frame_idx >= len(debug_frames): | |
| debug_frame_idx = 0 | |
| return enqueued_frame | |
| # emptry array of type int16 | |
| sample_buffer = np.zeros((0), dtype=np.int16) | |
| def process_frame(old_frame): | |
| try: | |
| output_channels = 2 | |
| output_sample_rate = 44100 | |
| required_samples = old_frame.samples | |
| global sample_buffer | |
| while sample_buffer.shape[0] < required_samples: | |
| dequeued_frame = dequeue_frame() | |
| if dequeued_frame is None: | |
| break | |
| # convert dequeued_frame to same format as old_frame | |
| float_samples = dequeued_frame.to_ndarray() | |
| max_sample = np.max(np.abs(float_samples)) | |
| min_sample = np.min(np.abs(float_samples)) | |
| if max_sample > 1.0 or min_sample > 1.0: | |
| print(f"WARNING: max_sample: {max_sample}, min_sample: {min_sample}") | |
| int_samples = np.int16(float_samples * 32767) | |
| sound = pydub.AudioSegment( | |
| data=int_samples.tobytes(), | |
| sample_width=2, | |
| frame_rate=output_sample_rate, | |
| channels=len(dequeued_frame.layout.channels), | |
| ) | |
| sound = sound.set_frame_rate(old_frame.sample_rate) | |
| samples = np.array(sound.get_array_of_samples(), dtype=np.int16) | |
| sample_buffer = np.append(sample_buffer, samples) | |
| # handle case where we ran out of frames | |
| if sample_buffer.shape[0] < required_samples: | |
| empty_samples = np.zeros((required_samples - sample_buffer.shape[0]), dtype=np.int16) | |
| sample_buffer = np.append(sample_buffer, empty_samples) | |
| # take the first required_samples samples from the buffer | |
| samples = sample_buffer[:required_samples] | |
| sample_buffer = sample_buffer[required_samples:] | |
| # Duplicate mono channel for stereo | |
| if output_channels == 2: | |
| samples = np.vstack((samples, samples)).reshape((-1,), order='F') | |
| samples = samples.reshape(1, -1) | |
| layout = 'stereo' if output_channels == 2 else 'mono' | |
| new_frame = av.AudioFrame.from_ndarray(samples, format='s16', layout=layout) | |
| new_frame.sample_rate = old_frame.sample_rate | |
| new_frame.pts = old_frame.pts | |
| return new_frame | |
| except Exception as e: | |
| print (e) | |
| traceback.print_exc() | |
| raise(e) | |
| def audio_frame_callback(old_frame: av.AudioFrame) -> av.AudioFrame: | |
| global debug_frame_idx, debug_frames | |
| new_frame = process_frame(old_frame) | |
| # print (f"new_frames: {len(new_frames)}, frames: {len(frames)}") | |
| print (f"frame: {old_frame}, pts: {old_frame.pts}") | |
| print (f"new_frame: {new_frame}, pts: {new_frame.pts}") | |
| return new_frame | |
| # return old_frame | |
| webrtc_streamer( | |
| key="delay", | |
| mode=WebRtcMode.SENDRECV, | |
| rtc_configuration={"iceServers": get_ice_servers()}, | |
| video_frame_callback=video_frame_callback, | |
| audio_frame_callback=audio_frame_callback, | |
| ) | |