| import os
|
| import shutil
|
| import ssl
|
| import subprocess
|
| import urllib.request
|
| from functools import lru_cache
|
| from typing import List, Tuple
|
| from urllib.parse import urlparse
|
|
|
| from tqdm import tqdm
|
|
|
| from ffff import logger, process_manager, state_manager, wording
|
| from ffff.common_helper import is_macos
|
| from ffff.filesystem import get_file_size, is_file, remove_file
|
| from ffff.hash_helper import validate_hash
|
| from ffff.typing import DownloadSet
|
|
|
| if is_macos():
|
| ssl._create_default_https_context = ssl._create_unverified_context
|
|
|
| def conditional_download(download_directory_path : str, urls : List[str]) -> None:
|
| for url in urls:
|
| download_file_name = os.path.basename(urlparse(url).path)
|
| download_file_path = os.path.join(download_directory_path, download_file_name)
|
| initial_size = get_file_size(download_file_path)
|
| download_size = get_download_size(url)
|
|
|
| if initial_size < download_size:
|
| with tqdm(total = download_size, initial = initial_size, desc = wording.get('downloading'), unit = 'B', unit_scale = True, unit_divisor = 1024, ascii = ' =', disable = state_manager.get_item('log_level') in [ 'warn', 'error' ]) as progress:
|
| subprocess.Popen([ shutil.which('curl'), '--create-dirs', '--silent', '--insecure', '--location', '--continue-at', '-', '--output', download_file_path, url ])
|
| current_size = initial_size
|
|
|
| progress.set_postfix(file = download_file_name)
|
| while current_size < download_size:
|
| if is_file(download_file_path):
|
| current_size = get_file_size(download_file_path)
|
| progress.update(current_size - progress.n)
|
|
|
| @lru_cache(maxsize = None)
|
| def get_download_size(url : str) -> int:
|
| try:
|
| response = urllib.request.urlopen(url, timeout = 10)
|
| content_length = response.headers.get('Content-Length')
|
| return int(content_length)
|
| except (OSError, TypeError, ValueError):
|
| return 0
|
|
|
| def is_download_done(url : str, file_path : str) -> bool:
|
| if is_file(file_path):
|
| return get_download_size(url) == get_file_size(file_path)
|
| return False
|
|
|
| def conditional_download_hashes(download_directory_path : str, hashes : DownloadSet) -> bool:
|
| process_manager.check()
|
| if not state_manager.get_item('skip_download'):
|
| for index in hashes:
|
| hash_path = hashes.get(index).get('path')
|
| if not is_file(hash_path):
|
| hash_url = hashes.get(index).get('url')
|
| conditional_download(download_directory_path, [hash_url])
|
|
|
| process_manager.end()
|
| return True
|
|
|
| def conditional_download_sources(download_directory_path : str, sources : DownloadSet) -> bool:
|
| process_manager.check()
|
| if not state_manager.get_item('skip_download'):
|
| for index in sources:
|
| source_path = sources.get(index).get('path')
|
| if not is_file(source_path):
|
| source_url = sources.get(index).get('url')
|
| conditional_download(download_directory_path, [source_url])
|
|
|
| process_manager.end()
|
| return True
|
|
|
| def validate_hash_paths(hash_paths : List[str]) -> Tuple[List[str], List[str]]:
|
| return hash_paths, []
|
|
|
| def validate_source_paths(source_paths : List[str]) -> Tuple[List[str], List[str]]:
|
| return source_paths, [] |