File size: 1,474 Bytes
7e78def 0d8fb18 7e78def 0d8fb18 7e78def dcfbd3d 7e78def 0d8fb18 7e78def 0d8fb18 936b1ef 0d8fb18 2e2101b 936b1ef 0d8fb18 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 |
import subprocess
import io
def ffmpeg(input_stream: io.BufferedReader, ffmpeg_args: list) -> io.BytesIO:
command = ["ffmpeg", "-i", "pipe:0"] + ffmpeg_args + ["pipe:1"]
process = subprocess.Popen(
command,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
bufsize=10**6
)
output_stream = io.BytesIO()
try:
while chunk := input_stream.read(1024 * 1024):
process.stdin.write(chunk)
process.stdin.close()
while chunk := process.stdout.read(1024 * 1024):
output_stream.write(chunk)
process.wait()
if process.returncode != 0:
error_message = process.stderr.read().decode()
raise RuntimeError(f"FFmpeg error: {error_message}")
output_stream.seek(0)
return output_stream
except Exception as e:
process.terminate()
raise RuntimeError(f"FFmpeg processing failed: {str(e)}")
finally:
if process.stdout:
process.stdout.close()
if process.stderr:
process.stderr.close()
if process.stdin:
process.stdin.close()
def mp3_to_opus(input_bytes_io: io.BytesIO) -> io.BytesIO:
try:
input_stream = io.BufferedReader(input_bytes_io)
return ffmpeg(input_stream, ["-c:a", "libopus", "-f", "opus"])
except Exception as e:
raise RuntimeError(f"Failed to convert MP3 to Opus: {str(e)}")
|