File size: 14,967 Bytes
783b01b
 
 
 
 
 
 
 
 
 
b475f1d
346738b
1c7b6d3
783b01b
 
 
 
346738b
 
783b01b
 
 
 
 
 
 
 
1c7b6d3
783b01b
 
 
 
 
 
 
1c7b6d3
783b01b
 
 
 
 
 
 
346738b
783b01b
 
 
1c7b6d3
 
783b01b
1c7b6d3
 
783b01b
 
 
 
 
 
 
 
 
 
 
346738b
783b01b
 
1c7b6d3
783b01b
 
 
 
 
 
 
 
 
 
 
9ee80d0
783b01b
 
 
 
9ee80d0
 
 
 
 
 
 
 
 
783b01b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
9ee80d0
 
 
 
 
 
 
 
 
783b01b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1c7b6d3
 
783b01b
 
 
346738b
783b01b
 
1c7b6d3
346738b
 
 
 
 
 
 
 
 
 
 
9522142
 
 
 
 
346738b
 
 
 
 
1c7b6d3
 
 
 
346738b
1c7b6d3
 
 
 
 
 
 
346738b
1c7b6d3
 
346738b
 
 
 
 
 
1c7b6d3
 
346738b
 
 
1c7b6d3
 
 
 
 
346738b
 
9522142
 
346738b
 
 
 
 
b475f1d
33c7184
1c7b6d3
346738b
b475f1d
346738b
 
b475f1d
346738b
 
 
 
b475f1d
1c7b6d3
346738b
b475f1d
346738b
 
 
 
 
 
 
 
 
 
 
 
 
1c7b6d3
 
783b01b
1c7b6d3
 
346738b
 
 
 
 
 
 
 
 
 
 
 
 
 
1c7b6d3
346738b
 
 
 
 
 
1c7b6d3
 
346738b
 
1c7b6d3
 
346738b
 
 
 
 
 
 
1c7b6d3
 
 
346738b
 
 
 
1c7b6d3
 
 
 
 
 
 
 
 
783b01b
 
 
 
 
 
 
1c7b6d3
 
 
 
 
 
 
783b01b
 
1c7b6d3
346738b
1c7b6d3
 
 
 
 
 
83705b6
346738b
88be5d0
346738b
88be5d0
346738b
1c7b6d3
88be5d0
1c7b6d3
 
 
 
346738b
 
0506561
 
 
 
346738b
 
 
 
9522142
 
 
 
 
346738b
 
33c7184
 
 
346738b
 
1c7b6d3
 
 
 
346738b
 
352b367
 
 
9522142
346738b
1c7b6d3
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
"""
a module of logs saving and backuping
"""

import os
import datasets as ds
from apscheduler.schedulers.background import BackgroundScheduler
from utils import beijing, md5, json_to_str
from huggingface_hub import HfApi
import pandas as pd
from tqdm import tqdm
from datetime import datetime, date, timedelta
from zoneinfo import ZoneInfo

hf = HfApi()
hf.token = os.environ.get("hf_token")

TIMEZONE = ZoneInfo("Asia/Shanghai")


class LoggingHelper:

    def __init__(
        self,
        repo_id: str,
        local_dir: str = "data/logs",
        synchronize_interval: int = 60,
        cache_days: int = 30,
    ):
        """
        :param repo_id: the repo_id of the dataset in huggingface
        :param local_dir: a directory in cwd to store downloaded files
        :param synchronize_interval: the interval of synchronizing between local and huggingface

        """
        self.cache_days = cache_days
        self.local_dir = local_dir
        self.repo_id = repo_id
        self.synchronize_interval = synchronize_interval
        self.repo_type = "dataset"
        self.scheduler = BackgroundScheduler()
        self.buffer = dict[str, ds.Dataset]()
        self.need_push = dict[str, bool]()
        self.timestamps = dict[str, str]()
        self.today = beijing().date()
        ds.disable_progress_bar()
        self.dataframe: pd.DataFrame
        self.dataframe_refresh_needed = True
        # 首先下载所有数据
        self.pull()
        # 加载最近30天的日志数据到内存
        self.load_logs()
        self.start_synchronize()

    def addlog(self, log: dict):
        """add one log"""
        remotedir = self.remotedir()
        filename = md5(json_to_str(log))[:2] + ".json"
        remotepath = "/".join([remotedir, filename])
        if remotepath in self.buffer:
            self.buffer[remotepath] = self.buffer[remotepath].add_item(log)  # type: ignore
        else:
            self.buffer[remotepath] = ds.Dataset.from_dict({})
            self.timestamps[remotepath] = beijing().isoformat(timespec="microseconds")
            self.buffer[remotepath] = self.buffer[remotepath].add_item(log)  # type: ignore
        self.need_push[remotepath] = True
        self.dataframe_refresh_needed = True
        print("[addlog] Added a log to buffer")

    def remotedir(self):
        now = beijing()
        year = now.year.__str__()
        month = now.month.__str__()
        day = now.day.__str__()
        return "/".join([year, month, day])

    def push_yesterday(self) -> bool:
        try:
            print("[push_yesterday] Pushing yesterday's log files to remote")
            year = self.today.year.__str__()
            month = self.today.month.__str__()
            day = self.today.day.__str__()
            remotedir = "/".join([year, month, day])
            localdir = os.sep.join([self.local_dir, remotedir])
            res = hf.upload_folder(
                repo_id=self.repo_id,
                folder_path=localdir,
                path_in_repo=remotedir,
                repo_type=self.repo_type,
                commit_message=f"Updated at {beijing()}",
            )
            print(f"[push_yesterday] Log files pushed to {res}")
            print("[push_yesterday] Done")
            return True
        except Exception as e:
            print(f"[push_yesterday] {type(e)}: {e}")
            return False

    def push(self):
        try:
            files_to_push = [
                filename for filename in self.need_push if self.need_push[filename]
            ]
            print("[push] Writing datasets to json files")
            for filename in files_to_push:
                localpath = os.path.join(self.local_dir, filename)
                self.buffer[filename].to_json(localpath)
                print(f"[push] {filename} finished")
                self.need_push[filename] = False
            now = beijing().date()
            if len(files_to_push) == 0:
                print("[push] Done")
                return True
            if now != self.today:  # new day comes
                if not self.push_yesterday():
                    print("[push] Failed to upload yesterday's log files")
                self.today = now
            print("[push] Pushing log files to remote")
            remotedir = self.remotedir()
            localdir = "/".join([self.local_dir, remotedir])
            res = hf.upload_folder(
                repo_id=self.repo_id,
                folder_path=localdir,
                path_in_repo=remotedir,
                repo_type=self.repo_type,
                commit_message=f"Updated at {beijing()}",
            )
            print(f"[push] Log files pushed to {res}")
            print("[push] Done")
            return True
        except Exception as e:
            print(f"[push] {type(e)}: {e}")
            return False

    def pull(self):
        print("[pull] Starting downloading")
        try:
            res = hf.snapshot_download(
                repo_id=self.repo_id,
                repo_type=self.repo_type,
                local_dir=self.local_dir,
            )
            print(f"[pull] Downloaded to {res}")
            remotepathes = hf.list_repo_files(
                repo_id=self.repo_id, repo_type=self.repo_type
            )
            jsonfiles = [f for f in remotepathes if f.endswith(".json")]
            print(f"[pull] {len(jsonfiles)} files found in remote repo")
            print("[pull] Parsing timestamps")
            for remotepath in jsonfiles:
                try:
                    parts = remotepath.split("/")
                    year, month, day = parts[0], parts[1], parts[2]
                    date_obj = date(int(year), int(month), int(day))
                    timestamp = (
                        datetime.combine(date_obj, datetime.min.time())
                        .astimezone(TIMEZONE)
                        .isoformat(timespec="microseconds")
                    )
                    self.timestamps[remotepath] = timestamp
                except Exception as e:
                    print(f"[pull] Error parsing timestamp of {remotepath}: {e}")
                    continue
            print("[pull] Done")
        except Exception as e:
            print(f"[pull] {type(e)}: {e}")
        print("[pull] Done")

    def get_pathes_between(self, from_date: date, to_date: date) -> dict[str, str]:
        """
        获取指定日期范围内的路径列表

        :param from_date: 开始日期(格式:YYYY-MM-DD 或 datetime.date),含该日期
        :param to_date: 结束日期(格式:YYYY-MM-DD 或 datetime.date),含该日期
        :return: 日期范围内的路径列表,格式为 ["YYYY/MM/DD", ...]
        """
        pathes = {}
        current_date = from_date
        while current_date <= to_date:
            key = f"{current_date.year}/{current_date.month}/{current_date.day}"
            value = datetime.combine(current_date, datetime.min.time()).isoformat(
                timespec="microseconds"
            )
            pathes[key] = value
            current_date += timedelta(days=1)
        return pathes

    def load_logs(
        self, from_timestamp: str | None = None, to_timestamp: str | None = None
    ):
        """
        在启动时加载最近30天的日志数据到内存buffer
        """
        try:

            start_timestamp = self.cutoff_timestamp()
            end_timestamp = (
                beijing()
                .replace(hour=23, minute=59, second=59, microsecond=999999)
                .isoformat(timespec="microseconds")
            )
            from_timestamp = from_timestamp or start_timestamp
            to_timestamp = to_timestamp or end_timestamp
            total_files_loaded = 0
            for remotepath, timestamp in tqdm(self.timestamps.items()):
                if timestamp < from_timestamp or timestamp > to_timestamp:
                    continue
                localpath = "/".join([self.local_dir, remotepath])
                # print(f"[load_logs] Loading file {localpath}")
                # 检查该文件是否存在
                if not os.path.exists(localpath):
                    # print(f"[load_logs] File not found: {localpath}")
                    continue
                try:
                    # 检查文件是否为空
                    if os.path.getsize(localpath) == 0:
                        # print(f"[load_logs] Skipping empty file: {remotepath}")
                        continue
                    if remotepath in self.buffer:
                        # print(f"[load_logs] File already loaded: {remotepath}")
                        continue
                    # 加载JSON数据到Dataset
                    dataset = ds.Dataset.from_json(localpath)
                    if isinstance(dataset, ds.Dataset):
                        self.buffer[remotepath] = dataset
                        self.need_push[remotepath] = False
                        self.timestamps[remotepath] = timestamp
                        total_files_loaded += 1
                except Exception as e:
                    print(f"[load_logs] Error loading {remotepath}: {e}")
                    continue
            if total_files_loaded > 0:
                self.dataframe_refresh_needed = True
            print(f"[load_logs] Successfully loaded {total_files_loaded} log files")
            print(f"[load_logs] Total datasets in buffer: {len(self.buffer)}")
        except Exception as e:
            print(f"[load_logs] Error: {type(e)}: {e}")

    def cutoff_timestamp(self) -> str:
        """
        计算用于清理日志的截止时间戳

        :return: 截止时间戳,格式为 ISO 8601 字符串
        """
        cutoff_date = self.today - timedelta(days=self.cache_days)
        cutoff_timestamp = (
            datetime.combine(cutoff_date, datetime.min.time())
            .astimezone(TIMEZONE)
            .isoformat(timespec="microseconds")
        )
        return cutoff_timestamp

    def cleanup_old_logs(self):
        """
        清理buffer中超过30天的日志数据

        保留逻辑:保留最近cache_days天的日志
        删除逻辑:删除早于 (today - cache_days) 的所有日志
        """
        try:
            print("[cleanup_old_logs] Starting cleanup of old logs")
            # 计算应该保留的最早日期(含这一天)
            start_timestamp = self.cutoff_timestamp()
            removed_count = 0
            for filepath in list(self.buffer.keys()):
                # filepath 格式类似 "2025/9/23/xx.json"
                # 提取日期部分 "2025/9/23"
                try:
                    timestamp = self.timestamps[filepath]
                    # 如果文件日期早于截断日期,则删除
                    if timestamp >= start_timestamp:
                        continue
                    del self.buffer[filepath]
                    del self.need_push[filepath]
                    removed_count += 1
                    print(f"[cleanup_old_logs] Removed {filepath}")
                except (ValueError, IndexError) as e:
                    print(f"[cleanup_old_logs] Error parsing filepath {filepath}: {e}")
                    continue

            print(f"[cleanup_old_logs] Cleaned up {removed_count} old log files")
            print(
                f"[cleanup_old_logs] Remaining datasets in buffer: {len(self.buffer)}"
            )
            print("[cleanup_old_logs] Done")

        except Exception as e:
            print(f"[cleanup_old_logs] Error: {type(e)}: {e}")

    def start_synchronize(self):
        self.scheduler.add_job(
            self.push,
            "interval",
            seconds=self.synchronize_interval,
        )
        # 添加每日清理任务,在每天凌晨2点执行
        self.scheduler.add_job(
            self.cleanup_old_logs,
            "cron",
            hour=2,
            minute=0,
        )
        self.scheduler.start()

    def refresh_dataframe(self) -> pd.DataFrame:
        """内存中所有日志数据合并为一个DataFrame"""
        datasets = list(self.buffer.values())
        merged_dataset = ds.concatenate_datasets(datasets)
        self.dataframe = merged_dataset.to_pandas()  # type: ignore
        print(f"[refresh_dataframe] Loaded {len(self.dataframe)} logs")  # type: ignore
        self.dataframe_refresh_needed = False
        return self.dataframe  # type: ignore

    def refresh(self, from_date: str | None, to_date: str | None) -> list[dict]:
        """
        获取刷新后的日志列表,支持查询任意时间范围的日志(包括超过30天前的日志)

        当查询超过30天前的日志时,会动态从磁盘加载相应数据。
        基于timestamp字段进行日期过滤。时间戳格式为 ISO 8601 格式(如 "2025-09-08T16:01:07.526954+08:00")

        :param from_date: 开始日期(格式:YYYY-MM-DD 或 datetime.date),含该日期的所有日志
        :param to_date: 结束日期(格式:YYYY-MM-DD 或 datetime.date),含该日期的所有日志
        :return: 按时间戳降序排列的日志字典列表
        """
        from_timestamp = None
        if from_date is not None:
            from_datetime = (
                datetime.strptime(from_date, "%Y-%m-%d")
                .astimezone(TIMEZONE)
                .replace(hour=0, minute=0, second=0, microsecond=0)
            )
            from_timestamp = from_datetime.isoformat(timespec="microseconds")
        to_timestamp = None
        if to_date is not None:
            to_datetime = (
                datetime.strptime(to_date, "%Y-%m-%d")
                .astimezone(TIMEZONE)
                .replace(hour=23, minute=59, second=59, microsecond=999999)
            )
            to_timestamp = to_datetime.isoformat(timespec="microseconds")

        print(
            f"[refresh] Starting to load logs from {from_timestamp} to {to_timestamp}"
        )
        # 如果查询范围超出缓存范围,则加载相应的日志文件
        self.load_logs(from_timestamp=from_timestamp, to_timestamp=to_timestamp)
        if self.dataframe_refresh_needed:
            self.refresh_dataframe()
        df = self.dataframe
        print(f"[refresh] Filtering logs from {from_date} to {to_date}")
        # 创建日期范围过滤条件
        filter_condition = pd.Series([True] * len(df), index=df.index)
        if from_timestamp is not None:
            filter_condition = filter_condition & (df["timestamp"] >= from_timestamp)
        if to_timestamp is not None:
            filter_condition = filter_condition & (df["timestamp"] <= to_timestamp)
        df = df[filter_condition]
        # 按timestamp降序排序(最新日志在前)
        df = df.sort_values(by="timestamp", ascending=False)
        print(f"[refresh] Returning {len(df)} logs")
        return df.to_dict(orient="records")