Spaces:
Running
Running
| """ | |
| a module of logs saving and backuping | |
| """ | |
| import os | |
| import datasets as ds | |
| from apscheduler.schedulers.background import BackgroundScheduler | |
| from utils import beijing, md5, json_to_str | |
| from huggingface_hub import HfApi | |
| import pandas as pd | |
| from tqdm import tqdm | |
| from datetime import datetime, date, timedelta | |
| from zoneinfo import ZoneInfo | |
| hf = HfApi() | |
| hf.token = os.environ.get("hf_token") | |
| TIMEZONE = ZoneInfo("Asia/Shanghai") | |
| class LoggingHelper: | |
| def __init__( | |
| self, | |
| repo_id: str, | |
| local_dir: str = "data/logs", | |
| synchronize_interval: int = 60, | |
| cache_days: int = 30, | |
| ): | |
| """ | |
| :param repo_id: the repo_id of the dataset in huggingface | |
| :param local_dir: a directory in cwd to store downloaded files | |
| :param synchronize_interval: the interval of synchronizing between local and huggingface | |
| """ | |
| self.cache_days = cache_days | |
| self.local_dir = local_dir | |
| self.repo_id = repo_id | |
| self.synchronize_interval = synchronize_interval | |
| self.repo_type = "dataset" | |
| self.scheduler = BackgroundScheduler() | |
| self.buffer = dict[str, ds.Dataset]() | |
| self.need_push = dict[str, bool]() | |
| self.timestamps = dict[str, str]() | |
| self.today = beijing().date() | |
| ds.disable_progress_bar() | |
| self.dataframe: pd.DataFrame | |
| self.dataframe_refresh_needed = True | |
| # 首先下载所有数据 | |
| self.pull() | |
| # 加载最近30天的日志数据到内存 | |
| self.load_logs() | |
| self.start_synchronize() | |
| def addlog(self, log: dict): | |
| """add one log""" | |
| remotedir = self.remotedir() | |
| filename = md5(json_to_str(log))[:2] + ".json" | |
| remotepath = "/".join([remotedir, filename]) | |
| if remotepath in self.buffer: | |
| self.buffer[remotepath] = self.buffer[remotepath].add_item(log) # type: ignore | |
| else: | |
| self.buffer[remotepath] = ds.Dataset.from_dict({}) | |
| self.timestamps[remotepath] = beijing().isoformat(timespec="microseconds") | |
| self.buffer[remotepath] = self.buffer[remotepath].add_item(log) # type: ignore | |
| self.need_push[remotepath] = True | |
| self.dataframe_refresh_needed = True | |
| print("[addlog] Added a log to buffer") | |
| def remotedir(self): | |
| now = beijing() | |
| year = now.year.__str__() | |
| month = now.month.__str__() | |
| day = now.day.__str__() | |
| return "/".join([year, month, day]) | |
| def push_yesterday(self) -> bool: | |
| try: | |
| print("[push_yesterday] Pushing yesterday's log files to remote") | |
| year = self.today.year.__str__() | |
| month = self.today.month.__str__() | |
| day = self.today.day.__str__() | |
| remotedir = "/".join([year, month, day]) | |
| localdir = os.sep.join([self.local_dir, remotedir]) | |
| res = hf.upload_folder( | |
| repo_id=self.repo_id, | |
| folder_path=localdir, | |
| path_in_repo=remotedir, | |
| repo_type=self.repo_type, | |
| commit_message=f"Updated at {beijing()}", | |
| ) | |
| print(f"[push_yesterday] Log files pushed to {res}") | |
| print("[push_yesterday] Done") | |
| return True | |
| except Exception as e: | |
| print(f"[push_yesterday] {type(e)}: {e}") | |
| return False | |
| def push(self): | |
| try: | |
| files_to_push = [ | |
| filename for filename in self.need_push if self.need_push[filename] | |
| ] | |
| print("[push] Writing datasets to json files") | |
| for filename in files_to_push: | |
| localpath = os.path.join(self.local_dir, filename) | |
| self.buffer[filename].to_json(localpath) | |
| print(f"[push] {filename} finished") | |
| self.need_push[filename] = False | |
| now = beijing().date() | |
| if len(files_to_push) == 0: | |
| print("[push] Done") | |
| return True | |
| if now != self.today: # new day comes | |
| if not self.push_yesterday(): | |
| print("[push] Failed to upload yesterday's log files") | |
| self.today = now | |
| print("[push] Pushing log files to remote") | |
| remotedir = self.remotedir() | |
| localdir = "/".join([self.local_dir, remotedir]) | |
| res = hf.upload_folder( | |
| repo_id=self.repo_id, | |
| folder_path=localdir, | |
| path_in_repo=remotedir, | |
| repo_type=self.repo_type, | |
| commit_message=f"Updated at {beijing()}", | |
| ) | |
| print(f"[push] Log files pushed to {res}") | |
| print("[push] Done") | |
| return True | |
| except Exception as e: | |
| print(f"[push] {type(e)}: {e}") | |
| return False | |
| def pull(self): | |
| print("[pull] Starting downloading") | |
| try: | |
| res = hf.snapshot_download( | |
| repo_id=self.repo_id, | |
| repo_type=self.repo_type, | |
| local_dir=self.local_dir, | |
| ) | |
| print(f"[pull] Downloaded to {res}") | |
| remotepathes = hf.list_repo_files( | |
| repo_id=self.repo_id, repo_type=self.repo_type | |
| ) | |
| jsonfiles = [f for f in remotepathes if f.endswith(".json")] | |
| print(f"[pull] {len(jsonfiles)} files found in remote repo") | |
| print("[pull] Parsing timestamps") | |
| for remotepath in jsonfiles: | |
| try: | |
| parts = remotepath.split("/") | |
| year, month, day = parts[0], parts[1], parts[2] | |
| date_obj = date(int(year), int(month), int(day)) | |
| timestamp = ( | |
| datetime.combine(date_obj, datetime.min.time()) | |
| .astimezone(TIMEZONE) | |
| .isoformat(timespec="microseconds") | |
| ) | |
| self.timestamps[remotepath] = timestamp | |
| except Exception as e: | |
| print(f"[pull] Error parsing timestamp of {remotepath}: {e}") | |
| continue | |
| print("[pull] Done") | |
| except Exception as e: | |
| print(f"[pull] {type(e)}: {e}") | |
| print("[pull] Done") | |
| def get_pathes_between(self, from_date: date, to_date: date) -> dict[str, str]: | |
| """ | |
| 获取指定日期范围内的路径列表 | |
| :param from_date: 开始日期(格式:YYYY-MM-DD 或 datetime.date),含该日期 | |
| :param to_date: 结束日期(格式:YYYY-MM-DD 或 datetime.date),含该日期 | |
| :return: 日期范围内的路径列表,格式为 ["YYYY/MM/DD", ...] | |
| """ | |
| pathes = {} | |
| current_date = from_date | |
| while current_date <= to_date: | |
| key = f"{current_date.year}/{current_date.month}/{current_date.day}" | |
| value = datetime.combine(current_date, datetime.min.time()).isoformat( | |
| timespec="microseconds" | |
| ) | |
| pathes[key] = value | |
| current_date += timedelta(days=1) | |
| return pathes | |
| def load_logs( | |
| self, from_timestamp: str | None = None, to_timestamp: str | None = None | |
| ): | |
| """ | |
| 在启动时加载最近30天的日志数据到内存buffer | |
| """ | |
| try: | |
| start_timestamp = self.cutoff_timestamp() | |
| end_timestamp = ( | |
| beijing() | |
| .replace(hour=23, minute=59, second=59, microsecond=999999) | |
| .isoformat(timespec="microseconds") | |
| ) | |
| from_timestamp = from_timestamp or start_timestamp | |
| to_timestamp = to_timestamp or end_timestamp | |
| total_files_loaded = 0 | |
| for remotepath, timestamp in tqdm(self.timestamps.items()): | |
| if timestamp < from_timestamp or timestamp > to_timestamp: | |
| continue | |
| localpath = "/".join([self.local_dir, remotepath]) | |
| # print(f"[load_logs] Loading file {localpath}") | |
| # 检查该文件是否存在 | |
| if not os.path.exists(localpath): | |
| # print(f"[load_logs] File not found: {localpath}") | |
| continue | |
| try: | |
| # 检查文件是否为空 | |
| if os.path.getsize(localpath) == 0: | |
| # print(f"[load_logs] Skipping empty file: {remotepath}") | |
| continue | |
| if remotepath in self.buffer: | |
| # print(f"[load_logs] File already loaded: {remotepath}") | |
| continue | |
| # 加载JSON数据到Dataset | |
| dataset = ds.Dataset.from_json(localpath) | |
| if isinstance(dataset, ds.Dataset): | |
| self.buffer[remotepath] = dataset | |
| self.need_push[remotepath] = False | |
| self.timestamps[remotepath] = timestamp | |
| total_files_loaded += 1 | |
| except Exception as e: | |
| print(f"[load_logs] Error loading {remotepath}: {e}") | |
| continue | |
| if total_files_loaded > 0: | |
| self.dataframe_refresh_needed = True | |
| print(f"[load_logs] Successfully loaded {total_files_loaded} log files") | |
| print(f"[load_logs] Total datasets in buffer: {len(self.buffer)}") | |
| except Exception as e: | |
| print(f"[load_logs] Error: {type(e)}: {e}") | |
| def cutoff_timestamp(self) -> str: | |
| """ | |
| 计算用于清理日志的截止时间戳 | |
| :return: 截止时间戳,格式为 ISO 8601 字符串 | |
| """ | |
| cutoff_date = self.today - timedelta(days=self.cache_days) | |
| cutoff_timestamp = ( | |
| datetime.combine(cutoff_date, datetime.min.time()) | |
| .astimezone(TIMEZONE) | |
| .isoformat(timespec="microseconds") | |
| ) | |
| return cutoff_timestamp | |
| def cleanup_old_logs(self): | |
| """ | |
| 清理buffer中超过30天的日志数据 | |
| 保留逻辑:保留最近cache_days天的日志 | |
| 删除逻辑:删除早于 (today - cache_days) 的所有日志 | |
| """ | |
| try: | |
| print("[cleanup_old_logs] Starting cleanup of old logs") | |
| # 计算应该保留的最早日期(含这一天) | |
| start_timestamp = self.cutoff_timestamp() | |
| removed_count = 0 | |
| for filepath in list(self.buffer.keys()): | |
| # filepath 格式类似 "2025/9/23/xx.json" | |
| # 提取日期部分 "2025/9/23" | |
| try: | |
| timestamp = self.timestamps[filepath] | |
| # 如果文件日期早于截断日期,则删除 | |
| if timestamp >= start_timestamp: | |
| continue | |
| del self.buffer[filepath] | |
| del self.need_push[filepath] | |
| removed_count += 1 | |
| print(f"[cleanup_old_logs] Removed {filepath}") | |
| except (ValueError, IndexError) as e: | |
| print(f"[cleanup_old_logs] Error parsing filepath {filepath}: {e}") | |
| continue | |
| print(f"[cleanup_old_logs] Cleaned up {removed_count} old log files") | |
| print( | |
| f"[cleanup_old_logs] Remaining datasets in buffer: {len(self.buffer)}" | |
| ) | |
| print("[cleanup_old_logs] Done") | |
| except Exception as e: | |
| print(f"[cleanup_old_logs] Error: {type(e)}: {e}") | |
| def start_synchronize(self): | |
| self.scheduler.add_job( | |
| self.push, | |
| "interval", | |
| seconds=self.synchronize_interval, | |
| ) | |
| # 添加每日清理任务,在每天凌晨2点执行 | |
| self.scheduler.add_job( | |
| self.cleanup_old_logs, | |
| "cron", | |
| hour=2, | |
| minute=0, | |
| ) | |
| self.scheduler.start() | |
| def refresh_dataframe(self) -> pd.DataFrame: | |
| """内存中所有日志数据合并为一个DataFrame""" | |
| datasets = list(self.buffer.values()) | |
| merged_dataset = ds.concatenate_datasets(datasets) | |
| self.dataframe = merged_dataset.to_pandas() # type: ignore | |
| print(f"[refresh_dataframe] Loaded {len(self.dataframe)} logs") # type: ignore | |
| self.dataframe_refresh_needed = False | |
| return self.dataframe # type: ignore | |
| def refresh(self, from_date: str | None, to_date: str | None) -> list[dict]: | |
| """ | |
| 获取刷新后的日志列表,支持查询任意时间范围的日志(包括超过30天前的日志) | |
| 当查询超过30天前的日志时,会动态从磁盘加载相应数据。 | |
| 基于timestamp字段进行日期过滤。时间戳格式为 ISO 8601 格式(如 "2025-09-08T16:01:07.526954+08:00") | |
| :param from_date: 开始日期(格式:YYYY-MM-DD 或 datetime.date),含该日期的所有日志 | |
| :param to_date: 结束日期(格式:YYYY-MM-DD 或 datetime.date),含该日期的所有日志 | |
| :return: 按时间戳降序排列的日志字典列表 | |
| """ | |
| from_timestamp = None | |
| if from_date is not None: | |
| from_datetime = ( | |
| datetime.strptime(from_date, "%Y-%m-%d") | |
| .astimezone(TIMEZONE) | |
| .replace(hour=0, minute=0, second=0, microsecond=0) | |
| ) | |
| from_timestamp = from_datetime.isoformat(timespec="microseconds") | |
| to_timestamp = None | |
| if to_date is not None: | |
| to_datetime = ( | |
| datetime.strptime(to_date, "%Y-%m-%d") | |
| .astimezone(TIMEZONE) | |
| .replace(hour=23, minute=59, second=59, microsecond=999999) | |
| ) | |
| to_timestamp = to_datetime.isoformat(timespec="microseconds") | |
| print( | |
| f"[refresh] Starting to load logs from {from_timestamp} to {to_timestamp}" | |
| ) | |
| # 如果查询范围超出缓存范围,则加载相应的日志文件 | |
| self.load_logs(from_timestamp=from_timestamp, to_timestamp=to_timestamp) | |
| if self.dataframe_refresh_needed: | |
| self.refresh_dataframe() | |
| df = self.dataframe | |
| print(f"[refresh] Filtering logs from {from_date} to {to_date}") | |
| # 创建日期范围过滤条件 | |
| filter_condition = pd.Series([True] * len(df), index=df.index) | |
| if from_timestamp is not None: | |
| filter_condition = filter_condition & (df["timestamp"] >= from_timestamp) | |
| if to_timestamp is not None: | |
| filter_condition = filter_condition & (df["timestamp"] <= to_timestamp) | |
| df = df[filter_condition] | |
| # 按timestamp降序排序(最新日志在前) | |
| df = df.sort_values(by="timestamp", ascending=False) | |
| print(f"[refresh] Returning {len(df)} logs") | |
| return df.to_dict(orient="records") | |