| from __future__ import annotations |
|
|
| import io |
| from typing import Literal |
|
|
| import av |
| import numpy as np |
|
|
| AudioFormat = Literal["mp3", "opus", "aac", "flac", "wav", "pcm"] |
|
|
| |
| FORMAT_CONFIG: dict[str, dict] = { |
| "mp3": {"codec": "mp3", "container": "mp3", "content_type": "audio/mpeg"}, |
| "opus": {"codec": "libopus", "container": "ogg", "content_type": "audio/ogg"}, |
| "aac": {"codec": "aac", "container": "adts", "content_type": "audio/aac"}, |
| "flac": {"codec": "flac", "container": "flac", "content_type": "audio/flac"}, |
| "wav": {"codec": "pcm_s16le", "container": "wav", "content_type": "audio/wav"}, |
| "pcm": {"codec": None, "container": None, "content_type": "audio/pcm"}, |
| } |
|
|
|
|
| def get_content_type(fmt: AudioFormat) -> str: |
| return FORMAT_CONFIG[fmt]["content_type"] |
|
|
|
|
| class StreamingAudioWriter: |
| """Encodes raw PCM audio (float32, mono) into various formats using PyAV.""" |
|
|
| def __init__(self, fmt: AudioFormat, sample_rate: int = 24000) -> None: |
| self.format = fmt |
| self.sample_rate = sample_rate |
| self._buffer = io.BytesIO() |
|
|
| if fmt == "pcm": |
| |
| self._container = None |
| self._stream = None |
| else: |
| config = FORMAT_CONFIG[fmt] |
| self._container = av.open(self._buffer, mode="w", format=config["container"]) |
| self._stream = self._container.add_stream(config["codec"], rate=sample_rate) |
| self._stream.layout = "mono" |
| if fmt == "opus": |
| self._stream.rate = 48000 |
|
|
| def write_chunk(self, pcm_data: np.ndarray) -> bytes: |
| """Encode a chunk of float32 PCM audio and return the encoded bytes.""" |
| if self.format == "pcm": |
| |
| pcm_int16 = (pcm_data * 32767).astype(np.int16) |
| return pcm_int16.tobytes() |
|
|
| |
| pcm_int16 = (np.clip(pcm_data, -1.0, 1.0) * 32767).astype(np.int16) |
|
|
| frame = av.AudioFrame.from_ndarray( |
| pcm_int16.reshape(1, -1), |
| format="s16", |
| layout="mono", |
| ) |
| frame.sample_rate = self.sample_rate |
| if self.format == "opus": |
| frame.sample_rate = 48000 |
|
|
| start_pos = self._buffer.tell() |
| for packet in self._stream.encode(frame): |
| self._container.mux(packet) |
|
|
| |
| self._buffer.seek(start_pos) |
| data = self._buffer.read() |
| return data |
|
|
| def finalize(self) -> bytes: |
| """Flush remaining encoded data and close the container.""" |
| if self.format == "pcm" or self._container is None: |
| return b"" |
|
|
| start_pos = self._buffer.tell() |
|
|
| |
| for packet in self._stream.encode(None): |
| self._container.mux(packet) |
|
|
| self._container.close() |
|
|
| self._buffer.seek(start_pos) |
| data = self._buffer.read() |
| return data |
|
|
| def close(self) -> None: |
| if self._container is not None: |
| try: |
| self._container.close() |
| except Exception: |
| pass |
| self._buffer.close() |
|
|
|
|
| def encode_audio_complete( |
| pcm_data: np.ndarray, |
| fmt: AudioFormat, |
| sample_rate: int = 24000, |
| ) -> bytes: |
| """Encode a complete PCM float32 array to the specified audio format.""" |
| if fmt == "pcm": |
| return (pcm_data * 32767).astype(np.int16).tobytes() |
|
|
| buf = io.BytesIO() |
| config = FORMAT_CONFIG[fmt] |
| container = av.open(buf, mode="w", format=config["container"]) |
| stream = container.add_stream(config["codec"], rate=sample_rate) |
| stream.layout = "mono" |
|
|
| actual_rate = 48000 if fmt == "opus" else sample_rate |
| if fmt == "opus": |
| stream.rate = 48000 |
|
|
| pcm_int16 = (np.clip(pcm_data, -1.0, 1.0) * 32767).astype(np.int16) |
| frame = av.AudioFrame.from_ndarray( |
| pcm_int16.reshape(1, -1), |
| format="s16", |
| layout="mono", |
| ) |
| frame.sample_rate = actual_rate |
|
|
| for packet in stream.encode(frame): |
| container.mux(packet) |
| for packet in stream.encode(None): |
| container.mux(packet) |
| container.close() |
|
|
| buf.seek(0) |
| return buf.read() |
|
|