zirobtc's picture
Upload folder using huggingface_hub
fbb20ff verified
import datetime
import glob
import importlib
import itertools
import os
import os.path as osp
import subprocess
import time
import numpy as np
class AverageMeter(object):
def __init__(self, avg=None, count=1):
self.reset()
if avg is not None:
self.val = avg
self.avg = avg
self.count = count
self.sum = avg * count
def __repr__(self) -> str:
return f"{self.avg: .4f}"
def reset(self):
self.val = 0
self.avg = 0
self.sum = 0
self.count = 0
def update(self, val, n=1):
if n > 0:
self.val = val
self.sum += val * n
self.count += n
self.avg = self.sum / self.count
def worker_init_fn(worker_id):
os.environ["worker_id"] = str(worker_id)
np.random.seed(np.random.get_state()[1][0] + worker_id * 7)
def find_last_version(folder, prefix="version_", cp="last"):
version_folders = glob.glob(f"{folder}/{prefix}*")
if cp is not None:
if cp == "last":
suffix = "last.ckpt"
elif cp == "best":
suffix = "*best*.ckpt"
elif cp.isdigit():
suffix = f"*{int(cp):07d}.ckpt"
else:
suffix = f"{cp}.ckpt"
version_folders = [
x for x in version_folders if len(glob.glob(f"{x}/**/{suffix}")) > 0
]
version_numbers = sorted(
[int(osp.basename(x)[len(prefix) :]) for x in version_folders]
)
if len(version_numbers) == 0:
return None
last_version = version_numbers[-1]
return last_version
def get_eta_str(cur_iter, total_iter, time_per_iter):
eta = time_per_iter * (total_iter - cur_iter - 1)
return convert_sec_to_time(eta)
def convert_sec_to_time(secs):
return str(datetime.timedelta(seconds=round(secs)))
def concat_lists(list_of_lists):
return list(itertools.chain.from_iterable(list_of_lists))
def find_consecutive_runs(x, min_len=1):
"""Find runs of consecutive items in an array."""
# ensure array
x = np.asanyarray(x)
if x.ndim != 1:
raise ValueError("only 1D array supported")
n = x.shape[0]
# handle empty array
if n == 0:
return np.array([]), np.array([]), np.array([])
else:
# find run starts
loc_run_start = np.empty(n, dtype=bool)
loc_run_start[0] = True
np.not_equal(x[:-1], x[1:] - 1, out=loc_run_start[1:])
run_starts = np.nonzero(loc_run_start)[0]
# find run lengths
run_lengths = np.diff(np.append(run_starts, n))
ind = run_lengths >= min_len
run_starts = run_starts[ind]
run_lengths = run_lengths[ind]
# find run values
run_values = [
x[start : start + length] for start, length in zip(run_starts, run_lengths)
]
# assert np.allclose(np.concatenate(run_values), x)
return run_values, run_starts, run_lengths
def get_checkpoint_path(checkpoint_dir, cp, return_name=False):
if cp == "last": # use last epoch
cp_name = "last.ckpt"
elif cp == "best": # use best epoch
cp_name = osp.basename(sorted(glob.glob(f"{checkpoint_dir}/*best*.ckpt"))[-1])
else:
cp_name = osp.basename(sorted(glob.glob(f"{checkpoint_dir}/{cp}.ckpt"))[-1])
cp_path = f"{checkpoint_dir}/{cp_name}"
if return_name:
return cp_path, cp_name
return cp_path
def subprocess_run(cmd, ignore_err=False, **kwargs):
try:
result = subprocess.run(cmd, **kwargs)
except subprocess.CalledProcessError as err:
print("####### subprocess-run error message ######")
print(f"{err} {err.stderr.decode('utf8')}")
if result.returncode != 0:
if not ignore_err:
raise Exception("error in subprocess_run!")
return result
def import_type_from_str(s):
module_name, type_name = s.rsplit(".", 1)
module = importlib.import_module(module_name)
type_to_import = getattr(module, type_name)
return type_to_import
def build_object_from_dict(d, type_field="type", **add_kwargs):
d = d.copy()
_type = import_type_from_str(d.pop(type_field))
return _type(**d, **add_kwargs)
def write_list_to_file(filename, string_list):
with open(filename, "w") as file:
for item in string_list:
file.write(item + "\n")
def are_arrays_equal(array1, array2, sort=False):
if array1 is None or array2 is None:
return False
# if array1 == array2:
# return True
if len(array1) != len(array2):
return False
# Sort both arrays
if sort:
array1 = sorted(array1)
array2 = sorted(array2)
# Compare each element
for i in range(len(array1)):
if array1[i] != array2[i]:
return False
return True
def load_ema_weights_from_checkpoint(model, checkpoint):
ema_params = checkpoint["optimizer_states"][0]["ema"]
for param, ema_param in zip(model.parameters(), ema_params):
param.data.copy_(ema_param.data)
return
def rsync_file_from_remote(fname, remote_dir, local_dir, hostname):
remote_fname = fname.replace(local_dir, f"{remote_dir}/./")
cmd = f"rsync -avzP -m --relative {hostname}:{remote_fname} {local_dir}/"
subprocess_run(cmd, shell=True)
return
# Global variable for timing indentation level
timer_indent_level = 0
# Context manager for timing
class Timer:
def __init__(self, name="", enabled=True, show_rank=False, rank_zero_only=True):
self.name = name
self.start_time = None
self.enabled = enabled
if "LOCAL_RANK" in os.environ:
self.rank = int(os.environ["LOCAL_RANK"])
else:
self.rank = 0
self.show_rank = show_rank
self.rank_zero_only = rank_zero_only
def __enter__(self):
if (not self.enabled) or (self.rank_zero_only and self.rank != 0):
return self
global timer_indent_level
self.start_time = time.perf_counter()
self.current_indent = timer_indent_level # Capture current indent level
timer_indent_level += 1 # Increment global indent level for next call
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if exc_type:
return False # Re-raise the exception
if (not self.enabled) or (self.rank_zero_only and self.rank != 0):
return self
global timer_indent_level
elapsed_time = time.perf_counter() - self.start_time
indent = " " * self.current_indent # 4 spaces per indent level
rank_str = f"[rank{self.rank}] " if self.show_rank else ""
print(f"{indent}{rank_str}[{self.name}] time: {elapsed_time:.4f} seconds")
timer_indent_level -= 1 # Decrement global indent level after finishing