| import os |
| import sys |
| import time |
| from pathlib import Path |
|
|
| |
| sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))) |
|
|
| from app.services.transcribe import extract_audio, transcribe_audio |
| from app.services.srt_generator import save_srt, translate_srt |
| from app.services.precision_patch import apply_precision_patch |
| from app.main import get_translator |
|
|
| class Logger(object): |
| def __init__(self, filename): |
| self.terminal = sys.stdout |
| self.log = open(filename, "a", encoding="utf-8") |
| |
| def write(self, message): |
| self.terminal.write(message) |
| self.log.write(message) |
| self.log.flush() |
|
|
| def flush(self): |
| self.terminal.flush() |
| self.log.flush() |
|
|
| |
| TEST_VIDEOS_DIR = Path(os.path.dirname(os.path.abspath(__file__))) / "resources" / "test-videos" |
| TARGET_LANGS = ["ml", "hi"] |
| ENGINE = "gemini" |
|
|
| def generate_subtitles_test(video_path: str, target_lang: str, engine: str, version: str, reuse_version: str = None) -> str: |
| |
| base_name = os.path.splitext(os.path.basename(video_path))[0] |
| safe_name = "".join([c for c in base_name if c.isalnum() or c in " ._-"]).strip() |
| file_id = safe_name if safe_name else "video" |
| |
| upload_dir = f"app/uploads/{version}" |
| subtitles_dir = f"app/subtitles/{version}" |
| os.makedirs(upload_dir, exist_ok=True) |
| os.makedirs(subtitles_dir, exist_ok=True) |
| |
| audio_path = f"{upload_dir}/{file_id}_test.wav" |
| en_srt_path = f"{subtitles_dir}/{file_id}_test_en.srt" |
| target_srt_path = f"{subtitles_dir}/{file_id}_test_{target_lang}.srt" |
| |
| |
| if reuse_version and not os.path.exists(en_srt_path): |
| old_en_srt = f"app/subtitles/{reuse_version}/{file_id}_test_en.srt" |
| if os.path.exists(old_en_srt): |
| import shutil |
| shutil.copy(old_en_srt, en_srt_path) |
| print(f" --> Reused English SRT from {reuse_version}") |
| |
| |
| if not os.path.exists(en_srt_path): |
| |
| extract_audio(video_path, audio_path) |
| |
| |
| segments, info = transcribe_audio(audio_path) |
| |
| |
| apply_precision_patch(segments) |
| |
| |
| save_srt(segments, en_srt_path) |
| else: |
| if not (reuse_version and os.path.exists(en_srt_path)): |
| print(f" --> Skipping transcription, using cached English SRT") |
| |
| |
| translator = get_translator(engine) |
| translate_srt(en_srt_path, target_srt_path, target_lang, translator, validate=True) |
| |
| |
| if os.path.exists(audio_path): |
| os.remove(audio_path) |
| |
| return target_srt_path |
|
|
| def run_batch_tests(): |
| batch_version = time.strftime("%I-%M-%p--%d-%m-%Y") |
| |
| os.makedirs("logs", exist_ok=True) |
| log_file = f"logs/batch_test_{batch_version}.txt" |
| sys.stdout = Logger(log_file) |
| sys.stderr = sys.stdout |
| |
| |
| reuse_version = None |
| subtitles_root = Path("app/subtitles") |
| if subtitles_root.exists(): |
| |
| folders = [f.name for f in subtitles_root.iterdir() if f.is_dir() and "--" in f.name] |
| if folders: |
| |
| latest_folder = sorted(folders, reverse=True)[0] |
| print(f"\n[?] Found existing transcriptions in: {latest_folder}") |
| |
| try: |
| choice = input("Use the latest transcription to save time? (y/n): ").strip().lower() |
| if choice == 'y': |
| reuse_version = latest_folder |
| print(f"β
Reusing transcriptions from: {reuse_version}\n") |
| except EOFError: |
| |
| pass |
|
|
| print(f"π Starting automated pipeline tests...") |
| print(f"π Directory: {TEST_VIDEOS_DIR}") |
| print(f"βοΈ Engine: {ENGINE}") |
| print(f"π Target Languages: {TARGET_LANGS}") |
| print(f"π Batch Version: {batch_version}\n") |
|
|
| videos = sorted(TEST_VIDEOS_DIR.glob("*.mp4"), key=lambda v: v.stat().st_size) |
| |
| if not videos: |
| print("β No videos found in test directory.") |
| return |
|
|
| print(f"π Processing order (smallest first):") |
| for i, v in enumerate(videos, 1): |
| print(f" {i}. {v.name} ({v.stat().st_size / (1024*1024):.1f} MB)") |
|
|
| for video in videos: |
| print(f"\n{'='*60}") |
| print(f"π₯ Processing Video: {video.name} (Size: {video.stat().st_size / (1024*1024):.1f} MB)") |
| print(f"{'='*60}") |
| |
| for lang in TARGET_LANGS: |
| start_time = time.time() |
| print(f"\n---> Running pipeline for [ {lang.upper()} ]") |
| try: |
| output_srt = generate_subtitles_test( |
| video_path=str(video), |
| target_lang=lang, |
| engine=ENGINE, |
| version=batch_version, |
| reuse_version=reuse_version |
| ) |
| duration = time.time() - start_time |
| print(f"β Success! Generated SRT: {output_srt}") |
| print(f"β±οΈ Time taken: {duration:.2f} seconds") |
| except Exception as e: |
| print(f"β Pipeline failed for {lang.upper()}: {e}") |
| |
| print("\nβ
Batch testing complete!") |
| print("π Review logs/translation_failures.jsonl to see self-generated architectural insights.") |
|
|
| if __name__ == "__main__": |
| run_batch_tests() |
|
|