| | """ |
| | This module contains all the logic needed to find the signature functions. |
| | |
| | YouTube's strategy to restrict downloading videos is to send a ciphered version |
| | of the signature to the client, along with the decryption algorithm obfuscated |
| | in JavaScript. For the clients to play the videos, JavaScript must take the |
| | ciphered version, cycle it through a series of "transform functions," and then |
| | signs the media URL with the output. |
| | |
| | This module is responsible for (1) finding these "transformations |
| | functions" (2) sends them to be interpreted by jsinterp.py |
| | """ |
| | import logging |
| | import re |
| |
|
| | from pytubefix.exceptions import RegexMatchError, InterpretationError |
| | from pytubefix.jsinterp import JSInterpreter, extract_player_js_global_var |
| |
|
| | logger = logging.getLogger(__name__) |
| |
|
| |
|
| | class Cipher: |
| | def __init__(self, js: str, js_url: str): |
| |
|
| | self.js_url = js_url |
| |
|
| | self.signature_function_name = get_initial_function_name(js, js_url) |
| | self.throttling_function_name = get_throttling_function_name(js, js_url) |
| |
|
| | self.calculated_n = None |
| |
|
| | self.js_interpreter = JSInterpreter(js) |
| |
|
| | def get_throttling(self, n: str): |
| | """Interpret the function that throttles download speed. |
| | :param str n: |
| | Contains the parameter that must be transformed. |
| | :rtype: str |
| | :returns: |
| | Returns the transformed value "n". |
| | """ |
| | try: |
| | return self.js_interpreter.call_function(self.throttling_function_name, n) |
| | except: |
| | raise InterpretationError(js_url=self.js_url) |
| |
|
| | def get_signature(self, ciphered_signature: str) -> str: |
| | """interprets the function that signs the streams. |
| | The lack of this signature generates the 403 forbidden error. |
| | :param str ciphered_signature: |
| | Contains the signature that must be transformed. |
| | :rtype: str |
| | :returns: |
| | Returns the correct stream signature. |
| | """ |
| | try: |
| | return self.js_interpreter.call_function(self.signature_function_name, ciphered_signature) |
| | except: |
| | raise InterpretationError(js_url=self.js_url) |
| |
|
| |
|
| | def get_initial_function_name(js: str, js_url: str) -> str: |
| | """Extract the name of the function responsible for computing the signature. |
| | :param str js: |
| | The contents of the base.js asset file. |
| | :param str js_url: |
| | Full base.js url |
| | :rtype: str |
| | :returns: |
| | Function name from regex match |
| | """ |
| |
|
| | function_patterns = [ |
| | r'(?P<sig>[a-zA-Z0-9_$]+)\s*=\s*function\(\s*(?P<arg>[a-zA-Z0-9_$]+)\s*\)\s*{\s*(?P=arg)\s*=\s*(?P=arg)\.split\(\s*[a-zA-Z0-9_\$\"\[\]]+\s*\)\s*;\s*[^}]+;\s*return\s+(?P=arg)\.join\(\s*[a-zA-Z0-9_\$\"\[\]]+\s*\)', |
| | r'(?:\b|[^a-zA-Z0-9_$])(?P<sig>[a-zA-Z0-9_$]{2,})\s*=\s*function\(\s*a\s*\)\s*{\s*a\s*=\s*a\.split\(\s*""\s*\)(?:;[a-zA-Z0-9_$]{2}\.[a-zA-Z0-9_$]{2}\(a,\d+\))?', |
| | r'\b(?P<var>[a-zA-Z0-9_$]+)&&\((?P=var)=(?P<sig>[a-zA-Z0-9_$]{2,})\(decodeURIComponent\((?P=var)\)\)', |
| | |
| | r'\b[cs]\s*&&\s*[adf]\.set\([^,]+\s*,\s*encodeURIComponent\s*\(\s*(?P<sig>[a-zA-Z0-9$]+)\(', |
| | r'\b[a-zA-Z0-9]+\s*&&\s*[a-zA-Z0-9]+\.set\([^,]+\s*,\s*encodeURIComponent\s*\(\s*(?P<sig>[a-zA-Z0-9$]+)\(', |
| | r'\bm=(?P<sig>[a-zA-Z0-9$]{2,})\(decodeURIComponent\(h\.s\)\)', |
| | |
| | r'("|\')signature\1\s*,\s*(?P<sig>[a-zA-Z0-9$]+)\(', |
| | r'\.sig\|\|(?P<sig>[a-zA-Z0-9$]+)\(', |
| | r'yt\.akamaized\.net/\)\s*\|\|\s*.*?\s*[cs]\s*&&\s*[adf]\.set\([^,]+\s*,\s*(?:encodeURIComponent\s*\()?\s*(?P<sig>[a-zA-Z0-9$]+)\(', |
| | r'\b[cs]\s*&&\s*[adf]\.set\([^,]+\s*,\s*(?P<sig>[a-zA-Z0-9$]+)\(', |
| | r'\bc\s*&&\s*[a-zA-Z0-9]+\.set\([^,]+\s*,\s*\([^)]*\)\s*\(\s*(?P<sig>[a-zA-Z0-9$]+)\(' |
| | ] |
| | logger.debug("looking for signature cipher name") |
| | for pattern in function_patterns: |
| | regex = re.compile(pattern) |
| | function_match = regex.search(js) |
| | if function_match: |
| | sig = function_match.group('sig') |
| | logger.debug("finished regex search, matched: %s", pattern) |
| | logger.debug(f'Signature cipher function name: {sig}') |
| | return sig |
| |
|
| | raise RegexMatchError( |
| | caller="get_initial_function_name", pattern=f"multiple in {js_url}" |
| | ) |
| |
|
| |
|
| | def get_throttling_function_name(js: str, js_url: str) -> str: |
| | """Extract the name of the function that computes the throttling parameter. |
| | |
| | :param str js: |
| | The contents of the base.js asset file. |
| | :param str js_url: |
| | Full base.js url |
| | :rtype: str |
| | :returns: |
| | The name of the function used to compute the throttling parameter. |
| | """ |
| |
|
| | logger.debug("looking for nsig name") |
| | try: |
| | |
| | global_obj, varname, code = extract_player_js_global_var(js) |
| | if global_obj and varname and code: |
| | logger.debug(f"Global Obj name is: {varname}") |
| | global_obj = JSInterpreter(js).interpret_expression(code, {}, 100) |
| | logger.debug("Successfully interpreted global object") |
| | for k, v in enumerate(global_obj): |
| | if v.endswith('_w8_'): |
| | logger.debug(f"_w8_ found in index {k}") |
| | pattern = r'''(?xs) |
| | [;\n](?: |
| | (?P<f>function\s+)| |
| | (?:var\s+)? |
| | )(?P<funcname>[a-zA-Z0-9_$]+)\s*(?(f)|=\s*function\s*) |
| | \((?P<argname>[a-zA-Z0-9_$]+)\)\s*\{ |
| | (?:(?!\};(?![\]\)])).)+ |
| | \}\s*catch\(\s*[a-zA-Z0-9_$]+\s*\)\s* |
| | \{\s*return\s+%s\[%d\]\s*\+\s*(?P=argname)\s*\}\s*return\s+[^}]+\}[;\n] |
| | ''' % (re.escape(varname), k) |
| | func_name = re.search(pattern, js) |
| | if func_name: |
| | n_func = func_name.group("funcname") |
| | logger.debug(f"Nfunc name is: {n_func}") |
| | return n_func |
| | except: |
| | pass |
| |
|
| | pattern = r'''(?x) |
| | (?: |
| | \.get\("n"\)\)&&\(b=| |
| | (?: |
| | b=String\.fromCharCode\(110\)| |
| | (?P<str_idx>[a-zA-Z0-9_$.]+)&&\(b="nn"\[\+(?P=str_idx)\] |
| | ) |
| | (?: |
| | ,[a-zA-Z0-9_$]+\(a\))?,c=a\. |
| | (?: |
| | get\(b\)| |
| | [a-zA-Z0-9_$]+\[b\]\|\|null |
| | )\)&&\(c=| |
| | \b(?P<var>[a-zA-Z0-9_$]+)= |
| | )(?P<nfunc>[a-zA-Z0-9_$]+)(?:\[(?P<idx>\d+)\])?\([a-zA-Z]\) |
| | (?(var),[a-zA-Z0-9_$]+\.set\((?:"n+"|[a-zA-Z0-9_$]+)\,(?P=var)\))''' |
| |
|
| | logger.debug('Finding throttling function name') |
| |
|
| | regex = re.compile(pattern) |
| | function_match = regex.search(js) |
| | if function_match: |
| | logger.debug("finished regex search, matched: %s", pattern) |
| |
|
| | func = function_match.group('nfunc') |
| | idx = function_match.group('idx') |
| |
|
| | logger.debug(f'func is: {func}') |
| | logger.debug(f'idx is: {idx}') |
| |
|
| | logger.debug('Checking throttling function name') |
| | if idx: |
| | n_func_check_pattern = fr'var {re.escape(func)}\s*=\s*\[(.+?)];' |
| | n_func_found = re.search(n_func_check_pattern, js) |
| |
|
| | if n_func_found: |
| | throttling_function = n_func_found.group(1) |
| | logger.debug(f'Throttling function name is: {throttling_function}') |
| | return throttling_function |
| |
|
| | raise RegexMatchError( |
| | caller="get_throttling_function_name", pattern=f"{n_func_check_pattern} in {js_url}" |
| | ) |
| |
|
| | raise RegexMatchError( |
| | caller="get_throttling_function_name", pattern=f"{pattern} in {js_url}" |
| | ) |
| |
|