| import os |
| import pathlib |
| import shutil |
| import sys |
| import tempfile |
| import time |
| import urllib.request |
| from urllib.parse import urlparse, parse_qs, unquote |
|
|
| from huggingface_hub import hf_hub_download, HfFileSystem |
|
|
| CHUNK_SIZE = 4 * 4 * 100 * 1024 |
| USER_AGENT = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3' |
|
|
|
|
| def download_model_from_huggingface(output_dir: pathlib.Path | str, repo: str, filename: str): |
| download_file_from_huggingface(output_dir, repo, filename) |
|
|
|
|
| def download_file_from_huggingface(output_dir: pathlib.Path | str, repo: str, filename: str): |
| if isinstance(output_dir, str): |
| output_dir = pathlib.Path(output_dir) |
|
|
| if check_file_exists_on_huggingface(output_dir, repo, filename): |
| return |
|
|
| hf_hub_download( |
| repo_id=repo, |
| filename=filename, |
| local_dir=f'{output_dir}', |
| cache_dir=f'{output_dir}' |
| ) |
|
|
|
|
| def check_file_exists_on_huggingface(output_dir: pathlib.Path | str, repo: str, file: str): |
| fs = HfFileSystem() |
| remote_files = fs.ls(f'{repo}/{file}') |
| if not remote_files: |
| return False |
|
|
| if isinstance(output_dir, str): |
| output_dir = pathlib.Path(output_dir) |
|
|
| local_file = output_dir / file |
| if not local_file.exists(): |
| return False |
|
|
| remote_file = remote_files[0] |
| remote_file_size = remote_file.get('size') |
| local_file_size = local_file.stat().st_size |
| if remote_file_size == local_file_size: |
| return True |
| return False |
|
|
|
|
| def download_lora_from_huggingface(base_dir: pathlib.Path | str, repo: str, filename: str): |
| download_file_from_huggingface(base_dir, repo, filename) |
|
|
|
|
| def download_civitai_file(url: str, output_path: str, token: str = ''): |
| headers = { |
| 'Authorization': f'Bearer {token}', |
| 'User-Agent': USER_AGENT, |
| } |
|
|
| |
| class NoRedirection(urllib.request.HTTPErrorProcessor): |
| def http_response(self, request, response): |
| return response |
|
|
| https_response = http_response |
|
|
| request = urllib.request.Request(url, headers=headers) |
| opener = urllib.request.build_opener(NoRedirection) |
| response = opener.open(request) |
|
|
| if response.status in [301, 302, 303, 307, 308]: |
| redirect_url = response.getheader('Location') |
|
|
| |
| parsed_url = urlparse(redirect_url) |
| query_params = parse_qs(parsed_url.query) |
| content_disposition = query_params.get('response-content-disposition', [None])[0] |
|
|
| if content_disposition: |
| filename = unquote(content_disposition.split('filename=')[1].strip('"')) |
| else: |
| raise Exception('Unable to determine filename') |
|
|
| response = urllib.request.urlopen(redirect_url) |
| elif response.status == 404: |
| raise Exception('File not found') |
| else: |
| raise Exception('No redirect found, something went wrong') |
|
|
| total_size = response.getheader('Content-Length') |
|
|
| if total_size is not None: |
| total_size = int(total_size) |
|
|
| |
|
|
| temporary_file = tempfile.NamedTemporaryFile(mode='wb', delete=False) |
| with temporary_file as f: |
| downloaded = 0 |
| start_time = time.time() |
|
|
| while True: |
| chunk_start_time = time.time() |
| buffer = response.read(CHUNK_SIZE) |
| chunk_end_time = time.time() |
|
|
| if not buffer: |
| break |
|
|
| downloaded += len(buffer) |
| f.write(buffer) |
| chunk_time = chunk_end_time - chunk_start_time |
|
|
| if chunk_time > 0: |
| speed = len(buffer) / chunk_time / (1024 ** 2) |
|
|
| if total_size is not None: |
| progress = downloaded / total_size |
| sys.stdout.write(f'\rDownloading: {filename} [{progress * 100:.2f}%] - {speed:.2f} MB/s') |
| sys.stdout.flush() |
|
|
| shutil.move(temporary_file.name, output_path) |
|
|
| end_time = time.time() |
| time_taken = end_time - start_time |
| hours, remainder = divmod(time_taken, 3600) |
| minutes, seconds = divmod(remainder, 60) |
|
|
| if hours > 0: |
| time_str = f'{int(hours)}h {int(minutes)}m {int(seconds)}s' |
| elif minutes > 0: |
| time_str = f'{int(minutes)}m {int(seconds)}s' |
| else: |
| time_str = f'{int(seconds)}s' |
|
|
| sys.stdout.write('\n') |
| print(f'Download completed. File saved as: {filename}') |
| print(f'Downloaded in {time_str}') |
|
|
|
|
| def download_lora_from_civitai(base_dir: pathlib.Path, filename: str, uri: str): |
| if not base_dir.exists(): |
| base_dir.mkdir(parents=True, exist_ok=True) |
| civitai_token = os.environ.get('CIVITAI_TOKEN', '0412348365e9a632d16687abf37e23a2') |
| output_file = base_dir / filename |
| download_civitai_file(uri, f'{output_file}', civitai_token) |
|
|