Spaces:
Running
Running
File size: 8,313 Bytes
f5bce42 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 | """Batch processing module for multiple audio/script file pairs."""
import logging
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Optional, Tuple, Union
import tempfile
from config import DEFAULT_LANGUAGE
from validator import validate_inputs
from normalize import normalize_audio
from aligner import align
from srt_writer import write_srt
logger = logging.getLogger(__name__)
def batch_process(input_dir: Union[str, Path], output_dir: Union[str, Path],
language: str = DEFAULT_LANGUAGE) -> None:
"""Process all matched audio/script file pairs in input directory.
Scans input directory for audio files, matches them with corresponding
text files by filename stem, and processes each pair through the full
alignment pipeline. Generates processing log with detailed results.
"""
input_dir = Path(input_dir)
output_dir = Path(output_dir)
# Validate directories
if not input_dir.exists() or not input_dir.is_dir():
raise ValueError(f"Input directory does not exist: {input_dir}")
# Create output directory if needed
output_dir.mkdir(parents=True, exist_ok=True)
# Find all audio/script pairs
pairs = find_audio_script_pairs(input_dir)
if not pairs:
print(f"β No audio/script pairs found in {input_dir}")
print("Expected pairs like: video_01.mp3 β video_01.txt")
return
print(f"π¬ Found {len(pairs)} audio/script pairs to process")
print(f"π Input: {input_dir}")
print(f"π Output: {output_dir}")
print()
# Process each pair
results = []
for i, (audio_path, script_path) in enumerate(pairs, 1):
print(f"Processing {i}/{len(pairs)}: {audio_path.name}...")
try:
result = process_single_pair(
audio_path, script_path, output_dir, language, i, len(pairs)
)
results.append(result)
print(f"β
{audio_path.stem}: {result['caption_count']} captions, {result['duration_sec']:.1f}s")
except Exception as e:
error_result = {
"filename": audio_path.stem,
"status": "failed",
"error": str(e),
"caption_count": 0,
"duration_sec": 0.0
}
results.append(error_result)
print(f"β {audio_path.stem}: {e}")
print() # Empty line between files
# Generate processing log
generate_processing_log(output_dir, results)
# Print summary
print_batch_summary(results)
def find_audio_script_pairs(input_dir: Path) -> List[Tuple[Path, Path]]:
"""Find matching audio and script file pairs in input directory."""
# Supported audio extensions
audio_extensions = {'.mp3', '.wav', '.m4a', '.aac'}
# Find all audio files
audio_files = []
for ext in audio_extensions:
audio_files.extend(input_dir.glob(f"*{ext}"))
# Match with corresponding text files
pairs = []
for audio_path in audio_files:
script_path = input_dir / f"{audio_path.stem}.txt"
if script_path.exists():
pairs.append((audio_path, script_path))
else:
logger.warning(f"No matching script file for {audio_path.name}")
# Sort pairs by filename for consistent processing order
pairs.sort(key=lambda x: x[0].name)
return pairs
def process_single_pair(audio_path: Path, script_path: Path, output_dir: Path,
language: str, current: int, total: int) -> Dict:
"""Process a single audio/script pair through the full pipeline."""
# Determine output path
output_filename = f"{audio_path.stem}.srt"
output_path = output_dir / output_filename
# Step 1: Validate inputs
validation_result = validate_inputs(audio_path, script_path)
# Step 2: Normalize audio
with tempfile.NamedTemporaryFile(suffix=".wav", delete=False) as tmp_file:
temp_wav_path = tmp_file.name
try:
normalize_audio(audio_path, temp_wav_path)
# Step 3: Load script
with open(script_path, 'r', encoding='utf-8') as f:
script_content = f.read()
sentences = [line.strip() for line in script_content.splitlines() if line.strip()]
if not sentences:
raise ValueError(f"No non-empty lines found in script")
# Step 4: Perform alignment
segments = align(temp_wav_path, sentences, language)
# Step 5: Write SRT file
write_srt(segments, output_path)
# Return success result
return {
"filename": audio_path.stem,
"status": "success",
"caption_count": len(segments),
"duration_sec": validation_result["audio_duration_sec"],
"output_path": str(output_path),
"warnings": validation_result["warnings"]
}
finally:
# Clean up temporary WAV file
try:
Path(temp_wav_path).unlink()
except OSError:
pass
def generate_processing_log(output_dir: Path, results: List[Dict]) -> None:
"""Generate detailed processing log file."""
log_path = output_dir / "processing_log.txt"
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
with open(log_path, 'w', encoding='utf-8') as f:
f.write(f"SRT Caption Generator - Batch Processing Log\n")
f.write(f"Timestamp: {timestamp}\n")
f.write(f"Processed: {len(results)} files\n")
f.write("=" * 60 + "\n\n")
for result in results:
status_icon = "β
" if result["status"] == "success" else "β"
f.write(f"{status_icon} {result['filename']}\n")
f.write(f" Status: {result['status']}\n")
if result["status"] == "success":
f.write(f" Captions: {result['caption_count']}\n")
f.write(f" Duration: {result['duration_sec']:.1f}s\n")
f.write(f" Output: {Path(result['output_path']).name}\n")
if result.get("warnings"):
f.write(f" Warnings:\n")
for warning in result["warnings"]:
f.write(f" - {warning}\n")
else:
f.write(f" Error: {result['error']}\n")
f.write("\n")
# Summary statistics
successful = sum(1 for r in results if r["status"] == "success")
failed = len(results) - successful
total_captions = sum(r["caption_count"] for r in results if r["status"] == "success")
total_duration = sum(r["duration_sec"] for r in results if r["status"] == "success")
f.write("=" * 60 + "\n")
f.write("SUMMARY\n")
f.write("=" * 60 + "\n")
f.write(f"Total files: {len(results)}\n")
f.write(f"Successful: {successful}\n")
f.write(f"Failed: {failed}\n")
f.write(f"Total captions generated: {total_captions}\n")
f.write(f"Total audio duration: {total_duration:.1f}s\n")
print(f"π Processing log written to: {log_path}")
def print_batch_summary(results: List[Dict]) -> None:
"""Print batch processing summary to console."""
successful = sum(1 for r in results if r["status"] == "success")
failed = len(results) - successful
total_captions = sum(r["caption_count"] for r in results if r["status"] == "success")
total_duration = sum(r["duration_sec"] for r in results if r["status"] == "success")
print("=" * 60)
print("π¬ BATCH PROCESSING COMPLETE")
print("=" * 60)
print(f"π Files processed: {len(results)}")
print(f"β
Successful: {successful}")
if failed > 0:
print(f"β Failed: {failed}")
print(f"π Total captions: {total_captions}")
print(f"β±οΈ Total duration: {total_duration:.1f}s")
if failed > 0:
print(f"\nβ Failed files:")
for result in results:
if result["status"] == "failed":
print(f" {result['filename']}: {result['error']}")
print("=" * 60) |