Spaces:
Sleeping
Sleeping
| """ | |
| 动作日志记录器 | |
| 用于记录OASIS模拟中每个Agent的动作,供后端监控使用 | |
| 日志结构: | |
| sim_xxx/ | |
| ├── twitter/ | |
| │ └── actions.jsonl # Twitter 平台动作日志 | |
| ├── reddit/ | |
| │ └── actions.jsonl # Reddit 平台动作日志 | |
| ├── simulation.log # 主模拟进程日志 | |
| └── run_state.json # 运行状态(API 查询用) | |
| """ | |
| import json | |
| import os | |
| import logging | |
| from datetime import datetime | |
| from typing import Dict, Any, Optional | |
| class PlatformActionLogger: | |
| """单平台动作日志记录器""" | |
| def __init__(self, platform: str, base_dir: str): | |
| """ | |
| 初始化日志记录器 | |
| Args: | |
| platform: 平台名称 (twitter/reddit) | |
| base_dir: 模拟目录的基础路径 | |
| """ | |
| self.platform = platform | |
| self.base_dir = base_dir | |
| self.log_dir = os.path.join(base_dir, platform) | |
| self.log_path = os.path.join(self.log_dir, "actions.jsonl") | |
| self._ensure_dir() | |
| def _ensure_dir(self): | |
| """确保目录存在""" | |
| os.makedirs(self.log_dir, exist_ok=True) | |
| def log_action( | |
| self, | |
| round_num: int, | |
| agent_id: int, | |
| agent_name: str, | |
| action_type: str, | |
| action_args: Optional[Dict[str, Any]] = None, | |
| result: Optional[str] = None, | |
| success: bool = True | |
| ): | |
| """记录一个动作""" | |
| entry = { | |
| "round": round_num, | |
| "timestamp": datetime.now().isoformat(), | |
| "agent_id": agent_id, | |
| "agent_name": agent_name, | |
| "action_type": action_type, | |
| "action_args": action_args or {}, | |
| "result": result, | |
| "success": success, | |
| } | |
| with open(self.log_path, 'a', encoding='utf-8') as f: | |
| f.write(json.dumps(entry, ensure_ascii=False) + '\n') | |
| def log_round_start(self, round_num: int, simulated_hour: int): | |
| """记录轮次开始""" | |
| entry = { | |
| "round": round_num, | |
| "timestamp": datetime.now().isoformat(), | |
| "event_type": "round_start", | |
| "simulated_hour": simulated_hour, | |
| } | |
| with open(self.log_path, 'a', encoding='utf-8') as f: | |
| f.write(json.dumps(entry, ensure_ascii=False) + '\n') | |
| def log_round_end(self, round_num: int, actions_count: int): | |
| """记录轮次结束""" | |
| entry = { | |
| "round": round_num, | |
| "timestamp": datetime.now().isoformat(), | |
| "event_type": "round_end", | |
| "actions_count": actions_count, | |
| } | |
| with open(self.log_path, 'a', encoding='utf-8') as f: | |
| f.write(json.dumps(entry, ensure_ascii=False) + '\n') | |
| def log_simulation_start(self, config: Dict[str, Any]): | |
| """记录模拟开始""" | |
| entry = { | |
| "timestamp": datetime.now().isoformat(), | |
| "event_type": "simulation_start", | |
| "platform": self.platform, | |
| "total_rounds": config.get("time_config", {}).get("total_simulation_hours", 72) * 2, | |
| "agents_count": len(config.get("agent_configs", [])), | |
| } | |
| with open(self.log_path, 'a', encoding='utf-8') as f: | |
| f.write(json.dumps(entry, ensure_ascii=False) + '\n') | |
| def log_simulation_end(self, total_rounds: int, total_actions: int): | |
| """记录模拟结束""" | |
| entry = { | |
| "timestamp": datetime.now().isoformat(), | |
| "event_type": "simulation_end", | |
| "platform": self.platform, | |
| "total_rounds": total_rounds, | |
| "total_actions": total_actions, | |
| } | |
| with open(self.log_path, 'a', encoding='utf-8') as f: | |
| f.write(json.dumps(entry, ensure_ascii=False) + '\n') | |
| class SimulationLogManager: | |
| """ | |
| 模拟日志管理器 | |
| 统一管理所有日志文件,按平台分离 | |
| """ | |
| def __init__(self, simulation_dir: str): | |
| """ | |
| 初始化日志管理器 | |
| Args: | |
| simulation_dir: 模拟目录路径 | |
| """ | |
| self.simulation_dir = simulation_dir | |
| self.twitter_logger: Optional[PlatformActionLogger] = None | |
| self.reddit_logger: Optional[PlatformActionLogger] = None | |
| self._main_logger: Optional[logging.Logger] = None | |
| # 设置主日志 | |
| self._setup_main_logger() | |
| def _setup_main_logger(self): | |
| """设置主模拟日志""" | |
| log_path = os.path.join(self.simulation_dir, "simulation.log") | |
| # 创建 logger | |
| self._main_logger = logging.getLogger(f"simulation.{os.path.basename(self.simulation_dir)}") | |
| self._main_logger.setLevel(logging.INFO) | |
| self._main_logger.handlers.clear() | |
| # 文件处理器 | |
| file_handler = logging.FileHandler(log_path, encoding='utf-8', mode='w') | |
| file_handler.setLevel(logging.INFO) | |
| file_handler.setFormatter(logging.Formatter( | |
| '%(asctime)s - %(levelname)s - %(message)s', | |
| datefmt='%Y-%m-%d %H:%M:%S' | |
| )) | |
| self._main_logger.addHandler(file_handler) | |
| # 控制台处理器 | |
| console_handler = logging.StreamHandler() | |
| console_handler.setLevel(logging.INFO) | |
| console_handler.setFormatter(logging.Formatter( | |
| '[%(asctime)s] %(message)s', | |
| datefmt='%H:%M:%S' | |
| )) | |
| self._main_logger.addHandler(console_handler) | |
| self._main_logger.propagate = False | |
| def get_twitter_logger(self) -> PlatformActionLogger: | |
| """获取 Twitter 平台日志记录器""" | |
| if self.twitter_logger is None: | |
| self.twitter_logger = PlatformActionLogger("twitter", self.simulation_dir) | |
| return self.twitter_logger | |
| def get_reddit_logger(self) -> PlatformActionLogger: | |
| """获取 Reddit 平台日志记录器""" | |
| if self.reddit_logger is None: | |
| self.reddit_logger = PlatformActionLogger("reddit", self.simulation_dir) | |
| return self.reddit_logger | |
| def log(self, message: str, level: str = "info"): | |
| """记录主日志""" | |
| if self._main_logger: | |
| getattr(self._main_logger, level.lower(), self._main_logger.info)(message) | |
| def info(self, message: str): | |
| self.log(message, "info") | |
| def warning(self, message: str): | |
| self.log(message, "warning") | |
| def error(self, message: str): | |
| self.log(message, "error") | |
| def debug(self, message: str): | |
| self.log(message, "debug") | |
| # ============ 兼容旧接口 ============ | |
| class ActionLogger: | |
| """ | |
| 动作日志记录器(兼容旧接口) | |
| 建议使用 SimulationLogManager 代替 | |
| """ | |
| def __init__(self, log_path: str): | |
| self.log_path = log_path | |
| self._ensure_dir() | |
| def _ensure_dir(self): | |
| log_dir = os.path.dirname(self.log_path) | |
| if log_dir: | |
| os.makedirs(log_dir, exist_ok=True) | |
| def log_action( | |
| self, | |
| round_num: int, | |
| platform: str, | |
| agent_id: int, | |
| agent_name: str, | |
| action_type: str, | |
| action_args: Optional[Dict[str, Any]] = None, | |
| result: Optional[str] = None, | |
| success: bool = True | |
| ): | |
| entry = { | |
| "round": round_num, | |
| "timestamp": datetime.now().isoformat(), | |
| "platform": platform, | |
| "agent_id": agent_id, | |
| "agent_name": agent_name, | |
| "action_type": action_type, | |
| "action_args": action_args or {}, | |
| "result": result, | |
| "success": success, | |
| } | |
| with open(self.log_path, 'a', encoding='utf-8') as f: | |
| f.write(json.dumps(entry, ensure_ascii=False) + '\n') | |
| def log_round_start(self, round_num: int, simulated_hour: int, platform: str): | |
| entry = { | |
| "round": round_num, | |
| "timestamp": datetime.now().isoformat(), | |
| "platform": platform, | |
| "event_type": "round_start", | |
| "simulated_hour": simulated_hour, | |
| } | |
| with open(self.log_path, 'a', encoding='utf-8') as f: | |
| f.write(json.dumps(entry, ensure_ascii=False) + '\n') | |
| def log_round_end(self, round_num: int, actions_count: int, platform: str): | |
| entry = { | |
| "round": round_num, | |
| "timestamp": datetime.now().isoformat(), | |
| "platform": platform, | |
| "event_type": "round_end", | |
| "actions_count": actions_count, | |
| } | |
| with open(self.log_path, 'a', encoding='utf-8') as f: | |
| f.write(json.dumps(entry, ensure_ascii=False) + '\n') | |
| def log_simulation_start(self, platform: str, config: Dict[str, Any]): | |
| entry = { | |
| "timestamp": datetime.now().isoformat(), | |
| "platform": platform, | |
| "event_type": "simulation_start", | |
| "total_rounds": config.get("time_config", {}).get("total_simulation_hours", 72) * 2, | |
| "agents_count": len(config.get("agent_configs", [])), | |
| } | |
| with open(self.log_path, 'a', encoding='utf-8') as f: | |
| f.write(json.dumps(entry, ensure_ascii=False) + '\n') | |
| def log_simulation_end(self, platform: str, total_rounds: int, total_actions: int): | |
| entry = { | |
| "timestamp": datetime.now().isoformat(), | |
| "platform": platform, | |
| "event_type": "simulation_end", | |
| "total_rounds": total_rounds, | |
| "total_actions": total_actions, | |
| } | |
| with open(self.log_path, 'a', encoding='utf-8') as f: | |
| f.write(json.dumps(entry, ensure_ascii=False) + '\n') | |
| # 全局日志实例(兼容旧接口) | |
| _global_logger: Optional[ActionLogger] = None | |
| def get_logger(log_path: Optional[str] = None) -> ActionLogger: | |
| """获取全局日志实例(兼容旧接口)""" | |
| global _global_logger | |
| if log_path: | |
| _global_logger = ActionLogger(log_path) | |
| if _global_logger is None: | |
| _global_logger = ActionLogger("actions.jsonl") | |
| return _global_logger | |