Spaces:
Running
Running
| """ | |
| a module of logs saving and backuping | |
| """ | |
| import os | |
| import datasets as ds | |
| from apscheduler.schedulers.background import BackgroundScheduler | |
| from tqdm import tqdm | |
| from utils import beijing, md5, json_to_str | |
| from huggingface_hub import HfApi | |
| import pandas as pd | |
| import glob | |
| hf = HfApi() | |
| hf.token = os.environ.get("hf_token") | |
| class LoggingHelper: | |
| def __init__( | |
| self, | |
| repo_id: str, | |
| local_dir: str = "data/logs", | |
| synchronize_interval: int = 60, | |
| ): | |
| """ | |
| :param repo_id: the repo_id of the dataset in huggingface | |
| :param local_dir: a directory in cwd to store downloaded files | |
| :param synchronize_interval: the interval of synchronizing between local and huggingface | |
| """ | |
| self.local_dir = local_dir | |
| self.repo_id = repo_id | |
| self.synchronize_interval = synchronize_interval | |
| self.repo_type = "dataset" | |
| self.scheduler = BackgroundScheduler() | |
| self.buffer = dict[str, ds.Dataset]() | |
| self.need_push = dict[str, bool]() | |
| self.today = beijing().date() | |
| ds.disable_progress_bar() | |
| self.dataframe: pd.DataFrame | |
| # 缓存相关变量 | |
| self.cached_df: pd.DataFrame | None = None | |
| self.loaded_files: set[str] = set() | |
| self.cache_needs_refresh = False | |
| self.pull() | |
| self.start_synchronize() | |
| def addlog(self, log: dict): | |
| """add one log""" | |
| remotedir = self.remotedir() | |
| filename = md5(json_to_str(log))[:2] + ".json" | |
| remotepath = "/".join([remotedir, filename]) | |
| if remotepath in self.buffer: | |
| self.buffer[remotepath] = self.buffer[remotepath].add_item(log) # type: ignore | |
| else: | |
| self.buffer[remotepath] = ds.Dataset.from_dict({}) | |
| self.buffer[remotepath] = self.buffer[remotepath].add_item(log) # type: ignore | |
| self.need_push[remotepath] = True | |
| print("[addlog] Added a log to buffer") | |
| def remotedir(self): | |
| now = beijing() | |
| year = now.year.__str__() | |
| month = now.month.__str__() | |
| day = now.day.__str__() | |
| return "/".join([year, month, day]) | |
| def pull(self): | |
| try: | |
| self.download() | |
| remotedir = self.remotedir() | |
| print(f"[pull] today dir: {remotedir}") | |
| filenames = hf.list_repo_files( | |
| repo_id=self.repo_id, | |
| repo_type=self.repo_type, | |
| ) | |
| files_to_load = [ | |
| filename | |
| for filename in filenames | |
| if filename not in self.buffer | |
| and filename.startswith(remotedir) | |
| and filename.endswith(".json") | |
| ] | |
| print(f"[pull] total {len(files_to_load)} to load") | |
| for filename in tqdm(files_to_load): | |
| print() | |
| path = os.sep.join([self.local_dir, filename]) | |
| with open(path, "r") as f: | |
| data = f.read() | |
| if len(data) != 0: | |
| self.buffer[filename] = ds.Dataset.from_json(path) # type: ignore | |
| self.need_push[filename] = False | |
| return True | |
| except Exception as e: | |
| print(f"[pull] {type(e)}: {e}") | |
| return False | |
| def push_yesterday(self) -> bool: | |
| try: | |
| year = self.today.year.__str__() | |
| month = self.today.month.__str__() | |
| day = self.today.day.__str__() | |
| remotedir = "/".join([year, month, day]) | |
| files_to_push = [] | |
| for filename in self.buffer.keys(): | |
| if not filename.startswith(remotedir): | |
| continue | |
| if not self.need_push[filename]: | |
| del self.buffer[filename] | |
| del self.need_push[filename] | |
| files_to_push.append(filename) | |
| if len(files_to_push) == 0: | |
| return True | |
| print("[push_yesterday] Writing datasets to json files") | |
| for filename in files_to_push: | |
| localpath = os.sep.join([self.local_dir, filename]) | |
| self.buffer[filename].to_json(localpath) | |
| files_to_push.append(filename) | |
| print(f"[push_yesterday] {filename} finished") | |
| print("[push_yesterday] Done") | |
| print("[push_yesterday] Pushing log files to remote") | |
| if len(files_to_push): | |
| localdir = os.sep.join([self.local_dir, remotedir]) | |
| res = hf.upload_folder( | |
| repo_id=self.repo_id, | |
| folder_path=localdir, | |
| path_in_repo=remotedir, | |
| repo_type=self.repo_type, | |
| commit_message=f"Updated at {beijing()}", | |
| ) | |
| print(f"[push_yesterday] Log files pushed to {res}") | |
| print("[push_yesterday] Done") | |
| return True | |
| except Exception as e: | |
| print(f"[push_yesterday] {type(e)}: {e}") | |
| return False | |
| def push(self): | |
| try: | |
| now = beijing().date() | |
| if now != self.today: # new day comes | |
| if not self.push_yesterday(): | |
| print("[push] Failed to upload yesterday's log files") | |
| self.today = now | |
| files_to_push = [ | |
| filename for filename in self.need_push if self.need_push[filename] | |
| ] | |
| if len(files_to_push) == 0: | |
| return True | |
| print("[push] Writing datasets to json files") | |
| for filename in files_to_push: | |
| localpath = os.path.join(self.local_dir, filename) | |
| self.buffer[filename].to_json(localpath) | |
| print(f"[push] {filename} finished") | |
| print("[push] Done") | |
| print("[push] Pushing log files to remote") | |
| remotedir = self.remotedir() | |
| localdir = "/".join([self.local_dir, remotedir]) | |
| res = hf.upload_folder( | |
| repo_id=self.repo_id, | |
| folder_path=localdir, | |
| path_in_repo=remotedir, | |
| repo_type=self.repo_type, | |
| commit_message=f"Updated at {beijing()}", | |
| ) | |
| for filename in files_to_push: | |
| self.need_push[filename] = False | |
| print(f"[push] Log files pushed to {res}") | |
| print("[push] Done") | |
| # 标记缓存需要刷新 | |
| self.cache_needs_refresh = True | |
| return True | |
| except Exception as e: | |
| print(f"[push] {type(e)}: {e}") | |
| return False | |
| def download(self): | |
| print("[download] Starting downloading") | |
| try: | |
| res = hf.snapshot_download( | |
| repo_id=self.repo_id, | |
| repo_type="dataset", | |
| local_dir=self.local_dir, | |
| ) | |
| print(f"[download] Downloaded to {res}") | |
| except Exception as e: | |
| print(f"[download] {type(e)}: {e}") | |
| print("[download] Done") | |
| def start_synchronize(self): | |
| self.scheduler.add_job( | |
| self.push, | |
| "interval", | |
| seconds=self.synchronize_interval, | |
| ) | |
| self.scheduler.start() | |
| def _load_all_logs(self, from_date=None, to_date=None) -> pd.DataFrame: | |
| """ | |
| 加载日志文件并返回合并后的DataFrame | |
| 使用直接路径构造方式高效地检索特定日期范围内的文件 | |
| :param from_date: 开始日期(格式:YYYY-MM-DD或datetime.date),默认为None | |
| :param to_date: 结束日期(格式:YYYY-MM-DD或datetime.date),默认为None | |
| """ | |
| import datetime | |
| print("[_load_all_logs] Starting to load logs") | |
| print(f"[_load_all_logs] Date range: {from_date} to {to_date}") | |
| filepathes = [] | |
| # 确定日期范围 | |
| if from_date is None and to_date is None: | |
| # 如果没有指定范围,扫描所有目录 | |
| files = glob.glob("**/*.json", root_dir=self.local_dir, recursive=True) | |
| filepathes = [os.path.join(self.local_dir, file) for file in files] | |
| else: | |
| # 将日期参数转换为 datetime.date 对象 | |
| start_date = from_date | |
| end_date = to_date | |
| if isinstance(start_date, str): | |
| start_date = datetime.datetime.strptime(start_date, "%Y-%m-%d").date() | |
| if isinstance(end_date, str): | |
| end_date = datetime.datetime.strptime(end_date, "%Y-%m-%d").date() | |
| # 如果只指定了一个日期,设置默认值 | |
| if start_date is None: | |
| start_date = end_date | |
| if end_date is None: | |
| end_date = start_date | |
| # 确保日期不为 None 的类型检查 | |
| if start_date is not None and end_date is not None: | |
| # 直接构造日期范围内的目录路径,避免 glob 遍历 | |
| current_date = start_date | |
| date_dirs = [] | |
| while current_date <= end_date: | |
| year = str(current_date.year) | |
| month = str(current_date.month) | |
| day = str(current_date.day) | |
| date_dir = os.path.join(self.local_dir, year, month, day) | |
| date_dirs.append((date_dir, year, month, day)) | |
| current_date += datetime.timedelta(days=1) | |
| print( | |
| f"[_load_all_logs] Constructed {len(date_dirs)} date directories" | |
| ) | |
| # 从指定日期目录中查找 JSON 文件 | |
| for date_dir, year, month, day in date_dirs: | |
| if os.path.isdir(date_dir): | |
| json_files = glob.glob("*.json", root_dir=date_dir) | |
| for json_file in json_files: | |
| filepathes.append(os.path.join(date_dir, json_file)) | |
| print(f"[_load_all_logs] Found {len(filepathes)} files in date range") | |
| # 加载所有日志文件 | |
| datasets = [] | |
| for path in tqdm(filepathes): | |
| path = str(path) | |
| try: | |
| datasets.append(ds.Dataset.from_json(path)) | |
| except Exception as e: | |
| print(f"[_load_all_logs] Error loading {path}: {e}") | |
| continue | |
| # 合并数据集并排序 | |
| df = pd.DataFrame() | |
| if datasets: | |
| dataset: ds.Dataset = ds.concatenate_datasets(datasets) | |
| df = dataset.to_pandas() | |
| assert isinstance(df, pd.DataFrame) | |
| df = df.sort_values(by="timestamp", ascending=False) | |
| print(f"[_load_all_logs] Loaded {len(df)} logs") | |
| self.loaded_files = set([os.path.relpath(p, self.local_dir) for p in filepathes]) | |
| return df | |
| def refresh(self, from_date=None, to_date=None) -> list[dict]: | |
| """ | |
| 获取刷新后的日志列表,支持日期范围过滤 | |
| :param from_date: 开始日期(格式:YYYY-MM-DD或datetime.date),默认为None | |
| :param to_date: 结束日期(格式:YYYY-MM-DD或datetime.date),默认为None | |
| :return: 日志字典列表 | |
| """ | |
| import datetime | |
| self.push() | |
| # 将字符串日期转换为 datetime.date 对象 | |
| if isinstance(from_date, str): | |
| from_date = datetime.datetime.strptime(from_date, "%Y-%m-%d").date() | |
| if isinstance(to_date, str): | |
| to_date = datetime.datetime.strptime(to_date, "%Y-%m-%d").date() | |
| # 如果没有指定日期范围,使用缓存机制 | |
| if from_date is None and to_date is None: | |
| # 如果缓存需要刷新或者缓存为空,重新加载所有日志 | |
| if self.cache_needs_refresh or self.cached_df is None: | |
| print("[refresh] Cache miss, reloading all logs") | |
| self.cached_df = self._load_all_logs() | |
| self.cache_needs_refresh = False | |
| else: | |
| print("[refresh] Using cached data") | |
| # 返回缓存的DataFrame | |
| if self.cached_df is None or self.cached_df.empty: | |
| return [] | |
| return self.cached_df.to_dict(orient="records") | |
| else: | |
| # 如果指定了日期范围,直接加载不使用缓存 | |
| print("[refresh] Date range specified, loading without cache") | |
| df = self._load_all_logs(from_date=from_date, to_date=to_date) | |
| if df is None or df.empty: | |
| return [] | |
| return df.to_dict(orient="records") | |