| | import gzip |
| | import io |
| | import json |
| | import os |
| | import pytest |
| | from unittest import mock |
| |
|
| | from pytube import helpers |
| | from pytube.exceptions import RegexMatchError |
| | from pytube.helpers import cache, create_mock_html_json, deprecated, setup_logger |
| | from pytube.helpers import target_directory, uniqueify |
| |
|
| |
|
| | def test_regex_search_no_match(): |
| | with pytest.raises(RegexMatchError): |
| | helpers.regex_search("^a$", "", group=0) |
| |
|
| |
|
| | def test_regex_search(): |
| | assert helpers.regex_search("^a$", "a", group=0) == "a" |
| |
|
| |
|
| | def test_safe_filename(): |
| | """Unsafe characters get stripped from generated filename""" |
| | assert helpers.safe_filename("abc1245$$") == "abc1245" |
| | assert helpers.safe_filename("abc##") == "abc" |
| |
|
| |
|
| | @mock.patch("warnings.warn") |
| | def test_deprecated(warn): |
| | @deprecated("oh no") |
| | def deprecated_function(): |
| | return None |
| |
|
| | deprecated_function() |
| | warn.assert_called_with( |
| | "Call to deprecated function deprecated_function (oh no).", |
| | category=DeprecationWarning, |
| | stacklevel=2, |
| | ) |
| |
|
| |
|
| | def test_cache(): |
| | call_count = 0 |
| |
|
| | @cache |
| | def cached_func(stuff): |
| | nonlocal call_count |
| | call_count += 1 |
| | return stuff |
| |
|
| | cached_func("hi") |
| | cached_func("hi") |
| | cached_func("bye") |
| | cached_func("bye") |
| |
|
| | assert call_count == 2 |
| |
|
| |
|
| | @mock.patch("os.path.isabs", return_value=False) |
| | @mock.patch("os.getcwd", return_value="/cwd") |
| | @mock.patch("os.makedirs") |
| | def test_target_directory_with_relative_path(_, __, makedirs): |
| | assert target_directory("test") == os.path.join("/cwd", "test") |
| | makedirs.assert_called() |
| |
|
| |
|
| | @mock.patch("os.path.isabs", return_value=True) |
| | @mock.patch("os.makedirs") |
| | def test_target_directory_with_absolute_path(_, makedirs): |
| | assert target_directory("/test") == "/test" |
| | makedirs.assert_called() |
| |
|
| |
|
| | @mock.patch("os.getcwd", return_value="/cwd") |
| | @mock.patch("os.makedirs") |
| | def test_target_directory_with_no_path(_, makedirs): |
| | assert target_directory() == "/cwd" |
| | makedirs.assert_called() |
| |
|
| |
|
| | @mock.patch("pytube.helpers.logging") |
| | def test_setup_logger(logging): |
| | |
| | logger = logging.getLogger.return_value |
| | |
| | setup_logger(20) |
| | |
| | logging.getLogger.assert_called_with("pytube") |
| | logger.addHandler.assert_called() |
| | logger.setLevel.assert_called_with(20) |
| |
|
| |
|
| | @mock.patch('builtins.open', new_callable=mock.mock_open) |
| | @mock.patch('pytube.request.urlopen') |
| | def test_create_mock_html_json(mock_url_open, mock_open): |
| | video_id = '2lAe1cqCOXo' |
| | gzip_html_filename = 'yt-video-%s-html.json.gz' % video_id |
| |
|
| | |
| | pytube_dir_path = os.path.abspath( |
| | os.path.join( |
| | os.path.dirname(__file__), |
| | os.path.pardir |
| | ) |
| | ) |
| | pytube_mocks_path = os.path.join(pytube_dir_path, 'tests', 'mocks') |
| | gzip_html_filepath = os.path.join(pytube_mocks_path, gzip_html_filename) |
| |
|
| | |
| | mock_url_open_object = mock.Mock() |
| |
|
| | |
| | |
| | |
| | |
| | |
| | mock_url_open_object.read.side_effect = [ |
| | (b'yt.setConfig({"PLAYER_CONFIG":{"args":[]}});ytInitialData = {};ytInitialPlayerResponse = {};' |
| | b'"jsUrl":"/s/player/13371337/player_ias.vflset/en_US/base.js"'), |
| | b'embed_html', |
| | b'watch_html', |
| | b'{\"responseContext\":{}}', |
| | ] |
| | mock_url_open.return_value = mock_url_open_object |
| |
|
| | |
| | result_data = create_mock_html_json(video_id) |
| |
|
| | |
| | mock_open.assert_called_once_with(gzip_html_filepath, 'wb') |
| |
|
| | |
| | gzip_file = io.BytesIO() |
| | with gzip.GzipFile( |
| | filename=gzip_html_filename, |
| | fileobj=gzip_file, |
| | mode='wb' |
| | ) as f: |
| | f.write(json.dumps(result_data).encode('utf-8')) |
| | gzip_data = gzip_file.getvalue() |
| |
|
| | file_handle = mock_open.return_value.__enter__.return_value |
| |
|
| | |
| | |
| | |
| | full_content = b'' |
| | for call in file_handle.write.call_args_list: |
| | args, kwargs = call |
| | full_content += b''.join(args) |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | assert gzip_data[10:] == full_content[10:] |
| |
|
| |
|
| | def test_uniqueify(): |
| | non_unique_list = [1, 2, 3, 3, 4, 5] |
| | expected = [1, 2, 3, 4, 5] |
| | result = uniqueify(non_unique_list) |
| | assert result == expected |
| |
|