| | |
| |
|
| | |
| | |
| | |
| |
|
| | |
| |
|
| | |
| | |
| | |
| | |
| | |
| |
|
| | import logging |
| | import zipfile |
| | import tempfile |
| | import sys |
| | import os |
| | import stat |
| | import time |
| | from typing import Any, Callable |
| |
|
| |
|
| | RENAME_RETRY_COUNT = 100 |
| | RENAME_RETRY_DELAY = 0.1 |
| |
|
| | logging.basicConfig(level=logging.WARNING, format="%(message)s") |
| | logger = logging.getLogger("install_package") |
| |
|
| |
|
| | def remove_directory_item(path): |
| | if os.path.islink(path) or os.path.isfile(path): |
| | try: |
| | os.remove(path) |
| | except PermissionError: |
| | |
| | os.chmod(path, stat.S_IRWXU) |
| | os.remove(path) |
| | else: |
| | |
| | clean_out_folder = False |
| | try: |
| | |
| | |
| | os.chmod(path, stat.S_IRWXU) |
| | os.rmdir(path) |
| | except OSError: |
| | clean_out_folder = True |
| |
|
| | if clean_out_folder: |
| | |
| | names = os.listdir(path) |
| | for name in names: |
| | fullname = os.path.join(path, name) |
| | remove_directory_item(fullname) |
| | |
| | os.rmdir(path) |
| |
|
| |
|
| | class StagingDirectory: |
| | def __init__(self, staging_path): |
| | self.staging_path = staging_path |
| | self.temp_folder_path = None |
| | os.makedirs(staging_path, exist_ok=True) |
| |
|
| | def __enter__(self): |
| | self.temp_folder_path = tempfile.mkdtemp(prefix="ver-", dir=self.staging_path) |
| | return self |
| |
|
| | def get_temp_folder_path(self): |
| | return self.temp_folder_path |
| |
|
| | |
| | def promote_and_rename(self, folder_name): |
| | abs_dst_folder_name = os.path.join(self.staging_path, folder_name) |
| | os.rename(self.temp_folder_path, abs_dst_folder_name) |
| |
|
| | def __exit__(self, type, value, traceback): |
| | |
| | path = self.temp_folder_path |
| | if os.path.isdir(path): |
| | remove_directory_item(path) |
| |
|
| |
|
| | def rename_folder(staging_dir: StagingDirectory, folder_name: str): |
| | try: |
| | staging_dir.promote_and_rename(folder_name) |
| | except OSError as exc: |
| | |
| | |
| | abs_dst_folder_name = os.path.join(staging_dir.staging_path, folder_name) |
| | if os.path.exists(abs_dst_folder_name): |
| | logger.warning( |
| | f"Directory {abs_dst_folder_name} already present, package installation already completed" |
| | ) |
| | else: |
| | raise |
| |
|
| |
|
| | def call_with_retry( |
| | op_name: str, func: Callable, retry_count: int = 3, retry_delay: float = 20 |
| | ) -> Any: |
| | retries_left = retry_count |
| | while True: |
| | try: |
| | return func() |
| | except (OSError, IOError) as exc: |
| | logger.warning(f"Failure while executing {op_name} [{str(exc)}]") |
| | if retries_left: |
| | retry_str = "retry" if retries_left == 1 else "retries" |
| | logger.warning( |
| | f"Retrying after {retry_delay} seconds" |
| | f" ({retries_left} {retry_str} left) ..." |
| | ) |
| | time.sleep(retry_delay) |
| | else: |
| | logger.error("Maximum retries exceeded, giving up") |
| | raise |
| | retries_left -= 1 |
| |
|
| |
|
| | def rename_folder_with_retry(staging_dir: StagingDirectory, folder_name): |
| | dst_path = os.path.join(staging_dir.staging_path, folder_name) |
| | call_with_retry( |
| | f"rename {staging_dir.get_temp_folder_path()} -> {dst_path}", |
| | lambda: rename_folder(staging_dir, folder_name), |
| | RENAME_RETRY_COUNT, |
| | RENAME_RETRY_DELAY, |
| | ) |
| |
|
| |
|
| | def install_package(package_path, install_path): |
| | staging_path, version = os.path.split(install_path) |
| | with StagingDirectory(staging_path) as staging_dir: |
| | output_folder = staging_dir.get_temp_folder_path() |
| | with zipfile.ZipFile(package_path, allowZip64=True) as zip_file: |
| | zip_file.extractall(output_folder) |
| |
|
| | |
| | rename_folder_with_retry(staging_dir, version) |
| |
|
| | print(f"Package successfully installed to {install_path}") |
| |
|
| |
|
| | if __name__ == "__main__": |
| | install_package(sys.argv[1], sys.argv[2]) |
| |
|