""" a module of logs saving and backuping """ import os import datasets as ds from apscheduler.schedulers.background import BackgroundScheduler from utils import beijing, md5, json_to_str from huggingface_hub import HfApi import pandas as pd import datetime from zoneinfo import ZoneInfo import glob hf = HfApi() hf.token = os.environ.get("hf_token") class LoggingHelper: def __init__( self, repo_id: str, local_dir: str = "data/logs", synchronize_interval: int = 60, cache_days: int = 30, ): """ :param repo_id: the repo_id of the dataset in huggingface :param local_dir: a directory in cwd to store downloaded files :param synchronize_interval: the interval of synchronizing between local and huggingface """ self.cache_days = cache_days self.local_dir = local_dir self.repo_id = repo_id self.synchronize_interval = synchronize_interval self.repo_type = "dataset" self.scheduler = BackgroundScheduler() self.buffer = dict[str, ds.Dataset]() self.need_push = dict[str, bool]() self.today = beijing().date() ds.disable_progress_bar() self.dataframe: pd.DataFrame self.dataframe_refresh_needed = True # 首先下载所有数据 self.pull() # 加载最近30天的日志数据到内存 self.load_logs() self.start_synchronize() def addlog(self, log: dict): """add one log""" remotedir = self.remotedir() filename = md5(json_to_str(log))[:2] + ".json" remotepath = "/".join([remotedir, filename]) if remotepath in self.buffer: self.buffer[remotepath] = self.buffer[remotepath].add_item(log) # type: ignore else: self.buffer[remotepath] = ds.Dataset.from_dict({}) self.buffer[remotepath] = self.buffer[remotepath].add_item(log) # type: ignore self.need_push[remotepath] = True self.dataframe_refresh_needed = True print("[addlog] Added a log to buffer") def remotedir(self): now = beijing() year = now.year.__str__() month = now.month.__str__() day = now.day.__str__() return "/".join([year, month, day]) def push_yesterday(self) -> bool: try: year = self.today.year.__str__() month = self.today.month.__str__() day = self.today.day.__str__() remotedir = "/".join([year, month, day]) files_to_push = [] for filename in self.buffer.keys(): if not filename.startswith(remotedir): continue files_to_push.append(filename) if len(files_to_push) == 0: return True print("[push_yesterday] Writing datasets to json files") for filename in files_to_push: localpath = os.sep.join([self.local_dir, filename]) self.buffer[filename].to_json(localpath) files_to_push.append(filename) print(f"[push_yesterday] {filename} finished") print("[push_yesterday] Done") print("[push_yesterday] Pushing log files to remote") if len(files_to_push): localdir = os.sep.join([self.local_dir, remotedir]) res = hf.upload_folder( repo_id=self.repo_id, folder_path=localdir, path_in_repo=remotedir, repo_type=self.repo_type, commit_message=f"Updated at {beijing()}", ) print(f"[push_yesterday] Log files pushed to {res}") print("[push_yesterday] Done") return True except Exception as e: print(f"[push_yesterday] {type(e)}: {e}") return False def push(self): try: now = beijing().date() if now != self.today: # new day comes if not self.push_yesterday(): print("[push] Failed to upload yesterday's log files") self.today = now files_to_push = [ filename for filename in self.need_push if self.need_push[filename] ] if len(files_to_push) == 0: return True print("[push] Writing datasets to json files") for filename in files_to_push: localpath = os.path.join(self.local_dir, filename) self.buffer[filename].to_json(localpath) print(f"[push] {filename} finished") print("[push] Done") print("[push] Pushing log files to remote") remotedir = self.remotedir() localdir = "/".join([self.local_dir, remotedir]) res = hf.upload_folder( repo_id=self.repo_id, folder_path=localdir, path_in_repo=remotedir, repo_type=self.repo_type, commit_message=f"Updated at {beijing()}", ) for filename in files_to_push: self.need_push[filename] = False print(f"[push] Log files pushed to {res}") print("[push] Done") return True except Exception as e: print(f"[push] {type(e)}: {e}") return False def pull(self): print("[pull] Starting downloading") try: res = hf.snapshot_download( repo_id=self.repo_id, repo_type="dataset", local_dir=self.local_dir, ) print(f"[pull] Downloaded to {res}") except Exception as e: print(f"[pull] {type(e)}: {e}") print("[pull] Done") def get_pathes_between( self, from_date: datetime.date, to_date: datetime.date ) -> list[str]: """ 获取指定日期范围内的路径列表 :param from_date: 开始日期(格式:YYYY-MM-DD 或 datetime.date),含该日期 :param to_date: 结束日期(格式:YYYY-MM-DD 或 datetime.date),含该日期 :return: 日期范围内的路径列表,格式为 ["YYYY/MM/DD", ...] """ pathes = [] current_date = from_date while current_date <= to_date: pathes.append(f"{current_date.year}/{current_date.month}/{current_date.day}") current_date += datetime.timedelta(days=1) return pathes def load_logs(self): """ 在启动时加载最近30天的日志数据到内存buffer """ print("[load_logs] Starting to load recent 30 days logs") try: today = beijing().date() start_date = today - datetime.timedelta(days=self.cache_days) print(f"Loading logs from {start_date} to {today}") # 生成最近30天的日期范围 pathes = self.get_pathes_between(start_date, today) total_files_loaded = 0 # 遍历每一天的日志 for path in pathes: date_path = "/".join([self.local_dir, path]) print(f"[load_logs] Processing directory: {date_path}") # 检查该日期的目录是否存在 if not os.path.exists(date_path): print(f"[load_logs] Directory not found: {date_path}") continue # 加载该目录下的所有JSON文件 json_files = glob.glob(os.path.join(date_path, "*.json")) for json_file in json_files: # 构造相对路径作为buffer的key relative_path = os.path.relpath(json_file, self.local_dir).replace( os.sep, "/" ) try: # 检查文件是否为空 if os.path.getsize(json_file) == 0: print(f"[load_logs] Skipping empty file: {relative_path}") continue # 加载JSON数据到Dataset dataset = ds.Dataset.from_json(json_file) if isinstance(dataset, ds.Dataset): self.buffer[relative_path] = dataset self.need_push[relative_path] = False total_files_loaded += 1 except Exception as e: print(f"[load_logs] Error loading {relative_path}: {e}") continue print(f"[load_logs] Successfully loaded {total_files_loaded} log files") print(f"[load_logs] Total datasets in buffer: {len(self.buffer)}") except Exception as e: print(f"[load_logs] Error: {type(e)}: {e}") def cleanup_old_logs(self): """清理buffer中超过30天的日志数据""" try: print("[cleanup_old_logs] Starting cleanup of old logs") cache_dir_to_remove = ( self.today - datetime.timedelta(days=(self.cache_days + 1)) ).strftime("%Y/%m/%d") print( f"[cleanup_old_logs] Removing logs in {cache_dir_to_remove} from buffer" ) removed_count = 0 for filepath in list(self.buffer.keys()): if filepath.startswith(cache_dir_to_remove): del self.buffer[filepath] del self.need_push[filepath] removed_count += 1 print(f"[cleanup_old_logs] Cleaned up {removed_count} old log files") print( f"[cleanup_old_logs] Remaining datasets in buffer: {len(self.buffer)}" ) print("[cleanup_old_logs] Done") except Exception as e: print(f"[cleanup_old_logs] Error: {type(e)}: {e}") def start_synchronize(self): self.scheduler.add_job( self.push, "interval", seconds=self.synchronize_interval, ) # 添加每日清理任务,在每天凌晨2点执行 self.scheduler.add_job( self.cleanup_old_logs, "cron", hour=2, minute=0, ) self.scheduler.start() def refresh_dataframe(self) -> pd.DataFrame: """ 加载最近30天的日志文件并返回合并后的DataFrame """ datasets = list(self.buffer.values()) merged_dataset = ds.concatenate_datasets(datasets) self.dataframe = merged_dataset.to_pandas() # type: ignore print(f"[refresh_dataframe] Loaded {len(self.dataframe)} logs") # type: ignore self.dataframe_refresh_needed = False return self.dataframe # type: ignore def refresh(self, from_date=None, to_date=None) -> list[dict]: """ 获取刷新后的日志列表,从内存buffer中合并Dataset,支持日期范围过滤 基于timestamp字段进行日期过滤。时间戳格式为 ISO 8601 格式(如 "2025-09-08T16:01:07.526954+08:00") :param from_date: 开始日期(格式:YYYY-MM-DD 或 datetime.date),含该日期的所有日志 :param to_date: 结束日期(格式:YYYY-MM-DD 或 datetime.date),含该日期的所有日志 :return: 按时间戳降序排列的日志字典列表 """ if self.dataframe_refresh_needed: self.refresh_dataframe() df = self.dataframe # 将字符串日期转换为 datetime.date 对象 tz = ZoneInfo("Asia/Shanghai") if isinstance(from_date, str): from_date = ( datetime.datetime.strptime(from_date, "%Y-%m-%d") .astimezone(tz) .isoformat(timespec="microseconds") ) if isinstance(to_date, str): to_date = datetime.datetime.strptime(to_date, "%Y-%m-%d").astimezone(tz) to_date += datetime.timedelta(days=1) # 包含结束日期全天 to_date = to_date.isoformat(timespec="microseconds") print(f"[refresh] Filtering logs from {from_date} to {to_date}") # 按timestamp范围过滤(包含边界日期的全天数据) if from_date is not None or to_date is not None: # 创建日期范围过滤条件 filter_condition = pd.Series([True] * len(df), index=df.index) if from_date is not None: filter_condition = filter_condition & (df["timestamp"] >= from_date) if to_date is not None: filter_condition = filter_condition & (df["timestamp"] < to_date) df = df[filter_condition] # 按timestamp降序排序(最新日志在前) df = df.sort_values(by="timestamp", ascending=False) print(f"[refresh] Returning {len(df)} logs") return df.to_dict(orient="records")