Buckets:
| # Copyright 2020 The Emscripten Authors. All rights reserved. | |
| # Emscripten is available under two separate licenses, the MIT license and the | |
| # University of Illinois/NCSA Open Source License. Both these licenses can be | |
| # found in the LICENSE file. | |
| """General purpose utility functions. | |
| The code in this file should mostly be not emscripten-specific, but general | |
| purpose enough to be useful in any command line utility. | |
| """ | |
| import functools | |
| import logging | |
| import os | |
| import shlex | |
| import shutil | |
| import stat | |
| import subprocess | |
| import sys | |
| from pathlib import Path | |
| from . import diagnostics | |
| __rootpath__ = os.path.abspath(os.path.dirname(os.path.dirname(__file__))) | |
| WINDOWS = sys.platform.startswith('win') | |
| MACOS = sys.platform == 'darwin' | |
| LINUX = sys.platform.startswith('linux') | |
| logger = logging.getLogger('utils') | |
| def run_process(cmd, check=True, input=None, *args, **kw): | |
| """Run a subprocess returning the exit code. | |
| By default this function will raise an exception on failure. Therefore this should only be | |
| used if you want to handle such failures. For most subprocesses, failures are not recoverable | |
| and should be fatal. In those cases the `check_call` wrapper should be preferred. | |
| """ | |
| # Flush standard streams otherwise the output of the subprocess may appear in the | |
| # output before messages that we have already written. | |
| sys.stdout.flush() | |
| sys.stderr.flush() | |
| kw.setdefault('text', True) | |
| kw.setdefault('encoding', 'utf-8') | |
| ret = subprocess.run(cmd, check=check, input=input, *args, **kw) | |
| debug_text = '%sexecuted %s' % ('successfully ' if check else '', shlex.join(cmd)) | |
| logger.debug(debug_text) | |
| return ret | |
| def exec(cmd): | |
| if WINDOWS: | |
| rtn = run_process(cmd, stdin=sys.stdin, check=False).returncode | |
| sys.exit(rtn) | |
| else: | |
| sys.stdout.flush() | |
| sys.stderr.flush() | |
| os.execvp(cmd[0], cmd) | |
| def exit_with_error(msg, *args): | |
| diagnostics.error(msg, *args) | |
| def path_from_root(*pathelems): | |
| return str(Path(__rootpath__, *pathelems)) | |
| def exe_path_from_root(*pathelems): | |
| return find_exe(path_from_root(*pathelems)) | |
| def suffix(name): | |
| """Return the file extension.""" | |
| return os.path.splitext(name)[1] | |
| def find_exe(*pathelems): | |
| path = os.path.join(*pathelems) | |
| if WINDOWS: | |
| # Should we use PATHEXT environment variable here? | |
| # For now, specify only enough extensions to find llvm / binaryen / emscripten executables. | |
| for ext in ['.exe', '.bat']: | |
| if os.path.isfile(path + ext): | |
| return path + ext | |
| return path | |
| def replace_suffix(filename, new_suffix): | |
| assert new_suffix[0] == '.' | |
| return os.path.splitext(filename)[0] + new_suffix | |
| def unsuffixed(name): | |
| """Return the filename without the extension. | |
| If there are multiple extensions this strips only the final one. | |
| """ | |
| return os.path.splitext(name)[0] | |
| def unsuffixed_basename(name): | |
| return os.path.basename(unsuffixed(name)) | |
| def get_file_suffix(filename): | |
| """Return the essential suffix of a filename, discarding Unix-style version numbers. | |
| For example for 'libz.so.1.2.8' returns '.so' | |
| """ | |
| while filename: | |
| filename, suffix = os.path.splitext(filename) | |
| if not suffix[1:].isdigit(): | |
| return suffix | |
| return '' | |
| def normalize_path(path): | |
| """Normalize path separators to UNIX-style forward slashes. | |
| This can be useful when converting paths to URLs or JS strings, | |
| or when trying to generate consistent output file contents | |
| across all platforms. In most cases UNIX-style separators work | |
| fine on windows. | |
| """ | |
| return path.replace('\\', '/').replace('//', '/') | |
| def safe_ensure_dirs(dirname): | |
| os.makedirs(dirname, exist_ok=True) | |
| def make_writable(filename): | |
| assert os.path.exists(filename) | |
| old_mode = stat.S_IMODE(os.stat(filename).st_mode) | |
| os.chmod(filename, old_mode | stat.S_IWUSR) | |
| def safe_copy(src, dst): | |
| logger.debug('copy: %s -> %s', src, dst) | |
| src = os.path.abspath(src) | |
| dst = os.path.abspath(dst) | |
| if os.path.isdir(dst): | |
| dst = os.path.join(dst, os.path.basename(src)) | |
| if src == dst: | |
| return | |
| if dst == os.devnull: | |
| return | |
| # Copies data and permission bits, but not other metadata such as timestamp | |
| shutil.copy(src, dst) | |
| # We always want the target file to be writable even when copying from | |
| # read-only source. (e.g. a read-only install of emscripten). | |
| make_writable(dst) | |
| def read_file(file_path): | |
| """Read from a file opened in text mode.""" | |
| with open(file_path, encoding='utf-8') as fh: | |
| return fh.read() | |
| def read_binary(file_path): | |
| """Read from a file opened in binary mode.""" | |
| with open(file_path, 'rb') as fh: | |
| return fh.read() | |
| def write_file(file_path, text, line_endings=None): | |
| """Write to a file opened in text mode.""" | |
| if line_endings and line_endings != os.linesep: | |
| text = text.replace('\n', line_endings) | |
| write_binary(file_path, text.encode('utf-8')) | |
| else: | |
| with open(file_path, 'w', encoding='utf-8') as fh: | |
| fh.write(text) | |
| def write_binary(file_path, contents): | |
| """Write to a file opened in binary mode.""" | |
| with open(file_path, 'wb') as fh: | |
| fh.write(contents) | |
| def delete_file(filename): | |
| """Delete a file (if it exists).""" | |
| if os.path.lexists(filename): | |
| os.remove(filename) | |
| def delete_dir(dirname): | |
| """Delete a directory (if it exists).""" | |
| if not os.path.exists(dirname): | |
| return | |
| shutil.rmtree(dirname) | |
| def delete_contents(dirname, exclude=None): | |
| """Delete the contents of a directory without removing the directory itself.""" | |
| if not os.path.exists(dirname): | |
| return | |
| for entry in os.listdir(dirname): | |
| if exclude and entry in exclude: | |
| continue | |
| entry = os.path.join(dirname, entry) | |
| if os.path.isdir(entry): | |
| delete_dir(entry) | |
| else: | |
| delete_file(entry) | |
| def get_num_cores(): | |
| # Prefer `os.process_cpu_count` when available (3.13 and above) since | |
| # it takes into account thread affinity. | |
| # Fall back to `os.sched_getaffinity` where available and finally | |
| # `os.cpu_count`, which should work everywhere. | |
| if hasattr(os, 'process_cpu_count'): | |
| cpu_count = os.process_cpu_count() | |
| elif hasattr(os, 'sched_getaffinity'): | |
| cpu_count = len(os.sched_getaffinity(0)) | |
| else: | |
| cpu_count = os.cpu_count() | |
| return int(os.environ.get('EMCC_CORES', cpu_count)) | |
| memoize = functools.cache | |
| # TODO: Move this back to shared.py once importing that file becoming side effect free (i.e. it no longer requires a config). | |
| def set_version_globals(): | |
| global EMSCRIPTEN_VERSION, EMSCRIPTEN_VERSION_MAJOR, EMSCRIPTEN_VERSION_MINOR, EMSCRIPTEN_VERSION_TINY | |
| filename = path_from_root('emscripten-version.txt') | |
| EMSCRIPTEN_VERSION = read_file(filename).strip().strip('"') | |
| parts = [int(x) for x in EMSCRIPTEN_VERSION.split('-')[0].split('.')] | |
| EMSCRIPTEN_VERSION_MAJOR, EMSCRIPTEN_VERSION_MINOR, EMSCRIPTEN_VERSION_TINY = parts | |
Xet Storage Details
- Size:
- 6.8 kB
- Xet hash:
- 02de3558e996e44a2ca0d3983afeb4d5b2a2935180ee8f61f7b4bf46de4ac0ac
·
Xet efficiently stores files, intelligently splitting them into unique chunks and accelerating uploads and downloads. More info.