| | """Implements a simple wrapper around urlopen.""" |
| | import http.client |
| | import json |
| | import logging |
| | import re |
| | import socket |
| | from functools import lru_cache |
| | from urllib import parse |
| | from urllib.error import URLError |
| | from urllib.request import Request, urlopen |
| |
|
| | from pytubefix.exceptions import RegexMatchError, MaxRetriesExceeded |
| | from pytubefix.helpers import regex_search |
| |
|
| | logger = logging.getLogger(__name__) |
| | default_range_size = 9437184 |
| |
|
| |
|
| | def _execute_request( |
| | url, |
| | method=None, |
| | headers=None, |
| | data=None, |
| | timeout=socket._GLOBAL_DEFAULT_TIMEOUT |
| | ): |
| | base_headers = {"User-Agent": "Mozilla/5.0", "accept-language": "en-US,en"} |
| | if headers: |
| | base_headers.update(headers) |
| | if data and not isinstance(data, bytes): |
| | data = bytes(json.dumps(data), encoding="utf-8") |
| | if url.lower().startswith("http"): |
| | request = Request(url, headers=base_headers, method=method, data=data) |
| | else: |
| | raise ValueError("Invalid URL") |
| | return urlopen(request, timeout=timeout) |
| |
|
| |
|
| | def get(url, extra_headers=None, timeout=socket._GLOBAL_DEFAULT_TIMEOUT): |
| | """Send an http GET request. |
| | |
| | :param str url: |
| | The URL to perform the GET request for. |
| | :param dict extra_headers: |
| | Extra headers to add to the request |
| | :rtype: str |
| | :returns: |
| | UTF-8 encoded string of response |
| | """ |
| | if extra_headers is None: |
| | extra_headers = {} |
| | response = _execute_request(url, headers=extra_headers, timeout=timeout) |
| | return response.read().decode("utf-8") |
| |
|
| |
|
| | def post(url, extra_headers=None, data=None, timeout=socket._GLOBAL_DEFAULT_TIMEOUT): |
| | """Send an http POST request. |
| | |
| | :param str url: |
| | The URL to perform the POST request for. |
| | :param dict extra_headers: |
| | Extra headers to add to the request |
| | :param dict data: |
| | The data to send on the POST request |
| | :rtype: str |
| | :returns: |
| | UTF-8 encoded string of response |
| | """ |
| | |
| | |
| | if extra_headers is None: |
| | extra_headers = {} |
| | if data is None: |
| | data = {} |
| | |
| | |
| | extra_headers.update({"Content-Type": "application/json"}) |
| | response = _execute_request( |
| | url, |
| | headers=extra_headers, |
| | data=data, |
| | timeout=timeout |
| | ) |
| | return response.read().decode("utf-8") |
| |
|
| |
|
| | def seq_stream( |
| | url, |
| | timeout=socket._GLOBAL_DEFAULT_TIMEOUT, |
| | max_retries=0): |
| |
|
| | """Read the response in sequence. |
| | :param str url: The URL to perform the GET request for. |
| | :rtype: Iterable[bytes] |
| | """ |
| | |
| | split_url = parse.urlsplit(url) |
| | base_url = f'{split_url.scheme}://{split_url.netloc}/{split_url.path}?' |
| |
|
| | querys = dict(parse.parse_qsl(split_url.query)) |
| |
|
| | |
| | |
| | querys['sq'] = 0 |
| | url = base_url + parse.urlencode(querys) |
| |
|
| | segment_data = b'' |
| | for chunk in stream(url, timeout=timeout, max_retries=max_retries): |
| | yield chunk |
| | segment_data += chunk |
| |
|
| | |
| | stream_info = segment_data.split(b'\r\n') |
| | segment_count_pattern = re.compile(b'Segment-Count: (\\d+)') |
| | for line in stream_info: |
| | match = segment_count_pattern.search(line) |
| | if match: |
| | segment_count = int(match.group(1).decode('utf-8')) |
| |
|
| | |
| | seq_num = 1 |
| | while seq_num <= segment_count: |
| | |
| | querys['sq'] = seq_num |
| | url = base_url + parse.urlencode(querys) |
| |
|
| | yield from stream(url, timeout=timeout, max_retries=max_retries) |
| | seq_num += 1 |
| | return |
| |
|
| |
|
| | |
| | def stream(url, |
| | timeout=socket._GLOBAL_DEFAULT_TIMEOUT, |
| | max_retries=0): |
| | """Read the response in chunks. |
| | :param str url: The URL to perform the GET request for. |
| | :rtype: Iterable[bytes] |
| | """ |
| | file_size: int = default_range_size |
| | downloaded = 0 |
| | while downloaded < file_size: |
| | stop_pos = min(downloaded + default_range_size, file_size) - 1 |
| | range_header = f"bytes={downloaded}-{stop_pos}" |
| | tries = 0 |
| |
|
| | |
| | while True: |
| | |
| | if tries >= 1 + max_retries: |
| | raise MaxRetriesExceeded() |
| |
|
| | |
| | try: |
| | response = _execute_request( |
| | f"{url}&range={downloaded}-{stop_pos}", |
| | method="GET", |
| | timeout=timeout |
| | ) |
| | except URLError as e: |
| | |
| | |
| | if not isinstance(e.reason, (socket.timeout, OSError)): |
| | raise |
| | except http.client.IncompleteRead: |
| | |
| | pass |
| | else: |
| | |
| | break |
| | tries += 1 |
| |
|
| | if file_size == default_range_size: |
| | try: |
| | resp = _execute_request( |
| | f"{url}&range=0-99999999999", |
| | method="GET", |
| | timeout=timeout |
| | ) |
| | content_range = resp.info()["Content-Length"] |
| | file_size = int(content_range) |
| | except (KeyError, IndexError, ValueError) as e: |
| | logger.error(e) |
| | while True: |
| | try: |
| | chunk = response.read() |
| | except StopIteration: |
| | return |
| | except http.client.IncompleteRead as e: |
| | chunk = e.partial |
| | if not chunk: |
| | break |
| |
|
| | if chunk: downloaded += len(chunk) |
| | yield chunk |
| | return |
| |
|
| |
|
| | @lru_cache() |
| | def filesize(url): |
| | """Fetch size in bytes of file at given URL |
| | |
| | :param str url: The URL to get the size of |
| | :returns: int: size in bytes of remote file |
| | """ |
| | return int(head(url)["content-length"]) |
| |
|
| |
|
| | @lru_cache() |
| | def seq_filesize(url): |
| | """Fetch size in bytes of file at given URL from sequential requests |
| | |
| | :param str url: The URL to get the size of |
| | :returns: int: size in bytes of remote file |
| | """ |
| | total_filesize = 0 |
| | |
| | split_url = parse.urlsplit(url) |
| | base_url = f'{split_url.scheme}://{split_url.netloc}/{split_url.path}?' |
| | querys = dict(parse.parse_qsl(split_url.query)) |
| |
|
| | |
| | |
| | querys['sq'] = 0 |
| | url = base_url + parse.urlencode(querys) |
| | response = _execute_request( |
| | url, method="GET" |
| | ) |
| |
|
| | response_value = response.read() |
| | |
| | total_filesize += len(response_value) |
| |
|
| | |
| | segment_count = 0 |
| | stream_info = response_value.split(b'\r\n') |
| | segment_regex = b'Segment-Count: (\\d+)' |
| | for line in stream_info: |
| | |
| | |
| | try: |
| | segment_count = int(regex_search(segment_regex, line, 1)) |
| | except RegexMatchError: |
| | pass |
| |
|
| | if segment_count == 0: |
| | raise RegexMatchError('seq_filesize', segment_regex) |
| |
|
| | |
| | seq_num = 1 |
| | while seq_num <= segment_count: |
| | |
| | querys['sq'] = seq_num |
| | url = base_url + parse.urlencode(querys) |
| |
|
| | total_filesize += int(head(url)['content-length']) |
| | seq_num += 1 |
| | return total_filesize |
| |
|
| |
|
| | def head(url): |
| | """Fetch headers returned http GET request. |
| | |
| | :param str url: |
| | The URL to perform the GET request for. |
| | :rtype: dict |
| | :returns: |
| | dictionary of lowercase headers |
| | """ |
| | response_headers = _execute_request(url, method="HEAD").info() |
| | return {k.lower(): v for k, v in response_headers.items()} |
| |
|