# Copyright 2019 NVIDIA CORPORATION # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # http://www.apache.org/licenses/LICENSE-2.0 # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import logging import zipfile import tempfile import sys import os import stat import time from typing import Any, Callable RENAME_RETRY_COUNT = 100 RENAME_RETRY_DELAY = 0.1 logging.basicConfig(level=logging.WARNING, format="%(message)s") logger = logging.getLogger("install_package") def remove_directory_item(path): if os.path.islink(path) or os.path.isfile(path): try: os.remove(path) except PermissionError: # make sure we have access and try again: os.chmod(path, stat.S_IRWXU) os.remove(path) else: # try first to delete the dir because this will work for folder junctions, otherwise we would follow the junctions and cause destruction! clean_out_folder = False try: # make sure we have access preemptively - this is necessary because recursing into a directory without permissions # will only lead to heart ache os.chmod(path, stat.S_IRWXU) os.rmdir(path) except OSError: clean_out_folder = True if clean_out_folder: # we should make sure the directory is empty names = os.listdir(path) for name in names: fullname = os.path.join(path, name) remove_directory_item(fullname) # now try to again get rid of the folder - and not catch if it raises: os.rmdir(path) class StagingDirectory: def __init__(self, staging_path): self.staging_path = staging_path self.temp_folder_path = None os.makedirs(staging_path, exist_ok=True) def __enter__(self): self.temp_folder_path = tempfile.mkdtemp(prefix="ver-", dir=self.staging_path) return self def get_temp_folder_path(self): return self.temp_folder_path # this function renames the temp staging folder to folder_name, it is required that the parent path exists! def promote_and_rename(self, folder_name): abs_dst_folder_name = os.path.join(self.staging_path, folder_name) os.rename(self.temp_folder_path, abs_dst_folder_name) def __exit__(self, type, value, traceback): # Remove temp staging folder if it's still there (something went wrong): path = self.temp_folder_path if os.path.isdir(path): remove_directory_item(path) def rename_folder(staging_dir: StagingDirectory, folder_name: str): try: staging_dir.promote_and_rename(folder_name) except OSError as exc: # if we failed to rename because the folder now exists we can assume that another packman process # has managed to update the package before us - in all other cases we re-raise the exception abs_dst_folder_name = os.path.join(staging_dir.staging_path, folder_name) if os.path.exists(abs_dst_folder_name): logger.warning( f"Directory {abs_dst_folder_name} already present, package installation already completed" ) else: raise def call_with_retry( op_name: str, func: Callable, retry_count: int = 3, retry_delay: float = 20 ) -> Any: retries_left = retry_count while True: try: return func() except (OSError, IOError) as exc: logger.warning(f"Failure while executing {op_name} [{str(exc)}]") if retries_left: retry_str = "retry" if retries_left == 1 else "retries" logger.warning( f"Retrying after {retry_delay} seconds" f" ({retries_left} {retry_str} left) ..." ) time.sleep(retry_delay) else: logger.error("Maximum retries exceeded, giving up") raise retries_left -= 1 def rename_folder_with_retry(staging_dir: StagingDirectory, folder_name): dst_path = os.path.join(staging_dir.staging_path, folder_name) call_with_retry( f"rename {staging_dir.get_temp_folder_path()} -> {dst_path}", lambda: rename_folder(staging_dir, folder_name), RENAME_RETRY_COUNT, RENAME_RETRY_DELAY, ) def install_package(package_path, install_path): staging_path, version = os.path.split(install_path) with StagingDirectory(staging_path) as staging_dir: output_folder = staging_dir.get_temp_folder_path() with zipfile.ZipFile(package_path, allowZip64=True) as zip_file: zip_file.extractall(output_folder) # attempt the rename operation rename_folder_with_retry(staging_dir, version) print(f"Package successfully installed to {install_path}") if __name__ == "__main__": executable_paths = os.getenv("PATH") paths_list = executable_paths.split(os.path.pathsep) if executable_paths else [] target_path_np = os.path.normpath(sys.argv[2]) target_path_np_nc = os.path.normcase(target_path_np) for exec_path in paths_list: if os.path.normcase(os.path.normpath(exec_path)) == target_path_np_nc: raise RuntimeError(f"packman will not install to executable path '{exec_path}'") install_package(sys.argv[1], target_path_np)