Spaces:
Paused
Paused
| import os | |
| import importlib | |
| import importlib.util | |
| import shutil | |
| import subprocess | |
| import sys | |
| import re | |
| import logging | |
| import pygit2 | |
| pygit2.option(pygit2.GIT_OPT_SET_OWNER_VALIDATION, 0) | |
| logging.getLogger("torch.distributed.nn").setLevel(logging.ERROR) # sshh... | |
| logging.getLogger("xformers").addFilter(lambda record: 'A matching Triton is not available' not in record.getMessage()) | |
| python = sys.executable | |
| default_command_live = (os.environ.get('LAUNCH_LIVE_OUTPUT') == "1") | |
| index_url = os.environ.get('INDEX_URL', "") | |
| modules_path = os.path.dirname(os.path.realpath(__file__)) | |
| script_path = os.path.dirname(modules_path) | |
| dir_repos = "repositories" | |
| def git_clone(url, dir, name, hash=None): | |
| try: | |
| try: | |
| repo = pygit2.Repository(dir) | |
| print(f'{name} exists.') | |
| except: | |
| if os.path.exists(dir): | |
| shutil.rmtree(dir, ignore_errors=True) | |
| os.makedirs(dir, exist_ok=True) | |
| repo = pygit2.clone_repository(url, dir) | |
| print(f'{name} cloned.') | |
| remote = repo.remotes['origin'] | |
| remote.fetch() | |
| commit = repo.get(hash) | |
| repo.checkout_tree(commit, strategy=pygit2.GIT_CHECKOUT_FORCE) | |
| print(f'{name} checkout finished.') | |
| except Exception as e: | |
| print(f'Git clone failed for {name}: {str(e)}') | |
| def repo_dir(name): | |
| return os.path.join(script_path, dir_repos, name) | |
| def is_installed(package): | |
| try: | |
| spec = importlib.util.find_spec(package) | |
| except ModuleNotFoundError: | |
| return False | |
| return spec is not None | |
| def run(command, desc=None, errdesc=None, custom_env=None, live: bool = default_command_live) -> str: | |
| if desc is not None: | |
| print(desc) | |
| run_kwargs = { | |
| "args": command, | |
| "shell": True, | |
| "env": os.environ if custom_env is None else custom_env, | |
| "encoding": 'utf8', | |
| "errors": 'ignore', | |
| } | |
| if not live: | |
| run_kwargs["stdout"] = run_kwargs["stderr"] = subprocess.PIPE | |
| result = subprocess.run(**run_kwargs) | |
| if result.returncode != 0: | |
| error_bits = [ | |
| f"{errdesc or 'Error running command'}.", | |
| f"Command: {command}", | |
| f"Error code: {result.returncode}", | |
| ] | |
| if result.stdout: | |
| error_bits.append(f"stdout: {result.stdout}") | |
| if result.stderr: | |
| error_bits.append(f"stderr: {result.stderr}") | |
| raise RuntimeError("\n".join(error_bits)) | |
| return (result.stdout or "") | |
| def run_pip(command, desc=None, live=default_command_live): | |
| index_url_line = f' --index-url {index_url}' if index_url != '' else '' | |
| return run(f'"{python}" -m pip {command} --prefer-binary{index_url_line}', desc=f"Installing {desc}", | |
| errdesc=f"Couldn't install {desc}", live=live) | |
| re_requirement = re.compile(r"\s*([-_a-zA-Z0-9]+)\s*(?:==\s*([-+_.a-zA-Z0-9]+))?\s*") | |
| def requirements_met(requirements_file): | |
| """ | |
| Does a simple parse of a requirements.txt file to determine if all rerqirements in it | |
| are already installed. Returns True if so, False if not installed or parsing fails. | |
| """ | |
| import importlib.metadata | |
| import packaging.version | |
| with open(requirements_file, "r", encoding="utf8") as file: | |
| for line in file: | |
| if line.strip() == "": | |
| continue | |
| m = re.match(re_requirement, line) | |
| if m is None: | |
| return False | |
| package = m.group(1).strip() | |
| version_required = (m.group(2) or "").strip() | |
| if version_required == "": | |
| continue | |
| try: | |
| version_installed = importlib.metadata.version(package) | |
| except Exception: | |
| return False | |
| if packaging.version.parse(version_required) != packaging.version.parse(version_installed): | |
| return False | |
| return True | |