import os import sys import time from pathlib import Path # Ensure the app module can be imported from root directory sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))) from app.services.transcribe import extract_audio, transcribe_audio from app.services.srt_generator import save_srt, translate_srt from app.services.precision_patch import apply_precision_patch from app.main import get_translator class Logger(object): def __init__(self, filename): self.terminal = sys.stdout self.log = open(filename, "a", encoding="utf-8") def write(self, message): self.terminal.write(message) self.log.write(message) self.log.flush() def flush(self): self.terminal.flush() self.log.flush() # Configuration TEST_VIDEOS_DIR = Path(os.path.dirname(os.path.abspath(__file__))) / "resources" / "test-videos" TARGET_LANGS = ["ml", "hi"] # We will test both Malayalam and Hindi ENGINE = "gemini" # Using Gemini 1.5 Flash to bypass rate limits ( Add GEMINI_API_KEY=your_key_here to your .env file.) def generate_subtitles_test(video_path: str, target_lang: str, engine: str, version: str, reuse_version: str = None) -> str: # Setup paths base_name = os.path.splitext(os.path.basename(video_path))[0] safe_name = "".join([c for c in base_name if c.isalnum() or c in " ._-"]).strip() file_id = safe_name if safe_name else "video" upload_dir = f"app/uploads/{version}" subtitles_dir = f"app/subtitles/{version}" os.makedirs(upload_dir, exist_ok=True) os.makedirs(subtitles_dir, exist_ok=True) audio_path = f"{upload_dir}/{file_id}_test.wav" en_srt_path = f"{subtitles_dir}/{file_id}_test_en.srt" target_srt_path = f"{subtitles_dir}/{file_id}_test_{target_lang}.srt" # Try to reuse from previous version if requested if reuse_version and not os.path.exists(en_srt_path): old_en_srt = f"app/subtitles/{reuse_version}/{file_id}_test_en.srt" if os.path.exists(old_en_srt): import shutil shutil.copy(old_en_srt, en_srt_path) print(f" --> Reused English SRT from {reuse_version}") # Only extract and transcribe if English SRT doesn't already exist (avoids running Whisper twice) if not os.path.exists(en_srt_path): # Extract audio extract_audio(video_path, audio_path) # Transcribe audio to get segments segments, info = transcribe_audio(audio_path) # Correct English transcription errors (brands/names) apply_precision_patch(segments) # Generate English SRT save_srt(segments, en_srt_path) else: if not (reuse_version and os.path.exists(en_srt_path)): print(f" --> Skipping transcription, using cached English SRT") # Select translator and translate (validation always runs) translator = get_translator(engine) translate_srt(en_srt_path, target_srt_path, target_lang, translator, validate=True) # Clean up audio if os.path.exists(audio_path): os.remove(audio_path) return target_srt_path def run_batch_tests(): batch_version = time.strftime("%I-%M-%p--%d-%m-%Y") os.makedirs("logs", exist_ok=True) log_file = f"logs/batch_test_{batch_version}.txt" sys.stdout = Logger(log_file) sys.stderr = sys.stdout # Check for latest transcription to reuse reuse_version = None subtitles_root = Path("app/subtitles") if subtitles_root.exists(): # Folders are timestamped like 08-48-AM--11-05-2026 folders = [f.name for f in subtitles_root.iterdir() if f.is_dir() and "--" in f.name] if folders: # Sorting by name works because they are timestamped latest_folder = sorted(folders, reverse=True)[0] print(f"\n[?] Found existing transcriptions in: {latest_folder}") # Use raw input for simple prompt try: choice = input("Use the latest transcription to save time? (y/n): ").strip().lower() if choice == 'y': reuse_version = latest_folder print(f"āœ… Reusing transcriptions from: {reuse_version}\n") except EOFError: # Handle cases where input is not available pass print(f"šŸš€ Starting automated pipeline tests...") print(f"šŸ“‚ Directory: {TEST_VIDEOS_DIR}") print(f"āš™ļø Engine: {ENGINE}") print(f"šŸŒ Target Languages: {TARGET_LANGS}") print(f"šŸ•’ Batch Version: {batch_version}\n") videos = sorted(TEST_VIDEOS_DIR.glob("*.mp4"), key=lambda v: v.stat().st_size) if not videos: print("āŒ No videos found in test directory.") return print(f"šŸ“‹ Processing order (smallest first):") for i, v in enumerate(videos, 1): print(f" {i}. {v.name} ({v.stat().st_size / (1024*1024):.1f} MB)") for video in videos: print(f"\n{'='*60}") print(f"šŸŽ„ Processing Video: {video.name} (Size: {video.stat().st_size / (1024*1024):.1f} MB)") print(f"{'='*60}") for lang in TARGET_LANGS: start_time = time.time() print(f"\n---> Running pipeline for [ {lang.upper()} ]") try: output_srt = generate_subtitles_test( video_path=str(video), target_lang=lang, engine=ENGINE, version=batch_version, reuse_version=reuse_version ) duration = time.time() - start_time print(f"āœ“ Success! Generated SRT: {output_srt}") print(f"ā±ļø Time taken: {duration:.2f} seconds") except Exception as e: print(f"āŒ Pipeline failed for {lang.upper()}: {e}") print("\nāœ… Batch testing complete!") print("šŸ“Š Review logs/translation_failures.jsonl to see self-generated architectural insights.") if __name__ == "__main__": run_batch_tests()