Beracles commited on
Commit
783b01b
·
1 Parent(s): 7416023

logginghelper

Browse files
Files changed (1) hide show
  1. logging_helper.py +205 -0
logging_helper.py ADDED
@@ -0,0 +1,205 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ a module of logs saving and backuping
3
+ """
4
+
5
+ import os
6
+ import datasets as ds
7
+ from apscheduler.schedulers.background import BackgroundScheduler
8
+ from tqdm import tqdm
9
+ from utils import beijing, md5, json_to_str
10
+ from huggingface_hub import HfApi
11
+ import pandas as pd
12
+ import glob
13
+
14
+ hf = HfApi()
15
+ hf.token = os.environ.get("hf_token")
16
+
17
+
18
+ class LoggingHelper:
19
+
20
+ def __init__(
21
+ self,
22
+ repo_id: str,
23
+ local_dir: str = "data/logs",
24
+ synchronize_interval: int = 60,
25
+ ):
26
+ """
27
+ :param repo_id: the repo_id of the dataset in huggingface
28
+ :param local_dir: a directory in cwd to store downloaded files
29
+ :param synchronize_interval: the interval of synchronizing between local and huggingface
30
+
31
+ """
32
+ self.local_dir = local_dir
33
+ self.repo_id = repo_id
34
+ self.synchronize_interval = synchronize_interval
35
+ self.repo_type = "dataset"
36
+ self.scheduler = BackgroundScheduler()
37
+ self.buffer = dict[str, ds.Dataset]()
38
+ self.need_push = dict[str, bool]()
39
+ self.today = beijing().date()
40
+ ds.disable_progress_bar()
41
+ self.dataframe: pd.DataFrame
42
+ self.pull()
43
+ self.start_synchronize()
44
+
45
+ def addlog(self, log: dict):
46
+ """add one log"""
47
+ remotedir = self.remotedir()
48
+ filename = md5(json_to_str(log))[:2] + ".json"
49
+ remotepath = "/".join([remotedir, filename])
50
+ if remotepath in self.buffer:
51
+ self.buffer[remotepath] = self.buffer[remotepath].add_item(log) # type: ignore
52
+ else:
53
+ self.buffer[remotepath] = ds.Dataset.from_dict({})
54
+ self.buffer[remotepath] = self.buffer[remotepath].add_item(log) # type: ignore
55
+ self.need_push[remotepath] = True
56
+ print("[addlog] Added a log to buffer")
57
+
58
+ def remotedir(self):
59
+ now = beijing()
60
+ year = now.year.__str__()
61
+ month = now.month.__str__()
62
+ day = now.day.__str__()
63
+ return "/".join([year, month, day])
64
+
65
+ def pull(self):
66
+ try:
67
+ self.download()
68
+ remotedir = self.remotedir()
69
+ print(f"[pull] today dir: {remotedir}")
70
+ filenames = hf.list_repo_files(
71
+ repo_id=self.repo_id,
72
+ repo_type=self.repo_type,
73
+ )
74
+ files_to_load = [
75
+ filename
76
+ for filename in filenames
77
+ if filename not in self.buffer
78
+ and filename.startswith(remotedir)
79
+ and filename.endswith(".json")
80
+ ]
81
+ print(f"[pull] total {len(files_to_load)} to load")
82
+ for filename in tqdm(files_to_load):
83
+ print()
84
+ path = os.sep.join([self.local_dir, filename])
85
+ with open(path, "r") as f:
86
+ data = f.read()
87
+ if len(data) != 0:
88
+ self.buffer[filename] = ds.Dataset.from_json(path) # type: ignore
89
+ self.need_push[filename] = False
90
+ return True
91
+ except Exception as e:
92
+ print(f"[pull] {type(e)}: {e}")
93
+ return False
94
+
95
+ def push_yesterday(self) -> bool:
96
+ try:
97
+ year = self.today.year.__str__()
98
+ month = self.today.month.__str__()
99
+ day = self.today.day.__str__()
100
+ remotedir = "/".join([year, month, day])
101
+ files_to_push = []
102
+ for filename in self.buffer:
103
+ if not filename.startswith(remotedir):
104
+ continue
105
+ if not self.need_push[filename]:
106
+ del self.buffer[filename]
107
+ del self.need_push[filename]
108
+ files_to_push.append(filename)
109
+ if len(files_to_push) == 0:
110
+ return True
111
+ print("[push_yesterday] Writing datasets to json files")
112
+ for filename in files_to_push:
113
+ localpath = os.sep.join([self.local_dir, filename])
114
+ self.buffer[filename].to_json(localpath)
115
+ files_to_push.append(filename)
116
+ print(f"[push_yesterday] {filename} finished")
117
+ print("[push_yesterday] Done")
118
+ print("[push_yesterday] Pushing log files to remote")
119
+ if len(files_to_push):
120
+ localdir = os.sep.join([self.local_dir, remotedir])
121
+ res = hf.upload_folder(
122
+ repo_id=self.repo_id,
123
+ folder_path=localdir,
124
+ path_in_repo=remotedir,
125
+ repo_type=self.repo_type,
126
+ commit_message=f"Updated at {beijing()}",
127
+ )
128
+ print(f"[push_yesterday] Log files pushed to {res}")
129
+ print("[push_yesterday] Done")
130
+ return True
131
+ except Exception as e:
132
+ print(f"[push_yesterday] {type(e)}: {e}")
133
+ return False
134
+
135
+ def push(self):
136
+ try:
137
+ now = beijing().date()
138
+ if now != self.today: # new day comes
139
+ if not self.push_yesterday():
140
+ print("[push] Failed to upload yesterday's log files")
141
+ self.today = now
142
+ files_to_push = [
143
+ filename for filename in self.need_push if self.need_push[filename]
144
+ ]
145
+ if len(files_to_push) == 0:
146
+ return True
147
+ print("[push] Writing datasets to json files")
148
+ for filename in files_to_push:
149
+ localpath = os.path.join(self.local_dir, filename)
150
+ self.buffer[filename].to_json(localpath)
151
+ print(f"[push] {filename} finished")
152
+ print("[push] Done")
153
+ print("[push] Pushing log files to remote")
154
+ remotedir = self.remotedir()
155
+ localdir = "/".join([self.local_dir, remotedir])
156
+ res = hf.upload_folder(
157
+ repo_id=self.repo_id,
158
+ folder_path=localdir,
159
+ path_in_repo=remotedir,
160
+ repo_type=self.repo_type,
161
+ commit_message=f"Updated at {beijing()}",
162
+ )
163
+ print(f"[push] Log files pushed to {res}")
164
+ print("[push] Done")
165
+ return True
166
+ except Exception as e:
167
+ print(f"[push] {type(e)}: {e}")
168
+ return False
169
+
170
+ def download(self):
171
+ print("[download] Starting downloading")
172
+ try:
173
+ res = hf.snapshot_download(
174
+ repo_id=self.repo_id,
175
+ repo_type="dataset",
176
+ local_dir=self.local_dir,
177
+ )
178
+ print(f"[download] Downloaded to {res}")
179
+ except Exception as e:
180
+ print(f"[download] {type(e)}: {e}")
181
+ print("[download] Done")
182
+
183
+ def start_synchronize(self):
184
+ self.scheduler.add_job(
185
+ self.push,
186
+ "interval",
187
+ seconds=self.synchronize_interval,
188
+ )
189
+ self.scheduler.start()
190
+
191
+ def refresh(self) -> list[dict]:
192
+ self.download()
193
+ files = glob.glob("**/*.json", root_dir=self.local_dir, recursive=True)
194
+ filepathes = [os.sep.join([self.local_dir, file]) for file in files]
195
+ datasets = []
196
+ for path in tqdm(filepathes):
197
+ path = str(path)
198
+ datasets.append(ds.Dataset.from_json(path))
199
+ df = pd.DataFrame()
200
+ if datasets:
201
+ dataset: ds.Dataset = ds.concatenate_datasets(datasets)
202
+ df = dataset.to_pandas()
203
+ assert isinstance(df, pd.DataFrame)
204
+ df = df.sort_values(by="timestamp")
205
+ return df.to_dict(orient="records")