neuapi / api /src /services /streaming_audio_writer.py
grimshaw's picture
Upload folder using huggingface_hub
35bb6f4 verified
Raw
History Blame Contribute Delete
4.32 kB
from __future__ import annotations
import io
from typing import Literal
import av
import numpy as np
AudioFormat = Literal["mp3", "opus", "aac", "flac", "wav", "pcm"]
# Mapping from our format names to PyAV codec/container names
FORMAT_CONFIG: dict[str, dict] = {
"mp3": {"codec": "mp3", "container": "mp3", "content_type": "audio/mpeg"},
"opus": {"codec": "libopus", "container": "ogg", "content_type": "audio/ogg"},
"aac": {"codec": "aac", "container": "adts", "content_type": "audio/aac"},
"flac": {"codec": "flac", "container": "flac", "content_type": "audio/flac"},
"wav": {"codec": "pcm_s16le", "container": "wav", "content_type": "audio/wav"},
"pcm": {"codec": None, "container": None, "content_type": "audio/pcm"},
}
def get_content_type(fmt: AudioFormat) -> str:
return FORMAT_CONFIG[fmt]["content_type"]
class StreamingAudioWriter:
"""Encodes raw PCM audio (float32, mono) into various formats using PyAV."""
def __init__(self, fmt: AudioFormat, sample_rate: int = 24000) -> None:
self.format = fmt
self.sample_rate = sample_rate
self._buffer = io.BytesIO()
if fmt == "pcm":
# No encoding needed for raw PCM
self._container = None
self._stream = None
else:
config = FORMAT_CONFIG[fmt]
self._container = av.open(self._buffer, mode="w", format=config["container"])
self._stream = self._container.add_stream(config["codec"], rate=sample_rate)
self._stream.layout = "mono"
if fmt == "opus":
self._stream.rate = 48000 # Opus requires 48kHz
def write_chunk(self, pcm_data: np.ndarray) -> bytes:
"""Encode a chunk of float32 PCM audio and return the encoded bytes."""
if self.format == "pcm":
# Convert float32 to int16 PCM
pcm_int16 = (pcm_data * 32767).astype(np.int16)
return pcm_int16.tobytes()
# Convert float32 [-1.0, 1.0] to int16
pcm_int16 = (np.clip(pcm_data, -1.0, 1.0) * 32767).astype(np.int16)
frame = av.AudioFrame.from_ndarray(
pcm_int16.reshape(1, -1),
format="s16",
layout="mono",
)
frame.sample_rate = self.sample_rate
if self.format == "opus":
frame.sample_rate = 48000
start_pos = self._buffer.tell()
for packet in self._stream.encode(frame):
self._container.mux(packet)
# Read newly written bytes
self._buffer.seek(start_pos)
data = self._buffer.read()
return data
def finalize(self) -> bytes:
"""Flush remaining encoded data and close the container."""
if self.format == "pcm" or self._container is None:
return b""
start_pos = self._buffer.tell()
# Flush encoder
for packet in self._stream.encode(None):
self._container.mux(packet)
self._container.close()
self._buffer.seek(start_pos)
data = self._buffer.read()
return data
def close(self) -> None:
if self._container is not None:
try:
self._container.close()
except Exception:
pass
self._buffer.close()
def encode_audio_complete(
pcm_data: np.ndarray,
fmt: AudioFormat,
sample_rate: int = 24000,
) -> bytes:
"""Encode a complete PCM float32 array to the specified audio format."""
if fmt == "pcm":
return (pcm_data * 32767).astype(np.int16).tobytes()
buf = io.BytesIO()
config = FORMAT_CONFIG[fmt]
container = av.open(buf, mode="w", format=config["container"])
stream = container.add_stream(config["codec"], rate=sample_rate)
stream.layout = "mono"
actual_rate = 48000 if fmt == "opus" else sample_rate
if fmt == "opus":
stream.rate = 48000
pcm_int16 = (np.clip(pcm_data, -1.0, 1.0) * 32767).astype(np.int16)
frame = av.AudioFrame.from_ndarray(
pcm_int16.reshape(1, -1),
format="s16",
layout="mono",
)
frame.sample_rate = actual_rate
for packet in stream.encode(frame):
container.mux(packet)
for packet in stream.encode(None):
container.mux(packet)
container.close()
buf.seek(0)
return buf.read()