Beracles commited on
Commit
346738b
·
1 Parent(s): e7d4173

优化日志加载和清理功能,支持时间戳管理和日期范围过滤

Browse files
Files changed (1) hide show
  1. logging_helper.py +130 -81
logging_helper.py CHANGED
@@ -8,13 +8,15 @@ from apscheduler.schedulers.background import BackgroundScheduler
8
  from utils import beijing, md5, json_to_str
9
  from huggingface_hub import HfApi
10
  import pandas as pd
11
- import datetime
12
  from zoneinfo import ZoneInfo
13
  import glob
14
 
15
  hf = HfApi()
16
  hf.token = os.environ.get("hf_token")
17
 
 
 
18
 
19
  class LoggingHelper:
20
 
@@ -39,6 +41,7 @@ class LoggingHelper:
39
  self.scheduler = BackgroundScheduler()
40
  self.buffer = dict[str, ds.Dataset]()
41
  self.need_push = dict[str, bool]()
 
42
  self.today = beijing().date()
43
  ds.disable_progress_bar()
44
  self.dataframe: pd.DataFrame
@@ -58,6 +61,7 @@ class LoggingHelper:
58
  self.buffer[remotepath] = self.buffer[remotepath].add_item(log) # type: ignore
59
  else:
60
  self.buffer[remotepath] = ds.Dataset.from_dict({})
 
61
  self.buffer[remotepath] = self.buffer[remotepath].add_item(log) # type: ignore
62
  self.need_push[remotepath] = True
63
  self.dataframe_refresh_needed = True
@@ -149,17 +153,34 @@ class LoggingHelper:
149
  try:
150
  res = hf.snapshot_download(
151
  repo_id=self.repo_id,
152
- repo_type="dataset",
153
  local_dir=self.local_dir,
154
  )
155
  print(f"[pull] Downloaded to {res}")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
156
  except Exception as e:
157
  print(f"[pull] {type(e)}: {e}")
158
  print("[pull] Done")
159
 
160
- def get_pathes_between(
161
- self, from_date: datetime.date, to_date: datetime.date
162
- ) -> list[str]:
163
  """
164
  获取指定日期范围内的路径列表
165
 
@@ -167,79 +188,109 @@ class LoggingHelper:
167
  :param to_date: 结束日期(格式:YYYY-MM-DD 或 datetime.date),含该日期
168
  :return: 日期范围内的路径列表,格式为 ["YYYY/MM/DD", ...]
169
  """
170
- pathes = []
171
  current_date = from_date
172
  while current_date <= to_date:
173
- pathes.append(f"{current_date.year}/{current_date.month}/{current_date.day}")
174
- current_date += datetime.timedelta(days=1)
 
 
 
 
175
  return pathes
176
 
177
- def load_logs(self):
 
 
178
  """
179
  在启动时加载最近30天的日志数据到内存buffer
180
  """
181
- print("[load_logs] Starting to load recent 30 days logs")
182
-
183
  try:
184
- today = beijing().date()
185
- start_date = today - datetime.timedelta(days=self.cache_days)
186
- print(f"Loading logs from {start_date} to {today}")
187
- # 生成最近30天的日期范围
188
- pathes = self.get_pathes_between(start_date, today)
189
- total_files_loaded = 0
190
 
191
- # 遍历每一天的日志
192
- for path in pathes:
193
- date_path = "/".join([self.local_dir, path])
194
- print(f"[load_logs] Processing directory: {date_path}")
195
- # 检查该日期的目录是否存在
196
- if not os.path.exists(date_path):
197
- print(f"[load_logs] Directory not found: {date_path}")
 
 
 
 
198
  continue
199
- # 加载该目录下的所有JSON文件
200
- json_files = glob.glob(os.path.join(date_path, "*.json"))
201
-
202
- for json_file in json_files:
203
- # 构造相对路径作为buffer的key
204
- relative_path = os.path.relpath(json_file, self.local_dir).replace(
205
- os.sep, "/"
206
- )
207
- try:
208
- # 检查文件是否为空
209
- if os.path.getsize(json_file) == 0:
210
- print(f"[load_logs] Skipping empty file: {relative_path}")
211
- continue
212
-
213
- # 加载JSON数据到Dataset
214
- dataset = ds.Dataset.from_json(json_file)
215
- if isinstance(dataset, ds.Dataset):
216
- self.buffer[relative_path] = dataset
217
- self.need_push[relative_path] = False
218
- total_files_loaded += 1
219
- except Exception as e:
220
- print(f"[load_logs] Error loading {relative_path}: {e}")
221
  continue
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
222
  print(f"[load_logs] Successfully loaded {total_files_loaded} log files")
223
  print(f"[load_logs] Total datasets in buffer: {len(self.buffer)}")
224
  except Exception as e:
225
  print(f"[load_logs] Error: {type(e)}: {e}")
226
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
227
  def cleanup_old_logs(self):
228
- """清理buffer中超过30天的日志数据"""
 
 
 
 
 
229
  try:
230
  print("[cleanup_old_logs] Starting cleanup of old logs")
231
- cache_dir_to_remove = (
232
- self.today - datetime.timedelta(days=(self.cache_days + 1))
233
- ).strftime("%Y/%m/%d")
234
- print(
235
- f"[cleanup_old_logs] Removing logs in {cache_dir_to_remove} from buffer"
236
- )
237
  removed_count = 0
238
  for filepath in list(self.buffer.keys()):
239
- if filepath.startswith(cache_dir_to_remove):
 
 
 
 
 
 
240
  del self.buffer[filepath]
241
  del self.need_push[filepath]
242
  removed_count += 1
 
 
 
 
243
 
244
  print(f"[cleanup_old_logs] Cleaned up {removed_count} old log files")
245
  print(
@@ -266,9 +317,7 @@ class LoggingHelper:
266
  self.scheduler.start()
267
 
268
  def refresh_dataframe(self) -> pd.DataFrame:
269
- """
270
- 加载最近30天的日志文件并返回合并后的DataFrame
271
- """
272
  datasets = list(self.buffer.values())
273
  merged_dataset = ds.concatenate_datasets(datasets)
274
  self.dataframe = merged_dataset.to_pandas() # type: ignore
@@ -276,42 +325,42 @@ class LoggingHelper:
276
  self.dataframe_refresh_needed = False
277
  return self.dataframe # type: ignore
278
 
279
- def refresh(self, from_date=None, to_date=None) -> list[dict]:
280
  """
281
- 获取刷新后的日志列表,从内存buffer中合并Dataset,支持日期范围过滤
282
 
 
283
  基于timestamp字段进行日期过滤。时间戳格式为 ISO 8601 格式(如 "2025-09-08T16:01:07.526954+08:00")
284
 
285
  :param from_date: 开始日期(格式:YYYY-MM-DD 或 datetime.date),含该日期的所有日志
286
  :param to_date: 结束日期(格式:YYYY-MM-DD 或 datetime.date),含该日期的所有日志
287
  :return: 按时间戳降序排列的日志字典列表
288
  """
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
289
  if self.dataframe_refresh_needed:
290
  self.refresh_dataframe()
291
-
292
  df = self.dataframe
293
- # 将字符串日期转换为 datetime.date 对象
294
- tz = ZoneInfo("Asia/Shanghai")
295
- if isinstance(from_date, str):
296
- from_date = (
297
- datetime.datetime.strptime(from_date, "%Y-%m-%d")
298
- .astimezone(tz)
299
- .isoformat(timespec="microseconds")
300
- )
301
- if isinstance(to_date, str):
302
- to_date = datetime.datetime.strptime(to_date, "%Y-%m-%d").astimezone(tz)
303
- to_date += datetime.timedelta(days=1) # 包含结束日期全天
304
- to_date = to_date.isoformat(timespec="microseconds")
305
  print(f"[refresh] Filtering logs from {from_date} to {to_date}")
306
- # 按timestamp范围过滤(包含边界日期的全天数据)
307
- if from_date is not None or to_date is not None:
308
- # 创建日期范围过滤条件
309
- filter_condition = pd.Series([True] * len(df), index=df.index)
310
- if from_date is not None:
311
- filter_condition = filter_condition & (df["timestamp"] >= from_date)
312
- if to_date is not None:
313
- filter_condition = filter_condition & (df["timestamp"] < to_date)
314
- df = df[filter_condition]
315
  # 按timestamp降序排序(最新日志在前)
316
  df = df.sort_values(by="timestamp", ascending=False)
317
  print(f"[refresh] Returning {len(df)} logs")
 
8
  from utils import beijing, md5, json_to_str
9
  from huggingface_hub import HfApi
10
  import pandas as pd
11
+ from datetime import datetime, date, timedelta
12
  from zoneinfo import ZoneInfo
13
  import glob
14
 
15
  hf = HfApi()
16
  hf.token = os.environ.get("hf_token")
17
 
18
+ TIMEZONE = ZoneInfo("Asia/Shanghai")
19
+
20
 
21
  class LoggingHelper:
22
 
 
41
  self.scheduler = BackgroundScheduler()
42
  self.buffer = dict[str, ds.Dataset]()
43
  self.need_push = dict[str, bool]()
44
+ self.timestamps = dict[str, str]()
45
  self.today = beijing().date()
46
  ds.disable_progress_bar()
47
  self.dataframe: pd.DataFrame
 
61
  self.buffer[remotepath] = self.buffer[remotepath].add_item(log) # type: ignore
62
  else:
63
  self.buffer[remotepath] = ds.Dataset.from_dict({})
64
+ self.timestamps[remotepath] = beijing().isoformat(timespec="microseconds")
65
  self.buffer[remotepath] = self.buffer[remotepath].add_item(log) # type: ignore
66
  self.need_push[remotepath] = True
67
  self.dataframe_refresh_needed = True
 
153
  try:
154
  res = hf.snapshot_download(
155
  repo_id=self.repo_id,
156
+ repo_type=self.repo_type,
157
  local_dir=self.local_dir,
158
  )
159
  print(f"[pull] Downloaded to {res}")
160
+ remotepathes = hf.list_repo_files(
161
+ repo_id=self.repo_id, repo_type=self.repo_type
162
+ )
163
+ jsonfiles = [f for f in remotepathes if f.endswith(".json")]
164
+ print(f"[pull] {len(jsonfiles)} files found in remote repo")
165
+ print("[pull] Parsing timestamps")
166
+ for remotepath in jsonfiles:
167
+ try:
168
+ parts = remotepath.split("/")
169
+ year, month, day = parts[0], parts[1], parts[2]
170
+ date_obj = date(int(year), int(month), int(day))
171
+ timestamp = datetime.combine(
172
+ date_obj, datetime.min.time()
173
+ ).isoformat(timespec="microseconds")
174
+ self.timestamps[remotepath] = timestamp
175
+ except Exception as e:
176
+ print(f"[pull] Error parsing timestamp of {remotepath}: {e}")
177
+ continue
178
+ print("[pull] Done")
179
  except Exception as e:
180
  print(f"[pull] {type(e)}: {e}")
181
  print("[pull] Done")
182
 
183
+ def get_pathes_between(self, from_date: date, to_date: date) -> dict[str, str]:
 
 
184
  """
185
  获取指定日期范围内的路径列表
186
 
 
188
  :param to_date: 结束日期(格式:YYYY-MM-DD 或 datetime.date),含该日期
189
  :return: 日期范围内的路径列表,格式为 ["YYYY/MM/DD", ...]
190
  """
191
+ pathes = {}
192
  current_date = from_date
193
  while current_date <= to_date:
194
+ key = f"{current_date.year}/{current_date.month}/{current_date.day}"
195
+ value = datetime.combine(current_date, datetime.min.time()).isoformat(
196
+ timespec="microseconds"
197
+ )
198
+ pathes[key] = value
199
+ current_date += timedelta(days=1)
200
  return pathes
201
 
202
+ def load_logs(
203
+ self, from_timestamp: str | None = None, to_timestamp: str | None = None
204
+ ):
205
  """
206
  在启动时加载最近30天的日志数据到内存buffer
207
  """
 
 
208
  try:
 
 
 
 
 
 
209
 
210
+ start_timestamp = self.cutoff_timestamp()
211
+ end_timestamp = (
212
+ beijing()
213
+ .replace(hour=23, minute=59, second=59, microsecond=999999)
214
+ .isoformat(timespec="microseconds")
215
+ )
216
+ from_timestamp = from_timestamp or start_timestamp
217
+ to_timestamp = to_timestamp or end_timestamp
218
+ total_files_loaded = 0
219
+ for remotepath, timestamp in self.timestamps.items():
220
+ if timestamp < from_timestamp or timestamp >= to_timestamp:
221
  continue
222
+ localpath = "/".join([self.local_dir, remotepath])
223
+ print(f"[load_logs] Loading file {localpath}")
224
+ # 检查该文件是否存在
225
+ if not os.path.exists(localpath):
226
+ print(f"[load_logs] File not found: {localpath}")
227
+ continue
228
+ try:
229
+ # 检查文件是否为空
230
+ if os.path.getsize(localpath) == 0:
231
+ print(f"[load_logs] Skipping empty file: {remotepath}")
 
 
 
 
 
 
 
 
 
 
 
 
232
  continue
233
+ if remotepath in self.buffer:
234
+ print(f"[load_logs] File already loaded: {remotepath}")
235
+ continue
236
+ # 加载JSON数据到Dataset
237
+ dataset = ds.Dataset.from_json(localpath)
238
+ if isinstance(dataset, ds.Dataset):
239
+ self.buffer[remotepath] = dataset
240
+ self.need_push[remotepath] = False
241
+ self.timestamps[remotepath] = timestamp
242
+ total_files_loaded += 1
243
+ except Exception as e:
244
+ print(f"[load_logs] Error loading {remotepath}: {e}")
245
+ continue
246
+ if total_files_loaded > 0:
247
+ self.dataframe_refresh_needed = True
248
  print(f"[load_logs] Successfully loaded {total_files_loaded} log files")
249
  print(f"[load_logs] Total datasets in buffer: {len(self.buffer)}")
250
  except Exception as e:
251
  print(f"[load_logs] Error: {type(e)}: {e}")
252
 
253
+ def cutoff_timestamp(self) -> str:
254
+ """
255
+ 计算用于清理日志的截止时间戳
256
+
257
+ :return: 截止时间戳,格式为 ISO 8601 字符串
258
+ """
259
+ cutoff_date = self.today - timedelta(days=self.cache_days)
260
+ cutoff_timestamp = (
261
+ datetime.combine(cutoff_date, datetime.min.time())
262
+ .astimezone(TIMEZONE)
263
+ .isoformat(timespec="microseconds")
264
+ )
265
+ return cutoff_timestamp
266
+
267
  def cleanup_old_logs(self):
268
+ """
269
+ 清理buffer中超过30天的日志数据
270
+
271
+ 保留逻辑:保留最近cache_days天的日志
272
+ 删除逻辑:删除早于 (today - cache_days) 的所有日志
273
+ """
274
  try:
275
  print("[cleanup_old_logs] Starting cleanup of old logs")
276
+ # 计算应该保留的最早日期(含这一天)
277
+ start_timestamp = self.cutoff_timestamp()
 
 
 
 
278
  removed_count = 0
279
  for filepath in list(self.buffer.keys()):
280
+ # filepath 格式类似 "2025/9/23/xx.json"
281
+ # 提取日期部分 "2025/9/23"
282
+ try:
283
+ timestamp = self.timestamps[filepath]
284
+ # 如果文件日期早于截断日期,则删除
285
+ if timestamp >= start_timestamp:
286
+ continue
287
  del self.buffer[filepath]
288
  del self.need_push[filepath]
289
  removed_count += 1
290
+ print(f"[cleanup_old_logs] Removed {filepath}")
291
+ except (ValueError, IndexError) as e:
292
+ print(f"[cleanup_old_logs] Error parsing filepath {filepath}: {e}")
293
+ continue
294
 
295
  print(f"[cleanup_old_logs] Cleaned up {removed_count} old log files")
296
  print(
 
317
  self.scheduler.start()
318
 
319
  def refresh_dataframe(self) -> pd.DataFrame:
320
+ """内存中所有日志数据合并为一个DataFrame"""
 
 
321
  datasets = list(self.buffer.values())
322
  merged_dataset = ds.concatenate_datasets(datasets)
323
  self.dataframe = merged_dataset.to_pandas() # type: ignore
 
325
  self.dataframe_refresh_needed = False
326
  return self.dataframe # type: ignore
327
 
328
+ def refresh(self, from_date: str | None, to_date: str | None) -> list[dict]:
329
  """
330
+ 获取刷新后的日志列表,支持查询任意时间范围的日志(包括超过30天前的日志)
331
 
332
+ 当查询超过30天前的日志时,会动态从磁盘加载相应数据。
333
  基于timestamp字段进行日期过滤。时间戳格式为 ISO 8601 格式(如 "2025-09-08T16:01:07.526954+08:00")
334
 
335
  :param from_date: 开始日期(格式:YYYY-MM-DD 或 datetime.date),含该日期的所有日志
336
  :param to_date: 结束日期(格式:YYYY-MM-DD 或 datetime.date),含该日期的所有日志
337
  :return: 按时间戳降序排列的日志字典列表
338
  """
339
+ from_timestamp = None
340
+ if from_date is not None:
341
+ from_datetime = datetime.strptime(from_date, "%Y-%m-%d").astimezone(
342
+ TIMEZONE
343
+ )
344
+ from_timestamp = from_datetime.isoformat(timespec="microseconds")
345
+ to_timestamp = None
346
+ if to_date is not None:
347
+ to_datetime = datetime.strptime(to_date, "%Y-%m-%d").astimezone(
348
+ TIMEZONE
349
+ ) + timedelta(days=1)
350
+ to_timestamp = to_datetime.isoformat(timespec="microseconds")
351
+
352
+ print("[load_logs] Starting to load recent 30 days logs")
353
+ # 如果查询范围超出缓存范围,则加载相应的日志文件
354
+ self.load_logs(from_timestamp=from_timestamp, to_timestamp=to_timestamp)
355
  if self.dataframe_refresh_needed:
356
  self.refresh_dataframe()
 
357
  df = self.dataframe
 
 
 
 
 
 
 
 
 
 
 
 
358
  print(f"[refresh] Filtering logs from {from_date} to {to_date}")
359
+ # 创建日期范围过滤条件
360
+ filter_condition = pd.Series([True] * len(df), index=df.index)
361
+ filter_condition = filter_condition & (df["timestamp"] >= from_date)
362
+ filter_condition = filter_condition & (df["timestamp"] < to_date)
363
+ df = df[filter_condition]
 
 
 
 
364
  # 按timestamp降序排序(最新日志在前)
365
  df = df.sort_values(by="timestamp", ascending=False)
366
  print(f"[refresh] Returning {len(df)} logs")