Spaces:
Sleeping
Sleeping
| from __future__ import annotations | |
| import base64 | |
| import os | |
| import shutil | |
| import subprocess | |
| from dataclasses import dataclass | |
| from typing import List | |
| class AudioChunk: | |
| path: str | |
| start_sec: float | |
| end_sec: float | |
| def check_ffmpeg_available() -> bool: | |
| return shutil.which("ffmpeg") is not None and shutil.which("ffprobe") is not None | |
| def ensure_dir(path: str) -> None: | |
| os.makedirs(path, exist_ok=True) | |
| def get_media_duration(path: str) -> float: | |
| cmd = [ | |
| "ffprobe", | |
| "-v", "error", | |
| "-show_entries", "format=duration", | |
| "-of", "default=noprint_wrappers=1:nokey=1", | |
| path, | |
| ] | |
| proc = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) | |
| # if proc.returncode != 0: | |
| # print(f"获取时长失败: {proc.stderr}") | |
| return float(proc.stdout.strip()) | |
| def extract_audio_from_video( | |
| video_path: str, | |
| output_audio_path: str, | |
| bitrate: str = "64k", | |
| ) -> str: | |
| ensure_dir(os.path.dirname(output_audio_path)) | |
| cmd = [ | |
| "ffmpeg", | |
| "-y", | |
| "-i", video_path, | |
| "-vn", | |
| "-ac", "1", | |
| "-ar", "16000", | |
| "-c:a", "mp3", | |
| "-b:a", bitrate, | |
| output_audio_path, | |
| ] | |
| proc = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) | |
| return output_audio_path | |
| def split_audio_to_chunks( | |
| audio_path: str, | |
| output_dir: str, | |
| chunk_seconds: int = 290, | |
| ) -> List[AudioChunk]: | |
| ensure_dir(output_dir) | |
| duration = get_media_duration(audio_path) | |
| chunks: List[AudioChunk] = [] | |
| start = 0.0 | |
| idx = 0 | |
| while start < duration: | |
| end = min(duration, start + chunk_seconds) | |
| chunk_path = os.path.join(output_dir, f"audio_chunk_{idx:03d}.mp3") | |
| cmd = [ | |
| "ffmpeg", | |
| "-y", | |
| "-i", audio_path, | |
| "-ss", str(start), | |
| "-t", str(end - start), | |
| "-acodec", "copy", | |
| chunk_path, | |
| ] | |
| proc = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) | |
| if proc.returncode != 0: | |
| raise RuntimeError(f"切分音频失败:\n{proc.stderr}") | |
| chunks.append(AudioChunk(path=chunk_path, start_sec=start, end_sec=end)) | |
| start = end | |
| idx += 1 | |
| return chunks | |
| def audio_file_to_data_uri(audio_path: str, mime_type: str = "audio/mpeg") -> str: | |
| with open(audio_path, "rb") as f: | |
| b64 = base64.b64encode(f.read()).decode("utf-8") | |
| return f"data:{mime_type};base64,{b64}" | |