File size: 1,474 Bytes
7e78def
 
 
0d8fb18
7e78def
 
 
 
 
 
 
 
 
0d8fb18
7e78def
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
dcfbd3d
7e78def
 
0d8fb18
 
 
 
 
 
7e78def
 
0d8fb18
936b1ef
0d8fb18
2e2101b
936b1ef
0d8fb18
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
import subprocess
import io


def ffmpeg(input_stream: io.BufferedReader, ffmpeg_args: list) -> io.BytesIO:
    command = ["ffmpeg", "-i", "pipe:0"] + ffmpeg_args + ["pipe:1"]
    process = subprocess.Popen(
        command,
        stdin=subprocess.PIPE,
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE,
        bufsize=10**6
    )

    output_stream = io.BytesIO()

    try:
        while chunk := input_stream.read(1024 * 1024):
            process.stdin.write(chunk)
        process.stdin.close()

        while chunk := process.stdout.read(1024 * 1024):
            output_stream.write(chunk)

        process.wait()

        if process.returncode != 0:
            error_message = process.stderr.read().decode()
            raise RuntimeError(f"FFmpeg error: {error_message}")

        output_stream.seek(0)
        return output_stream

    except Exception as e:
        process.terminate()
        raise RuntimeError(f"FFmpeg processing failed: {str(e)}")

    finally:
        if process.stdout:
            process.stdout.close()
        if process.stderr:
            process.stderr.close()
        if process.stdin:
            process.stdin.close()


def mp3_to_opus(input_bytes_io: io.BytesIO) -> io.BytesIO:
    try:
        input_stream = io.BufferedReader(input_bytes_io)
        return ffmpeg(input_stream, ["-c:a", "libopus", "-f", "opus"])
    except Exception as e:
        raise RuntimeError(f"Failed to convert MP3 to Opus: {str(e)}")