LogDisplayer / logging_helper.py
Beracles's picture
push as need
22871df
raw
history blame
7.68 kB
"""
a module of logs saving and backuping
"""
import os
import datasets as ds
from apscheduler.schedulers.background import BackgroundScheduler
from tqdm import tqdm
from utils import beijing, md5, json_to_str
from huggingface_hub import HfApi
import pandas as pd
import glob
hf = HfApi()
hf.token = os.environ.get("hf_token")
class LoggingHelper:
def __init__(
self,
repo_id: str,
local_dir: str = "data/logs",
synchronize_interval: int = 60,
):
"""
:param repo_id: the repo_id of the dataset in huggingface
:param local_dir: a directory in cwd to store downloaded files
:param synchronize_interval: the interval of synchronizing between local and huggingface
"""
self.local_dir = local_dir
self.repo_id = repo_id
self.synchronize_interval = synchronize_interval
self.repo_type = "dataset"
self.scheduler = BackgroundScheduler()
self.buffer = dict[str, ds.Dataset]()
self.need_push = dict[str, bool]()
self.today = beijing().date()
ds.disable_progress_bar()
self.dataframe: pd.DataFrame
self.pull()
self.start_synchronize()
def addlog(self, log: dict):
"""add one log"""
remotedir = self.remotedir()
filename = md5(json_to_str(log))[:2] + ".json"
remotepath = "/".join([remotedir, filename])
if remotepath in self.buffer:
self.buffer[remotepath] = self.buffer[remotepath].add_item(log) # type: ignore
else:
self.buffer[remotepath] = ds.Dataset.from_dict({})
self.buffer[remotepath] = self.buffer[remotepath].add_item(log) # type: ignore
self.need_push[remotepath] = True
print("[addlog] Added a log to buffer")
def remotedir(self):
now = beijing()
year = now.year.__str__()
month = now.month.__str__()
day = now.day.__str__()
return "/".join([year, month, day])
def pull(self):
try:
self.download()
remotedir = self.remotedir()
print(f"[pull] today dir: {remotedir}")
filenames = hf.list_repo_files(
repo_id=self.repo_id,
repo_type=self.repo_type,
)
files_to_load = [
filename
for filename in filenames
if filename not in self.buffer
and filename.startswith(remotedir)
and filename.endswith(".json")
]
print(f"[pull] total {len(files_to_load)} to load")
for filename in tqdm(files_to_load):
print()
path = os.sep.join([self.local_dir, filename])
with open(path, "r") as f:
data = f.read()
if len(data) != 0:
self.buffer[filename] = ds.Dataset.from_json(path) # type: ignore
self.need_push[filename] = False
return True
except Exception as e:
print(f"[pull] {type(e)}: {e}")
return False
def push_yesterday(self) -> bool:
try:
year = self.today.year.__str__()
month = self.today.month.__str__()
day = self.today.day.__str__()
remotedir = "/".join([year, month, day])
files_to_push = []
for filename in self.buffer:
if not filename.startswith(remotedir):
continue
if not self.need_push[filename]:
del self.buffer[filename]
del self.need_push[filename]
files_to_push.append(filename)
if len(files_to_push) == 0:
return True
print("[push_yesterday] Writing datasets to json files")
for filename in files_to_push:
localpath = os.sep.join([self.local_dir, filename])
self.buffer[filename].to_json(localpath)
files_to_push.append(filename)
print(f"[push_yesterday] {filename} finished")
print("[push_yesterday] Done")
print("[push_yesterday] Pushing log files to remote")
if len(files_to_push):
localdir = os.sep.join([self.local_dir, remotedir])
res = hf.upload_folder(
repo_id=self.repo_id,
folder_path=localdir,
path_in_repo=remotedir,
repo_type=self.repo_type,
commit_message=f"Updated at {beijing()}",
)
print(f"[push_yesterday] Log files pushed to {res}")
print("[push_yesterday] Done")
return True
except Exception as e:
print(f"[push_yesterday] {type(e)}: {e}")
return False
def push(self):
try:
now = beijing().date()
if now != self.today: # new day comes
if not self.push_yesterday():
print("[push] Failed to upload yesterday's log files")
self.today = now
files_to_push = [
filename for filename in self.need_push if self.need_push[filename]
]
if len(files_to_push) == 0:
return True
print("[push] Writing datasets to json files")
for filename in files_to_push:
localpath = os.path.join(self.local_dir, filename)
self.buffer[filename].to_json(localpath)
print(f"[push] {filename} finished")
print("[push] Done")
print("[push] Pushing log files to remote")
remotedir = self.remotedir()
localdir = "/".join([self.local_dir, remotedir])
res = hf.upload_folder(
repo_id=self.repo_id,
folder_path=localdir,
path_in_repo=remotedir,
repo_type=self.repo_type,
commit_message=f"Updated at {beijing()}",
)
for filename in files_to_push:
self.need_push[filename] = False
print(f"[push] Log files pushed to {res}")
print("[push] Done")
return True
except Exception as e:
print(f"[push] {type(e)}: {e}")
return False
def download(self):
print("[download] Starting downloading")
try:
res = hf.snapshot_download(
repo_id=self.repo_id,
repo_type="dataset",
local_dir=self.local_dir,
)
print(f"[download] Downloaded to {res}")
except Exception as e:
print(f"[download] {type(e)}: {e}")
print("[download] Done")
def start_synchronize(self):
self.scheduler.add_job(
self.push,
"interval",
seconds=self.synchronize_interval,
)
self.scheduler.start()
def refresh(self) -> list[dict]:
self.push()
files = glob.glob("**/*.json", root_dir=self.local_dir, recursive=True)
filepathes = [os.sep.join([self.local_dir, file]) for file in files]
datasets = []
for path in tqdm(filepathes):
path = str(path)
datasets.append(ds.Dataset.from_json(path))
df = pd.DataFrame()
if datasets:
dataset: ds.Dataset = ds.concatenate_datasets(datasets)
df = dataset.to_pandas()
assert isinstance(df, pd.DataFrame)
df = df.sort_values(by="timestamp", ascending=False)
return df.to_dict(orient="records")