Spaces:
Sleeping
Sleeping
| import os | |
| import json | |
| import math | |
| import re | |
| import time | |
| import traceback | |
| from typing import Optional | |
| import xml.etree.ElementTree as ElementTree | |
| from html import unescape | |
| from yt_dlp import YoutubeDL, DownloadError | |
| from yt_dlp.networking import Request | |
| from yt_dlp.utils import sanitize_filename, random_user_agent | |
| NO_RETRY_STR = [ | |
| "Sorry about that", | |
| "unavailable", | |
| "not available", | |
| ] | |
| RETRY_STR = [ | |
| "URLError", | |
| "429", | |
| "IncompleteRead", | |
| "Remote end closed connection", | |
| # "No video formats found", | |
| ] | |
| debug = os.getenv("DEBUG") | |
| # yt-dlp subtitle types: json3,srv1,srv2,srv3,ttml,vtt, xml(youtube url with out extargs) | |
| # "subtitles": { | |
| # "live_chat": [ | |
| # { | |
| # "url": "https://www.youtube.com/watch?v=ANtM2bHRz04&bpctr=9999999999&has_verified=1", | |
| # "ext": "json", | |
| # "video_id": "ANtM2bHRz04", | |
| # "protocol": "youtube_live_chat_replay" | |
| # } | |
| # ] | |
| # } | |
| def getSubtitleOptions( | |
| lang: Optional[str] = None, | |
| proxy: Optional[str] = None, | |
| ): | |
| ydl_opts = { | |
| "noplaylist": True, | |
| "writesubtitles": True, | |
| "writeautomaticsub": True, | |
| # "listsubtitles": True, | |
| # "subtitlesformat": subType, # mark due to default youtube no srt and xml format | |
| "skip_download": True, | |
| "socket_timeout": 10, | |
| "extractor_retries": 0, | |
| # "debug_printtraffic": True, | |
| "extractor_args": { | |
| "youtube": { | |
| "player_skip": [ | |
| "configs", | |
| "initial", | |
| ], # skip "webpage" will cause l2P5PgL1LfI missing some langs, | |
| "player_client": ["ios"], | |
| "skip": ["hls", "dash"], # don't skip "translated_subs" | |
| } | |
| }, | |
| } | |
| if lang: | |
| ydl_opts.update( | |
| { | |
| "subtitleslangs": [ | |
| lang, | |
| "-live_chat", | |
| ] | |
| } | |
| ) # filter live chat to requested_subtitles | |
| if proxy: | |
| ydl_opts.update({"proxy": proxy, "socket_timeout": 20}) | |
| print(ydl_opts) | |
| return ydl_opts | |
| def getUrlFromSubtitleItem(item, lang="en", subType="vtt"): | |
| # print("item: {}, lang: {}, subType: {}".format(item, lang, subType)) | |
| for subtitle in item[lang]: | |
| if lang != "live_chat" and subType == "xml": | |
| if debug: | |
| print( | |
| "subtitle source lang:{} url: {}".format(lang, subtitle.get("url")) | |
| ) | |
| return subtitle.get("url").replace("&fmt=" + subtitle.get("ext"), "") | |
| if subtitle.get("ext") == subType: | |
| if debug: | |
| print("subtitle lang:{} url: {}".format(lang, subtitle.get("url"))) | |
| return subtitle.get("url") | |
| return None | |
| def getRequestedSubtitlesUrl(info_dict, lang, subType, isLangKey=False): | |
| item = info_dict.get("requested_subtitles") | |
| if not item: | |
| return None | |
| langs = item.keys() | |
| if lang in langs: | |
| item = {lang: [item[lang]]} if type(item[lang]) == dict else item | |
| url = getUrlFromSubtitleItem(item, lang, subType) | |
| if url: | |
| if debug: | |
| print("getRequestedSubtitlesUrl lang:{}".format(lang)) | |
| return url | |
| if not isLangKey: | |
| for l in langs: | |
| if l.startswith(lang): | |
| item = {l: [item[l]]} if type(item[l]) == dict else item | |
| url = getUrlFromSubtitleItem(item, l, subType) | |
| if url: | |
| if debug: | |
| print("getRequestedSubtitlesUrl lang:{} url:{}".format(l, url)) | |
| return url | |
| return None | |
| def getSubtitleLangUrl( | |
| info_dict, | |
| lang="en", | |
| subType="vtt", | |
| subTitleKeys=["subtitles", "automatic_captions"], | |
| isLangKey=False, | |
| ): | |
| for subtitle_item in subTitleKeys: | |
| langs = info_dict.get(subtitle_item).keys() | |
| if lang in langs: | |
| url = getUrlFromSubtitleItem(info_dict.get(subtitle_item), lang, subType) | |
| if url: | |
| if debug: | |
| print("getSubtitleLangUrl lang:{}".format(lang)) | |
| return url | |
| if not isLangKey: | |
| for subtitle_item in subTitleKeys: | |
| langs = info_dict.get(subtitle_item).keys() | |
| for l in langs: | |
| if l.startswith(lang): | |
| url = getUrlFromSubtitleItem( | |
| info_dict.get(subtitle_item), l, subType | |
| ) | |
| if url: | |
| if debug: | |
| print("getSubtitleLangUrl lang:{} url:{}".format(l, url)) | |
| return url | |
| return None | |
| def getSubtitleOtherUrl( | |
| info_dict, | |
| lang="en", | |
| subType="vtt", | |
| subTitleKeys=["subtitles", "automatic_captions"], | |
| ): | |
| for subtitle_item in subTitleKeys: | |
| langs = info_dict.get(subtitle_item).keys() | |
| if len(langs) == 0: | |
| continue | |
| l = lang if lang in langs else ("en" if "en" in langs else list(langs)[0]) | |
| if l is None: | |
| continue | |
| url = getUrlFromSubtitleItem(info_dict.get(subtitle_item), l, subType) | |
| if url: | |
| if debug: | |
| print("getSubtitleOtherUrl lang:{} url:{}".format(l, url)) | |
| return url | |
| return None | |
| async def fetchSubtitle( | |
| url: str, | |
| lang: Optional[str] = "en", | |
| subType: Optional[str] = "vtt", | |
| proxy: Optional[str] = None, | |
| ) -> dict: | |
| return await fetchAnySubtitle(url, lang, subType, proxy) | |
| async def fetchAnySubtitle( | |
| url: str, | |
| lang: Optional[str] = "en", | |
| subType: Optional[str] = "vtt", | |
| proxy: Optional[str] = None, | |
| ) -> dict: | |
| # lang-code or lang.* .* is regex | |
| # reqLang = lang if len(lang.split("-")) > 1 or lang.endswith(".*") else lang + ".*" | |
| title = "unknow" | |
| duration = "" | |
| try: | |
| ydl, info_dict = extractInfo(url, lang, proxy, False) | |
| # print(json.dumps(info_dict)) | |
| title = sanitize_filename(info_dict.get("title", "unknow")) | |
| seconds = info_dict.get("duration") | |
| duration = str(seconds) if seconds else "" | |
| thumbnail = info_dict.get("thumbnail") | |
| if ".webp" in thumbnail: | |
| thumbnail = "https://i.ytimg.com/vi/{}/hqdefault.jpg".format( | |
| info_dict.get("id") | |
| ) | |
| reqType = subType | |
| if info_dict.get("extractor") == "youtube" and subType in ["srt", "txt"]: | |
| reqType = "xml" | |
| if debug: | |
| print( | |
| "subtitles.keys(): {} automatic_captions: {} requested_subtitles: {}".format( | |
| info_dict.get("subtitles").keys(), | |
| info_dict.get("automatic_captions").keys(), | |
| ( | |
| info_dict.get("requested_subtitles").keys() | |
| if info_dict.get("requested_subtitles") | |
| else {} | |
| ), | |
| ) | |
| ) | |
| subtitle_funcs = [ | |
| getRequestedSubtitlesUrl, | |
| getSubtitleLangUrl, | |
| getSubtitleOtherUrl, | |
| ] | |
| for index in range(len(subtitle_funcs)): | |
| subtitle_url = subtitle_funcs[index](info_dict, lang, reqType) | |
| if subtitle_url: | |
| # print("subtitle_url: {}".format(subtitle_url)) | |
| subtitle = fetchSubtitleBydlUrl(subType, subtitle_url, ydl=ydl) | |
| print( | |
| "function index:{}, url:{}, title:{}, duration:{} len(subtitle): {}".format( | |
| index, url, title, duration, len(subtitle or "") | |
| ) | |
| ) | |
| if subtitle is not None: | |
| return { | |
| "id": info_dict.get("id"), | |
| "url": url, | |
| "title": title, | |
| "thumbnail": thumbnail, | |
| "duration": duration, | |
| "subtitle": subtitle, | |
| "chapters": info_dict.get("chapters", None), | |
| } | |
| except Exception as e: | |
| print("{}, {}".format(e, url)) | |
| traceback.print_exc() | |
| return {"error": str(e)} | |
| return {"title": title, "duration": duration, "error": "No subtitles"} | |
| def float_to_srt_time_format(d: float) -> str: | |
| """Convert decimal durations into proper srt format. | |
| :rtype: str | |
| :returns: | |
| SubRip Subtitle (str) formatted time duration. | |
| float_to_srt_time_format(3.89) -> '00:00:03,890' | |
| """ | |
| fraction, whole = math.modf(d) | |
| time_fmt = time.strftime("%H:%M:%S,", time.gmtime(whole)) | |
| ms = f"{fraction:.3f}".replace("0.", "") | |
| return time_fmt + ms | |
| def is_spaces_only(variable): | |
| for char in variable: | |
| if not char.isspace(): | |
| return False | |
| return True | |
| def xml_caption_to_srt(xml_captions: str, skip_empty: bool = True) -> str: | |
| """Convert xml caption tracks to "SubRip Subtitle (srt)". | |
| :param str xml_captions: | |
| XML formatted caption tracks. | |
| """ | |
| segments = [] | |
| root = ElementTree.fromstring(xml_captions) | |
| for i, child in enumerate(list(root)): | |
| text = child.text or "" | |
| caption = unescape( | |
| text.replace("\n", " ").replace(" ", " "), | |
| ) | |
| if skip_empty and len(caption) == 0 or is_spaces_only(caption): | |
| continue | |
| try: | |
| duration = float(child.attrib["dur"]) | |
| except KeyError: | |
| duration = 0.0 | |
| start = float(child.attrib["start"]) | |
| end = start + duration | |
| sequence_number = i + 1 # convert from 0-indexed to 1. | |
| line = "{seq}\n{start} --> {end}\n{text}\n".format( | |
| seq=sequence_number, | |
| start=float_to_srt_time_format(start), | |
| end=float_to_srt_time_format(end), | |
| text=caption, | |
| ) | |
| segments.append(line) | |
| if skip_empty: | |
| # return None if no text in xml | |
| return "\n".join(segments).strip() if len(segments) > 0 else None | |
| return "\n".join(segments).strip() | |
| def xml_caption_to_txt(xml_captions: str, skip_empty: bool = True) -> str: | |
| """Convert xml caption tracks to "SubRip Subtitle (srt)". | |
| :param str xml_captions: | |
| XML formatted caption tracks. | |
| """ | |
| segments = [] | |
| root = ElementTree.fromstring(xml_captions) | |
| for i, child in enumerate(list(root)): | |
| text = child.text or "" | |
| caption = unescape( | |
| text.replace("\n", " ").replace(" ", " "), | |
| ) | |
| if skip_empty and (len(caption) == 0 or is_spaces_only(caption)): | |
| continue | |
| line = "{text}\n".format(text=caption) | |
| segments.append(line) | |
| if skip_empty: | |
| "\n".join(segments).strip() if len(segments) > 0 else None | |
| return "\n".join(segments).strip() | |
| async def fetchSubtitleUrls(url: str, proxy: Optional[str] = None) -> json: | |
| try: | |
| _, info_dict = extractInfo(url, None, proxy, True) | |
| title = sanitize_filename(info_dict.get("title", "unknow")) | |
| seconds = info_dict.get("duration") | |
| duration = str(seconds) if seconds else "" | |
| thumbnail = info_dict.get("thumbnail") | |
| if ".webp" in thumbnail: | |
| thumbnail = "https://i.ytimg.com/vi/{}/hqdefault.jpg".format( | |
| info_dict.get("id") | |
| ) | |
| return { | |
| "id": info_dict.get("id"), | |
| "url": url, | |
| "title": title, | |
| "thumbnail": thumbnail, | |
| "duration": duration, | |
| "subtitles": info_dict.get("subtitles"), | |
| "automatic_captions": info_dict.get("automatic_captions"), | |
| } | |
| except Exception as e: | |
| print("{}, {}".format(e, url)) | |
| traceback.print_exc() | |
| return {"error": str(e)} | |
| def createHeaders(): | |
| return { | |
| "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", | |
| "Accept-Charset": "ISO-8859-1,utf-8;q=0.7,*;q=0.7", | |
| "Accept-Encoding": "gzip, deflate", | |
| "Accept-Language": "en-us,en;q=0.5", | |
| "User-Agent": random_user_agent(), | |
| } | |
| def fetchSubtitleBydlUrl(subType, dlUrl, skipEmpty=True, ydl=None): | |
| dlUrl = dlUrl if subType not in ["srt", "txt"] else re.sub(r"&fmt=[\w]+", "", dlUrl) | |
| # if download mailed we may contain headers and cookies in info and use it here. | |
| try: | |
| ydl = ydl if ydl else YoutubeDL(getSubtitleOptions()) | |
| with ydl.urlopen(Request(dlUrl, headers=createHeaders())) as resp: | |
| if subType == "srt": | |
| return xml_caption_to_srt(resp.read().decode(), skipEmpty) | |
| elif subType == "txt": | |
| return xml_caption_to_txt(resp.read().decode(), skipEmpty) | |
| else: | |
| return resp.read().decode() | |
| except Exception as e: | |
| print(e) | |
| return None | |
| def getSubtitleUrlByLang(info_dict, lang, subType, isLangKey): | |
| subtitle_funcs = [ | |
| getRequestedSubtitlesUrl, | |
| getSubtitleLangUrl, | |
| ] | |
| for index in range(len(subtitle_funcs)): | |
| subtitle_url = subtitle_funcs[index]( | |
| info_dict, lang, subType, isLangKey=isLangKey | |
| ) | |
| print("getSubtitleUrlByLang subtitle_url: {}".format(subtitle_url)) | |
| if subtitle_url: | |
| return subtitle_url | |
| def extractInfo(url, lang, proxy, forceProxy=False): | |
| max_retry = 2 | |
| retry = 0 | |
| http_proxy = proxy if forceProxy else None | |
| errMsg = None | |
| while retry < max_retry: | |
| try: | |
| ydl_opts = getSubtitleOptions(lang, http_proxy) | |
| ydl = YoutubeDL(ydl_opts) | |
| return ydl, ydl.extract_info(url, download=False) | |
| except DownloadError as e: | |
| errMsg = str(e) | |
| if "429" in errMsg: | |
| http_proxy = proxy | |
| if any(s in errMsg for s in NO_RETRY_STR): | |
| # print("{}, {}".format(e, url)) | |
| break | |
| if not any(s in errMsg for s in RETRY_STR): | |
| # print("{}, {}".format(e, url)) | |
| break | |
| retry += 1 | |
| except Exception as e: | |
| print(e) | |
| traceback.print_exc() | |
| raise e | |
| raise Exception(errMsg) | |
| async def fetchSubtitleByInfo( | |
| url: str, subType: str, dlInfo, proxy: Optional[str] = None | |
| ): | |
| try: | |
| reqType = "xml" if subType in ["srt", "txt"] else subType | |
| subtitle = None | |
| if "dlUrl" in dlInfo: | |
| subtitle = fetchSubtitleBydlUrl(subType, dlInfo.get("dlUrl"), False) | |
| if subtitle is not None: | |
| return subtitle | |
| ydl, info_dict = extractInfo(url, dlInfo.get("lang", None), proxy, False) | |
| if debug: | |
| print( | |
| "subtitles.keys(): {} automatic_captions: {} requested_subtitles: {}".format( | |
| info_dict.get("subtitles").keys(), | |
| info_dict.get("automatic_captions").keys(), | |
| ( | |
| info_dict.get("requested_subtitles").keys() | |
| if info_dict.get("requested_subtitles") | |
| else {} | |
| ), | |
| ) | |
| ) | |
| subtitleUrl = None | |
| if "langKey" in dlInfo: | |
| subtitleUrl = getSubtitleUrlByLang( | |
| info_dict, dlInfo.get("langKey"), reqType, True | |
| ) | |
| if subtitleUrl is None: | |
| subtitleUrl = getSubtitleUrlByLang( | |
| info_dict, dlInfo.get("lang"), reqType, False | |
| ) | |
| print("subtitleUrl: {}".format(subtitleUrl)) | |
| subtitle = fetchSubtitleBydlUrl(subType, subtitleUrl, False, ydl) | |
| return subtitle | |
| except Exception as e: | |
| print("{}, {}".format(e, url)) | |
| traceback.print_exc() | |
| return {"error": str(e)} | |