| |
| """ |
| Subtitle Detection Script |
| |
| This script checks which videos on JW.org have subtitles available, |
| using the same logic as the Search UI app. Results are output to a CSV file. |
| |
| Detects TWO types of subtitles: |
| 1. VTT files (files[].subtitles.url) - Downloadable subtitle files |
| 2. Burned-in (files[].subtitled=true) - Video versions with embedded subtitles |
| |
| Usage: |
| python3 subtitle-detection.py [--language LANG] |
| |
| Example: |
| python3 subtitle-detection.py --language E |
| """ |
|
|
| import requests |
| import json |
| import csv |
| import argparse |
| from typing import Dict, Any, Optional |
| from datetime import datetime |
|
|
|
|
| def fetch_json(url: str) -> Dict[str, Any]: |
| """Fetch JSON data from URL.""" |
| print(f"Fetching: {url}") |
| response = requests.get(url, timeout=60) |
| response.raise_for_status() |
| return response.json() |
|
|
|
|
| def find_subtitles(media_item: Dict[str, Any]) -> Optional[str]: |
| """ |
| Find subtitle URL from media item files. |
| This uses the exact same logic as the Search UI app (subtitles_download.py). |
| |
| Args: |
| media_item: Media item dictionary with 'files' array |
| |
| Returns: |
| Subtitle URL string, or None if not found |
| """ |
| files = media_item.get('files', []) |
| for file in files: |
| if 'subtitles' in file: |
| subtitle_url = file['subtitles'].get('url') |
| if subtitle_url: |
| return subtitle_url |
| return None |
|
|
|
|
| def has_burned_in_subtitles(media_item: Dict[str, Any]) -> bool: |
| """ |
| Check if video has a burned-in subtitle version available. |
| |
| Burned-in subtitles are embedded in the video stream itself (not a separate file). |
| The JW.org website lets you switch between subtitled and non-subtitled versions. |
| |
| Args: |
| media_item: Media item dictionary with 'files' array |
| |
| Returns: |
| True if any file has subtitled=True, False otherwise |
| """ |
| files = media_item.get('files', []) |
| for file in files: |
| if file.get('subtitled') == True: |
| return True |
| return False |
|
|
|
|
| def get_file_labels_with_subtitles(media_item: Dict[str, Any]) -> Dict[str, Any]: |
| """ |
| Get detailed information about which file versions have subtitles. |
| |
| Args: |
| media_item: Media item dictionary with 'files' array |
| |
| Returns: |
| Dict with: |
| - burned_in_labels: list of resolution labels with burned-in subtitles |
| - vtt_labels: list of resolution labels with VTT subtitle files |
| """ |
| files = media_item.get('files', []) |
| burned_in_labels = [] |
| vtt_labels = [] |
|
|
| for file in files: |
| label = file.get('label', 'unknown') |
|
|
| if file.get('subtitled') == True: |
| burned_in_labels.append(label) |
|
|
| if 'subtitles' in file and file['subtitles'].get('url'): |
| vtt_labels.append(label) |
|
|
| return { |
| 'burned_in_labels': burned_in_labels, |
| 'vtt_labels': vtt_labels |
| } |
|
|
|
|
| def get_jw_org_url(natural_key: str, language: str = "E") -> str: |
| """ |
| Generate the JW.org video URL for a given natural key. |
| |
| Args: |
| natural_key: The languageAgnosticNaturalKey (e.g., 'pub-osg_108_VIDEO') |
| language: Language code (e.g., 'E' for English) |
| |
| Returns: |
| Full JW.org URL for the video |
| """ |
| return f"https://www.jw.org/finder?srcid=share&wtlocale={language}&lank={natural_key}" |
|
|
|
|
| def download_vod_categories(language: str) -> Dict[str, Any]: |
| """ |
| Download the VideoOnDemand metadata to get list of categories. |
| |
| Args: |
| language: Language code (e.g., 'E' for English) |
| |
| Returns: |
| VOD data dictionary with category information |
| """ |
| url = f"https://b.jw-cdn.org/apis/mediator/v1/categories/{language}/VideoOnDemand?detailed=1&mediaLimit=0&clientType=www" |
| return fetch_json(url) |
|
|
|
|
| def download_category(category_key: str, language: str) -> Dict[str, Any]: |
| """ |
| Download detailed category data from JW.org API. |
| |
| Args: |
| category_key: Category key (e.g., 'VODStudio') |
| language: Language code (e.g., 'E' for English) |
| |
| Returns: |
| Category data dictionary with media items |
| """ |
| url = f"https://b.jw-cdn.org/apis/mediator/v1/categories/{language}/{category_key}?detailed=1&clientType=www" |
| return fetch_json(url) |
|
|
|
|
| def collect_all_media(language: str) -> Dict[str, Dict[str, Any]]: |
| """ |
| Collect all media items from all VOD categories. |
| Uses the same approach as the Search UI app (jw_api.py). |
| |
| Args: |
| language: Language code |
| |
| Returns: |
| Dictionary mapping naturalKey -> media_item (with category context) |
| """ |
| print(f"\n=== Collecting all media items for language: {language} ===\n") |
|
|
| |
| vod_data = download_vod_categories(language) |
|
|
| |
| categories = vod_data.get('category', {}).get('subcategories', []) |
| print(f"Found {len(categories)} top-level categories\n") |
|
|
| all_media = {} |
|
|
| |
| for category in categories: |
| category_key = category.get('key', '') |
| category_name = category.get('name', '') |
|
|
| print(f"Processing category: {category_name} ({category_key})") |
|
|
| try: |
| category_data = download_category(category_key, language) |
|
|
| |
| subcategories = category_data.get('category', {}).get('subcategories', []) |
|
|
| for subcategory in subcategories: |
| subcategory_name = subcategory.get('name', '') |
| media_items = subcategory.get('media', []) |
|
|
| for media_item in media_items: |
| natural_key = media_item.get('languageAgnosticNaturalKey', '') |
|
|
| if not natural_key: |
| continue |
|
|
| |
| media_item_with_context = media_item.copy() |
| media_item_with_context['_category'] = category_name |
| media_item_with_context['_category_key'] = category_key |
| media_item_with_context['_subcategory'] = subcategory_name |
|
|
| all_media[natural_key] = media_item_with_context |
|
|
| print(f" -> Found {len(subcategories)} subcategories") |
|
|
| except Exception as e: |
| print(f" -> Error: {e}") |
| continue |
|
|
| print(f"\n=== Total unique media items collected: {len(all_media)} ===\n") |
| return all_media |
|
|
|
|
| def analyze_subtitles(all_media: Dict[str, Dict[str, Any]], language: str) -> list: |
| """ |
| Analyze all media items for subtitle availability. |
| |
| Checks for TWO types of subtitles: |
| 1. VTT files (files[].subtitles.url) - Downloadable subtitle files |
| 2. Burned-in (files[].subtitled=true) - Video versions with embedded subtitles |
| |
| Args: |
| all_media: Dictionary of all media items |
| language: Language code |
| |
| Returns: |
| List of dictionaries with analysis results |
| """ |
| results = [] |
|
|
| for natural_key, media_item in all_media.items(): |
| |
| subtitle_url = find_subtitles(media_item) |
| has_vtt = subtitle_url is not None |
|
|
| |
| has_burned_in = has_burned_in_subtitles(media_item) |
|
|
| |
| subtitle_info = get_file_labels_with_subtitles(media_item) |
|
|
| |
| has_any_subtitles = has_vtt or has_burned_in |
|
|
| |
| duration = media_item.get('duration', 0) |
| duration_formatted = media_item.get('durationFormattedHHMM', '') |
|
|
| |
| first_published = media_item.get('firstPublished', '') |
|
|
| results.append({ |
| 'natural_key': natural_key, |
| 'title': media_item.get('title', ''), |
| 'category': media_item.get('_category', ''), |
| 'subcategory': media_item.get('_subcategory', ''), |
| 'has_any_subtitles': 'Yes' if has_any_subtitles else 'No', |
| 'has_vtt_subtitles': 'Yes' if has_vtt else 'No', |
| 'has_burned_in_subtitles': 'Yes' if has_burned_in else 'No', |
| 'vtt_url': subtitle_url or '', |
| 'vtt_resolutions': ', '.join(subtitle_info['vtt_labels']) if subtitle_info['vtt_labels'] else '', |
| 'burned_in_resolutions': ', '.join(subtitle_info['burned_in_labels']) if subtitle_info['burned_in_labels'] else '', |
| 'duration_seconds': duration, |
| 'duration_formatted': duration_formatted, |
| 'first_published': first_published, |
| 'jw_org_url': get_jw_org_url(natural_key, language) |
| }) |
|
|
| return results |
|
|
|
|
| def write_csv(results: list, output_file: str) -> None: |
| """ |
| Write results to CSV file. |
| |
| Args: |
| results: List of result dictionaries |
| output_file: Path to output CSV file |
| """ |
| if not results: |
| print("No results to write!") |
| return |
|
|
| |
| fieldnames = [ |
| 'natural_key', |
| 'title', |
| 'category', |
| 'subcategory', |
| 'has_any_subtitles', |
| 'has_vtt_subtitles', |
| 'has_burned_in_subtitles', |
| 'vtt_url', |
| 'vtt_resolutions', |
| 'burned_in_resolutions', |
| 'duration_seconds', |
| 'duration_formatted', |
| 'first_published', |
| 'jw_org_url' |
| ] |
|
|
| with open(output_file, 'w', newline='', encoding='utf-8') as f: |
| writer = csv.DictWriter(f, fieldnames=fieldnames) |
| writer.writeheader() |
| writer.writerows(results) |
|
|
| print(f"Results written to: {output_file}") |
|
|
|
|
| def print_summary(results: list) -> str: |
| """ |
| Print and return a summary of the results. |
| |
| Args: |
| results: List of result dictionaries |
| |
| Returns: |
| Summary string |
| """ |
| total = len(results) |
|
|
| |
| with_any = sum(1 for r in results if r['has_any_subtitles'] == 'Yes') |
| with_vtt = sum(1 for r in results if r['has_vtt_subtitles'] == 'Yes') |
| with_burned_in = sum(1 for r in results if r['has_burned_in_subtitles'] == 'Yes') |
| with_both = sum(1 for r in results if r['has_vtt_subtitles'] == 'Yes' and r['has_burned_in_subtitles'] == 'Yes') |
| vtt_only = sum(1 for r in results if r['has_vtt_subtitles'] == 'Yes' and r['has_burned_in_subtitles'] == 'No') |
| burned_in_only = sum(1 for r in results if r['has_vtt_subtitles'] == 'No' and r['has_burned_in_subtitles'] == 'Yes') |
| without_any = total - with_any |
|
|
| |
| categories = {} |
| for r in results: |
| cat = r['category'] |
| if cat not in categories: |
| categories[cat] = {'total': 0, 'with_any': 0, 'with_vtt': 0, 'with_burned_in': 0} |
| categories[cat]['total'] += 1 |
| if r['has_any_subtitles'] == 'Yes': |
| categories[cat]['with_any'] += 1 |
| if r['has_vtt_subtitles'] == 'Yes': |
| categories[cat]['with_vtt'] += 1 |
| if r['has_burned_in_subtitles'] == 'Yes': |
| categories[cat]['with_burned_in'] += 1 |
|
|
| pct = lambda n: f"{100*n/total:.1f}%" if total > 0 else "0%" |
|
|
| lines = [ |
| "", |
| "=" * 70, |
| "SUBTITLE AVAILABILITY SUMMARY", |
| "=" * 70, |
| f"Total videos analyzed: {total}", |
| "", |
| "OVERALL SUBTITLE AVAILABILITY:", |
| "-" * 40, |
| f" Videos WITH any subtitles: {with_any:5d} ({pct(with_any)})", |
| f" Videos WITHOUT any subtitles: {without_any:5d} ({pct(without_any)})", |
| "", |
| "SUBTITLE TYPE BREAKDOWN:", |
| "-" * 40, |
| f" VTT file subtitles: {with_vtt:5d} ({pct(with_vtt)})", |
| f" Burned-in subtitles: {with_burned_in:5d} ({pct(with_burned_in)})", |
| f" Both types available: {with_both:5d} ({pct(with_both)})", |
| f" VTT only (no burned-in): {vtt_only:5d} ({pct(vtt_only)})", |
| f" Burned-in only (no VTT): {burned_in_only:5d} ({pct(burned_in_only)})", |
| "", |
| "BY CATEGORY (any subtitles / VTT / burned-in):", |
| "-" * 70 |
| ] |
|
|
| for cat, counts in sorted(categories.items()): |
| pct_any = 100 * counts['with_any'] / counts['total'] if counts['total'] > 0 else 0 |
| lines.append(f" {cat}:") |
| lines.append(f" Any: {counts['with_any']}/{counts['total']} ({pct_any:.0f}%) | VTT: {counts['with_vtt']} | Burned-in: {counts['with_burned_in']}") |
|
|
| lines.append("=" * 70) |
|
|
| summary = "\n".join(lines) |
| print(summary) |
| return summary |
|
|
|
|
| def main(): |
| parser = argparse.ArgumentParser( |
| description='Check which JW.org videos have subtitles available for download' |
| ) |
| parser.add_argument( |
| '--language', '-l', |
| default='E', |
| help='Language code (default: E for English)' |
| ) |
| parser.add_argument( |
| '--output', '-o', |
| default=None, |
| help='Output CSV file path (default: subtitle-detection-{language}-{timestamp}.csv)' |
| ) |
|
|
| args = parser.parse_args() |
|
|
| language = args.language |
| timestamp = datetime.now().strftime('%Y%m%d-%H%M%S') |
|
|
| |
| if args.output: |
| output_file = args.output |
| else: |
| import os |
| output_file = f'subtitle-detection-{language}-{timestamp}.csv' |
|
|
| print(f"\n{'='*60}") |
| print("JW.ORG SUBTITLE DETECTION SCRIPT") |
| print(f"{'='*60}") |
| print(f"Language: {language}") |
| print(f"Output file: {output_file}") |
| print(f"{'='*60}\n") |
|
|
| |
| all_media = collect_all_media(language) |
|
|
| |
| print("Analyzing subtitle availability...") |
| results = analyze_subtitles(all_media, language) |
|
|
| |
| results.sort(key=lambda x: (x['category'], x['title'])) |
|
|
| |
| write_csv(results, output_file) |
|
|
| |
| summary = print_summary(results) |
|
|
| |
| result_file = output_file.replace('.csv', '.result.txt') |
| with open(result_file, 'w', encoding='utf-8') as f: |
| f.write(f"Subtitle Detection Results\n") |
| f.write(f"Generated: {datetime.now().isoformat()}\n") |
| f.write(f"Language: {language}\n") |
| f.write(f"CSV Output: {output_file}\n") |
| f.write(summary) |
|
|
| print(f"\nSummary written to: {result_file}") |
| print("\nDone!") |
|
|
|
|
| if __name__ == '__main__': |
| main() |
|
|