File size: 4,318 Bytes
35bb6f4 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 | from __future__ import annotations
import io
from typing import Literal
import av
import numpy as np
AudioFormat = Literal["mp3", "opus", "aac", "flac", "wav", "pcm"]
# Mapping from our format names to PyAV codec/container names
FORMAT_CONFIG: dict[str, dict] = {
"mp3": {"codec": "mp3", "container": "mp3", "content_type": "audio/mpeg"},
"opus": {"codec": "libopus", "container": "ogg", "content_type": "audio/ogg"},
"aac": {"codec": "aac", "container": "adts", "content_type": "audio/aac"},
"flac": {"codec": "flac", "container": "flac", "content_type": "audio/flac"},
"wav": {"codec": "pcm_s16le", "container": "wav", "content_type": "audio/wav"},
"pcm": {"codec": None, "container": None, "content_type": "audio/pcm"},
}
def get_content_type(fmt: AudioFormat) -> str:
return FORMAT_CONFIG[fmt]["content_type"]
class StreamingAudioWriter:
"""Encodes raw PCM audio (float32, mono) into various formats using PyAV."""
def __init__(self, fmt: AudioFormat, sample_rate: int = 24000) -> None:
self.format = fmt
self.sample_rate = sample_rate
self._buffer = io.BytesIO()
if fmt == "pcm":
# No encoding needed for raw PCM
self._container = None
self._stream = None
else:
config = FORMAT_CONFIG[fmt]
self._container = av.open(self._buffer, mode="w", format=config["container"])
self._stream = self._container.add_stream(config["codec"], rate=sample_rate)
self._stream.layout = "mono"
if fmt == "opus":
self._stream.rate = 48000 # Opus requires 48kHz
def write_chunk(self, pcm_data: np.ndarray) -> bytes:
"""Encode a chunk of float32 PCM audio and return the encoded bytes."""
if self.format == "pcm":
# Convert float32 to int16 PCM
pcm_int16 = (pcm_data * 32767).astype(np.int16)
return pcm_int16.tobytes()
# Convert float32 [-1.0, 1.0] to int16
pcm_int16 = (np.clip(pcm_data, -1.0, 1.0) * 32767).astype(np.int16)
frame = av.AudioFrame.from_ndarray(
pcm_int16.reshape(1, -1),
format="s16",
layout="mono",
)
frame.sample_rate = self.sample_rate
if self.format == "opus":
frame.sample_rate = 48000
start_pos = self._buffer.tell()
for packet in self._stream.encode(frame):
self._container.mux(packet)
# Read newly written bytes
self._buffer.seek(start_pos)
data = self._buffer.read()
return data
def finalize(self) -> bytes:
"""Flush remaining encoded data and close the container."""
if self.format == "pcm" or self._container is None:
return b""
start_pos = self._buffer.tell()
# Flush encoder
for packet in self._stream.encode(None):
self._container.mux(packet)
self._container.close()
self._buffer.seek(start_pos)
data = self._buffer.read()
return data
def close(self) -> None:
if self._container is not None:
try:
self._container.close()
except Exception:
pass
self._buffer.close()
def encode_audio_complete(
pcm_data: np.ndarray,
fmt: AudioFormat,
sample_rate: int = 24000,
) -> bytes:
"""Encode a complete PCM float32 array to the specified audio format."""
if fmt == "pcm":
return (pcm_data * 32767).astype(np.int16).tobytes()
buf = io.BytesIO()
config = FORMAT_CONFIG[fmt]
container = av.open(buf, mode="w", format=config["container"])
stream = container.add_stream(config["codec"], rate=sample_rate)
stream.layout = "mono"
actual_rate = 48000 if fmt == "opus" else sample_rate
if fmt == "opus":
stream.rate = 48000
pcm_int16 = (np.clip(pcm_data, -1.0, 1.0) * 32767).astype(np.int16)
frame = av.AudioFrame.from_ndarray(
pcm_int16.reshape(1, -1),
format="s16",
layout="mono",
)
frame.sample_rate = actual_rate
for packet in stream.encode(frame):
container.mux(packet)
for packet in stream.encode(None):
container.mux(packet)
container.close()
buf.seek(0)
return buf.read()
|