|
|
""" |
|
|
File system utils. |
|
|
""" |
|
|
import collections |
|
|
import os |
|
|
import pickle |
|
|
import sys |
|
|
import errno |
|
|
import shutil |
|
|
import glob |
|
|
|
|
|
|
|
|
import codecs |
|
|
import hashlib |
|
|
import tarfile |
|
|
import fnmatch |
|
|
import tempfile |
|
|
from datetime import datetime |
|
|
from socket import gethostname |
|
|
import logging |
|
|
|
|
|
|
|
|
f_ext = os.path.splitext |
|
|
|
|
|
f_size = os.path.getsize |
|
|
|
|
|
is_file = os.path.isfile |
|
|
|
|
|
is_dir = os.path.isdir |
|
|
|
|
|
get_dir = os.path.dirname |
|
|
|
|
|
|
|
|
def host_name(): |
|
|
"Get host name, alias with ``socket.gethostname()``" |
|
|
return gethostname() |
|
|
|
|
|
|
|
|
def host_id(): |
|
|
""" |
|
|
Returns: first part of hostname up to '.' |
|
|
""" |
|
|
return host_name().split(".")[0] |
|
|
|
|
|
|
|
|
def utf_open(fname, mode): |
|
|
""" |
|
|
Wrapper for codecs.open |
|
|
""" |
|
|
return codecs.open(fname, mode=mode, encoding="utf-8") |
|
|
|
|
|
|
|
|
def is_sequence(obj): |
|
|
""" |
|
|
Returns: |
|
|
True if the sequence is a collections.Sequence and not a string. |
|
|
""" |
|
|
return isinstance(obj, collections.abc.Sequence) and not isinstance(obj, str) |
|
|
|
|
|
|
|
|
def pack_varargs(args): |
|
|
""" |
|
|
Pack *args or a single list arg as list |
|
|
|
|
|
def f(*args): |
|
|
arg_list = pack_varargs(args) |
|
|
# arg_list is now packed as a list |
|
|
""" |
|
|
assert isinstance(args, tuple), "please input the tuple `args` as in *args" |
|
|
if len(args) == 1 and is_sequence(args[0]): |
|
|
return args[0] |
|
|
else: |
|
|
return args |
|
|
|
|
|
|
|
|
def f_not_empty(*fpaths): |
|
|
""" |
|
|
Returns: |
|
|
True if and only if the file exists and file size > 0 |
|
|
if fpath is a dir, if and only if dir exists and has at least 1 file |
|
|
""" |
|
|
fpath = f_join(*fpaths) |
|
|
if not os.path.exists(fpath): |
|
|
return False |
|
|
|
|
|
if os.path.isdir(fpath): |
|
|
return len(os.listdir(fpath)) > 0 |
|
|
else: |
|
|
return os.path.getsize(fpath) > 0 |
|
|
|
|
|
|
|
|
def f_expand(fpath): |
|
|
return os.path.expandvars(os.path.expanduser(fpath)) |
|
|
|
|
|
|
|
|
def f_exists(*fpaths): |
|
|
return os.path.exists(f_join(*fpaths)) |
|
|
|
|
|
|
|
|
def f_join(*fpaths): |
|
|
""" |
|
|
join file paths and expand special symbols like `~` for home dir |
|
|
""" |
|
|
fpaths = pack_varargs(fpaths) |
|
|
fpath = f_expand(os.path.join(*fpaths)) |
|
|
if isinstance(fpath, str): |
|
|
fpath = fpath.strip() |
|
|
return fpath |
|
|
|
|
|
|
|
|
def f_listdir( |
|
|
*fpaths, |
|
|
filter_ext=None, |
|
|
filter=None, |
|
|
sort=True, |
|
|
full_path=False, |
|
|
nonexist_ok=True, |
|
|
recursive=False, |
|
|
): |
|
|
""" |
|
|
Args: |
|
|
full_path: True to return full paths to the dir contents |
|
|
filter: function that takes in file name and returns True to include |
|
|
nonexist_ok: True to return [] if the dir is non-existent, False to raise |
|
|
sort: sort the file names by alphabetical |
|
|
recursive: True to use os.walk to recursively list files. Note that `filter` |
|
|
will be applied to the relative path string to the root dir. |
|
|
e.g. filter will take "a/data1.txt" and "a/b/data3.txt" as input, instead of |
|
|
just the base file names "data1.txt" and "data3.txt". |
|
|
if False, will simply call os.listdir() |
|
|
""" |
|
|
assert not (filter_ext and filter), "filter_ext and filter are mutually exclusive" |
|
|
dir_path = f_join(*fpaths) |
|
|
if not os.path.exists(dir_path) and nonexist_ok: |
|
|
return [] |
|
|
if recursive: |
|
|
files = [ |
|
|
os.path.join(os.path.relpath(root, dir_path), file) |
|
|
for root, _, files in os.walk(dir_path) |
|
|
for file in files |
|
|
] |
|
|
else: |
|
|
files = os.listdir(dir_path) |
|
|
if filter is not None: |
|
|
files = [f for f in files if filter(f)] |
|
|
elif filter_ext is not None: |
|
|
files = [f for f in files if f.endswith(filter_ext)] |
|
|
if sort: |
|
|
files.sort() |
|
|
if full_path: |
|
|
return [os.path.join(dir_path, f) for f in files] |
|
|
else: |
|
|
return files |
|
|
|
|
|
|
|
|
def f_mkdir(*fpaths): |
|
|
""" |
|
|
Recursively creates all the subdirs |
|
|
If exist, do nothing. |
|
|
""" |
|
|
fpath = f_join(*fpaths) |
|
|
os.makedirs(fpath, exist_ok=True) |
|
|
return fpath |
|
|
|
|
|
|
|
|
def f_mkdir_in_path(*fpaths): |
|
|
""" |
|
|
fpath is a file, |
|
|
recursively creates all the parent dirs that lead to the file |
|
|
If exist, do nothing. |
|
|
""" |
|
|
os.makedirs(get_dir(f_join(*fpaths)), exist_ok=True) |
|
|
|
|
|
|
|
|
def last_part_in_path(fpath): |
|
|
""" |
|
|
https://stackoverflow.com/questions/3925096/how-to-get-only-the-last-part-of-a-path-in-python |
|
|
""" |
|
|
return os.path.basename(os.path.normpath(f_expand(fpath))) |
|
|
|
|
|
|
|
|
def is_abs_path(*fpath): |
|
|
return os.path.isabs(f_join(*fpath)) |
|
|
|
|
|
|
|
|
def is_relative_path(*fpath): |
|
|
return not is_abs_path(f_join(*fpath)) |
|
|
|
|
|
|
|
|
def f_time(*fpath): |
|
|
"File modification time" |
|
|
return str(os.path.getctime(f_join(*fpath))) |
|
|
|
|
|
|
|
|
def f_append_before_ext(fpath, suffix): |
|
|
""" |
|
|
Append a suffix to file name and retain its extension |
|
|
""" |
|
|
name, ext = f_ext(fpath) |
|
|
return name + suffix + ext |
|
|
|
|
|
|
|
|
def f_add_ext(fpath, ext): |
|
|
""" |
|
|
Append an extension if not already there |
|
|
Args: |
|
|
ext: will add a preceding `.` if doesn't exist |
|
|
""" |
|
|
if not ext.startswith("."): |
|
|
ext = "." + ext |
|
|
if fpath.endswith(ext): |
|
|
return fpath |
|
|
else: |
|
|
return fpath + ext |
|
|
|
|
|
|
|
|
def f_has_ext(fpath, ext): |
|
|
"Test if file path is a text file" |
|
|
_, actual_ext = f_ext(fpath) |
|
|
return actual_ext == "." + ext.lstrip(".") |
|
|
|
|
|
|
|
|
def f_glob(*fpath): |
|
|
return glob.glob(f_join(*fpath), recursive=True) |
|
|
|
|
|
|
|
|
def f_remove(*fpath, verbose=False, dry_run=False): |
|
|
""" |
|
|
If exist, remove. Supports both dir and file. Supports glob wildcard. |
|
|
""" |
|
|
assert isinstance(verbose, bool) |
|
|
fpath = f_join(fpath) |
|
|
if dry_run: |
|
|
print("Dry run, delete:", fpath) |
|
|
return |
|
|
for f in glob.glob(fpath): |
|
|
try: |
|
|
shutil.rmtree(f) |
|
|
except OSError as e: |
|
|
if e.errno == errno.ENOTDIR: |
|
|
try: |
|
|
os.remove(f) |
|
|
except: |
|
|
pass |
|
|
if verbose: |
|
|
print(f'Deleted "{fpath}"') |
|
|
|
|
|
|
|
|
def f_copy(fsrc, fdst, ignore=None, include=None, exists_ok=True, verbose=False): |
|
|
""" |
|
|
Supports both dir and file. Supports glob wildcard. |
|
|
""" |
|
|
fsrc, fdst = f_expand(fsrc), f_expand(fdst) |
|
|
for f in glob.glob(fsrc): |
|
|
try: |
|
|
f_copytree(f, fdst, ignore=ignore, include=include, exist_ok=exists_ok) |
|
|
except OSError as e: |
|
|
if e.errno == errno.ENOTDIR: |
|
|
shutil.copy(f, fdst) |
|
|
else: |
|
|
raise |
|
|
if verbose: |
|
|
print(f'Copied "{fsrc}" to "{fdst}"') |
|
|
|
|
|
|
|
|
def _f_copytree( |
|
|
src, |
|
|
dst, |
|
|
symlinks=False, |
|
|
ignore=None, |
|
|
exist_ok=True, |
|
|
copy_function=shutil.copy2, |
|
|
ignore_dangling_symlinks=False, |
|
|
): |
|
|
"""Copied from python standard lib shutil.copytree |
|
|
except that we allow exist_ok |
|
|
Use f_copytree as entry |
|
|
""" |
|
|
names = os.listdir(src) |
|
|
if ignore is not None: |
|
|
ignored_names = ignore(src, names) |
|
|
else: |
|
|
ignored_names = set() |
|
|
|
|
|
os.makedirs(dst, exist_ok=exist_ok) |
|
|
errors = [] |
|
|
for name in names: |
|
|
if name in ignored_names: |
|
|
continue |
|
|
srcname = os.path.join(src, name) |
|
|
dstname = os.path.join(dst, name) |
|
|
try: |
|
|
if os.path.islink(srcname): |
|
|
linkto = os.readlink(srcname) |
|
|
if symlinks: |
|
|
|
|
|
|
|
|
|
|
|
os.symlink(linkto, dstname) |
|
|
shutil.copystat(srcname, dstname, follow_symlinks=not symlinks) |
|
|
else: |
|
|
|
|
|
if not os.path.exists(linkto) and ignore_dangling_symlinks: |
|
|
continue |
|
|
|
|
|
if os.path.isdir(srcname): |
|
|
_f_copytree( |
|
|
srcname, dstname, symlinks, ignore, exist_ok, copy_function |
|
|
) |
|
|
else: |
|
|
copy_function(srcname, dstname) |
|
|
elif os.path.isdir(srcname): |
|
|
_f_copytree(srcname, dstname, symlinks, ignore, exist_ok, copy_function) |
|
|
else: |
|
|
|
|
|
copy_function(srcname, dstname) |
|
|
|
|
|
|
|
|
except shutil.Error as err: |
|
|
errors.extend(err.args[0]) |
|
|
except OSError as why: |
|
|
errors.append((srcname, dstname, str(why))) |
|
|
try: |
|
|
shutil.copystat(src, dst) |
|
|
except OSError as why: |
|
|
|
|
|
if getattr(why, "winerror", None) is None: |
|
|
errors.append((src, dst, str(why))) |
|
|
if errors: |
|
|
raise shutil.Error(errors) |
|
|
return dst |
|
|
|
|
|
|
|
|
def _include_patterns(*patterns): |
|
|
"""Factory function that can be used with copytree() ignore parameter. |
|
|
|
|
|
Arguments define a sequence of glob-style patterns |
|
|
that are used to specify what files to NOT ignore. |
|
|
Creates and returns a function that determines this for each directory |
|
|
in the file hierarchy rooted at the source directory when used with |
|
|
shutil.copytree(). |
|
|
""" |
|
|
|
|
|
def _ignore_patterns(path, names): |
|
|
keep = set( |
|
|
name for pattern in patterns for name in fnmatch.filter(names, pattern) |
|
|
) |
|
|
ignore = set( |
|
|
name |
|
|
for name in names |
|
|
if name not in keep and not os.path.isdir(os.path.join(path, name)) |
|
|
) |
|
|
return ignore |
|
|
|
|
|
return _ignore_patterns |
|
|
|
|
|
|
|
|
def f_copytree(fsrc, fdst, symlinks=False, ignore=None, include=None, exist_ok=True): |
|
|
fsrc, fdst = f_expand(fsrc), f_expand(fdst) |
|
|
assert (ignore is None) or ( |
|
|
include is None |
|
|
), "ignore= and include= are mutually exclusive" |
|
|
if ignore: |
|
|
ignore = shutil.ignore_patterns(*ignore) |
|
|
elif include: |
|
|
ignore = _include_patterns(*include) |
|
|
_f_copytree(fsrc, fdst, ignore=ignore, symlinks=symlinks, exist_ok=exist_ok) |
|
|
|
|
|
|
|
|
def f_move(fsrc, fdst): |
|
|
fsrc, fdst = f_expand(fsrc), f_expand(fdst) |
|
|
for f in glob.glob(fsrc): |
|
|
shutil.move(f, fdst) |
|
|
|
|
|
|
|
|
def f_split_path(fpath, normpath=True): |
|
|
""" |
|
|
Splits path into a list of its component folders |
|
|
|
|
|
Args: |
|
|
normpath: call os.path.normpath to remove redundant '/' and |
|
|
up-level references like ".." |
|
|
""" |
|
|
if normpath: |
|
|
fpath = os.path.normpath(fpath) |
|
|
allparts = [] |
|
|
while 1: |
|
|
parts = os.path.split(fpath) |
|
|
if parts[0] == fpath: |
|
|
allparts.insert(0, parts[0]) |
|
|
break |
|
|
elif parts[1] == fpath: |
|
|
allparts.insert(0, parts[1]) |
|
|
break |
|
|
else: |
|
|
fpath = parts[0] |
|
|
allparts.insert(0, parts[1]) |
|
|
return allparts |
|
|
|
|
|
|
|
|
def get_script_dir(): |
|
|
""" |
|
|
Returns: the dir of current script |
|
|
""" |
|
|
return os.path.dirname(os.path.realpath(sys.argv[0])) |
|
|
|
|
|
|
|
|
def get_script_file_name(): |
|
|
""" |
|
|
Returns: the dir of current script |
|
|
""" |
|
|
return os.path.basename(sys.argv[0]) |
|
|
|
|
|
|
|
|
def get_script_self_path(): |
|
|
""" |
|
|
Returns: the dir of current script |
|
|
""" |
|
|
return os.path.realpath(sys.argv[0]) |
|
|
|
|
|
|
|
|
def get_parent_dir(location, abspath=False): |
|
|
""" |
|
|
Args: |
|
|
location: current directory or file |
|
|
|
|
|
Returns: |
|
|
parent directory absolute or relative path |
|
|
""" |
|
|
_path = os.path.abspath if abspath else os.path.relpath |
|
|
return _path(f_join(location, os.pardir)) |
|
|
|
|
|
|
|
|
def md5_checksum(*fpath): |
|
|
""" |
|
|
File md5 signature |
|
|
""" |
|
|
hash_md5 = hashlib.md5() |
|
|
with open(f_join(*fpath), "rb") as f: |
|
|
for chunk in iter(lambda: f.read(65536), b""): |
|
|
hash_md5.update(chunk) |
|
|
return hash_md5.hexdigest() |
|
|
|
|
|
|
|
|
def create_tar(fsrc, output_tarball, include=None, ignore=None, compress_mode="gz"): |
|
|
""" |
|
|
Args: |
|
|
fsrc: source file or folder |
|
|
output_tarball: output tar file name |
|
|
compress_mode: "gz", "bz2", "xz" or "" (empty for uncompressed write) |
|
|
include: include pattern, will trigger copy to temp directory |
|
|
ignore: ignore pattern, will trigger copy to temp directory |
|
|
""" |
|
|
fsrc, output_tarball = f_expand(fsrc), f_expand(output_tarball) |
|
|
assert compress_mode in ["gz", "bz2", "xz", ""] |
|
|
src_base = os.path.basename(fsrc) |
|
|
|
|
|
tempdir = None |
|
|
if include or ignore: |
|
|
tempdir = tempfile.mkdtemp() |
|
|
tempdest = f_join(tempdir, src_base) |
|
|
f_copy(fsrc, tempdest, include=include, ignore=ignore) |
|
|
fsrc = tempdest |
|
|
|
|
|
with tarfile.open(output_tarball, "w:" + compress_mode) as tar: |
|
|
tar.add(fsrc, arcname=src_base) |
|
|
|
|
|
if tempdir: |
|
|
f_remove(tempdir) |
|
|
|
|
|
|
|
|
def extract_tar(source_tarball, output_dir=".", members=None): |
|
|
""" |
|
|
Args: |
|
|
source_tarball: extract members from archive |
|
|
output_dir: default to current working dir |
|
|
members: must be a subset of the list returned by getmembers() |
|
|
""" |
|
|
source_tarball, output_dir = f_expand(source_tarball), f_expand(output_dir) |
|
|
with tarfile.open(source_tarball, "r:*") as tar: |
|
|
tar.extractall(output_dir, members=members) |
|
|
|
|
|
|
|
|
def move_with_backup(*fpath, suffix=".bak"): |
|
|
""" |
|
|
Ensures that a path is not occupied. If there is a file, rename it by |
|
|
adding @suffix. Resursively backs up everything. |
|
|
|
|
|
Args: |
|
|
fpath: file path to clear |
|
|
suffix: Add to backed up files (default: {'.bak'}) |
|
|
""" |
|
|
fpath = str(f_join(*fpath)) |
|
|
if os.path.exists(fpath): |
|
|
move_with_backup(fpath + suffix) |
|
|
shutil.move(fpath, fpath + suffix) |
|
|
|
|
|
|
|
|
def insert_before_ext(name, insert): |
|
|
""" |
|
|
log.txt -> log.ep50.txt |
|
|
""" |
|
|
name, ext = os.path.splitext(name) |
|
|
return name + insert + ext |
|
|
|
|
|
|
|
|
def timestamp_file_name(fname): |
|
|
timestr = datetime.now().strftime("_%H-%M-%S_%m-%d-%y") |
|
|
return insert_before_ext(fname, timestr) |
|
|
|
|
|
|
|
|
def get_file_lock(*fpath, timeout: int = 15, logging_level="critical"): |
|
|
""" |
|
|
NFS-safe filesystem-backed lock. `pip install flufl.lock` |
|
|
https://flufllock.readthedocs.io/en/stable/apiref.html |
|
|
|
|
|
Args: |
|
|
fpath: should be a path on NFS so that every process can see it |
|
|
timeout: seconds |
|
|
""" |
|
|
from flufl.lock import Lock |
|
|
|
|
|
logging.getLogger("flufl.lock").setLevel(logging_level.upper()) |
|
|
return Lock(f_join(*fpath), lifetime=timeout) |
|
|
|
|
|
|
|
|
def load_pickle(*fpaths): |
|
|
with open(f_join(*fpaths), "rb") as fp: |
|
|
return pickle.load(fp) |
|
|
|
|
|
|
|
|
def dump_pickle(data, *fpaths): |
|
|
with open(f_join(*fpaths), "wb") as fp: |
|
|
pickle.dump(data, fp) |
|
|
|
|
|
|
|
|
def load_text(*fpaths, by_lines=False): |
|
|
with open(f_join(*fpaths), "r") as fp: |
|
|
if by_lines: |
|
|
return fp.readlines() |
|
|
else: |
|
|
return fp.read() |
|
|
|
|
|
|
|
|
def load_text_lines(*fpaths): |
|
|
return load_text(*fpaths, by_lines=True) |
|
|
|
|
|
|
|
|
def dump_text(s, *fpaths): |
|
|
with open(f_join(*fpaths), "w") as fp: |
|
|
fp.write(s) |
|
|
|
|
|
|
|
|
def dump_text_lines(lines: list[str], *fpaths, add_newline=True): |
|
|
with open(f_join(*fpaths), "w") as fp: |
|
|
for line in lines: |
|
|
print(line, file=fp, end="\n" if add_newline else "") |
|
|
|
|
|
|
|
|
|
|
|
pickle_load = load_pickle |
|
|
pickle_dump = dump_pickle |
|
|
text_load = load_text |
|
|
read_text = load_text |
|
|
read_text_lines = load_text_lines |
|
|
write_text = dump_text |
|
|
write_text_lines = dump_text_lines |
|
|
text_dump = dump_text |
|
|
|