| """This module contains all non-cipher related data extraction logic.""" |
| import logging |
| import urllib.parse |
| import re |
| from collections import OrderedDict |
| from datetime import datetime |
| from typing import Any, Dict, List, Optional, Tuple |
| from urllib.parse import parse_qs, quote, urlencode, urlparse |
|
|
| from pytubefix.cipher import Cipher |
| from pytubefix.exceptions import HTMLParseError, LiveStreamError, RegexMatchError |
| from pytubefix.helpers import regex_search |
| from pytubefix.metadata import YouTubeMetadata |
| from pytubefix.parser import parse_for_object, parse_for_all_objects |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| def publish_date(watch_html: str): |
| """Extract publish date and return it as a datetime object |
| :param str watch_html: |
| The html contents of the watch page. |
| :rtype: datetime |
| :returns: |
| Publish date of the video as a datetime object with timezone. |
| """ |
| try: |
| result = re.search( |
| r"(?<=itemprop=\"datePublished\" content=\")\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}[+-]\d{2}:\d{2}", |
| watch_html |
| ) |
| if result: |
| return datetime.fromisoformat(result.group(0)) |
| except AttributeError: |
| return None |
|
|
|
|
| def recording_available(watch_html): |
| """Check if live stream recording is available. |
| |
| :param str watch_html: |
| The html contents of the watch page. |
| :rtype: bool |
| :returns: |
| Whether or not the content is private. |
| """ |
| unavailable_strings = [ |
| 'This live stream recording is not available.' |
| ] |
| for string in unavailable_strings: |
| if string in watch_html: |
| return False |
| return True |
|
|
|
|
| def is_private(watch_html): |
| """Check if content is private. |
| |
| :param str watch_html: |
| The html contents of the watch page. |
| :rtype: bool |
| :returns: |
| Whether or not the content is private. |
| """ |
| private_strings = [ |
| "This is a private video. Please sign in to verify that you may see it.", |
| "\"simpleText\":\"Private video\"", |
| "This video is private." |
| ] |
| for string in private_strings: |
| if string in watch_html: |
| return True |
| return False |
|
|
|
|
| def is_age_restricted(watch_html: str) -> bool: |
| """Check if content is age restricted. |
| |
| :param str watch_html: |
| The html contents of the watch page. |
| :rtype: bool |
| :returns: |
| Whether or not the content is age restricted. |
| """ |
| try: |
| regex_search(r"og:restrictions:age", watch_html, group=0) |
| except RegexMatchError: |
| return False |
| return True |
|
|
|
|
| def playability_status(player_response: dict) -> Tuple[Any, Any]: |
| """Return the playability status and status explanation of a video. |
| |
| For example, a video may have a status of LOGIN_REQUIRED, and an explanation |
| of "This is a private video. Please sign in to verify that you may see it." |
| |
| This explanation is what gets incorporated into the media player overlay. |
| |
| :param str player_response: |
| Content of the player's response. |
| :rtype: bool |
| :returns: |
| Playability status and reason of the video. |
| """ |
| status_dict = player_response.get('playabilityStatus', {}) |
| |
| |
| |
| if 'videoDetails' in player_response: |
| if 'isLive' in player_response['videoDetails']: |
| return 'LIVE_STREAM', 'Video is a live stream.' |
|
|
| if 'status' in status_dict: |
| if 'reason' in status_dict: |
| return status_dict['status'], [status_dict['reason']] |
| if 'messages' in status_dict: |
| return status_dict['status'], status_dict['messages'] |
| return None, [None] |
|
|
|
|
| def signature_timestamp(js: str) -> str: |
| return regex_search(r"signatureTimestamp:(\d*)", js, group=1) |
|
|
|
|
| def visitor_data(response_context: str) -> str: |
| return regex_search(r"visitor_data[',\"\s]+value['\"]:\s?['\"]([a-zA-Z0-9_%-]+)['\"]", response_context, group=1) |
|
|
|
|
| def video_id(url: str) -> str: |
| """Extract the ``video_id`` from a YouTube url. |
| |
| This function supports the following patterns: |
| |
| - :samp:`https://youtube.com/watch?v={video_id}` |
| - :samp:`https://youtube.com/embed/{video_id}` |
| - :samp:`https://youtu.be/{video_id}` |
| |
| :param str url: |
| A YouTube url containing a video id. |
| :rtype: str |
| :returns: |
| YouTube video id. |
| """ |
| return regex_search(r"(?:v=|\/)([0-9A-Za-z_-]{11}).*", url, group=1) |
|
|
|
|
| def playlist_id(url: str) -> str: |
| """Extract the ``playlist_id`` from a YouTube url. |
| |
| This function supports the following patterns: |
| |
| - :samp:`https://youtube.com/playlist?list={playlist_id}` |
| - :samp:`https://youtube.com/watch?v={video_id}&list={playlist_id}` |
| |
| :param str url: |
| A YouTube url containing a playlist id. |
| :rtype: str |
| :returns: |
| YouTube playlist id. |
| """ |
| parsed = urllib.parse.urlparse(url) |
| return parse_qs(parsed.query)['list'][0] |
|
|
|
|
| def channel_name(url: str) -> str: |
| """Extract the ``channel_name`` or ``channel_id`` from a YouTube url. |
| |
| This function supports the following patterns: |
| |
| - :samp:`https://youtube.com/c/{channel_name}/*` |
| - :samp:`https://youtube.com/channel/{channel_id}/* |
| - :samp:`https://youtube.com/u/{channel_name}/*` |
| - :samp:`https://youtube.com/user/{channel_id}/* |
| - :samp:`https://youtube.com/@{channel_id}/* |
| |
| :param str url: |
| A YouTube url containing a channel name. |
| :rtype: str |
| :returns: |
| YouTube channel name. |
| """ |
| patterns = [ |
| r"(?:\/(c)\/([%\d\w_\-]+)(\/.*)?)", |
| r"(?:\/(channel)\/([%\w\d_\-]+)(\/.*)?)", |
| r"(?:\/(u)\/([%\d\w_\-]+)(\/.*)?)", |
| r"(?:\/(user)\/([%\w\d_\-]+)(\/.*)?)", |
| r"(?:\/(\@)([%\d\w_\-\.]+)(\/.*)?)" |
| ] |
| for pattern in patterns: |
| regex = re.compile(pattern) |
| function_match = regex.search(url) |
| if function_match: |
| logger.debug("finished regex search, matched: %s", pattern) |
| uri_style = function_match.group(1) |
| uri_identifier = function_match.group(2) |
| return f'/{uri_style}/{uri_identifier}' if uri_style != '@' else f'/{uri_style}{uri_identifier}' |
|
|
| raise RegexMatchError( |
| caller="channel_name", pattern="patterns" |
| ) |
|
|
| def video_info_url(video_id: str, watch_url: str) -> str: |
| """Construct the video_info url. |
| |
| :param str video_id: |
| A YouTube video identifier. |
| :param str watch_url: |
| A YouTube watch url. |
| :rtype: str |
| :returns: |
| :samp:`https://youtube.com/get_video_info` with necessary GET |
| parameters. |
| """ |
| params = OrderedDict( |
| [ |
| ("video_id", video_id), |
| ("ps", "default"), |
| ("eurl", quote(watch_url)), |
| ("hl", "en_US"), |
| ("html5", "1"), |
| ("c", "TVHTML5"), |
| ("cver", "7.20201028"), |
| ] |
| ) |
| return _video_info_url(params) |
|
|
|
|
| def video_info_url_age_restricted(video_id: str, embed_html: str) -> str: |
| """Construct the video_info url. |
| |
| :param str video_id: |
| A YouTube video identifier. |
| :param str embed_html: |
| The html contents of the embed page (for age restricted videos). |
| :rtype: str |
| :returns: |
| :samp:`https://youtube.com/get_video_info` with necessary GET |
| parameters. |
| """ |
| try: |
| sts = regex_search(r'"sts"\s*:\s*(\d+)', embed_html, group=1) |
| except RegexMatchError: |
| sts = "" |
| |
| |
| eurl = f"https://youtube.googleapis.com/v/{video_id}" |
| params = OrderedDict( |
| [ |
| ("video_id", video_id), |
| ("eurl", eurl), |
| ("sts", sts), |
| ("html5", "1"), |
| ("c", "TVHTML5"), |
| ("cver", "7.20201028"), |
| ] |
| ) |
| return _video_info_url(params) |
|
|
|
|
| def _video_info_url(params: OrderedDict) -> str: |
| return f"https://www.youtube.com/get_video_info?{urlencode(params)}" |
|
|
|
|
| def js_url(html: str) -> str: |
| """Get the base JavaScript url. |
| |
| Construct the base JavaScript url, which contains the decipher |
| "transforms". |
| |
| :param str html: |
| The html contents of the watch page. |
| """ |
| try: |
| base_js = get_ytplayer_config(html)['assets']['js'] |
| except (KeyError, RegexMatchError): |
| base_js = get_ytplayer_js(html) |
| return f"https://youtube.com{base_js}" |
|
|
|
|
| def mime_type_codec(mime_type_codec: str) -> Tuple[str, List[str]]: |
| """Parse the type data. |
| |
| Breaks up the data in the ``type`` key of the manifest, which contains the |
| mime type and codecs serialized together, and splits them into separate |
| elements. |
| |
| **Example**: |
| |
| mime_type_codec('audio/webm; codecs="opus"') -> ('audio/webm', ['opus']) |
| |
| :param str mime_type_codec: |
| String containing mime type and codecs. |
| :rtype: tuple |
| :returns: |
| The mime type and a list of codecs. |
| |
| """ |
| pattern = r"(\w+\/\w+)\;\scodecs=\"([a-zA-Z-0-9.,\s]*)\"" |
| regex = re.compile(pattern) |
| results = regex.search(mime_type_codec) |
| if not results: |
| raise RegexMatchError(caller="mime_type_codec", pattern=pattern) |
| mime_type, codecs = results.groups() |
| return mime_type, [c.strip() for c in codecs.split(",")] |
|
|
|
|
| def get_ytplayer_js(html: str) -> Any: |
| """Get the YouTube player base JavaScript path. |
| |
| :param str html |
| The html contents of the watch page. |
| :rtype: str |
| :returns: |
| Path to YouTube's base.js file. |
| """ |
| js_url_patterns = [ |
| r"(/s/player/[\w\d]+/[\w\d_/.]+/base\.js)" |
| ] |
| for pattern in js_url_patterns: |
| regex = re.compile(pattern) |
| function_match = regex.search(html) |
| if function_match: |
| logger.debug("finished regex search, matched: %s", pattern) |
| yt_player_js = function_match.group(1) |
| logger.debug("player JS: " + yt_player_js) |
| return yt_player_js |
|
|
| raise RegexMatchError( |
| caller="get_ytplayer_js", pattern="js_url_patterns" |
| ) |
|
|
|
|
| def get_ytplayer_config(html: str) -> Any: |
| """Get the YouTube player configuration data from the watch html. |
| |
| Extract the ``ytplayer_config``, which is json data embedded within the |
| watch html and serves as the primary source of obtaining the stream |
| manifest data. |
| |
| :param str html: |
| The html contents of the watch page. |
| :rtype: str |
| :returns: |
| Substring of the html containing the encoded manifest data. |
| """ |
| logger.debug("finding initial function name") |
| config_patterns = [ |
| r"ytplayer\.config\s*=\s*", |
| r"ytInitialPlayerResponse\s*=\s*" |
| ] |
| for pattern in config_patterns: |
| |
| try: |
| return parse_for_object(html, pattern) |
| except HTMLParseError as e: |
| logger.debug(f'Pattern failed: {pattern}') |
| logger.debug(e) |
| continue |
|
|
| |
| |
| |
| |
| setconfig_patterns = [ |
| r"yt\.setConfig\(.*['\"]PLAYER_CONFIG['\"]:\s*" |
| ] |
| for pattern in setconfig_patterns: |
| |
| try: |
| return parse_for_object(html, pattern) |
| except HTMLParseError: |
| continue |
|
|
| raise RegexMatchError( |
| caller="get_ytplayer_config", pattern="config_patterns, setconfig_patterns" |
| ) |
|
|
|
|
| def get_ytcfg(html: str) -> str: |
| """Get the entirety of the ytcfg object. |
| |
| This is built over multiple pieces, so we have to find all matches and |
| combine the dicts together. |
| |
| :param str html: |
| The html contents of the watch page. |
| :rtype: str |
| :returns: |
| Substring of the html containing the encoded manifest data. |
| """ |
| ytcfg = {} |
| ytcfg_patterns = [ |
| r"ytcfg\s=\s", |
| r"ytcfg\.set\(" |
| ] |
| for pattern in ytcfg_patterns: |
| |
| try: |
| found_objects = parse_for_all_objects(html, pattern) |
| for obj in found_objects: |
| ytcfg.update(obj) |
| except HTMLParseError: |
| continue |
|
|
| if ytcfg: |
| return ytcfg |
|
|
| raise RegexMatchError( |
| caller="get_ytcfg", pattern="ytcfg_pattenrs" |
| ) |
|
|
|
|
| def apply_po_token(stream_manifest: Dict, vid_info: Dict, po_token: str) -> None: |
| """Apply the proof of origin token to the stream manifest |
| |
| :param dict stream_manifest: |
| Details of the media streams available. |
| :param str po_token: |
| Proof of Origin Token. |
| """ |
| logger.debug(f'Applying poToken') |
| for i, stream in enumerate(stream_manifest): |
| try: |
| url: str = stream["url"] |
| except KeyError: |
| live_stream = ( |
| vid_info.get("playabilityStatus", {}, ) |
| .get("liveStreamability") |
| ) |
| if live_stream: |
| raise LiveStreamError("UNKNOWN") |
|
|
| parsed_url = urlparse(url) |
|
|
| |
| query_params = parse_qs(urlparse(url).query) |
| query_params = { |
| k: v[0] for k, v in query_params.items() |
| } |
|
|
| query_params['pot'] = po_token |
|
|
| url = f'{parsed_url.scheme}://{parsed_url.netloc}{parsed_url.path}?{urlencode(query_params)}' |
|
|
| stream_manifest[i]["url"] = url |
|
|
|
|
| def apply_signature(stream_manifest: Dict, vid_info: Dict, js: str, url_js: str) -> None: |
| """Apply the decrypted signature to the stream manifest. |
| |
| :param dict stream_manifest: |
| Details of the media streams available. |
| :param str js: |
| The contents of the base.js asset file. |
| :param str url_js: |
| Full base.js url |
| |
| """ |
| cipher = Cipher(js=js, js_url=url_js) |
| discovered_n = dict() |
| for i, stream in enumerate(stream_manifest): |
| try: |
| url: str = stream["url"] |
| except KeyError: |
| live_stream = ( |
| vid_info.get("playabilityStatus", {}, ) |
| .get("liveStreamability") |
| ) |
| if live_stream: |
| raise LiveStreamError("UNKNOWN") |
|
|
| parsed_url = urlparse(url) |
|
|
| |
| query_params = parse_qs(urlparse(url).query) |
| query_params = { |
| k: v[0] for k, v in query_params.items() |
| } |
|
|
| |
| if "signature" in url or ( |
| "s" not in stream and ("&sig=" in url or "&lsig=" in url) |
| ): |
| |
| |
| |
| logger.debug("signature found, skip decipher") |
|
|
| else: |
| signature = cipher.get_signature(ciphered_signature=stream["s"]) |
|
|
| logger.debug( |
| "finished descrambling signature for itag=%s", stream["itag"] |
| ) |
|
|
| query_params['sig'] = signature |
|
|
| if 'n' in query_params.keys(): |
| |
| |
|
|
| initial_n = query_params['n'] |
| logger.debug(f'Parameter n is: {initial_n}') |
|
|
| |
| if initial_n not in discovered_n: |
| discovered_n[initial_n] = cipher.get_throttling(initial_n) |
| else: |
| logger.debug('Parameter n found skipping decryption') |
|
|
| new_n = discovered_n[initial_n] |
| query_params['n'] = new_n |
| logger.debug(f'Parameter n deciphered: {new_n}') |
|
|
| url = f'{parsed_url.scheme}://{parsed_url.netloc}{parsed_url.path}?{urlencode(query_params)}' |
|
|
| stream_manifest[i]["url"] = url |
|
|
|
|
| def apply_descrambler(stream_data: Dict) -> Optional[List[Dict]]: |
| """Apply various in-place transforms to YouTube's media stream data. |
| |
| Creates a ``list`` of dictionaries by string splitting on commas, then |
| taking each list item, parsing it as a query string, converting it to a |
| ``dict`` and unquoting the value. |
| |
| :param dict stream_data: |
| Dictionary containing query string encoded values. |
| |
| **Example**: |
| |
| >>> d = {'foo': 'bar=1&var=test,em=5&t=url%20encoded'} |
| >>> apply_descrambler(d, 'foo') |
| >>> print(d) |
| {'foo': [{'bar': '1', 'var': 'test'}, {'em': '5', 't': 'url encoded'}]} |
| |
| """ |
| if 'url' in stream_data: |
| return None |
|
|
| |
| formats: list[Dict] = [] |
| if 'formats' in stream_data.keys(): |
| formats.extend(stream_data['formats']) |
| if 'adaptiveFormats' in stream_data.keys(): |
| formats.extend(stream_data['adaptiveFormats']) |
|
|
| |
| for data in formats: |
| if 'url' not in data and 'signatureCipher' in data: |
| cipher_url = parse_qs(data['signatureCipher']) |
| data['url'] = cipher_url['url'][0] |
| data['s'] = cipher_url['s'][0] |
| data['is_sabr'] = False |
| elif 'url' not in data and 'signatureCipher' not in data: |
| data['url'] = stream_data['serverAbrStreamingUrl'] |
| data['is_sabr'] = True |
| data['is_otf'] = data.get('type') == 'FORMAT_STREAM_TYPE_OTF' |
|
|
| logger.debug("applying descrambler") |
| return formats |
|
|
|
|
| def initial_data(watch_html: str) -> dict: |
| """Extract the ytInitialData json from the watch_html page. |
| |
| This mostly contains metadata necessary for rendering the page on-load, |
| such as video information, copyright notices, etc. |
| |
| @param watch_html: Html of the watch page |
| @return: |
| """ |
| patterns = [ |
| r"window\[['\"]ytInitialData['\"]]\s*=\s*", |
| r"ytInitialData\s*=\s*" |
| ] |
| for pattern in patterns: |
| try: |
| return parse_for_object(watch_html, pattern) |
| except HTMLParseError: |
| pass |
|
|
| raise RegexMatchError(caller='initial_data', pattern='initial_data_pattern') |
|
|
|
|
| def initial_player_response(watch_html: str) -> str: |
| """Extract the ytInitialPlayerResponse json from the watch_html page. |
| |
| This mostly contains metadata necessary for rendering the page on-load, |
| such as video information, copyright notices, etc. |
| |
| @param watch_html: Html of the watch page |
| @return: |
| """ |
| patterns = [ |
| r"window\[['\"]ytInitialPlayerResponse['\"]]\s*=\s*", |
| r"ytInitialPlayerResponse\s*=\s*" |
| ] |
| for pattern in patterns: |
| try: |
| return parse_for_object(watch_html, pattern) |
| except HTMLParseError: |
| pass |
|
|
| raise RegexMatchError( |
| caller='initial_player_response', |
| pattern='initial_player_response_pattern' |
| ) |
|
|
|
|
| def metadata(initial_data) -> Optional[YouTubeMetadata]: |
| """Get the informational metadata for the video. |
| |
| e.g.: |
| [ |
| { |
| 'Song': '강남스타일(Gangnam Style)', |
| 'Artist': 'PSY', |
| 'Album': 'PSY SIX RULES Pt.1', |
| 'Licensed to YouTube by': 'YG Entertainment Inc. [...]' |
| } |
| ] |
| |
| :rtype: YouTubeMetadata |
| """ |
| try: |
| metadata_rows: List = initial_data["contents"]["twoColumnWatchNextResults"][ |
| "results"]["results"]["contents"][1]["videoSecondaryInfoRenderer"][ |
| "metadataRowContainer"]["metadataRowContainerRenderer"]["rows"] |
| except (KeyError, IndexError): |
| |
| return YouTubeMetadata([]) |
|
|
| |
| |
| metadata_rows = filter( |
| lambda x: "metadataRowRenderer" in x.keys(), |
| metadata_rows |
| ) |
|
|
| |
| |
| metadata_rows = [x["metadataRowRenderer"] for x in metadata_rows] |
|
|
| return YouTubeMetadata(metadata_rows) |
|
|