File size: 7,683 Bytes
783b01b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
22871df
 
783b01b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
d071f60
783b01b
 
 
 
 
 
 
 
 
 
 
22871df
783b01b
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
"""
a module of logs saving and backuping
"""

import os
import datasets as ds
from apscheduler.schedulers.background import BackgroundScheduler
from tqdm import tqdm
from utils import beijing, md5, json_to_str
from huggingface_hub import HfApi
import pandas as pd
import glob

hf = HfApi()
hf.token = os.environ.get("hf_token")


class LoggingHelper:

    def __init__(
        self,
        repo_id: str,
        local_dir: str = "data/logs",
        synchronize_interval: int = 60,
    ):
        """
        :param repo_id: the repo_id of the dataset in huggingface
        :param local_dir: a directory in cwd to store downloaded files
        :param synchronize_interval: the interval of synchronizing between local and huggingface

        """
        self.local_dir = local_dir
        self.repo_id = repo_id
        self.synchronize_interval = synchronize_interval
        self.repo_type = "dataset"
        self.scheduler = BackgroundScheduler()
        self.buffer = dict[str, ds.Dataset]()
        self.need_push = dict[str, bool]()
        self.today = beijing().date()
        ds.disable_progress_bar()
        self.dataframe: pd.DataFrame
        self.pull()
        self.start_synchronize()

    def addlog(self, log: dict):
        """add one log"""
        remotedir = self.remotedir()
        filename = md5(json_to_str(log))[:2] + ".json"
        remotepath = "/".join([remotedir, filename])
        if remotepath in self.buffer:
            self.buffer[remotepath] = self.buffer[remotepath].add_item(log)  # type: ignore
        else:
            self.buffer[remotepath] = ds.Dataset.from_dict({})
            self.buffer[remotepath] = self.buffer[remotepath].add_item(log)  # type: ignore
        self.need_push[remotepath] = True
        print("[addlog] Added a log to buffer")

    def remotedir(self):
        now = beijing()
        year = now.year.__str__()
        month = now.month.__str__()
        day = now.day.__str__()
        return "/".join([year, month, day])

    def pull(self):
        try:
            self.download()
            remotedir = self.remotedir()
            print(f"[pull] today dir: {remotedir}")
            filenames = hf.list_repo_files(
                repo_id=self.repo_id,
                repo_type=self.repo_type,
            )
            files_to_load = [
                filename
                for filename in filenames
                if filename not in self.buffer
                and filename.startswith(remotedir)
                and filename.endswith(".json")
            ]
            print(f"[pull] total {len(files_to_load)} to load")
            for filename in tqdm(files_to_load):
                print()
                path = os.sep.join([self.local_dir, filename])
                with open(path, "r") as f:
                    data = f.read()
                if len(data) != 0:
                    self.buffer[filename] = ds.Dataset.from_json(path)  # type: ignore
                    self.need_push[filename] = False
            return True
        except Exception as e:
            print(f"[pull] {type(e)}: {e}")
            return False

    def push_yesterday(self) -> bool:
        try:
            year = self.today.year.__str__()
            month = self.today.month.__str__()
            day = self.today.day.__str__()
            remotedir = "/".join([year, month, day])
            files_to_push = []
            for filename in self.buffer:
                if not filename.startswith(remotedir):
                    continue
                if not self.need_push[filename]:
                    del self.buffer[filename]
                    del self.need_push[filename]
                files_to_push.append(filename)
            if len(files_to_push) == 0:
                return True
            print("[push_yesterday] Writing datasets to json files")
            for filename in files_to_push:
                localpath = os.sep.join([self.local_dir, filename])
                self.buffer[filename].to_json(localpath)
                files_to_push.append(filename)
                print(f"[push_yesterday] {filename} finished")
            print("[push_yesterday] Done")
            print("[push_yesterday] Pushing log files to remote")
            if len(files_to_push):
                localdir = os.sep.join([self.local_dir, remotedir])
                res = hf.upload_folder(
                    repo_id=self.repo_id,
                    folder_path=localdir,
                    path_in_repo=remotedir,
                    repo_type=self.repo_type,
                    commit_message=f"Updated at {beijing()}",
                )
                print(f"[push_yesterday] Log files pushed to {res}")
            print("[push_yesterday] Done")
            return True
        except Exception as e:
            print(f"[push_yesterday] {type(e)}: {e}")
            return False

    def push(self):
        try:
            now = beijing().date()
            if now != self.today:  # new day comes
                if not self.push_yesterday():
                    print("[push] Failed to upload yesterday's log files")
                self.today = now
            files_to_push = [
                filename for filename in self.need_push if self.need_push[filename]
            ]
            if len(files_to_push) == 0:
                return True
            print("[push] Writing datasets to json files")
            for filename in files_to_push:
                localpath = os.path.join(self.local_dir, filename)
                self.buffer[filename].to_json(localpath)
                print(f"[push] {filename} finished")
            print("[push] Done")
            print("[push] Pushing log files to remote")
            remotedir = self.remotedir()
            localdir = "/".join([self.local_dir, remotedir])
            res = hf.upload_folder(
                repo_id=self.repo_id,
                folder_path=localdir,
                path_in_repo=remotedir,
                repo_type=self.repo_type,
                commit_message=f"Updated at {beijing()}",
            )
            for filename in files_to_push:
                self.need_push[filename] = False
            print(f"[push] Log files pushed to {res}")
            print("[push] Done")
            return True
        except Exception as e:
            print(f"[push] {type(e)}: {e}")
            return False

    def download(self):
        print("[download] Starting downloading")
        try:
            res = hf.snapshot_download(
                repo_id=self.repo_id,
                repo_type="dataset",
                local_dir=self.local_dir,
            )
            print(f"[download] Downloaded to {res}")
        except Exception as e:
            print(f"[download] {type(e)}: {e}")
        print("[download] Done")

    def start_synchronize(self):
        self.scheduler.add_job(
            self.push,
            "interval",
            seconds=self.synchronize_interval,
        )
        self.scheduler.start()

    def refresh(self) -> list[dict]:
        self.push()
        files = glob.glob("**/*.json", root_dir=self.local_dir, recursive=True)
        filepathes = [os.sep.join([self.local_dir, file]) for file in files]
        datasets = []
        for path in tqdm(filepathes):
            path = str(path)
            datasets.append(ds.Dataset.from_json(path))
        df = pd.DataFrame()
        if datasets:
            dataset: ds.Dataset = ds.concatenate_datasets(datasets)
            df = dataset.to_pandas()
            assert isinstance(df, pd.DataFrame)
            df = df.sort_values(by="timestamp", ascending=False)
        return df.to_dict(orient="records")