| import subprocess | |
| import io | |
| def ffmpeg(input_stream: io.BufferedReader, ffmpeg_args: list) -> io.BytesIO: | |
| command = ["ffmpeg", "-i", "pipe:0"] + ffmpeg_args + ["pipe:1"] | |
| process = subprocess.Popen( | |
| command, | |
| stdin=subprocess.PIPE, | |
| stdout=subprocess.PIPE, | |
| stderr=subprocess.PIPE, | |
| bufsize=10**6 | |
| ) | |
| output_stream = io.BytesIO() | |
| try: | |
| while chunk := input_stream.read(1024 * 1024): | |
| process.stdin.write(chunk) | |
| process.stdin.close() | |
| while chunk := process.stdout.read(1024 * 1024): | |
| output_stream.write(chunk) | |
| process.wait() | |
| if process.returncode != 0: | |
| error_message = process.stderr.read().decode() | |
| raise RuntimeError(f"FFmpeg error: {error_message}") | |
| output_stream.seek(0) | |
| return output_stream | |
| except Exception as e: | |
| process.terminate() | |
| raise RuntimeError(f"FFmpeg processing failed: {str(e)}") | |
| finally: | |
| if process.stdout: | |
| process.stdout.close() | |
| if process.stderr: | |
| process.stderr.close() | |
| if process.stdin: | |
| process.stdin.close() | |
| def mp3_to_opus(input_bytes_io: io.BytesIO) -> io.BytesIO: | |
| try: | |
| input_stream = io.BufferedReader(input_bytes_io) | |
| return ffmpeg(input_stream, ["-c:a", "libopus", "-f", "opus"]) | |
| except Exception as e: | |
| raise RuntimeError(f"Failed to convert MP3 to Opus: {str(e)}") | |