Spaces:
Running
Running
File size: 7,683 Bytes
783b01b 22871df 783b01b d071f60 783b01b 22871df 783b01b |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 |
"""
a module of logs saving and backuping
"""
import os
import datasets as ds
from apscheduler.schedulers.background import BackgroundScheduler
from tqdm import tqdm
from utils import beijing, md5, json_to_str
from huggingface_hub import HfApi
import pandas as pd
import glob
hf = HfApi()
hf.token = os.environ.get("hf_token")
class LoggingHelper:
def __init__(
self,
repo_id: str,
local_dir: str = "data/logs",
synchronize_interval: int = 60,
):
"""
:param repo_id: the repo_id of the dataset in huggingface
:param local_dir: a directory in cwd to store downloaded files
:param synchronize_interval: the interval of synchronizing between local and huggingface
"""
self.local_dir = local_dir
self.repo_id = repo_id
self.synchronize_interval = synchronize_interval
self.repo_type = "dataset"
self.scheduler = BackgroundScheduler()
self.buffer = dict[str, ds.Dataset]()
self.need_push = dict[str, bool]()
self.today = beijing().date()
ds.disable_progress_bar()
self.dataframe: pd.DataFrame
self.pull()
self.start_synchronize()
def addlog(self, log: dict):
"""add one log"""
remotedir = self.remotedir()
filename = md5(json_to_str(log))[:2] + ".json"
remotepath = "/".join([remotedir, filename])
if remotepath in self.buffer:
self.buffer[remotepath] = self.buffer[remotepath].add_item(log) # type: ignore
else:
self.buffer[remotepath] = ds.Dataset.from_dict({})
self.buffer[remotepath] = self.buffer[remotepath].add_item(log) # type: ignore
self.need_push[remotepath] = True
print("[addlog] Added a log to buffer")
def remotedir(self):
now = beijing()
year = now.year.__str__()
month = now.month.__str__()
day = now.day.__str__()
return "/".join([year, month, day])
def pull(self):
try:
self.download()
remotedir = self.remotedir()
print(f"[pull] today dir: {remotedir}")
filenames = hf.list_repo_files(
repo_id=self.repo_id,
repo_type=self.repo_type,
)
files_to_load = [
filename
for filename in filenames
if filename not in self.buffer
and filename.startswith(remotedir)
and filename.endswith(".json")
]
print(f"[pull] total {len(files_to_load)} to load")
for filename in tqdm(files_to_load):
print()
path = os.sep.join([self.local_dir, filename])
with open(path, "r") as f:
data = f.read()
if len(data) != 0:
self.buffer[filename] = ds.Dataset.from_json(path) # type: ignore
self.need_push[filename] = False
return True
except Exception as e:
print(f"[pull] {type(e)}: {e}")
return False
def push_yesterday(self) -> bool:
try:
year = self.today.year.__str__()
month = self.today.month.__str__()
day = self.today.day.__str__()
remotedir = "/".join([year, month, day])
files_to_push = []
for filename in self.buffer:
if not filename.startswith(remotedir):
continue
if not self.need_push[filename]:
del self.buffer[filename]
del self.need_push[filename]
files_to_push.append(filename)
if len(files_to_push) == 0:
return True
print("[push_yesterday] Writing datasets to json files")
for filename in files_to_push:
localpath = os.sep.join([self.local_dir, filename])
self.buffer[filename].to_json(localpath)
files_to_push.append(filename)
print(f"[push_yesterday] {filename} finished")
print("[push_yesterday] Done")
print("[push_yesterday] Pushing log files to remote")
if len(files_to_push):
localdir = os.sep.join([self.local_dir, remotedir])
res = hf.upload_folder(
repo_id=self.repo_id,
folder_path=localdir,
path_in_repo=remotedir,
repo_type=self.repo_type,
commit_message=f"Updated at {beijing()}",
)
print(f"[push_yesterday] Log files pushed to {res}")
print("[push_yesterday] Done")
return True
except Exception as e:
print(f"[push_yesterday] {type(e)}: {e}")
return False
def push(self):
try:
now = beijing().date()
if now != self.today: # new day comes
if not self.push_yesterday():
print("[push] Failed to upload yesterday's log files")
self.today = now
files_to_push = [
filename for filename in self.need_push if self.need_push[filename]
]
if len(files_to_push) == 0:
return True
print("[push] Writing datasets to json files")
for filename in files_to_push:
localpath = os.path.join(self.local_dir, filename)
self.buffer[filename].to_json(localpath)
print(f"[push] {filename} finished")
print("[push] Done")
print("[push] Pushing log files to remote")
remotedir = self.remotedir()
localdir = "/".join([self.local_dir, remotedir])
res = hf.upload_folder(
repo_id=self.repo_id,
folder_path=localdir,
path_in_repo=remotedir,
repo_type=self.repo_type,
commit_message=f"Updated at {beijing()}",
)
for filename in files_to_push:
self.need_push[filename] = False
print(f"[push] Log files pushed to {res}")
print("[push] Done")
return True
except Exception as e:
print(f"[push] {type(e)}: {e}")
return False
def download(self):
print("[download] Starting downloading")
try:
res = hf.snapshot_download(
repo_id=self.repo_id,
repo_type="dataset",
local_dir=self.local_dir,
)
print(f"[download] Downloaded to {res}")
except Exception as e:
print(f"[download] {type(e)}: {e}")
print("[download] Done")
def start_synchronize(self):
self.scheduler.add_job(
self.push,
"interval",
seconds=self.synchronize_interval,
)
self.scheduler.start()
def refresh(self) -> list[dict]:
self.push()
files = glob.glob("**/*.json", root_dir=self.local_dir, recursive=True)
filepathes = [os.sep.join([self.local_dir, file]) for file in files]
datasets = []
for path in tqdm(filepathes):
path = str(path)
datasets.append(ds.Dataset.from_json(path))
df = pd.DataFrame()
if datasets:
dataset: ds.Dataset = ds.concatenate_datasets(datasets)
df = dataset.to_pandas()
assert isinstance(df, pd.DataFrame)
df = df.sort_values(by="timestamp", ascending=False)
return df.to_dict(orient="records")
|