Spaces:
Sleeping
Sleeping
File size: 5,776 Bytes
66c9c8a | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 | # Copyright 2019 NVIDIA CORPORATION
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import zipfile
import tempfile
import sys
import os
import stat
import time
from typing import Any, Callable
RENAME_RETRY_COUNT = 100
RENAME_RETRY_DELAY = 0.1
logging.basicConfig(level=logging.WARNING, format="%(message)s")
logger = logging.getLogger("install_package")
def remove_directory_item(path):
if os.path.islink(path) or os.path.isfile(path):
try:
os.remove(path)
except PermissionError:
# make sure we have access and try again:
os.chmod(path, stat.S_IRWXU)
os.remove(path)
else:
# try first to delete the dir because this will work for folder junctions, otherwise we would follow the junctions and cause destruction!
clean_out_folder = False
try:
# make sure we have access preemptively - this is necessary because recursing into a directory without permissions
# will only lead to heart ache
os.chmod(path, stat.S_IRWXU)
os.rmdir(path)
except OSError:
clean_out_folder = True
if clean_out_folder:
# we should make sure the directory is empty
names = os.listdir(path)
for name in names:
fullname = os.path.join(path, name)
remove_directory_item(fullname)
# now try to again get rid of the folder - and not catch if it raises:
os.rmdir(path)
class StagingDirectory:
def __init__(self, staging_path):
self.staging_path = staging_path
self.temp_folder_path = None
os.makedirs(staging_path, exist_ok=True)
def __enter__(self):
self.temp_folder_path = tempfile.mkdtemp(prefix="ver-", dir=self.staging_path)
return self
def get_temp_folder_path(self):
return self.temp_folder_path
# this function renames the temp staging folder to folder_name, it is required that the parent path exists!
def promote_and_rename(self, folder_name):
abs_dst_folder_name = os.path.join(self.staging_path, folder_name)
os.rename(self.temp_folder_path, abs_dst_folder_name)
def __exit__(self, type, value, traceback):
# Remove temp staging folder if it's still there (something went wrong):
path = self.temp_folder_path
if os.path.isdir(path):
remove_directory_item(path)
def rename_folder(staging_dir: StagingDirectory, folder_name: str):
try:
staging_dir.promote_and_rename(folder_name)
except OSError as exc:
# if we failed to rename because the folder now exists we can assume that another packman process
# has managed to update the package before us - in all other cases we re-raise the exception
abs_dst_folder_name = os.path.join(staging_dir.staging_path, folder_name)
if os.path.exists(abs_dst_folder_name):
logger.warning(
f"Directory {abs_dst_folder_name} already present, package installation already completed"
)
else:
raise
def call_with_retry(
op_name: str, func: Callable, retry_count: int = 3, retry_delay: float = 20
) -> Any:
retries_left = retry_count
while True:
try:
return func()
except (OSError, IOError) as exc:
logger.warning(f"Failure while executing {op_name} [{str(exc)}]")
if retries_left:
retry_str = "retry" if retries_left == 1 else "retries"
logger.warning(
f"Retrying after {retry_delay} seconds"
f" ({retries_left} {retry_str} left) ..."
)
time.sleep(retry_delay)
else:
logger.error("Maximum retries exceeded, giving up")
raise
retries_left -= 1
def rename_folder_with_retry(staging_dir: StagingDirectory, folder_name):
dst_path = os.path.join(staging_dir.staging_path, folder_name)
call_with_retry(
f"rename {staging_dir.get_temp_folder_path()} -> {dst_path}",
lambda: rename_folder(staging_dir, folder_name),
RENAME_RETRY_COUNT,
RENAME_RETRY_DELAY,
)
def install_package(package_path, install_path):
staging_path, version = os.path.split(install_path)
with StagingDirectory(staging_path) as staging_dir:
output_folder = staging_dir.get_temp_folder_path()
with zipfile.ZipFile(package_path, allowZip64=True) as zip_file:
zip_file.extractall(output_folder)
# attempt the rename operation
rename_folder_with_retry(staging_dir, version)
print(f"Package successfully installed to {install_path}")
if __name__ == "__main__":
executable_paths = os.getenv("PATH")
paths_list = executable_paths.split(os.path.pathsep) if executable_paths else []
target_path_np = os.path.normpath(sys.argv[2])
target_path_np_nc = os.path.normcase(target_path_np)
for exec_path in paths_list:
if os.path.normcase(os.path.normpath(exec_path)) == target_path_np_nc:
raise RuntimeError(f"packman will not install to executable path '{exec_path}'")
install_package(sys.argv[1], target_path_np)
|