LogDisplayer / logging_helper.py
Beracles's picture
优化日志加载功能,支持日期范围过滤并增强前端日期选择器
88be5d0
raw
history blame
12.6 kB
"""
a module of logs saving and backuping
"""
import os
import datasets as ds
from apscheduler.schedulers.background import BackgroundScheduler
from tqdm import tqdm
from utils import beijing, md5, json_to_str
from huggingface_hub import HfApi
import pandas as pd
import glob
hf = HfApi()
hf.token = os.environ.get("hf_token")
class LoggingHelper:
def __init__(
self,
repo_id: str,
local_dir: str = "data/logs",
synchronize_interval: int = 60,
):
"""
:param repo_id: the repo_id of the dataset in huggingface
:param local_dir: a directory in cwd to store downloaded files
:param synchronize_interval: the interval of synchronizing between local and huggingface
"""
self.local_dir = local_dir
self.repo_id = repo_id
self.synchronize_interval = synchronize_interval
self.repo_type = "dataset"
self.scheduler = BackgroundScheduler()
self.buffer = dict[str, ds.Dataset]()
self.need_push = dict[str, bool]()
self.today = beijing().date()
ds.disable_progress_bar()
self.dataframe: pd.DataFrame
# 缓存相关变量
self.cached_df: pd.DataFrame | None = None
self.loaded_files: set[str] = set()
self.cache_needs_refresh = False
self.pull()
self.start_synchronize()
def addlog(self, log: dict):
"""add one log"""
remotedir = self.remotedir()
filename = md5(json_to_str(log))[:2] + ".json"
remotepath = "/".join([remotedir, filename])
if remotepath in self.buffer:
self.buffer[remotepath] = self.buffer[remotepath].add_item(log) # type: ignore
else:
self.buffer[remotepath] = ds.Dataset.from_dict({})
self.buffer[remotepath] = self.buffer[remotepath].add_item(log) # type: ignore
self.need_push[remotepath] = True
print("[addlog] Added a log to buffer")
def remotedir(self):
now = beijing()
year = now.year.__str__()
month = now.month.__str__()
day = now.day.__str__()
return "/".join([year, month, day])
def pull(self):
try:
self.download()
remotedir = self.remotedir()
print(f"[pull] today dir: {remotedir}")
filenames = hf.list_repo_files(
repo_id=self.repo_id,
repo_type=self.repo_type,
)
files_to_load = [
filename
for filename in filenames
if filename not in self.buffer
and filename.startswith(remotedir)
and filename.endswith(".json")
]
print(f"[pull] total {len(files_to_load)} to load")
for filename in tqdm(files_to_load):
print()
path = os.sep.join([self.local_dir, filename])
with open(path, "r") as f:
data = f.read()
if len(data) != 0:
self.buffer[filename] = ds.Dataset.from_json(path) # type: ignore
self.need_push[filename] = False
return True
except Exception as e:
print(f"[pull] {type(e)}: {e}")
return False
def push_yesterday(self) -> bool:
try:
year = self.today.year.__str__()
month = self.today.month.__str__()
day = self.today.day.__str__()
remotedir = "/".join([year, month, day])
files_to_push = []
for filename in self.buffer.keys():
if not filename.startswith(remotedir):
continue
if not self.need_push[filename]:
del self.buffer[filename]
del self.need_push[filename]
files_to_push.append(filename)
if len(files_to_push) == 0:
return True
print("[push_yesterday] Writing datasets to json files")
for filename in files_to_push:
localpath = os.sep.join([self.local_dir, filename])
self.buffer[filename].to_json(localpath)
files_to_push.append(filename)
print(f"[push_yesterday] {filename} finished")
print("[push_yesterday] Done")
print("[push_yesterday] Pushing log files to remote")
if len(files_to_push):
localdir = os.sep.join([self.local_dir, remotedir])
res = hf.upload_folder(
repo_id=self.repo_id,
folder_path=localdir,
path_in_repo=remotedir,
repo_type=self.repo_type,
commit_message=f"Updated at {beijing()}",
)
print(f"[push_yesterday] Log files pushed to {res}")
print("[push_yesterday] Done")
return True
except Exception as e:
print(f"[push_yesterday] {type(e)}: {e}")
return False
def push(self):
try:
now = beijing().date()
if now != self.today: # new day comes
if not self.push_yesterday():
print("[push] Failed to upload yesterday's log files")
self.today = now
files_to_push = [
filename for filename in self.need_push if self.need_push[filename]
]
if len(files_to_push) == 0:
return True
print("[push] Writing datasets to json files")
for filename in files_to_push:
localpath = os.path.join(self.local_dir, filename)
self.buffer[filename].to_json(localpath)
print(f"[push] {filename} finished")
print("[push] Done")
print("[push] Pushing log files to remote")
remotedir = self.remotedir()
localdir = "/".join([self.local_dir, remotedir])
res = hf.upload_folder(
repo_id=self.repo_id,
folder_path=localdir,
path_in_repo=remotedir,
repo_type=self.repo_type,
commit_message=f"Updated at {beijing()}",
)
for filename in files_to_push:
self.need_push[filename] = False
print(f"[push] Log files pushed to {res}")
print("[push] Done")
# 标记缓存需要刷新
self.cache_needs_refresh = True
return True
except Exception as e:
print(f"[push] {type(e)}: {e}")
return False
def download(self):
print("[download] Starting downloading")
try:
res = hf.snapshot_download(
repo_id=self.repo_id,
repo_type="dataset",
local_dir=self.local_dir,
)
print(f"[download] Downloaded to {res}")
except Exception as e:
print(f"[download] {type(e)}: {e}")
print("[download] Done")
def start_synchronize(self):
self.scheduler.add_job(
self.push,
"interval",
seconds=self.synchronize_interval,
)
self.scheduler.start()
def _load_all_logs(self, from_date=None, to_date=None) -> pd.DataFrame:
"""
加载日志文件并返回合并后的DataFrame
使用直接路径构造方式高效地检索特定日期范围内的文件
:param from_date: 开始日期(格式:YYYY-MM-DD或datetime.date),默认为None
:param to_date: 结束日期(格式:YYYY-MM-DD或datetime.date),默认为None
"""
import datetime
print("[_load_all_logs] Starting to load logs")
print(f"[_load_all_logs] Date range: {from_date} to {to_date}")
filepathes = []
# 确定日期范围
if from_date is None and to_date is None:
# 如果没有指定范围,扫描所有目录
files = glob.glob("**/*.json", root_dir=self.local_dir, recursive=True)
filepathes = [os.path.join(self.local_dir, file) for file in files]
else:
# 将日期参数转换为 datetime.date 对象
start_date = from_date
end_date = to_date
if isinstance(start_date, str):
start_date = datetime.datetime.strptime(start_date, "%Y-%m-%d").date()
if isinstance(end_date, str):
end_date = datetime.datetime.strptime(end_date, "%Y-%m-%d").date()
# 如果只指定了一个日期,设置默认值
if start_date is None:
start_date = end_date
if end_date is None:
end_date = start_date
# 确保日期不为 None 的类型检查
if start_date is not None and end_date is not None:
# 直接构造日期范围内的目录路径,避免 glob 遍历
current_date = start_date
date_dirs = []
while current_date <= end_date:
year = str(current_date.year)
month = str(current_date.month)
day = str(current_date.day)
date_dir = os.path.join(self.local_dir, year, month, day)
date_dirs.append((date_dir, year, month, day))
current_date += datetime.timedelta(days=1)
print(
f"[_load_all_logs] Constructed {len(date_dirs)} date directories"
)
# 从指定日期目录中查找 JSON 文件
for date_dir, year, month, day in date_dirs:
if os.path.isdir(date_dir):
json_files = glob.glob("*.json", root_dir=date_dir)
for json_file in json_files:
filepathes.append(os.path.join(date_dir, json_file))
print(f"[_load_all_logs] Found {len(filepathes)} files in date range")
# 加载所有日志文件
datasets = []
for path in tqdm(filepathes):
path = str(path)
try:
datasets.append(ds.Dataset.from_json(path))
except Exception as e:
print(f"[_load_all_logs] Error loading {path}: {e}")
continue
# 合并数据集并排序
df = pd.DataFrame()
if datasets:
dataset: ds.Dataset = ds.concatenate_datasets(datasets)
df = dataset.to_pandas()
assert isinstance(df, pd.DataFrame)
df = df.sort_values(by="timestamp", ascending=False)
print(f"[_load_all_logs] Loaded {len(df)} logs")
self.loaded_files = set([os.path.relpath(p, self.local_dir) for p in filepathes])
return df
def refresh(self, from_date=None, to_date=None) -> list[dict]:
"""
获取刷新后的日志列表,支持日期范围过滤
:param from_date: 开始日期(格式:YYYY-MM-DD或datetime.date),默认为None
:param to_date: 结束日期(格式:YYYY-MM-DD或datetime.date),默认为None
:return: 日志字典列表
"""
import datetime
self.push()
# 将字符串日期转换为 datetime.date 对象
if isinstance(from_date, str):
from_date = datetime.datetime.strptime(from_date, "%Y-%m-%d").date()
if isinstance(to_date, str):
to_date = datetime.datetime.strptime(to_date, "%Y-%m-%d").date()
# 如果没有指定日期范围,使用缓存机制
if from_date is None and to_date is None:
# 如果缓存需要刷新或者缓存为空,重新加载所有日志
if self.cache_needs_refresh or self.cached_df is None:
print("[refresh] Cache miss, reloading all logs")
self.cached_df = self._load_all_logs()
self.cache_needs_refresh = False
else:
print("[refresh] Using cached data")
# 返回缓存的DataFrame
if self.cached_df is None or self.cached_df.empty:
return []
return self.cached_df.to_dict(orient="records")
else:
# 如果指定了日期范围,直接加载不使用缓存
print("[refresh] Date range specified, loading without cache")
df = self._load_all_logs(from_date=from_date, to_date=to_date)
if df is None or df.empty:
return []
return df.to_dict(orient="records")