pyward / utils /ffmpeg.py
pluviouse's picture
Update utils/ffmpeg.py
2e2101b verified
import subprocess
import io
def ffmpeg(input_stream: io.BufferedReader, ffmpeg_args: list) -> io.BytesIO:
command = ["ffmpeg", "-i", "pipe:0"] + ffmpeg_args + ["pipe:1"]
process = subprocess.Popen(
command,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
bufsize=10**6
)
output_stream = io.BytesIO()
try:
while chunk := input_stream.read(1024 * 1024):
process.stdin.write(chunk)
process.stdin.close()
while chunk := process.stdout.read(1024 * 1024):
output_stream.write(chunk)
process.wait()
if process.returncode != 0:
error_message = process.stderr.read().decode()
raise RuntimeError(f"FFmpeg error: {error_message}")
output_stream.seek(0)
return output_stream
except Exception as e:
process.terminate()
raise RuntimeError(f"FFmpeg processing failed: {str(e)}")
finally:
if process.stdout:
process.stdout.close()
if process.stderr:
process.stderr.close()
if process.stdin:
process.stdin.close()
def mp3_to_opus(input_bytes_io: io.BytesIO) -> io.BytesIO:
try:
input_stream = io.BufferedReader(input_bytes_io)
return ffmpeg(input_stream, ["-c:a", "libopus", "-f", "opus"])
except Exception as e:
raise RuntimeError(f"Failed to convert MP3 to Opus: {str(e)}")