Spaces:
Running
Running
File size: 5,706 Bytes
4051511 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 |
import os
import tempfile
from pathlib import Path
from typing import Tuple, Optional
import ffmpeg
from pydub import AudioSegment
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class AudioProcessor:
"""Handles audio extraction, conversion, and chunking"""
SUPPORTED_FORMATS = {
'audio': ['.mp3', '.wav', '.m4a', '.flac', '.aac', '.ogg', '.wma'],
'video': ['.mp4', '.avi', '.mkv', '.mov', '.wmv', '.flv', '.webm']
}
CHUNK_DURATION_MS = 30 * 60 * 1000 # 30 minutes in milliseconds
OVERLAP_MS = 2000 # 2 second overlap between chunks
@staticmethod
def is_supported_file(file_path: str) -> bool:
"""Check if file format is supported"""
ext = Path(file_path).suffix.lower()
all_formats = AudioProcessor.SUPPORTED_FORMATS['audio'] + AudioProcessor.SUPPORTED_FORMATS['video']
return ext in all_formats
@staticmethod
def extract_audio(input_file: str, output_format: str = 'wav', progress_callback=None) -> str:
"""
Extract audio from video or convert audio to desired format
Args:
input_file: Path to input file
output_format: Desired output format (wav, mp3)
progress_callback: Optional callback for progress updates
Returns:
Path to extracted/converted audio file
"""
if progress_callback:
progress_callback("Extracting audio from file...")
output_file = tempfile.NamedTemporaryFile(
delete=False,
suffix=f'.{output_format}'
).name
try:
# Use ffmpeg to extract audio
stream = ffmpeg.input(input_file)
stream = ffmpeg.output(
stream,
output_file,
acodec='pcm_s16le' if output_format == 'wav' else 'libmp3lame',
ar='16000', # 16kHz sample rate (Whisper's preference)
ac=1 # Mono channel
)
ffmpeg.run(stream, overwrite_output=True, capture_stdout=True, capture_stderr=True)
if progress_callback:
progress_callback("Audio extraction complete")
logger.info(f"Audio extracted to: {output_file}")
return output_file
except ffmpeg.Error as e:
logger.error(f"FFmpeg error: {e.stderr.decode()}")
raise Exception(f"Failed to extract audio: {e.stderr.decode()}")
@staticmethod
def get_audio_duration(file_path: str) -> float:
"""Get audio duration in seconds"""
try:
probe = ffmpeg.probe(file_path)
duration = float(probe['streams'][0]['duration'])
return duration
except Exception as e:
logger.error(f"Failed to get duration: {e}")
# Fallback to pydub
audio = AudioSegment.from_file(file_path)
return len(audio) / 1000.0
@staticmethod
def chunk_audio(file_path: str, progress_callback=None) -> list:
"""
Split audio into chunks for processing large files
Args:
file_path: Path to audio file
progress_callback: Optional callback for progress updates
Returns:
List of tuples: [(chunk_file_path, start_time_offset), ...]
"""
if progress_callback:
progress_callback("Loading audio file for chunking...")
audio = AudioSegment.from_file(file_path)
duration_ms = len(audio)
# If audio is shorter than chunk duration, return as single chunk
if duration_ms <= AudioProcessor.CHUNK_DURATION_MS:
if progress_callback:
progress_callback("File is small enough, no chunking needed")
return [(file_path, 0.0)]
chunks = []
chunk_index = 0
start_ms = 0
total_chunks = (duration_ms // AudioProcessor.CHUNK_DURATION_MS) + 1
while start_ms < duration_ms:
if progress_callback:
progress_callback(f"Creating chunk {chunk_index + 1}/{total_chunks}...")
# Calculate end position
end_ms = min(start_ms + AudioProcessor.CHUNK_DURATION_MS, duration_ms)
# Extract chunk
chunk = audio[start_ms:end_ms]
# Save chunk to temporary file
chunk_file = tempfile.NamedTemporaryFile(
delete=False,
suffix='.wav',
prefix=f'chunk_{chunk_index}_'
).name
chunk.export(chunk_file, format='wav')
# Store chunk with its time offset in seconds
chunks.append((chunk_file, start_ms / 1000.0))
logger.info(f"Created chunk {chunk_index}: {start_ms/1000:.2f}s - {end_ms/1000:.2f}s")
# Move to next chunk with overlap
start_ms += AudioProcessor.CHUNK_DURATION_MS - AudioProcessor.OVERLAP_MS
chunk_index += 1
if progress_callback:
progress_callback(f"Created {len(chunks)} chunks for processing")
return chunks
@staticmethod
def cleanup_temp_files(*file_paths):
"""Clean up temporary files"""
for file_path in file_paths:
try:
if file_path and os.path.exists(file_path):
os.remove(file_path)
logger.info(f"Cleaned up: {file_path}")
except Exception as e:
logger.warning(f"Failed to clean up {file_path}: {e}")
@staticmethod
def get_file_size_mb(file_path: str) -> float:
"""Get file size in MB"""
size_bytes = os.path.getsize(file_path)
return size_bytes / (1024 * 1024)
|