Whisper-Transcriber / utils /audio_processor.py
Whisper Transcriber Bot
Initial commit: Complete Whisper Transcriber implementation
4051511
import os
import tempfile
from pathlib import Path
from typing import Tuple, Optional
import ffmpeg
from pydub import AudioSegment
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class AudioProcessor:
"""Handles audio extraction, conversion, and chunking"""
SUPPORTED_FORMATS = {
'audio': ['.mp3', '.wav', '.m4a', '.flac', '.aac', '.ogg', '.wma'],
'video': ['.mp4', '.avi', '.mkv', '.mov', '.wmv', '.flv', '.webm']
}
CHUNK_DURATION_MS = 30 * 60 * 1000 # 30 minutes in milliseconds
OVERLAP_MS = 2000 # 2 second overlap between chunks
@staticmethod
def is_supported_file(file_path: str) -> bool:
"""Check if file format is supported"""
ext = Path(file_path).suffix.lower()
all_formats = AudioProcessor.SUPPORTED_FORMATS['audio'] + AudioProcessor.SUPPORTED_FORMATS['video']
return ext in all_formats
@staticmethod
def extract_audio(input_file: str, output_format: str = 'wav', progress_callback=None) -> str:
"""
Extract audio from video or convert audio to desired format
Args:
input_file: Path to input file
output_format: Desired output format (wav, mp3)
progress_callback: Optional callback for progress updates
Returns:
Path to extracted/converted audio file
"""
if progress_callback:
progress_callback("Extracting audio from file...")
output_file = tempfile.NamedTemporaryFile(
delete=False,
suffix=f'.{output_format}'
).name
try:
# Use ffmpeg to extract audio
stream = ffmpeg.input(input_file)
stream = ffmpeg.output(
stream,
output_file,
acodec='pcm_s16le' if output_format == 'wav' else 'libmp3lame',
ar='16000', # 16kHz sample rate (Whisper's preference)
ac=1 # Mono channel
)
ffmpeg.run(stream, overwrite_output=True, capture_stdout=True, capture_stderr=True)
if progress_callback:
progress_callback("Audio extraction complete")
logger.info(f"Audio extracted to: {output_file}")
return output_file
except ffmpeg.Error as e:
logger.error(f"FFmpeg error: {e.stderr.decode()}")
raise Exception(f"Failed to extract audio: {e.stderr.decode()}")
@staticmethod
def get_audio_duration(file_path: str) -> float:
"""Get audio duration in seconds"""
try:
probe = ffmpeg.probe(file_path)
duration = float(probe['streams'][0]['duration'])
return duration
except Exception as e:
logger.error(f"Failed to get duration: {e}")
# Fallback to pydub
audio = AudioSegment.from_file(file_path)
return len(audio) / 1000.0
@staticmethod
def chunk_audio(file_path: str, progress_callback=None) -> list:
"""
Split audio into chunks for processing large files
Args:
file_path: Path to audio file
progress_callback: Optional callback for progress updates
Returns:
List of tuples: [(chunk_file_path, start_time_offset), ...]
"""
if progress_callback:
progress_callback("Loading audio file for chunking...")
audio = AudioSegment.from_file(file_path)
duration_ms = len(audio)
# If audio is shorter than chunk duration, return as single chunk
if duration_ms <= AudioProcessor.CHUNK_DURATION_MS:
if progress_callback:
progress_callback("File is small enough, no chunking needed")
return [(file_path, 0.0)]
chunks = []
chunk_index = 0
start_ms = 0
total_chunks = (duration_ms // AudioProcessor.CHUNK_DURATION_MS) + 1
while start_ms < duration_ms:
if progress_callback:
progress_callback(f"Creating chunk {chunk_index + 1}/{total_chunks}...")
# Calculate end position
end_ms = min(start_ms + AudioProcessor.CHUNK_DURATION_MS, duration_ms)
# Extract chunk
chunk = audio[start_ms:end_ms]
# Save chunk to temporary file
chunk_file = tempfile.NamedTemporaryFile(
delete=False,
suffix='.wav',
prefix=f'chunk_{chunk_index}_'
).name
chunk.export(chunk_file, format='wav')
# Store chunk with its time offset in seconds
chunks.append((chunk_file, start_ms / 1000.0))
logger.info(f"Created chunk {chunk_index}: {start_ms/1000:.2f}s - {end_ms/1000:.2f}s")
# Move to next chunk with overlap
start_ms += AudioProcessor.CHUNK_DURATION_MS - AudioProcessor.OVERLAP_MS
chunk_index += 1
if progress_callback:
progress_callback(f"Created {len(chunks)} chunks for processing")
return chunks
@staticmethod
def cleanup_temp_files(*file_paths):
"""Clean up temporary files"""
for file_path in file_paths:
try:
if file_path and os.path.exists(file_path):
os.remove(file_path)
logger.info(f"Cleaned up: {file_path}")
except Exception as e:
logger.warning(f"Failed to clean up {file_path}: {e}")
@staticmethod
def get_file_size_mb(file_path: str) -> float:
"""Get file size in MB"""
size_bytes = os.path.getsize(file_path)
return size_bytes / (1024 * 1024)