""" a module of logs saving and backuping """ import os import datasets as ds from apscheduler.schedulers.background import BackgroundScheduler from tqdm import tqdm from utils import beijing, md5, json_to_str from huggingface_hub import HfApi import pandas as pd import glob hf = HfApi() hf.token = os.environ.get("hf_token") class LoggingHelper: def __init__( self, repo_id: str, local_dir: str = "data/logs", synchronize_interval: int = 60, ): """ :param repo_id: the repo_id of the dataset in huggingface :param local_dir: a directory in cwd to store downloaded files :param synchronize_interval: the interval of synchronizing between local and huggingface """ self.local_dir = local_dir self.repo_id = repo_id self.synchronize_interval = synchronize_interval self.repo_type = "dataset" self.scheduler = BackgroundScheduler() self.buffer = dict[str, ds.Dataset]() self.need_push = dict[str, bool]() self.today = beijing().date() ds.disable_progress_bar() self.dataframe: pd.DataFrame self.pull() self.start_synchronize() def addlog(self, log: dict): """add one log""" remotedir = self.remotedir() filename = md5(json_to_str(log))[:2] + ".json" remotepath = "/".join([remotedir, filename]) if remotepath in self.buffer: self.buffer[remotepath] = self.buffer[remotepath].add_item(log) # type: ignore else: self.buffer[remotepath] = ds.Dataset.from_dict({}) self.buffer[remotepath] = self.buffer[remotepath].add_item(log) # type: ignore self.need_push[remotepath] = True print("[addlog] Added a log to buffer") def remotedir(self): now = beijing() year = now.year.__str__() month = now.month.__str__() day = now.day.__str__() return "/".join([year, month, day]) def pull(self): try: self.download() remotedir = self.remotedir() print(f"[pull] today dir: {remotedir}") filenames = hf.list_repo_files( repo_id=self.repo_id, repo_type=self.repo_type, ) files_to_load = [ filename for filename in filenames if filename not in self.buffer and filename.startswith(remotedir) and filename.endswith(".json") ] print(f"[pull] total {len(files_to_load)} to load") for filename in tqdm(files_to_load): print() path = os.sep.join([self.local_dir, filename]) with open(path, "r") as f: data = f.read() if len(data) != 0: self.buffer[filename] = ds.Dataset.from_json(path) # type: ignore self.need_push[filename] = False return True except Exception as e: print(f"[pull] {type(e)}: {e}") return False def push_yesterday(self) -> bool: try: year = self.today.year.__str__() month = self.today.month.__str__() day = self.today.day.__str__() remotedir = "/".join([year, month, day]) files_to_push = [] for filename in self.buffer: if not filename.startswith(remotedir): continue if not self.need_push[filename]: del self.buffer[filename] del self.need_push[filename] files_to_push.append(filename) if len(files_to_push) == 0: return True print("[push_yesterday] Writing datasets to json files") for filename in files_to_push: localpath = os.sep.join([self.local_dir, filename]) self.buffer[filename].to_json(localpath) files_to_push.append(filename) print(f"[push_yesterday] {filename} finished") print("[push_yesterday] Done") print("[push_yesterday] Pushing log files to remote") if len(files_to_push): localdir = os.sep.join([self.local_dir, remotedir]) res = hf.upload_folder( repo_id=self.repo_id, folder_path=localdir, path_in_repo=remotedir, repo_type=self.repo_type, commit_message=f"Updated at {beijing()}", ) print(f"[push_yesterday] Log files pushed to {res}") print("[push_yesterday] Done") return True except Exception as e: print(f"[push_yesterday] {type(e)}: {e}") return False def push(self): try: now = beijing().date() if now != self.today: # new day comes if not self.push_yesterday(): print("[push] Failed to upload yesterday's log files") self.today = now files_to_push = [ filename for filename in self.need_push if self.need_push[filename] ] if len(files_to_push) == 0: return True print("[push] Writing datasets to json files") for filename in files_to_push: localpath = os.path.join(self.local_dir, filename) self.buffer[filename].to_json(localpath) print(f"[push] {filename} finished") print("[push] Done") print("[push] Pushing log files to remote") remotedir = self.remotedir() localdir = "/".join([self.local_dir, remotedir]) res = hf.upload_folder( repo_id=self.repo_id, folder_path=localdir, path_in_repo=remotedir, repo_type=self.repo_type, commit_message=f"Updated at {beijing()}", ) for filename in files_to_push: self.need_push[filename] = False print(f"[push] Log files pushed to {res}") print("[push] Done") return True except Exception as e: print(f"[push] {type(e)}: {e}") return False def download(self): print("[download] Starting downloading") try: res = hf.snapshot_download( repo_id=self.repo_id, repo_type="dataset", local_dir=self.local_dir, ) print(f"[download] Downloaded to {res}") except Exception as e: print(f"[download] {type(e)}: {e}") print("[download] Done") def start_synchronize(self): self.scheduler.add_job( self.push, "interval", seconds=self.synchronize_interval, ) self.scheduler.start() def refresh(self) -> list[dict]: self.push() files = glob.glob("**/*.json", root_dir=self.local_dir, recursive=True) filepathes = [os.sep.join([self.local_dir, file]) for file in files] datasets = [] for path in tqdm(filepathes): path = str(path) datasets.append(ds.Dataset.from_json(path)) df = pd.DataFrame() if datasets: dataset: ds.Dataset = ds.concatenate_datasets(datasets) df = dataset.to_pandas() assert isinstance(df, pd.DataFrame) df = df.sort_values(by="timestamp", ascending=False) return df.to_dict(orient="records")