File size: 4,318 Bytes
35bb6f4
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
from __future__ import annotations

import io
from typing import Literal

import av
import numpy as np

AudioFormat = Literal["mp3", "opus", "aac", "flac", "wav", "pcm"]

# Mapping from our format names to PyAV codec/container names
FORMAT_CONFIG: dict[str, dict] = {
    "mp3": {"codec": "mp3", "container": "mp3", "content_type": "audio/mpeg"},
    "opus": {"codec": "libopus", "container": "ogg", "content_type": "audio/ogg"},
    "aac": {"codec": "aac", "container": "adts", "content_type": "audio/aac"},
    "flac": {"codec": "flac", "container": "flac", "content_type": "audio/flac"},
    "wav": {"codec": "pcm_s16le", "container": "wav", "content_type": "audio/wav"},
    "pcm": {"codec": None, "container": None, "content_type": "audio/pcm"},
}


def get_content_type(fmt: AudioFormat) -> str:
    return FORMAT_CONFIG[fmt]["content_type"]


class StreamingAudioWriter:
    """Encodes raw PCM audio (float32, mono) into various formats using PyAV."""

    def __init__(self, fmt: AudioFormat, sample_rate: int = 24000) -> None:
        self.format = fmt
        self.sample_rate = sample_rate
        self._buffer = io.BytesIO()

        if fmt == "pcm":
            # No encoding needed for raw PCM
            self._container = None
            self._stream = None
        else:
            config = FORMAT_CONFIG[fmt]
            self._container = av.open(self._buffer, mode="w", format=config["container"])
            self._stream = self._container.add_stream(config["codec"], rate=sample_rate)
            self._stream.layout = "mono"
            if fmt == "opus":
                self._stream.rate = 48000  # Opus requires 48kHz

    def write_chunk(self, pcm_data: np.ndarray) -> bytes:
        """Encode a chunk of float32 PCM audio and return the encoded bytes."""
        if self.format == "pcm":
            # Convert float32 to int16 PCM
            pcm_int16 = (pcm_data * 32767).astype(np.int16)
            return pcm_int16.tobytes()

        # Convert float32 [-1.0, 1.0] to int16
        pcm_int16 = (np.clip(pcm_data, -1.0, 1.0) * 32767).astype(np.int16)

        frame = av.AudioFrame.from_ndarray(
            pcm_int16.reshape(1, -1),
            format="s16",
            layout="mono",
        )
        frame.sample_rate = self.sample_rate
        if self.format == "opus":
            frame.sample_rate = 48000

        start_pos = self._buffer.tell()
        for packet in self._stream.encode(frame):
            self._container.mux(packet)

        # Read newly written bytes
        self._buffer.seek(start_pos)
        data = self._buffer.read()
        return data

    def finalize(self) -> bytes:
        """Flush remaining encoded data and close the container."""
        if self.format == "pcm" or self._container is None:
            return b""

        start_pos = self._buffer.tell()

        # Flush encoder
        for packet in self._stream.encode(None):
            self._container.mux(packet)

        self._container.close()

        self._buffer.seek(start_pos)
        data = self._buffer.read()
        return data

    def close(self) -> None:
        if self._container is not None:
            try:
                self._container.close()
            except Exception:
                pass
        self._buffer.close()


def encode_audio_complete(
    pcm_data: np.ndarray,
    fmt: AudioFormat,
    sample_rate: int = 24000,
) -> bytes:
    """Encode a complete PCM float32 array to the specified audio format."""
    if fmt == "pcm":
        return (pcm_data * 32767).astype(np.int16).tobytes()

    buf = io.BytesIO()
    config = FORMAT_CONFIG[fmt]
    container = av.open(buf, mode="w", format=config["container"])
    stream = container.add_stream(config["codec"], rate=sample_rate)
    stream.layout = "mono"

    actual_rate = 48000 if fmt == "opus" else sample_rate
    if fmt == "opus":
        stream.rate = 48000

    pcm_int16 = (np.clip(pcm_data, -1.0, 1.0) * 32767).astype(np.int16)
    frame = av.AudioFrame.from_ndarray(
        pcm_int16.reshape(1, -1),
        format="s16",
        layout="mono",
    )
    frame.sample_rate = actual_rate

    for packet in stream.encode(frame):
        container.mux(packet)
    for packet in stream.encode(None):
        container.mux(packet)
    container.close()

    buf.seek(0)
    return buf.read()