jw-search / scripts /subtitle-detection.py
jw-tools's picture
deploy: latest main (lazy-ML cold start, durable launcher, web-image search, scene search) + full-app data refresh
7ea1851 verified
#!/usr/bin/env python3
"""
Subtitle Detection Script
This script checks which videos on JW.org have subtitles available,
using the same logic as the Search UI app. Results are output to a CSV file.
Detects TWO types of subtitles:
1. VTT files (files[].subtitles.url) - Downloadable subtitle files
2. Burned-in (files[].subtitled=true) - Video versions with embedded subtitles
Usage:
python3 subtitle-detection.py [--language LANG]
Example:
python3 subtitle-detection.py --language E
"""
import requests
import json
import csv
import argparse
from typing import Dict, Any, Optional
from datetime import datetime
def fetch_json(url: str) -> Dict[str, Any]:
"""Fetch JSON data from URL."""
print(f"Fetching: {url}")
response = requests.get(url, timeout=60)
response.raise_for_status()
return response.json()
def find_subtitles(media_item: Dict[str, Any]) -> Optional[str]:
"""
Find subtitle URL from media item files.
This uses the exact same logic as the Search UI app (subtitles_download.py).
Args:
media_item: Media item dictionary with 'files' array
Returns:
Subtitle URL string, or None if not found
"""
files = media_item.get('files', [])
for file in files:
if 'subtitles' in file:
subtitle_url = file['subtitles'].get('url')
if subtitle_url:
return subtitle_url
return None
def has_burned_in_subtitles(media_item: Dict[str, Any]) -> bool:
"""
Check if video has a burned-in subtitle version available.
Burned-in subtitles are embedded in the video stream itself (not a separate file).
The JW.org website lets you switch between subtitled and non-subtitled versions.
Args:
media_item: Media item dictionary with 'files' array
Returns:
True if any file has subtitled=True, False otherwise
"""
files = media_item.get('files', [])
for file in files:
if file.get('subtitled') == True:
return True
return False
def get_file_labels_with_subtitles(media_item: Dict[str, Any]) -> Dict[str, Any]:
"""
Get detailed information about which file versions have subtitles.
Args:
media_item: Media item dictionary with 'files' array
Returns:
Dict with:
- burned_in_labels: list of resolution labels with burned-in subtitles
- vtt_labels: list of resolution labels with VTT subtitle files
"""
files = media_item.get('files', [])
burned_in_labels = []
vtt_labels = []
for file in files:
label = file.get('label', 'unknown')
if file.get('subtitled') == True:
burned_in_labels.append(label)
if 'subtitles' in file and file['subtitles'].get('url'):
vtt_labels.append(label)
return {
'burned_in_labels': burned_in_labels,
'vtt_labels': vtt_labels
}
def get_jw_org_url(natural_key: str, language: str = "E") -> str:
"""
Generate the JW.org video URL for a given natural key.
Args:
natural_key: The languageAgnosticNaturalKey (e.g., 'pub-osg_108_VIDEO')
language: Language code (e.g., 'E' for English)
Returns:
Full JW.org URL for the video
"""
return f"https://www.jw.org/finder?srcid=share&wtlocale={language}&lank={natural_key}"
def download_vod_categories(language: str) -> Dict[str, Any]:
"""
Download the VideoOnDemand metadata to get list of categories.
Args:
language: Language code (e.g., 'E' for English)
Returns:
VOD data dictionary with category information
"""
url = f"https://b.jw-cdn.org/apis/mediator/v1/categories/{language}/VideoOnDemand?detailed=1&mediaLimit=0&clientType=www"
return fetch_json(url)
def download_category(category_key: str, language: str) -> Dict[str, Any]:
"""
Download detailed category data from JW.org API.
Args:
category_key: Category key (e.g., 'VODStudio')
language: Language code (e.g., 'E' for English)
Returns:
Category data dictionary with media items
"""
url = f"https://b.jw-cdn.org/apis/mediator/v1/categories/{language}/{category_key}?detailed=1&clientType=www"
return fetch_json(url)
def collect_all_media(language: str) -> Dict[str, Dict[str, Any]]:
"""
Collect all media items from all VOD categories.
Uses the same approach as the Search UI app (jw_api.py).
Args:
language: Language code
Returns:
Dictionary mapping naturalKey -> media_item (with category context)
"""
print(f"\n=== Collecting all media items for language: {language} ===\n")
# Get the main VOD listing
vod_data = download_vod_categories(language)
# Get list of top-level categories
categories = vod_data.get('category', {}).get('subcategories', [])
print(f"Found {len(categories)} top-level categories\n")
all_media = {}
# Process each category
for category in categories:
category_key = category.get('key', '')
category_name = category.get('name', '')
print(f"Processing category: {category_name} ({category_key})")
try:
category_data = download_category(category_key, language)
# Process subcategories
subcategories = category_data.get('category', {}).get('subcategories', [])
for subcategory in subcategories:
subcategory_name = subcategory.get('name', '')
media_items = subcategory.get('media', [])
for media_item in media_items:
natural_key = media_item.get('languageAgnosticNaturalKey', '')
if not natural_key:
continue
# Add category context (same as combine_media_info in jw_api.py)
media_item_with_context = media_item.copy()
media_item_with_context['_category'] = category_name
media_item_with_context['_category_key'] = category_key
media_item_with_context['_subcategory'] = subcategory_name
all_media[natural_key] = media_item_with_context
print(f" -> Found {len(subcategories)} subcategories")
except Exception as e:
print(f" -> Error: {e}")
continue
print(f"\n=== Total unique media items collected: {len(all_media)} ===\n")
return all_media
def analyze_subtitles(all_media: Dict[str, Dict[str, Any]], language: str) -> list:
"""
Analyze all media items for subtitle availability.
Checks for TWO types of subtitles:
1. VTT files (files[].subtitles.url) - Downloadable subtitle files
2. Burned-in (files[].subtitled=true) - Video versions with embedded subtitles
Args:
all_media: Dictionary of all media items
language: Language code
Returns:
List of dictionaries with analysis results
"""
results = []
for natural_key, media_item in all_media.items():
# Check for VTT subtitle file
subtitle_url = find_subtitles(media_item)
has_vtt = subtitle_url is not None
# Check for burned-in subtitle versions
has_burned_in = has_burned_in_subtitles(media_item)
# Get detailed info about which resolutions have subtitles
subtitle_info = get_file_labels_with_subtitles(media_item)
# Any subtitle availability (either type)
has_any_subtitles = has_vtt or has_burned_in
# Get video duration
duration = media_item.get('duration', 0)
duration_formatted = media_item.get('durationFormattedHHMM', '')
# Get first published date
first_published = media_item.get('firstPublished', '')
results.append({
'natural_key': natural_key,
'title': media_item.get('title', ''),
'category': media_item.get('_category', ''),
'subcategory': media_item.get('_subcategory', ''),
'has_any_subtitles': 'Yes' if has_any_subtitles else 'No',
'has_vtt_subtitles': 'Yes' if has_vtt else 'No',
'has_burned_in_subtitles': 'Yes' if has_burned_in else 'No',
'vtt_url': subtitle_url or '',
'vtt_resolutions': ', '.join(subtitle_info['vtt_labels']) if subtitle_info['vtt_labels'] else '',
'burned_in_resolutions': ', '.join(subtitle_info['burned_in_labels']) if subtitle_info['burned_in_labels'] else '',
'duration_seconds': duration,
'duration_formatted': duration_formatted,
'first_published': first_published,
'jw_org_url': get_jw_org_url(natural_key, language)
})
return results
def write_csv(results: list, output_file: str) -> None:
"""
Write results to CSV file.
Args:
results: List of result dictionaries
output_file: Path to output CSV file
"""
if not results:
print("No results to write!")
return
# Define column order
fieldnames = [
'natural_key',
'title',
'category',
'subcategory',
'has_any_subtitles',
'has_vtt_subtitles',
'has_burned_in_subtitles',
'vtt_url',
'vtt_resolutions',
'burned_in_resolutions',
'duration_seconds',
'duration_formatted',
'first_published',
'jw_org_url'
]
with open(output_file, 'w', newline='', encoding='utf-8') as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(results)
print(f"Results written to: {output_file}")
def print_summary(results: list) -> str:
"""
Print and return a summary of the results.
Args:
results: List of result dictionaries
Returns:
Summary string
"""
total = len(results)
# Count different subtitle types
with_any = sum(1 for r in results if r['has_any_subtitles'] == 'Yes')
with_vtt = sum(1 for r in results if r['has_vtt_subtitles'] == 'Yes')
with_burned_in = sum(1 for r in results if r['has_burned_in_subtitles'] == 'Yes')
with_both = sum(1 for r in results if r['has_vtt_subtitles'] == 'Yes' and r['has_burned_in_subtitles'] == 'Yes')
vtt_only = sum(1 for r in results if r['has_vtt_subtitles'] == 'Yes' and r['has_burned_in_subtitles'] == 'No')
burned_in_only = sum(1 for r in results if r['has_vtt_subtitles'] == 'No' and r['has_burned_in_subtitles'] == 'Yes')
without_any = total - with_any
# Count by category
categories = {}
for r in results:
cat = r['category']
if cat not in categories:
categories[cat] = {'total': 0, 'with_any': 0, 'with_vtt': 0, 'with_burned_in': 0}
categories[cat]['total'] += 1
if r['has_any_subtitles'] == 'Yes':
categories[cat]['with_any'] += 1
if r['has_vtt_subtitles'] == 'Yes':
categories[cat]['with_vtt'] += 1
if r['has_burned_in_subtitles'] == 'Yes':
categories[cat]['with_burned_in'] += 1
pct = lambda n: f"{100*n/total:.1f}%" if total > 0 else "0%"
lines = [
"",
"=" * 70,
"SUBTITLE AVAILABILITY SUMMARY",
"=" * 70,
f"Total videos analyzed: {total}",
"",
"OVERALL SUBTITLE AVAILABILITY:",
"-" * 40,
f" Videos WITH any subtitles: {with_any:5d} ({pct(with_any)})",
f" Videos WITHOUT any subtitles: {without_any:5d} ({pct(without_any)})",
"",
"SUBTITLE TYPE BREAKDOWN:",
"-" * 40,
f" VTT file subtitles: {with_vtt:5d} ({pct(with_vtt)})",
f" Burned-in subtitles: {with_burned_in:5d} ({pct(with_burned_in)})",
f" Both types available: {with_both:5d} ({pct(with_both)})",
f" VTT only (no burned-in): {vtt_only:5d} ({pct(vtt_only)})",
f" Burned-in only (no VTT): {burned_in_only:5d} ({pct(burned_in_only)})",
"",
"BY CATEGORY (any subtitles / VTT / burned-in):",
"-" * 70
]
for cat, counts in sorted(categories.items()):
pct_any = 100 * counts['with_any'] / counts['total'] if counts['total'] > 0 else 0
lines.append(f" {cat}:")
lines.append(f" Any: {counts['with_any']}/{counts['total']} ({pct_any:.0f}%) | VTT: {counts['with_vtt']} | Burned-in: {counts['with_burned_in']}")
lines.append("=" * 70)
summary = "\n".join(lines)
print(summary)
return summary
def main():
parser = argparse.ArgumentParser(
description='Check which JW.org videos have subtitles available for download'
)
parser.add_argument(
'--language', '-l',
default='E',
help='Language code (default: E for English)'
)
parser.add_argument(
'--output', '-o',
default=None,
help='Output CSV file path (default: subtitle-detection-{language}-{timestamp}.csv)'
)
args = parser.parse_args()
language = args.language
timestamp = datetime.now().strftime('%Y%m%d-%H%M%S')
# Default output file in current working directory
if args.output:
output_file = args.output
else:
import os
output_file = f'subtitle-detection-{language}-{timestamp}.csv'
print(f"\n{'='*60}")
print("JW.ORG SUBTITLE DETECTION SCRIPT")
print(f"{'='*60}")
print(f"Language: {language}")
print(f"Output file: {output_file}")
print(f"{'='*60}\n")
# Collect all media items
all_media = collect_all_media(language)
# Analyze for subtitles
print("Analyzing subtitle availability...")
results = analyze_subtitles(all_media, language)
# Sort results by category, then title
results.sort(key=lambda x: (x['category'], x['title']))
# Write CSV
write_csv(results, output_file)
# Print summary
summary = print_summary(results)
# Write summary to result file
result_file = output_file.replace('.csv', '.result.txt')
with open(result_file, 'w', encoding='utf-8') as f:
f.write(f"Subtitle Detection Results\n")
f.write(f"Generated: {datetime.now().isoformat()}\n")
f.write(f"Language: {language}\n")
f.write(f"CSV Output: {output_file}\n")
f.write(summary)
print(f"\nSummary written to: {result_file}")
print("\nDone!")
if __name__ == '__main__':
main()