LogDisplayer / logging_helper.py
Beracles's picture
修正日期处理逻辑,确保从日期转换为时间戳时小时、分钟、秒和微秒均为零
0506561
"""
a module of logs saving and backuping
"""
import os
import datasets as ds
from apscheduler.schedulers.background import BackgroundScheduler
from utils import beijing, md5, json_to_str
from huggingface_hub import HfApi
import pandas as pd
from tqdm import tqdm
from datetime import datetime, date, timedelta
from zoneinfo import ZoneInfo
hf = HfApi()
hf.token = os.environ.get("hf_token")
TIMEZONE = ZoneInfo("Asia/Shanghai")
class LoggingHelper:
def __init__(
self,
repo_id: str,
local_dir: str = "data/logs",
synchronize_interval: int = 60,
cache_days: int = 30,
):
"""
:param repo_id: the repo_id of the dataset in huggingface
:param local_dir: a directory in cwd to store downloaded files
:param synchronize_interval: the interval of synchronizing between local and huggingface
"""
self.cache_days = cache_days
self.local_dir = local_dir
self.repo_id = repo_id
self.synchronize_interval = synchronize_interval
self.repo_type = "dataset"
self.scheduler = BackgroundScheduler()
self.buffer = dict[str, ds.Dataset]()
self.need_push = dict[str, bool]()
self.timestamps = dict[str, str]()
self.today = beijing().date()
ds.disable_progress_bar()
self.dataframe: pd.DataFrame
self.dataframe_refresh_needed = True
# 首先下载所有数据
self.pull()
# 加载最近30天的日志数据到内存
self.load_logs()
self.start_synchronize()
def addlog(self, log: dict):
"""add one log"""
remotedir = self.remotedir()
filename = md5(json_to_str(log))[:2] + ".json"
remotepath = "/".join([remotedir, filename])
if remotepath in self.buffer:
self.buffer[remotepath] = self.buffer[remotepath].add_item(log) # type: ignore
else:
self.buffer[remotepath] = ds.Dataset.from_dict({})
self.timestamps[remotepath] = beijing().isoformat(timespec="microseconds")
self.buffer[remotepath] = self.buffer[remotepath].add_item(log) # type: ignore
self.need_push[remotepath] = True
self.dataframe_refresh_needed = True
print("[addlog] Added a log to buffer")
def remotedir(self):
now = beijing()
year = now.year.__str__()
month = now.month.__str__()
day = now.day.__str__()
return "/".join([year, month, day])
def push_yesterday(self) -> bool:
try:
print("[push_yesterday] Pushing yesterday's log files to remote")
year = self.today.year.__str__()
month = self.today.month.__str__()
day = self.today.day.__str__()
remotedir = "/".join([year, month, day])
localdir = os.sep.join([self.local_dir, remotedir])
res = hf.upload_folder(
repo_id=self.repo_id,
folder_path=localdir,
path_in_repo=remotedir,
repo_type=self.repo_type,
commit_message=f"Updated at {beijing()}",
)
print(f"[push_yesterday] Log files pushed to {res}")
print("[push_yesterday] Done")
return True
except Exception as e:
print(f"[push_yesterday] {type(e)}: {e}")
return False
def push(self):
try:
files_to_push = [
filename for filename in self.need_push if self.need_push[filename]
]
print("[push] Writing datasets to json files")
for filename in files_to_push:
localpath = os.path.join(self.local_dir, filename)
self.buffer[filename].to_json(localpath)
print(f"[push] {filename} finished")
self.need_push[filename] = False
now = beijing().date()
if len(files_to_push) == 0:
print("[push] Done")
return True
if now != self.today: # new day comes
if not self.push_yesterday():
print("[push] Failed to upload yesterday's log files")
self.today = now
print("[push] Pushing log files to remote")
remotedir = self.remotedir()
localdir = "/".join([self.local_dir, remotedir])
res = hf.upload_folder(
repo_id=self.repo_id,
folder_path=localdir,
path_in_repo=remotedir,
repo_type=self.repo_type,
commit_message=f"Updated at {beijing()}",
)
print(f"[push] Log files pushed to {res}")
print("[push] Done")
return True
except Exception as e:
print(f"[push] {type(e)}: {e}")
return False
def pull(self):
print("[pull] Starting downloading")
try:
res = hf.snapshot_download(
repo_id=self.repo_id,
repo_type=self.repo_type,
local_dir=self.local_dir,
)
print(f"[pull] Downloaded to {res}")
remotepathes = hf.list_repo_files(
repo_id=self.repo_id, repo_type=self.repo_type
)
jsonfiles = [f for f in remotepathes if f.endswith(".json")]
print(f"[pull] {len(jsonfiles)} files found in remote repo")
print("[pull] Parsing timestamps")
for remotepath in jsonfiles:
try:
parts = remotepath.split("/")
year, month, day = parts[0], parts[1], parts[2]
date_obj = date(int(year), int(month), int(day))
timestamp = (
datetime.combine(date_obj, datetime.min.time())
.astimezone(TIMEZONE)
.isoformat(timespec="microseconds")
)
self.timestamps[remotepath] = timestamp
except Exception as e:
print(f"[pull] Error parsing timestamp of {remotepath}: {e}")
continue
print("[pull] Done")
except Exception as e:
print(f"[pull] {type(e)}: {e}")
print("[pull] Done")
def get_pathes_between(self, from_date: date, to_date: date) -> dict[str, str]:
"""
获取指定日期范围内的路径列表
:param from_date: 开始日期(格式:YYYY-MM-DD 或 datetime.date),含该日期
:param to_date: 结束日期(格式:YYYY-MM-DD 或 datetime.date),含该日期
:return: 日期范围内的路径列表,格式为 ["YYYY/MM/DD", ...]
"""
pathes = {}
current_date = from_date
while current_date <= to_date:
key = f"{current_date.year}/{current_date.month}/{current_date.day}"
value = datetime.combine(current_date, datetime.min.time()).isoformat(
timespec="microseconds"
)
pathes[key] = value
current_date += timedelta(days=1)
return pathes
def load_logs(
self, from_timestamp: str | None = None, to_timestamp: str | None = None
):
"""
在启动时加载最近30天的日志数据到内存buffer
"""
try:
start_timestamp = self.cutoff_timestamp()
end_timestamp = (
beijing()
.replace(hour=23, minute=59, second=59, microsecond=999999)
.isoformat(timespec="microseconds")
)
from_timestamp = from_timestamp or start_timestamp
to_timestamp = to_timestamp or end_timestamp
total_files_loaded = 0
for remotepath, timestamp in tqdm(self.timestamps.items()):
if timestamp < from_timestamp or timestamp > to_timestamp:
continue
localpath = "/".join([self.local_dir, remotepath])
# print(f"[load_logs] Loading file {localpath}")
# 检查该文件是否存在
if not os.path.exists(localpath):
# print(f"[load_logs] File not found: {localpath}")
continue
try:
# 检查文件是否为空
if os.path.getsize(localpath) == 0:
# print(f"[load_logs] Skipping empty file: {remotepath}")
continue
if remotepath in self.buffer:
# print(f"[load_logs] File already loaded: {remotepath}")
continue
# 加载JSON数据到Dataset
dataset = ds.Dataset.from_json(localpath)
if isinstance(dataset, ds.Dataset):
self.buffer[remotepath] = dataset
self.need_push[remotepath] = False
self.timestamps[remotepath] = timestamp
total_files_loaded += 1
except Exception as e:
print(f"[load_logs] Error loading {remotepath}: {e}")
continue
if total_files_loaded > 0:
self.dataframe_refresh_needed = True
print(f"[load_logs] Successfully loaded {total_files_loaded} log files")
print(f"[load_logs] Total datasets in buffer: {len(self.buffer)}")
except Exception as e:
print(f"[load_logs] Error: {type(e)}: {e}")
def cutoff_timestamp(self) -> str:
"""
计算用于清理日志的截止时间戳
:return: 截止时间戳,格式为 ISO 8601 字符串
"""
cutoff_date = self.today - timedelta(days=self.cache_days)
cutoff_timestamp = (
datetime.combine(cutoff_date, datetime.min.time())
.astimezone(TIMEZONE)
.isoformat(timespec="microseconds")
)
return cutoff_timestamp
def cleanup_old_logs(self):
"""
清理buffer中超过30天的日志数据
保留逻辑:保留最近cache_days天的日志
删除逻辑:删除早于 (today - cache_days) 的所有日志
"""
try:
print("[cleanup_old_logs] Starting cleanup of old logs")
# 计算应该保留的最早日期(含这一天)
start_timestamp = self.cutoff_timestamp()
removed_count = 0
for filepath in list(self.buffer.keys()):
# filepath 格式类似 "2025/9/23/xx.json"
# 提取日期部分 "2025/9/23"
try:
timestamp = self.timestamps[filepath]
# 如果文件日期早于截断日期,则删除
if timestamp >= start_timestamp:
continue
del self.buffer[filepath]
del self.need_push[filepath]
removed_count += 1
print(f"[cleanup_old_logs] Removed {filepath}")
except (ValueError, IndexError) as e:
print(f"[cleanup_old_logs] Error parsing filepath {filepath}: {e}")
continue
print(f"[cleanup_old_logs] Cleaned up {removed_count} old log files")
print(
f"[cleanup_old_logs] Remaining datasets in buffer: {len(self.buffer)}"
)
print("[cleanup_old_logs] Done")
except Exception as e:
print(f"[cleanup_old_logs] Error: {type(e)}: {e}")
def start_synchronize(self):
self.scheduler.add_job(
self.push,
"interval",
seconds=self.synchronize_interval,
)
# 添加每日清理任务,在每天凌晨2点执行
self.scheduler.add_job(
self.cleanup_old_logs,
"cron",
hour=2,
minute=0,
)
self.scheduler.start()
def refresh_dataframe(self) -> pd.DataFrame:
"""内存中所有日志数据合并为一个DataFrame"""
datasets = list(self.buffer.values())
merged_dataset = ds.concatenate_datasets(datasets)
self.dataframe = merged_dataset.to_pandas() # type: ignore
print(f"[refresh_dataframe] Loaded {len(self.dataframe)} logs") # type: ignore
self.dataframe_refresh_needed = False
return self.dataframe # type: ignore
def refresh(self, from_date: str | None, to_date: str | None) -> list[dict]:
"""
获取刷新后的日志列表,支持查询任意时间范围的日志(包括超过30天前的日志)
当查询超过30天前的日志时,会动态从磁盘加载相应数据。
基于timestamp字段进行日期过滤。时间戳格式为 ISO 8601 格式(如 "2025-09-08T16:01:07.526954+08:00")
:param from_date: 开始日期(格式:YYYY-MM-DD 或 datetime.date),含该日期的所有日志
:param to_date: 结束日期(格式:YYYY-MM-DD 或 datetime.date),含该日期的所有日志
:return: 按时间戳降序排列的日志字典列表
"""
from_timestamp = None
if from_date is not None:
from_datetime = (
datetime.strptime(from_date, "%Y-%m-%d")
.astimezone(TIMEZONE)
.replace(hour=0, minute=0, second=0, microsecond=0)
)
from_timestamp = from_datetime.isoformat(timespec="microseconds")
to_timestamp = None
if to_date is not None:
to_datetime = (
datetime.strptime(to_date, "%Y-%m-%d")
.astimezone(TIMEZONE)
.replace(hour=23, minute=59, second=59, microsecond=999999)
)
to_timestamp = to_datetime.isoformat(timespec="microseconds")
print(
f"[refresh] Starting to load logs from {from_timestamp} to {to_timestamp}"
)
# 如果查询范围超出缓存范围,则加载相应的日志文件
self.load_logs(from_timestamp=from_timestamp, to_timestamp=to_timestamp)
if self.dataframe_refresh_needed:
self.refresh_dataframe()
df = self.dataframe
print(f"[refresh] Filtering logs from {from_date} to {to_date}")
# 创建日期范围过滤条件
filter_condition = pd.Series([True] * len(df), index=df.index)
if from_timestamp is not None:
filter_condition = filter_condition & (df["timestamp"] >= from_timestamp)
if to_timestamp is not None:
filter_condition = filter_condition & (df["timestamp"] <= to_timestamp)
df = df[filter_condition]
# 按timestamp降序排序(最新日志在前)
df = df.sort_values(by="timestamp", ascending=False)
print(f"[refresh] Returning {len(df)} logs")
return df.to_dict(orient="records")