File size: 6,146 Bytes
57bbccb | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 | import os
import sys
import time
from pathlib import Path
# Ensure the app module can be imported from root directory
sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
from app.services.transcribe import extract_audio, transcribe_audio
from app.services.srt_generator import save_srt, translate_srt
from app.services.precision_patch import apply_precision_patch
from app.main import get_translator
class Logger(object):
def __init__(self, filename):
self.terminal = sys.stdout
self.log = open(filename, "a", encoding="utf-8")
def write(self, message):
self.terminal.write(message)
self.log.write(message)
self.log.flush()
def flush(self):
self.terminal.flush()
self.log.flush()
# Configuration
TEST_VIDEOS_DIR = Path(os.path.dirname(os.path.abspath(__file__))) / "resources" / "test-videos"
TARGET_LANGS = ["ml", "hi"] # We will test both Malayalam and Hindi
ENGINE = "gemini" # Using Gemini 1.5 Flash to bypass rate limits ( Add GEMINI_API_KEY=your_key_here to your .env file.)
def generate_subtitles_test(video_path: str, target_lang: str, engine: str, version: str, reuse_version: str = None) -> str:
# Setup paths
base_name = os.path.splitext(os.path.basename(video_path))[0]
safe_name = "".join([c for c in base_name if c.isalnum() or c in " ._-"]).strip()
file_id = safe_name if safe_name else "video"
upload_dir = f"app/uploads/{version}"
subtitles_dir = f"app/subtitles/{version}"
os.makedirs(upload_dir, exist_ok=True)
os.makedirs(subtitles_dir, exist_ok=True)
audio_path = f"{upload_dir}/{file_id}_test.wav"
en_srt_path = f"{subtitles_dir}/{file_id}_test_en.srt"
target_srt_path = f"{subtitles_dir}/{file_id}_test_{target_lang}.srt"
# Try to reuse from previous version if requested
if reuse_version and not os.path.exists(en_srt_path):
old_en_srt = f"app/subtitles/{reuse_version}/{file_id}_test_en.srt"
if os.path.exists(old_en_srt):
import shutil
shutil.copy(old_en_srt, en_srt_path)
print(f" --> Reused English SRT from {reuse_version}")
# Only extract and transcribe if English SRT doesn't already exist (avoids running Whisper twice)
if not os.path.exists(en_srt_path):
# Extract audio
extract_audio(video_path, audio_path)
# Transcribe audio to get segments
segments, info = transcribe_audio(audio_path)
# Correct English transcription errors (brands/names)
apply_precision_patch(segments)
# Generate English SRT
save_srt(segments, en_srt_path)
else:
if not (reuse_version and os.path.exists(en_srt_path)):
print(f" --> Skipping transcription, using cached English SRT")
# Select translator and translate (validation always runs)
translator = get_translator(engine)
translate_srt(en_srt_path, target_srt_path, target_lang, translator, validate=True)
# Clean up audio
if os.path.exists(audio_path):
os.remove(audio_path)
return target_srt_path
def run_batch_tests():
batch_version = time.strftime("%I-%M-%p--%d-%m-%Y")
os.makedirs("logs", exist_ok=True)
log_file = f"logs/batch_test_{batch_version}.txt"
sys.stdout = Logger(log_file)
sys.stderr = sys.stdout
# Check for latest transcription to reuse
reuse_version = None
subtitles_root = Path("app/subtitles")
if subtitles_root.exists():
# Folders are timestamped like 08-48-AM--11-05-2026
folders = [f.name for f in subtitles_root.iterdir() if f.is_dir() and "--" in f.name]
if folders:
# Sorting by name works because they are timestamped
latest_folder = sorted(folders, reverse=True)[0]
print(f"\n[?] Found existing transcriptions in: {latest_folder}")
# Use raw input for simple prompt
try:
choice = input("Use the latest transcription to save time? (y/n): ").strip().lower()
if choice == 'y':
reuse_version = latest_folder
print(f"β
Reusing transcriptions from: {reuse_version}\n")
except EOFError:
# Handle cases where input is not available
pass
print(f"π Starting automated pipeline tests...")
print(f"π Directory: {TEST_VIDEOS_DIR}")
print(f"βοΈ Engine: {ENGINE}")
print(f"π Target Languages: {TARGET_LANGS}")
print(f"π Batch Version: {batch_version}\n")
videos = sorted(TEST_VIDEOS_DIR.glob("*.mp4"), key=lambda v: v.stat().st_size)
if not videos:
print("β No videos found in test directory.")
return
print(f"π Processing order (smallest first):")
for i, v in enumerate(videos, 1):
print(f" {i}. {v.name} ({v.stat().st_size / (1024*1024):.1f} MB)")
for video in videos:
print(f"\n{'='*60}")
print(f"π₯ Processing Video: {video.name} (Size: {video.stat().st_size / (1024*1024):.1f} MB)")
print(f"{'='*60}")
for lang in TARGET_LANGS:
start_time = time.time()
print(f"\n---> Running pipeline for [ {lang.upper()} ]")
try:
output_srt = generate_subtitles_test(
video_path=str(video),
target_lang=lang,
engine=ENGINE,
version=batch_version,
reuse_version=reuse_version
)
duration = time.time() - start_time
print(f"β Success! Generated SRT: {output_srt}")
print(f"β±οΈ Time taken: {duration:.2f} seconds")
except Exception as e:
print(f"β Pipeline failed for {lang.upper()}: {e}")
print("\nβ
Batch testing complete!")
print("π Review logs/translation_failures.jsonl to see self-generated architectural insights.")
if __name__ == "__main__":
run_batch_tests()
|