""" 模拟相关API路由 Step2: Zep实体读取与过滤、OASIS模拟准备与运行(全程自动化) """ import os import traceback from flask import request, jsonify, send_file from . import simulation_bp from ..config import Config from ..utils.logger import get_logger from ..models.project import ProjectManager logger = get_logger('mirofish.api.simulation') class _LazyObjectProxy: def __init__(self, loader): self._loader = loader def _target(self): return self._loader() def __call__(self, *args, **kwargs): return self._target()(*args, **kwargs) def __getattr__(self, name): return getattr(self._target(), name) def _load_zep_entity_reader(): from ..services.zep_entity_reader import ZepEntityReader return ZepEntityReader def _load_oasis_profile_generator(): from ..services.oasis_profile_generator import OasisProfileGenerator return OasisProfileGenerator def _load_simulation_manager(): from ..services.simulation_manager import SimulationManager return SimulationManager def _load_simulation_status(): from ..services.simulation_manager import SimulationStatus return SimulationStatus def _load_simulation_runner(): from ..services.simulation_runner import SimulationRunner SimulationRunner.register_cleanup() return SimulationRunner def _load_runner_status(): from ..services.simulation_runner import RunnerStatus return RunnerStatus ZepEntityReader = _LazyObjectProxy(_load_zep_entity_reader) OasisProfileGenerator = _LazyObjectProxy(_load_oasis_profile_generator) SimulationManager = _LazyObjectProxy(_load_simulation_manager) SimulationStatus = _LazyObjectProxy(_load_simulation_status) SimulationRunner = _LazyObjectProxy(_load_simulation_runner) RunnerStatus = _LazyObjectProxy(_load_runner_status) # Interview prompt 优化前缀 # 添加此前缀可以避免Agent调用工具,直接用文本回复 INTERVIEW_PROMPT_PREFIX = "结合你的人设、所有的过往记忆与行动,不调用任何工具直接用文本回复我:" def optimize_interview_prompt(prompt: str) -> str: """ 优化Interview提问,添加前缀避免Agent调用工具 Args: prompt: 原始提问 Returns: 优化后的提问 """ if not prompt: return prompt # 避免重复添加前缀 if prompt.startswith(INTERVIEW_PROMPT_PREFIX): return prompt return f"{INTERVIEW_PROMPT_PREFIX}{prompt}" # ============== 实体读取接口 ============== @simulation_bp.route('/entities/', methods=['GET']) def get_graph_entities(graph_id: str): """ 获取图谱中的所有实体(已过滤) 只返回符合预定义实体类型的节点(Labels不只是Entity的节点) Query参数: entity_types: 逗号分隔的实体类型列表(可选,用于进一步过滤) enrich: 是否获取相关边信息(默认true) """ try: if not Config.ZEP_API_KEY: return jsonify({ "success": False, "error": "ZEP_API_KEY未配置" }), 500 entity_types_str = request.args.get('entity_types', '') entity_types = [t.strip() for t in entity_types_str.split(',') if t.strip()] if entity_types_str else None enrich = request.args.get('enrich', 'true').lower() == 'true' logger.info(f"获取图谱实体: graph_id={graph_id}, entity_types={entity_types}, enrich={enrich}") reader = ZepEntityReader() result = reader.filter_defined_entities( graph_id=graph_id, defined_entity_types=entity_types, enrich_with_edges=enrich ) return jsonify({ "success": True, "data": result.to_dict() }) except Exception as e: logger.error(f"获取图谱实体失败: {str(e)}") return jsonify({ "success": False, "error": str(e), "traceback": traceback.format_exc() }), 500 @simulation_bp.route('/entities//', methods=['GET']) def get_entity_detail(graph_id: str, entity_uuid: str): """获取单个实体的详细信息""" try: if not Config.ZEP_API_KEY: return jsonify({ "success": False, "error": "ZEP_API_KEY未配置" }), 500 reader = ZepEntityReader() entity = reader.get_entity_with_context(graph_id, entity_uuid) if not entity: return jsonify({ "success": False, "error": f"实体不存在: {entity_uuid}" }), 404 return jsonify({ "success": True, "data": entity.to_dict() }) except Exception as e: logger.error(f"获取实体详情失败: {str(e)}") return jsonify({ "success": False, "error": str(e), "traceback": traceback.format_exc() }), 500 @simulation_bp.route('/entities//by-type/', methods=['GET']) def get_entities_by_type(graph_id: str, entity_type: str): """获取指定类型的所有实体""" try: if not Config.ZEP_API_KEY: return jsonify({ "success": False, "error": "ZEP_API_KEY未配置" }), 500 enrich = request.args.get('enrich', 'true').lower() == 'true' reader = ZepEntityReader() entities = reader.get_entities_by_type( graph_id=graph_id, entity_type=entity_type, enrich_with_edges=enrich ) return jsonify({ "success": True, "data": { "entity_type": entity_type, "count": len(entities), "entities": [e.to_dict() for e in entities] } }) except Exception as e: logger.error(f"获取实体失败: {str(e)}") return jsonify({ "success": False, "error": str(e), "traceback": traceback.format_exc() }), 500 # ============== 模拟管理接口 ============== @simulation_bp.route('/create', methods=['POST']) def create_simulation(): """ 创建新的模拟 注意:max_rounds等参数由LLM智能生成,无需手动设置 请求(JSON): { "project_id": "proj_xxxx", // 必填 "graph_id": "mirofish_xxxx", // 可选,如不提供则从project获取 "enable_twitter": true, // 可选,默认true "enable_reddit": true // 可选,默认true } 返回: { "success": true, "data": { "simulation_id": "sim_xxxx", "project_id": "proj_xxxx", "graph_id": "mirofish_xxxx", "status": "created", "enable_twitter": true, "enable_reddit": true, "created_at": "2025-12-01T10:00:00" } } """ try: data = request.get_json() or {} project_id = data.get('project_id') if not project_id: return jsonify({ "success": False, "error": "请提供 project_id" }), 400 project = ProjectManager.get_project(project_id) if not project: return jsonify({ "success": False, "error": f"项目不存在: {project_id}" }), 404 graph_id = data.get('graph_id') or project.graph_id if not graph_id: return jsonify({ "success": False, "error": "项目尚未构建图谱,请先调用 /api/graph/build" }), 400 manager = SimulationManager() state = manager.create_simulation( project_id=project_id, graph_id=graph_id, enable_twitter=data.get('enable_twitter', True), enable_reddit=data.get('enable_reddit', True), ) return jsonify({ "success": True, "data": state.to_dict() }) except Exception as e: logger.error(f"创建模拟失败: {str(e)}") return jsonify({ "success": False, "error": str(e), "traceback": traceback.format_exc() }), 500 def _check_simulation_prepared(simulation_id: str) -> tuple: """ 检查模拟是否已经准备完成 检查条件: 1. state.json 存在且 status 为 "ready" 2. 必要文件存在:reddit_profiles.json, twitter_profiles.csv, simulation_config.json 注意:运行脚本(run_*.py)保留在 backend/scripts/ 目录,不再复制到模拟目录 Args: simulation_id: 模拟ID Returns: (is_prepared: bool, info: dict) """ import os from ..config import Config simulation_dir = os.path.join(Config.OASIS_SIMULATION_DATA_DIR, simulation_id) # 检查目录是否存在 if not os.path.exists(simulation_dir): return False, {"reason": "模拟目录不存在"} # 必要文件列表(不包括脚本,脚本位于 backend/scripts/) required_files = [ "state.json", "simulation_config.json", "reddit_profiles.json", "twitter_profiles.csv" ] # 检查文件是否存在 existing_files = [] missing_files = [] for f in required_files: file_path = os.path.join(simulation_dir, f) if os.path.exists(file_path): existing_files.append(f) else: missing_files.append(f) if missing_files: return False, { "reason": "缺少必要文件", "missing_files": missing_files, "existing_files": existing_files } # 检查state.json中的状态 state_file = os.path.join(simulation_dir, "state.json") try: import json with open(state_file, 'r', encoding='utf-8') as f: state_data = json.load(f) status = state_data.get("status", "") config_generated = state_data.get("config_generated", False) # 详细日志 logger.debug(f"检测模拟准备状态: {simulation_id}, status={status}, config_generated={config_generated}") # 如果 config_generated=True 且文件存在,认为准备完成 # 以下状态都说明准备工作已完成: # - ready: 准备完成,可以运行 # - preparing: 如果 config_generated=True 说明已完成 # - running: 正在运行,说明准备早就完成了 # - completed: 运行完成,说明准备早就完成了 # - stopped: 已停止,说明准备早就完成了 # - failed: 运行失败(但准备是完成的) prepared_statuses = ["ready", "preparing", "running", "completed", "stopped", "failed"] if status in prepared_statuses and config_generated: # 获取文件统计信息 profiles_file = os.path.join(simulation_dir, "reddit_profiles.json") config_file = os.path.join(simulation_dir, "simulation_config.json") profiles_count = 0 if os.path.exists(profiles_file): with open(profiles_file, 'r', encoding='utf-8') as f: profiles_data = json.load(f) profiles_count = len(profiles_data) if isinstance(profiles_data, list) else 0 # 如果状态是preparing但文件已完成,自动更新状态为ready if status == "preparing": try: state_data["status"] = "ready" from datetime import datetime state_data["updated_at"] = datetime.now().isoformat() with open(state_file, 'w', encoding='utf-8') as f: json.dump(state_data, f, ensure_ascii=False, indent=2) logger.info(f"自动更新模拟状态: {simulation_id} preparing -> ready") status = "ready" except Exception as e: logger.warning(f"自动更新状态失败: {e}") logger.info(f"模拟 {simulation_id} 检测结果: 已准备完成 (status={status}, config_generated={config_generated})") return True, { "status": status, "entities_count": state_data.get("entities_count", 0), "profiles_count": profiles_count, "entity_types": state_data.get("entity_types", []), "config_generated": config_generated, "created_at": state_data.get("created_at"), "updated_at": state_data.get("updated_at"), "existing_files": existing_files } else: logger.warning(f"模拟 {simulation_id} 检测结果: 未准备完成 (status={status}, config_generated={config_generated})") return False, { "reason": f"状态不在已准备列表中或config_generated为false: status={status}, config_generated={config_generated}", "status": status, "config_generated": config_generated } except Exception as e: return False, {"reason": f"读取状态文件失败: {str(e)}"} @simulation_bp.route('/prepare', methods=['POST']) def prepare_simulation(): """ 准备模拟环境(异步任务,LLM智能生成所有参数) 这是一个耗时操作,接口会立即返回task_id, 使用 GET /api/simulation/prepare/status 查询进度 特性: - 自动检测已完成的准备工作,避免重复生成 - 如果已准备完成,直接返回已有结果 - 支持强制重新生成(force_regenerate=true) 步骤: 1. 检查是否已有完成的准备工作 2. 从Zep图谱读取并过滤实体 3. 为每个实体生成OASIS Agent Profile(带重试机制) 4. LLM智能生成模拟配置(带重试机制) 5. 保存配置文件和预设脚本 请求(JSON): { "simulation_id": "sim_xxxx", // 必填,模拟ID "entity_types": ["Student", "PublicFigure"], // 可选,指定实体类型 "use_llm_for_profiles": true, // 可选,是否用LLM生成人设 "parallel_profile_count": 5, // 可选,并行生成人设数量,默认5 "force_regenerate": false // 可选,强制重新生成,默认false } 返回: { "success": true, "data": { "simulation_id": "sim_xxxx", "task_id": "task_xxxx", // 新任务时返回 "status": "preparing|ready", "message": "准备任务已启动|已有完成的准备工作", "already_prepared": true|false // 是否已准备完成 } } """ import threading import os from ..models.task import TaskManager, TaskStatus from ..config import Config try: data = request.get_json() or {} simulation_id = data.get('simulation_id') if not simulation_id: return jsonify({ "success": False, "error": "请提供 simulation_id" }), 400 manager = SimulationManager() state = manager.get_simulation(simulation_id) if not state: return jsonify({ "success": False, "error": f"模拟不存在: {simulation_id}" }), 404 # 检查是否强制重新生成 force_regenerate = data.get('force_regenerate', False) logger.info(f"开始处理 /prepare 请求: simulation_id={simulation_id}, force_regenerate={force_regenerate}") # 检查是否已经准备完成(避免重复生成) if not force_regenerate: logger.debug(f"检查模拟 {simulation_id} 是否已准备完成...") is_prepared, prepare_info = _check_simulation_prepared(simulation_id) logger.debug(f"检查结果: is_prepared={is_prepared}, prepare_info={prepare_info}") if is_prepared: logger.info(f"模拟 {simulation_id} 已准备完成,跳过重复生成") return jsonify({ "success": True, "data": { "simulation_id": simulation_id, "status": "ready", "message": "已有完成的准备工作,无需重复生成", "already_prepared": True, "prepare_info": prepare_info } }) else: logger.info(f"模拟 {simulation_id} 未准备完成,将启动准备任务") # 从项目获取必要信息 project = ProjectManager.get_project(state.project_id) if not project: return jsonify({ "success": False, "error": f"项目不存在: {state.project_id}" }), 404 # 获取模拟需求 simulation_requirement = project.simulation_requirement or "" if not simulation_requirement: return jsonify({ "success": False, "error": "项目缺少模拟需求描述 (simulation_requirement)" }), 400 # 获取文档文本 document_text = ProjectManager.get_extracted_text(state.project_id) or "" entity_types_list = data.get('entity_types') use_llm_for_profiles = data.get('use_llm_for_profiles', True) parallel_profile_count = data.get('parallel_profile_count', 5) # ========== 同步获取实体数量(在后台任务启动前) ========== # 这样前端在调用prepare后立即就能获取到预期Agent总数 try: logger.info(f"同步获取实体数量: graph_id={state.graph_id}") reader = ZepEntityReader() # 快速读取实体(不需要边信息,只统计数量) filtered_preview = reader.filter_defined_entities( graph_id=state.graph_id, defined_entity_types=entity_types_list, enrich_with_edges=False # 不获取边信息,加快速度 ) # 保存实体数量到状态(供前端立即获取) state.entities_count = filtered_preview.filtered_count state.entity_types = list(filtered_preview.entity_types) logger.info(f"预期实体数量: {filtered_preview.filtered_count}, 类型: {filtered_preview.entity_types}") except Exception as e: logger.warning(f"同步获取实体数量失败(将在后台任务中重试): {e}") # 失败不影响后续流程,后台任务会重新获取 # 创建异步任务 task_manager = TaskManager() task_id = task_manager.create_task( task_type="simulation_prepare", metadata={ "simulation_id": simulation_id, "project_id": state.project_id } ) # 更新模拟状态(包含预先获取的实体数量) state.status = SimulationStatus.PREPARING manager._save_simulation_state(state) # 定义后台任务 def run_prepare(): try: task_manager.update_task( task_id, status=TaskStatus.PROCESSING, progress=0, message="开始准备模拟环境..." ) # 准备模拟(带进度回调) # 存储阶段进度详情 stage_details = {} def progress_callback(stage, progress, message, **kwargs): # 计算总进度 stage_weights = { "reading": (0, 20), # 0-20% "generating_profiles": (20, 70), # 20-70% "generating_config": (70, 90), # 70-90% "copying_scripts": (90, 100) # 90-100% } start, end = stage_weights.get(stage, (0, 100)) current_progress = int(start + (end - start) * progress / 100) # 构建详细进度信息 stage_names = { "reading": "读取图谱实体", "generating_profiles": "生成Agent人设", "generating_config": "生成模拟配置", "copying_scripts": "准备模拟脚本" } stage_index = list(stage_weights.keys()).index(stage) + 1 if stage in stage_weights else 1 total_stages = len(stage_weights) # 更新阶段详情 stage_details[stage] = { "stage_name": stage_names.get(stage, stage), "stage_progress": progress, "current": kwargs.get("current", 0), "total": kwargs.get("total", 0), "item_name": kwargs.get("item_name", "") } # 构建详细进度信息 detail = stage_details[stage] progress_detail_data = { "current_stage": stage, "current_stage_name": stage_names.get(stage, stage), "stage_index": stage_index, "total_stages": total_stages, "stage_progress": progress, "current_item": detail["current"], "total_items": detail["total"], "item_description": message } # 构建简洁消息 if detail["total"] > 0: detailed_message = ( f"[{stage_index}/{total_stages}] {stage_names.get(stage, stage)}: " f"{detail['current']}/{detail['total']} - {message}" ) else: detailed_message = f"[{stage_index}/{total_stages}] {stage_names.get(stage, stage)}: {message}" task_manager.update_task( task_id, progress=current_progress, message=detailed_message, progress_detail=progress_detail_data ) result_state = manager.prepare_simulation( simulation_id=simulation_id, simulation_requirement=simulation_requirement, document_text=document_text, defined_entity_types=entity_types_list, use_llm_for_profiles=use_llm_for_profiles, progress_callback=progress_callback, parallel_profile_count=parallel_profile_count ) # 任务完成 task_manager.complete_task( task_id, result=result_state.to_simple_dict() ) except Exception as e: logger.error(f"准备模拟失败: {str(e)}") task_manager.fail_task(task_id, str(e)) # 更新模拟状态为失败 state = manager.get_simulation(simulation_id) if state: state.status = SimulationStatus.FAILED state.error = str(e) manager._save_simulation_state(state) # 启动后台线程 thread = threading.Thread(target=run_prepare, daemon=True) thread.start() return jsonify({ "success": True, "data": { "simulation_id": simulation_id, "task_id": task_id, "status": "preparing", "message": "准备任务已启动,请通过 /api/simulation/prepare/status 查询进度", "already_prepared": False, "expected_entities_count": state.entities_count, # 预期的Agent总数 "entity_types": state.entity_types # 实体类型列表 } }) except ValueError as e: return jsonify({ "success": False, "error": str(e) }), 404 except Exception as e: logger.error(f"启动准备任务失败: {str(e)}") return jsonify({ "success": False, "error": str(e), "traceback": traceback.format_exc() }), 500 @simulation_bp.route('/prepare/status', methods=['POST']) def get_prepare_status(): """ 查询准备任务进度 支持两种查询方式: 1. 通过task_id查询正在进行的任务进度 2. 通过simulation_id检查是否已有完成的准备工作 请求(JSON): { "task_id": "task_xxxx", // 可选,prepare返回的task_id "simulation_id": "sim_xxxx" // 可选,模拟ID(用于检查已完成的准备) } 返回: { "success": true, "data": { "task_id": "task_xxxx", "status": "processing|completed|ready", "progress": 45, "message": "...", "already_prepared": true|false, // 是否已有完成的准备 "prepare_info": {...} // 已准备完成时的详细信息 } } """ from ..models.task import TaskManager try: data = request.get_json() or {} task_id = data.get('task_id') simulation_id = data.get('simulation_id') # 如果提供了simulation_id,先检查是否已准备完成 if simulation_id: is_prepared, prepare_info = _check_simulation_prepared(simulation_id) if is_prepared: return jsonify({ "success": True, "data": { "simulation_id": simulation_id, "status": "ready", "progress": 100, "message": "已有完成的准备工作", "already_prepared": True, "prepare_info": prepare_info } }) # 如果没有task_id,返回错误 if not task_id: if simulation_id: # 有simulation_id但未准备完成 return jsonify({ "success": True, "data": { "simulation_id": simulation_id, "status": "not_started", "progress": 0, "message": "尚未开始准备,请调用 /api/simulation/prepare 开始", "already_prepared": False } }) return jsonify({ "success": False, "error": "请提供 task_id 或 simulation_id" }), 400 task_manager = TaskManager() task = task_manager.get_task(task_id) if not task: # 任务不存在,但如果有simulation_id,检查是否已准备完成 if simulation_id: is_prepared, prepare_info = _check_simulation_prepared(simulation_id) if is_prepared: return jsonify({ "success": True, "data": { "simulation_id": simulation_id, "task_id": task_id, "status": "ready", "progress": 100, "message": "任务已完成(准备工作已存在)", "already_prepared": True, "prepare_info": prepare_info } }) return jsonify({ "success": False, "error": f"任务不存在: {task_id}" }), 404 task_dict = task.to_dict() task_dict["already_prepared"] = False return jsonify({ "success": True, "data": task_dict }) except Exception as e: logger.error(f"查询任务状态失败: {str(e)}") return jsonify({ "success": False, "error": str(e) }), 500 @simulation_bp.route('/', methods=['GET']) def get_simulation(simulation_id: str): """获取模拟状态""" try: manager = SimulationManager() state = manager.get_simulation(simulation_id) if not state: return jsonify({ "success": False, "error": f"模拟不存在: {simulation_id}" }), 404 result = state.to_dict() # 如果模拟已准备好,附加运行说明 if state.status == SimulationStatus.READY: result["run_instructions"] = manager.get_run_instructions(simulation_id) return jsonify({ "success": True, "data": result }) except Exception as e: logger.error(f"获取模拟状态失败: {str(e)}") return jsonify({ "success": False, "error": str(e), "traceback": traceback.format_exc() }), 500 @simulation_bp.route('/list', methods=['GET']) def list_simulations(): """ 列出所有模拟 Query参数: project_id: 按项目ID过滤(可选) """ try: project_id = request.args.get('project_id') manager = SimulationManager() simulations = manager.list_simulations(project_id=project_id) return jsonify({ "success": True, "data": [s.to_dict() for s in simulations], "count": len(simulations) }) except Exception as e: logger.error(f"列出模拟失败: {str(e)}") return jsonify({ "success": False, "error": str(e), "traceback": traceback.format_exc() }), 500 def _get_report_id_for_simulation(simulation_id: str) -> str: """ 获取 simulation 对应的最新 report_id 遍历 reports 目录,找出 simulation_id 匹配的 report, 如果有多个则返回最新的(按 created_at 排序) Args: simulation_id: 模拟ID Returns: report_id 或 None """ import json from datetime import datetime # reports 目录路径:backend/uploads/reports # __file__ 是 app/api/simulation.py,需要向上两级到 backend/ reports_dir = os.path.join(os.path.dirname(__file__), '../../uploads/reports') if not os.path.exists(reports_dir): return None matching_reports = [] try: for report_folder in os.listdir(reports_dir): report_path = os.path.join(reports_dir, report_folder) if not os.path.isdir(report_path): continue meta_file = os.path.join(report_path, "meta.json") if not os.path.exists(meta_file): continue try: with open(meta_file, 'r', encoding='utf-8') as f: meta = json.load(f) if meta.get("simulation_id") == simulation_id: matching_reports.append({ "report_id": meta.get("report_id"), "created_at": meta.get("created_at", ""), "status": meta.get("status", "") }) except Exception: continue if not matching_reports: return None # 按创建时间倒序排序,返回最新的 matching_reports.sort(key=lambda x: x.get("created_at", ""), reverse=True) return matching_reports[0].get("report_id") except Exception as e: logger.warning(f"查找 simulation {simulation_id} 的 report 失败: {e}") return None @simulation_bp.route('/history', methods=['GET']) def get_simulation_history(): """ 获取历史模拟列表(带项目详情) 用于首页历史项目展示,返回包含项目名称、描述等丰富信息的模拟列表 Query参数: limit: 返回数量限制(默认20) 返回: { "success": true, "data": [ { "simulation_id": "sim_xxxx", "project_id": "proj_xxxx", "project_name": "武大舆情分析", "simulation_requirement": "如果武汉大学发布...", "status": "completed", "entities_count": 68, "profiles_count": 68, "entity_types": ["Student", "Professor", ...], "created_at": "2024-12-10", "updated_at": "2024-12-10", "total_rounds": 120, "current_round": 120, "report_id": "report_xxxx", "version": "v1.0.2" }, ... ], "count": 7 } """ try: limit = request.args.get('limit', 20, type=int) manager = SimulationManager() simulations = manager.list_simulations()[:limit] # 增强模拟数据,只从 Simulation 文件读取 enriched_simulations = [] for sim in simulations: sim_dict = sim.to_dict() # 获取模拟配置信息(从 simulation_config.json 读取 simulation_requirement) config = manager.get_simulation_config(sim.simulation_id) if config: sim_dict["simulation_requirement"] = config.get("simulation_requirement", "") time_config = config.get("time_config", {}) sim_dict["total_simulation_hours"] = time_config.get("total_simulation_hours", 0) # 推荐轮数(后备值) recommended_rounds = int( time_config.get("total_simulation_hours", 0) * 60 / max(time_config.get("minutes_per_round", 60), 1) ) else: sim_dict["simulation_requirement"] = "" sim_dict["total_simulation_hours"] = 0 recommended_rounds = 0 # 获取运行状态(从 run_state.json 读取用户设置的实际轮数) run_state = SimulationRunner.get_run_state(sim.simulation_id) if run_state: sim_dict["current_round"] = run_state.current_round sim_dict["runner_status"] = run_state.runner_status.value # 使用用户设置的 total_rounds,若无则使用推荐轮数 sim_dict["total_rounds"] = run_state.total_rounds if run_state.total_rounds > 0 else recommended_rounds else: sim_dict["current_round"] = 0 sim_dict["runner_status"] = "idle" sim_dict["total_rounds"] = recommended_rounds # 获取关联项目的文件列表(最多3个) project = ProjectManager.get_project(sim.project_id) if project and hasattr(project, 'files') and project.files: sim_dict["files"] = [ {"filename": f.get("filename", "未知文件")} for f in project.files[:3] ] else: sim_dict["files"] = [] # 获取关联的 report_id(查找该 simulation 最新的 report) sim_dict["report_id"] = _get_report_id_for_simulation(sim.simulation_id) # 添加版本号 sim_dict["version"] = "v1.0.2" # 格式化日期 try: created_date = sim_dict.get("created_at", "")[:10] sim_dict["created_date"] = created_date except: sim_dict["created_date"] = "" enriched_simulations.append(sim_dict) return jsonify({ "success": True, "data": enriched_simulations, "count": len(enriched_simulations) }) except Exception as e: logger.error(f"获取历史模拟失败: {str(e)}") return jsonify({ "success": False, "error": str(e), "traceback": traceback.format_exc() }), 500 @simulation_bp.route('//profiles', methods=['GET']) def get_simulation_profiles(simulation_id: str): """ 获取模拟的Agent Profile Query参数: platform: 平台类型(reddit/twitter,默认reddit) """ try: platform = request.args.get('platform', 'reddit') manager = SimulationManager() profiles = manager.get_profiles(simulation_id, platform=platform) return jsonify({ "success": True, "data": { "platform": platform, "count": len(profiles), "profiles": profiles } }) except ValueError as e: return jsonify({ "success": False, "error": str(e) }), 404 except Exception as e: logger.error(f"获取Profile失败: {str(e)}") return jsonify({ "success": False, "error": str(e), "traceback": traceback.format_exc() }), 500 @simulation_bp.route('//profiles/realtime', methods=['GET']) def get_simulation_profiles_realtime(simulation_id: str): """ 实时获取模拟的Agent Profile(用于在生成过程中实时查看进度) 与 /profiles 接口的区别: - 直接读取文件,不经过 SimulationManager - 适用于生成过程中的实时查看 - 返回额外的元数据(如文件修改时间、是否正在生成等) Query参数: platform: 平台类型(reddit/twitter,默认reddit) 返回: { "success": true, "data": { "simulation_id": "sim_xxxx", "platform": "reddit", "count": 15, "total_expected": 93, // 预期总数(如果有) "is_generating": true, // 是否正在生成 "file_exists": true, "file_modified_at": "2025-12-04T18:20:00", "profiles": [...] } } """ import json import csv from datetime import datetime try: platform = request.args.get('platform', 'reddit') # 获取模拟目录 sim_dir = os.path.join(Config.OASIS_SIMULATION_DATA_DIR, simulation_id) if not os.path.exists(sim_dir): return jsonify({ "success": False, "error": f"模拟不存在: {simulation_id}" }), 404 # 确定文件路径 if platform == "reddit": profiles_file = os.path.join(sim_dir, "reddit_profiles.json") else: profiles_file = os.path.join(sim_dir, "twitter_profiles.csv") # 检查文件是否存在 file_exists = os.path.exists(profiles_file) profiles = [] file_modified_at = None if file_exists: # 获取文件修改时间 file_stat = os.stat(profiles_file) file_modified_at = datetime.fromtimestamp(file_stat.st_mtime).isoformat() try: if platform == "reddit": with open(profiles_file, 'r', encoding='utf-8') as f: profiles = json.load(f) else: with open(profiles_file, 'r', encoding='utf-8') as f: reader = csv.DictReader(f) profiles = list(reader) except (json.JSONDecodeError, Exception) as e: logger.warning(f"读取 profiles 文件失败(可能正在写入中): {e}") profiles = [] # 检查是否正在生成(通过 state.json 判断) is_generating = False total_expected = None state_file = os.path.join(sim_dir, "state.json") if os.path.exists(state_file): try: with open(state_file, 'r', encoding='utf-8') as f: state_data = json.load(f) status = state_data.get("status", "") is_generating = status == "preparing" total_expected = state_data.get("entities_count") except Exception: pass return jsonify({ "success": True, "data": { "simulation_id": simulation_id, "platform": platform, "count": len(profiles), "total_expected": total_expected, "is_generating": is_generating, "file_exists": file_exists, "file_modified_at": file_modified_at, "profiles": profiles } }) except Exception as e: logger.error(f"实时获取Profile失败: {str(e)}") return jsonify({ "success": False, "error": str(e), "traceback": traceback.format_exc() }), 500 @simulation_bp.route('//config/realtime', methods=['GET']) def get_simulation_config_realtime(simulation_id: str): """ 实时获取模拟配置(用于在生成过程中实时查看进度) 与 /config 接口的区别: - 直接读取文件,不经过 SimulationManager - 适用于生成过程中的实时查看 - 返回额外的元数据(如文件修改时间、是否正在生成等) - 即使配置还没生成完也能返回部分信息 返回: { "success": true, "data": { "simulation_id": "sim_xxxx", "file_exists": true, "file_modified_at": "2025-12-04T18:20:00", "is_generating": true, // 是否正在生成 "generation_stage": "generating_config", // 当前生成阶段 "config": {...} // 配置内容(如果存在) } } """ import json from datetime import datetime try: # 获取模拟目录 sim_dir = os.path.join(Config.OASIS_SIMULATION_DATA_DIR, simulation_id) if not os.path.exists(sim_dir): return jsonify({ "success": False, "error": f"模拟不存在: {simulation_id}" }), 404 # 配置文件路径 config_file = os.path.join(sim_dir, "simulation_config.json") # 检查文件是否存在 file_exists = os.path.exists(config_file) config = None file_modified_at = None if file_exists: # 获取文件修改时间 file_stat = os.stat(config_file) file_modified_at = datetime.fromtimestamp(file_stat.st_mtime).isoformat() try: with open(config_file, 'r', encoding='utf-8') as f: config = json.load(f) except (json.JSONDecodeError, Exception) as e: logger.warning(f"读取 config 文件失败(可能正在写入中): {e}") config = None # 检查是否正在生成(通过 state.json 判断) is_generating = False generation_stage = None config_generated = False state_file = os.path.join(sim_dir, "state.json") if os.path.exists(state_file): try: with open(state_file, 'r', encoding='utf-8') as f: state_data = json.load(f) status = state_data.get("status", "") is_generating = status == "preparing" config_generated = state_data.get("config_generated", False) # 判断当前阶段 if is_generating: if state_data.get("profiles_generated", False): generation_stage = "generating_config" else: generation_stage = "generating_profiles" elif status == "ready": generation_stage = "completed" except Exception: pass # 构建返回数据 response_data = { "simulation_id": simulation_id, "file_exists": file_exists, "file_modified_at": file_modified_at, "is_generating": is_generating, "generation_stage": generation_stage, "config_generated": config_generated, "config": config } # 如果配置存在,提取一些关键统计信息 if config: response_data["summary"] = { "total_agents": len(config.get("agent_configs", [])), "simulation_hours": config.get("time_config", {}).get("total_simulation_hours"), "initial_posts_count": len(config.get("event_config", {}).get("initial_posts", [])), "hot_topics_count": len(config.get("event_config", {}).get("hot_topics", [])), "has_twitter_config": "twitter_config" in config, "has_reddit_config": "reddit_config" in config, "generated_at": config.get("generated_at"), "llm_model": config.get("llm_model") } return jsonify({ "success": True, "data": response_data }) except Exception as e: logger.error(f"实时获取Config失败: {str(e)}") return jsonify({ "success": False, "error": str(e), "traceback": traceback.format_exc() }), 500 @simulation_bp.route('//config', methods=['GET']) def get_simulation_config(simulation_id: str): """ 获取模拟配置(LLM智能生成的完整配置) 返回包含: - time_config: 时间配置(模拟时长、轮次、高峰/低谷时段) - agent_configs: 每个Agent的活动配置(活跃度、发言频率、立场等) - event_config: 事件配置(初始帖子、热点话题) - platform_configs: 平台配置 - generation_reasoning: LLM的配置推理说明 """ try: manager = SimulationManager() config = manager.get_simulation_config(simulation_id) if not config: return jsonify({ "success": False, "error": f"模拟配置不存在,请先调用 /prepare 接口" }), 404 return jsonify({ "success": True, "data": config }) except Exception as e: logger.error(f"获取配置失败: {str(e)}") return jsonify({ "success": False, "error": str(e), "traceback": traceback.format_exc() }), 500 @simulation_bp.route('//config/download', methods=['GET']) def download_simulation_config(simulation_id: str): """下载模拟配置文件""" try: manager = SimulationManager() sim_dir = manager._get_simulation_dir(simulation_id) config_path = os.path.join(sim_dir, "simulation_config.json") if not os.path.exists(config_path): return jsonify({ "success": False, "error": "配置文件不存在,请先调用 /prepare 接口" }), 404 return send_file( config_path, as_attachment=True, download_name="simulation_config.json" ) except Exception as e: logger.error(f"下载配置失败: {str(e)}") return jsonify({ "success": False, "error": str(e), "traceback": traceback.format_exc() }), 500 @simulation_bp.route('/script//download', methods=['GET']) def download_simulation_script(script_name: str): """ 下载模拟运行脚本文件(通用脚本,位于 backend/scripts/) script_name可选值: - run_twitter_simulation.py - run_reddit_simulation.py - run_parallel_simulation.py - action_logger.py """ try: # 脚本位于 backend/scripts/ 目录 scripts_dir = os.path.abspath(os.path.join(os.path.dirname(__file__), '../../scripts')) # 验证脚本名称 allowed_scripts = [ "run_twitter_simulation.py", "run_reddit_simulation.py", "run_parallel_simulation.py", "action_logger.py" ] if script_name not in allowed_scripts: return jsonify({ "success": False, "error": f"未知脚本: {script_name},可选: {allowed_scripts}" }), 400 script_path = os.path.join(scripts_dir, script_name) if not os.path.exists(script_path): return jsonify({ "success": False, "error": f"脚本文件不存在: {script_name}" }), 404 return send_file( script_path, as_attachment=True, download_name=script_name ) except Exception as e: logger.error(f"下载脚本失败: {str(e)}") return jsonify({ "success": False, "error": str(e), "traceback": traceback.format_exc() }), 500 # ============== Profile生成接口(独立使用) ============== @simulation_bp.route('/generate-profiles', methods=['POST']) def generate_profiles(): """ 直接从图谱生成OASIS Agent Profile(不创建模拟) 请求(JSON): { "graph_id": "mirofish_xxxx", // 必填 "entity_types": ["Student"], // 可选 "use_llm": true, // 可选 "platform": "reddit" // 可选 } """ try: data = request.get_json() or {} graph_id = data.get('graph_id') if not graph_id: return jsonify({ "success": False, "error": "请提供 graph_id" }), 400 entity_types = data.get('entity_types') use_llm = data.get('use_llm', True) platform = data.get('platform', 'reddit') reader = ZepEntityReader() filtered = reader.filter_defined_entities( graph_id=graph_id, defined_entity_types=entity_types, enrich_with_edges=True ) if filtered.filtered_count == 0: return jsonify({ "success": False, "error": "没有找到符合条件的实体" }), 400 generator = OasisProfileGenerator() profiles = generator.generate_profiles_from_entities( entities=filtered.entities, use_llm=use_llm ) if platform == "reddit": profiles_data = [p.to_reddit_format() for p in profiles] elif platform == "twitter": profiles_data = [p.to_twitter_format() for p in profiles] else: profiles_data = [p.to_dict() for p in profiles] return jsonify({ "success": True, "data": { "platform": platform, "entity_types": list(filtered.entity_types), "count": len(profiles_data), "profiles": profiles_data } }) except Exception as e: logger.error(f"生成Profile失败: {str(e)}") return jsonify({ "success": False, "error": str(e), "traceback": traceback.format_exc() }), 500 # ============== 模拟运行控制接口 ============== @simulation_bp.route('/start', methods=['POST']) def start_simulation(): """ 开始运行模拟 请求(JSON): { "simulation_id": "sim_xxxx", // 必填,模拟ID "platform": "parallel", // 可选: twitter / reddit / parallel (默认) "max_rounds": 100, // 可选: 最大模拟轮数,用于截断过长的模拟 "enable_graph_memory_update": false, // 可选: 是否将Agent活动动态更新到Zep图谱记忆 "force": false // 可选: 强制重新开始(会停止运行中的模拟并清理日志) } 关于 force 参数: - 启用后,如果模拟正在运行或已完成,会先停止并清理运行日志 - 清理的内容包括:run_state.json, actions.jsonl, simulation.log 等 - 不会清理配置文件(simulation_config.json)和 profile 文件 - 适用于需要重新运行模拟的场景 关于 enable_graph_memory_update: - 启用后,模拟中所有Agent的活动(发帖、评论、点赞等)都会实时更新到Zep图谱 - 这可以让图谱"记住"模拟过程,用于后续分析或AI对话 - 需要模拟关联的项目有有效的 graph_id - 采用批量更新机制,减少API调用次数 返回: { "success": true, "data": { "simulation_id": "sim_xxxx", "runner_status": "running", "process_pid": 12345, "twitter_running": true, "reddit_running": true, "started_at": "2025-12-01T10:00:00", "graph_memory_update_enabled": true, // 是否启用了图谱记忆更新 "force_restarted": true // 是否是强制重新开始 } } """ try: data = request.get_json() or {} simulation_id = data.get('simulation_id') if not simulation_id: return jsonify({ "success": False, "error": "请提供 simulation_id" }), 400 platform = data.get('platform', 'parallel') max_rounds = data.get('max_rounds') # 可选:最大模拟轮数 enable_graph_memory_update = data.get('enable_graph_memory_update', False) # 可选:是否启用图谱记忆更新 force = data.get('force', False) # 可选:强制重新开始 # 验证 max_rounds 参数 if max_rounds is not None: try: max_rounds = int(max_rounds) if max_rounds <= 0: return jsonify({ "success": False, "error": "max_rounds 必须是正整数" }), 400 except (ValueError, TypeError): return jsonify({ "success": False, "error": "max_rounds 必须是有效的整数" }), 400 if platform not in ['twitter', 'reddit', 'parallel']: return jsonify({ "success": False, "error": f"无效的平台类型: {platform},可选: twitter/reddit/parallel" }), 400 # 检查模拟是否已准备好 manager = SimulationManager() state = manager.get_simulation(simulation_id) if not state: return jsonify({ "success": False, "error": f"模拟不存在: {simulation_id}" }), 404 force_restarted = False # 智能处理状态:如果准备工作已完成,允许重新启动 if state.status != SimulationStatus.READY: # 检查准备工作是否已完成 is_prepared, prepare_info = _check_simulation_prepared(simulation_id) if is_prepared: # 准备工作已完成,检查是否有正在运行的进程 if state.status == SimulationStatus.RUNNING: # 检查模拟进程是否真的在运行 run_state = SimulationRunner.get_run_state(simulation_id) if run_state and run_state.runner_status.value == "running": # 进程确实在运行 if force: # 强制模式:停止运行中的模拟 logger.info(f"强制模式:停止运行中的模拟 {simulation_id}") try: SimulationRunner.stop_simulation(simulation_id) except Exception as e: logger.warning(f"停止模拟时出现警告: {str(e)}") else: return jsonify({ "success": False, "error": f"模拟正在运行中,请先调用 /stop 接口停止,或使用 force=true 强制重新开始" }), 400 # 如果是强制模式,清理运行日志 if force: logger.info(f"强制模式:清理模拟日志 {simulation_id}") cleanup_result = SimulationRunner.cleanup_simulation_logs(simulation_id) if not cleanup_result.get("success"): logger.warning(f"清理日志时出现警告: {cleanup_result.get('errors')}") force_restarted = True # 进程不存在或已结束,重置状态为 ready logger.info(f"模拟 {simulation_id} 准备工作已完成,重置状态为 ready(原状态: {state.status.value})") state.status = SimulationStatus.READY manager._save_simulation_state(state) else: # 准备工作未完成 return jsonify({ "success": False, "error": f"模拟未准备好,当前状态: {state.status.value},请先调用 /prepare 接口" }), 400 # 获取图谱ID(用于图谱记忆更新) graph_id = None if enable_graph_memory_update: # 从模拟状态或项目中获取 graph_id graph_id = state.graph_id if not graph_id: # 尝试从项目中获取 project = ProjectManager.get_project(state.project_id) if project: graph_id = project.graph_id if not graph_id: return jsonify({ "success": False, "error": "启用图谱记忆更新需要有效的 graph_id,请确保项目已构建图谱" }), 400 logger.info(f"启用图谱记忆更新: simulation_id={simulation_id}, graph_id={graph_id}") # 启动模拟 run_state = SimulationRunner.start_simulation( simulation_id=simulation_id, platform=platform, max_rounds=max_rounds, enable_graph_memory_update=enable_graph_memory_update, graph_id=graph_id ) # 更新模拟状态 state.status = SimulationStatus.RUNNING manager._save_simulation_state(state) response_data = run_state.to_dict() if max_rounds: response_data['max_rounds_applied'] = max_rounds response_data['graph_memory_update_enabled'] = enable_graph_memory_update response_data['force_restarted'] = force_restarted if enable_graph_memory_update: response_data['graph_id'] = graph_id return jsonify({ "success": True, "data": response_data }) except ValueError as e: return jsonify({ "success": False, "error": str(e) }), 400 except Exception as e: logger.error(f"启动模拟失败: {str(e)}") return jsonify({ "success": False, "error": str(e), "traceback": traceback.format_exc() }), 500 @simulation_bp.route('/stop', methods=['POST']) def stop_simulation(): """ 停止模拟 请求(JSON): { "simulation_id": "sim_xxxx" // 必填,模拟ID } 返回: { "success": true, "data": { "simulation_id": "sim_xxxx", "runner_status": "stopped", "completed_at": "2025-12-01T12:00:00" } } """ try: data = request.get_json() or {} simulation_id = data.get('simulation_id') if not simulation_id: return jsonify({ "success": False, "error": "请提供 simulation_id" }), 400 run_state = SimulationRunner.stop_simulation(simulation_id) # 更新模拟状态 manager = SimulationManager() state = manager.get_simulation(simulation_id) if state: state.status = SimulationStatus.PAUSED manager._save_simulation_state(state) return jsonify({ "success": True, "data": run_state.to_dict() }) except ValueError as e: return jsonify({ "success": False, "error": str(e) }), 400 except Exception as e: logger.error(f"停止模拟失败: {str(e)}") return jsonify({ "success": False, "error": str(e), "traceback": traceback.format_exc() }), 500 # ============== 实时状态监控接口 ============== @simulation_bp.route('//run-status', methods=['GET']) def get_run_status(simulation_id: str): """ 获取模拟运行实时状态(用于前端轮询) 返回: { "success": true, "data": { "simulation_id": "sim_xxxx", "runner_status": "running", "current_round": 5, "total_rounds": 144, "progress_percent": 3.5, "simulated_hours": 2, "total_simulation_hours": 72, "twitter_running": true, "reddit_running": true, "twitter_actions_count": 150, "reddit_actions_count": 200, "total_actions_count": 350, "started_at": "2025-12-01T10:00:00", "updated_at": "2025-12-01T10:30:00" } } """ try: run_state = SimulationRunner.get_run_state(simulation_id) if not run_state: return jsonify({ "success": True, "data": { "simulation_id": simulation_id, "runner_status": "idle", "current_round": 0, "total_rounds": 0, "progress_percent": 0, "twitter_actions_count": 0, "reddit_actions_count": 0, "total_actions_count": 0, } }) return jsonify({ "success": True, "data": run_state.to_dict() }) except Exception as e: logger.error(f"获取运行状态失败: {str(e)}") return jsonify({ "success": False, "error": str(e), "traceback": traceback.format_exc() }), 500 @simulation_bp.route('//run-status/detail', methods=['GET']) def get_run_status_detail(simulation_id: str): """ 获取模拟运行详细状态(包含所有动作) 用于前端展示实时动态 Query参数: platform: 过滤平台(twitter/reddit,可选) 返回: { "success": true, "data": { "simulation_id": "sim_xxxx", "runner_status": "running", "current_round": 5, ... "all_actions": [ { "round_num": 5, "timestamp": "2025-12-01T10:30:00", "platform": "twitter", "agent_id": 3, "agent_name": "Agent Name", "action_type": "CREATE_POST", "action_args": {"content": "..."}, "result": null, "success": true }, ... ], "twitter_actions": [...], # Twitter 平台的所有动作 "reddit_actions": [...] # Reddit 平台的所有动作 } } """ try: run_state = SimulationRunner.get_run_state(simulation_id) platform_filter = request.args.get('platform') if not run_state: return jsonify({ "success": True, "data": { "simulation_id": simulation_id, "runner_status": "idle", "all_actions": [], "twitter_actions": [], "reddit_actions": [] } }) # 获取完整的动作列表 all_actions = SimulationRunner.get_all_actions( simulation_id=simulation_id, platform=platform_filter ) # 分平台获取动作 twitter_actions = SimulationRunner.get_all_actions( simulation_id=simulation_id, platform="twitter" ) if not platform_filter or platform_filter == "twitter" else [] reddit_actions = SimulationRunner.get_all_actions( simulation_id=simulation_id, platform="reddit" ) if not platform_filter or platform_filter == "reddit" else [] # 获取当前轮次的动作(recent_actions 只展示最新一轮) current_round = run_state.current_round recent_actions = SimulationRunner.get_all_actions( simulation_id=simulation_id, platform=platform_filter, round_num=current_round ) if current_round > 0 else [] # 获取基础状态信息 result = run_state.to_dict() result["all_actions"] = [a.to_dict() for a in all_actions] result["twitter_actions"] = [a.to_dict() for a in twitter_actions] result["reddit_actions"] = [a.to_dict() for a in reddit_actions] result["rounds_count"] = len(run_state.rounds) # recent_actions 只展示当前最新一轮两个平台的内容 result["recent_actions"] = [a.to_dict() for a in recent_actions] return jsonify({ "success": True, "data": result }) except Exception as e: logger.error(f"获取详细状态失败: {str(e)}") return jsonify({ "success": False, "error": str(e), "traceback": traceback.format_exc() }), 500 @simulation_bp.route('//actions', methods=['GET']) def get_simulation_actions(simulation_id: str): """ 获取模拟中的Agent动作历史 Query参数: limit: 返回数量(默认100) offset: 偏移量(默认0) platform: 过滤平台(twitter/reddit) agent_id: 过滤Agent ID round_num: 过滤轮次 返回: { "success": true, "data": { "count": 100, "actions": [...] } } """ try: limit = request.args.get('limit', 100, type=int) offset = request.args.get('offset', 0, type=int) platform = request.args.get('platform') agent_id = request.args.get('agent_id', type=int) round_num = request.args.get('round_num', type=int) actions = SimulationRunner.get_actions( simulation_id=simulation_id, limit=limit, offset=offset, platform=platform, agent_id=agent_id, round_num=round_num ) return jsonify({ "success": True, "data": { "count": len(actions), "actions": [a.to_dict() for a in actions] } }) except Exception as e: logger.error(f"获取动作历史失败: {str(e)}") return jsonify({ "success": False, "error": str(e), "traceback": traceback.format_exc() }), 500 @simulation_bp.route('//timeline', methods=['GET']) def get_simulation_timeline(simulation_id: str): """ 获取模拟时间线(按轮次汇总) 用于前端展示进度条和时间线视图 Query参数: start_round: 起始轮次(默认0) end_round: 结束轮次(默认全部) 返回每轮的汇总信息 """ try: start_round = request.args.get('start_round', 0, type=int) end_round = request.args.get('end_round', type=int) timeline = SimulationRunner.get_timeline( simulation_id=simulation_id, start_round=start_round, end_round=end_round ) return jsonify({ "success": True, "data": { "rounds_count": len(timeline), "timeline": timeline } }) except Exception as e: logger.error(f"获取时间线失败: {str(e)}") return jsonify({ "success": False, "error": str(e), "traceback": traceback.format_exc() }), 500 @simulation_bp.route('//agent-stats', methods=['GET']) def get_agent_stats(simulation_id: str): """ 获取每个Agent的统计信息 用于前端展示Agent活跃度排行、动作分布等 """ try: stats = SimulationRunner.get_agent_stats(simulation_id) return jsonify({ "success": True, "data": { "agents_count": len(stats), "stats": stats } }) except Exception as e: logger.error(f"获取Agent统计失败: {str(e)}") return jsonify({ "success": False, "error": str(e), "traceback": traceback.format_exc() }), 500 # ============== 数据库查询接口 ============== @simulation_bp.route('//posts', methods=['GET']) def get_simulation_posts(simulation_id: str): """ 获取模拟中的帖子 Query参数: platform: 平台类型(twitter/reddit) limit: 返回数量(默认50) offset: 偏移量 返回帖子列表(从SQLite数据库读取) """ try: platform = request.args.get('platform', 'reddit') limit = request.args.get('limit', 50, type=int) offset = request.args.get('offset', 0, type=int) sim_dir = os.path.join( os.path.dirname(__file__), f'../../uploads/simulations/{simulation_id}' ) db_file = f"{platform}_simulation.db" db_path = os.path.join(sim_dir, db_file) if not os.path.exists(db_path): return jsonify({ "success": True, "data": { "platform": platform, "count": 0, "posts": [], "message": "数据库不存在,模拟可能尚未运行" } }) import sqlite3 conn = sqlite3.connect(db_path) conn.row_factory = sqlite3.Row cursor = conn.cursor() try: cursor.execute(""" SELECT * FROM post ORDER BY created_at DESC LIMIT ? OFFSET ? """, (limit, offset)) posts = [dict(row) for row in cursor.fetchall()] cursor.execute("SELECT COUNT(*) FROM post") total = cursor.fetchone()[0] except sqlite3.OperationalError: posts = [] total = 0 conn.close() return jsonify({ "success": True, "data": { "platform": platform, "total": total, "count": len(posts), "posts": posts } }) except Exception as e: logger.error(f"获取帖子失败: {str(e)}") return jsonify({ "success": False, "error": str(e), "traceback": traceback.format_exc() }), 500 @simulation_bp.route('//comments', methods=['GET']) def get_simulation_comments(simulation_id: str): """ 获取模拟中的评论(仅Reddit) Query参数: post_id: 过滤帖子ID(可选) limit: 返回数量 offset: 偏移量 """ try: post_id = request.args.get('post_id') limit = request.args.get('limit', 50, type=int) offset = request.args.get('offset', 0, type=int) sim_dir = os.path.join( os.path.dirname(__file__), f'../../uploads/simulations/{simulation_id}' ) db_path = os.path.join(sim_dir, "reddit_simulation.db") if not os.path.exists(db_path): return jsonify({ "success": True, "data": { "count": 0, "comments": [] } }) import sqlite3 conn = sqlite3.connect(db_path) conn.row_factory = sqlite3.Row cursor = conn.cursor() try: if post_id: cursor.execute(""" SELECT * FROM comment WHERE post_id = ? ORDER BY created_at DESC LIMIT ? OFFSET ? """, (post_id, limit, offset)) else: cursor.execute(""" SELECT * FROM comment ORDER BY created_at DESC LIMIT ? OFFSET ? """, (limit, offset)) comments = [dict(row) for row in cursor.fetchall()] except sqlite3.OperationalError: comments = [] conn.close() return jsonify({ "success": True, "data": { "count": len(comments), "comments": comments } }) except Exception as e: logger.error(f"获取评论失败: {str(e)}") return jsonify({ "success": False, "error": str(e), "traceback": traceback.format_exc() }), 500 # ============== Interview 采访接口 ============== @simulation_bp.route('/interview', methods=['POST']) def interview_agent(): """ 采访单个Agent 注意:此功能需要模拟环境处于运行状态(完成模拟循环后进入等待命令模式) 请求(JSON): { "simulation_id": "sim_xxxx", // 必填,模拟ID "agent_id": 0, // 必填,Agent ID "prompt": "你对这件事有什么看法?", // 必填,采访问题 "platform": "twitter", // 可选,指定平台(twitter/reddit) // 不指定时:双平台模拟同时采访两个平台 "timeout": 60 // 可选,超时时间(秒),默认60 } 返回(不指定platform,双平台模式): { "success": true, "data": { "agent_id": 0, "prompt": "你对这件事有什么看法?", "result": { "agent_id": 0, "prompt": "...", "platforms": { "twitter": {"agent_id": 0, "response": "...", "platform": "twitter"}, "reddit": {"agent_id": 0, "response": "...", "platform": "reddit"} } }, "timestamp": "2025-12-08T10:00:01" } } 返回(指定platform): { "success": true, "data": { "agent_id": 0, "prompt": "你对这件事有什么看法?", "result": { "agent_id": 0, "response": "我认为...", "platform": "twitter", "timestamp": "2025-12-08T10:00:00" }, "timestamp": "2025-12-08T10:00:01" } } """ try: data = request.get_json() or {} simulation_id = data.get('simulation_id') agent_id = data.get('agent_id') prompt = data.get('prompt') platform = data.get('platform') # 可选:twitter/reddit/None timeout = data.get('timeout', 60) if not simulation_id: return jsonify({ "success": False, "error": "请提供 simulation_id" }), 400 if agent_id is None: return jsonify({ "success": False, "error": "请提供 agent_id" }), 400 if not prompt: return jsonify({ "success": False, "error": "请提供 prompt(采访问题)" }), 400 # 验证platform参数 if platform and platform not in ("twitter", "reddit"): return jsonify({ "success": False, "error": "platform 参数只能是 'twitter' 或 'reddit'" }), 400 # 检查环境状态 if not SimulationRunner.check_env_alive(simulation_id): return jsonify({ "success": False, "error": "模拟环境未运行或已关闭。请确保模拟已完成并进入等待命令模式。" }), 400 # 优化prompt,添加前缀避免Agent调用工具 optimized_prompt = optimize_interview_prompt(prompt) result = SimulationRunner.interview_agent( simulation_id=simulation_id, agent_id=agent_id, prompt=optimized_prompt, platform=platform, timeout=timeout ) return jsonify({ "success": result.get("success", False), "data": result }) except ValueError as e: return jsonify({ "success": False, "error": str(e) }), 400 except TimeoutError as e: return jsonify({ "success": False, "error": f"等待Interview响应超时: {str(e)}" }), 504 except Exception as e: logger.error(f"Interview失败: {str(e)}") return jsonify({ "success": False, "error": str(e), "traceback": traceback.format_exc() }), 500 @simulation_bp.route('/interview/batch', methods=['POST']) def interview_agents_batch(): """ 批量采访多个Agent 注意:此功能需要模拟环境处于运行状态 请求(JSON): { "simulation_id": "sim_xxxx", // 必填,模拟ID "interviews": [ // 必填,采访列表 { "agent_id": 0, "prompt": "你对A有什么看法?", "platform": "twitter" // 可选,指定该Agent的采访平台 }, { "agent_id": 1, "prompt": "你对B有什么看法?" // 不指定platform则使用默认值 } ], "platform": "reddit", // 可选,默认平台(被每项的platform覆盖) // 不指定时:双平台模拟每个Agent同时采访两个平台 "timeout": 120 // 可选,超时时间(秒),默认120 } 返回: { "success": true, "data": { "interviews_count": 2, "result": { "interviews_count": 4, "results": { "twitter_0": {"agent_id": 0, "response": "...", "platform": "twitter"}, "reddit_0": {"agent_id": 0, "response": "...", "platform": "reddit"}, "twitter_1": {"agent_id": 1, "response": "...", "platform": "twitter"}, "reddit_1": {"agent_id": 1, "response": "...", "platform": "reddit"} } }, "timestamp": "2025-12-08T10:00:01" } } """ try: data = request.get_json() or {} simulation_id = data.get('simulation_id') interviews = data.get('interviews') platform = data.get('platform') # 可选:twitter/reddit/None timeout = data.get('timeout', 120) if not simulation_id: return jsonify({ "success": False, "error": "请提供 simulation_id" }), 400 if not interviews or not isinstance(interviews, list): return jsonify({ "success": False, "error": "请提供 interviews(采访列表)" }), 400 # 验证platform参数 if platform and platform not in ("twitter", "reddit"): return jsonify({ "success": False, "error": "platform 参数只能是 'twitter' 或 'reddit'" }), 400 # 验证每个采访项 for i, interview in enumerate(interviews): if 'agent_id' not in interview: return jsonify({ "success": False, "error": f"采访列表第{i+1}项缺少 agent_id" }), 400 if 'prompt' not in interview: return jsonify({ "success": False, "error": f"采访列表第{i+1}项缺少 prompt" }), 400 # 验证每项的platform(如果有) item_platform = interview.get('platform') if item_platform and item_platform not in ("twitter", "reddit"): return jsonify({ "success": False, "error": f"采访列表第{i+1}项的platform只能是 'twitter' 或 'reddit'" }), 400 # 检查环境状态 if not SimulationRunner.check_env_alive(simulation_id): return jsonify({ "success": False, "error": "模拟环境未运行或已关闭。请确保模拟已完成并进入等待命令模式。" }), 400 # 优化每个采访项的prompt,添加前缀避免Agent调用工具 optimized_interviews = [] for interview in interviews: optimized_interview = interview.copy() optimized_interview['prompt'] = optimize_interview_prompt(interview.get('prompt', '')) optimized_interviews.append(optimized_interview) result = SimulationRunner.interview_agents_batch( simulation_id=simulation_id, interviews=optimized_interviews, platform=platform, timeout=timeout ) return jsonify({ "success": result.get("success", False), "data": result }) except ValueError as e: return jsonify({ "success": False, "error": str(e) }), 400 except TimeoutError as e: return jsonify({ "success": False, "error": f"等待批量Interview响应超时: {str(e)}" }), 504 except Exception as e: logger.error(f"批量Interview失败: {str(e)}") return jsonify({ "success": False, "error": str(e), "traceback": traceback.format_exc() }), 500 @simulation_bp.route('/interview/all', methods=['POST']) def interview_all_agents(): """ 全局采访 - 使用相同问题采访所有Agent 注意:此功能需要模拟环境处于运行状态 请求(JSON): { "simulation_id": "sim_xxxx", // 必填,模拟ID "prompt": "你对这件事整体有什么看法?", // 必填,采访问题(所有Agent使用相同问题) "platform": "reddit", // 可选,指定平台(twitter/reddit) // 不指定时:双平台模拟每个Agent同时采访两个平台 "timeout": 180 // 可选,超时时间(秒),默认180 } 返回: { "success": true, "data": { "interviews_count": 50, "result": { "interviews_count": 100, "results": { "twitter_0": {"agent_id": 0, "response": "...", "platform": "twitter"}, "reddit_0": {"agent_id": 0, "response": "...", "platform": "reddit"}, ... } }, "timestamp": "2025-12-08T10:00:01" } } """ try: data = request.get_json() or {} simulation_id = data.get('simulation_id') prompt = data.get('prompt') platform = data.get('platform') # 可选:twitter/reddit/None timeout = data.get('timeout', 180) if not simulation_id: return jsonify({ "success": False, "error": "请提供 simulation_id" }), 400 if not prompt: return jsonify({ "success": False, "error": "请提供 prompt(采访问题)" }), 400 # 验证platform参数 if platform and platform not in ("twitter", "reddit"): return jsonify({ "success": False, "error": "platform 参数只能是 'twitter' 或 'reddit'" }), 400 # 检查环境状态 if not SimulationRunner.check_env_alive(simulation_id): return jsonify({ "success": False, "error": "模拟环境未运行或已关闭。请确保模拟已完成并进入等待命令模式。" }), 400 # 优化prompt,添加前缀避免Agent调用工具 optimized_prompt = optimize_interview_prompt(prompt) result = SimulationRunner.interview_all_agents( simulation_id=simulation_id, prompt=optimized_prompt, platform=platform, timeout=timeout ) return jsonify({ "success": result.get("success", False), "data": result }) except ValueError as e: return jsonify({ "success": False, "error": str(e) }), 400 except TimeoutError as e: return jsonify({ "success": False, "error": f"等待全局Interview响应超时: {str(e)}" }), 504 except Exception as e: logger.error(f"全局Interview失败: {str(e)}") return jsonify({ "success": False, "error": str(e), "traceback": traceback.format_exc() }), 500 @simulation_bp.route('/interview/history', methods=['POST']) def get_interview_history(): """ 获取Interview历史记录 从模拟数据库中读取所有Interview记录 请求(JSON): { "simulation_id": "sim_xxxx", // 必填,模拟ID "platform": "reddit", // 可选,平台类型(reddit/twitter) // 不指定则返回两个平台的所有历史 "agent_id": 0, // 可选,只获取该Agent的采访历史 "limit": 100 // 可选,返回数量,默认100 } 返回: { "success": true, "data": { "count": 10, "history": [ { "agent_id": 0, "response": "我认为...", "prompt": "你对这件事有什么看法?", "timestamp": "2025-12-08T10:00:00", "platform": "reddit" }, ... ] } } """ try: data = request.get_json() or {} simulation_id = data.get('simulation_id') platform = data.get('platform') # 不指定则返回两个平台的历史 agent_id = data.get('agent_id') limit = data.get('limit', 100) if not simulation_id: return jsonify({ "success": False, "error": "请提供 simulation_id" }), 400 history = SimulationRunner.get_interview_history( simulation_id=simulation_id, platform=platform, agent_id=agent_id, limit=limit ) return jsonify({ "success": True, "data": { "count": len(history), "history": history } }) except Exception as e: logger.error(f"获取Interview历史失败: {str(e)}") return jsonify({ "success": False, "error": str(e), "traceback": traceback.format_exc() }), 500 @simulation_bp.route('/env-status', methods=['POST']) def get_env_status(): """ 获取模拟环境状态 检查模拟环境是否存活(可以接收Interview命令) 请求(JSON): { "simulation_id": "sim_xxxx" // 必填,模拟ID } 返回: { "success": true, "data": { "simulation_id": "sim_xxxx", "env_alive": true, "twitter_available": true, "reddit_available": true, "message": "环境正在运行,可以接收Interview命令" } } """ try: data = request.get_json() or {} simulation_id = data.get('simulation_id') if not simulation_id: return jsonify({ "success": False, "error": "请提供 simulation_id" }), 400 env_alive = SimulationRunner.check_env_alive(simulation_id) # 获取更详细的状态信息 env_status = SimulationRunner.get_env_status_detail(simulation_id) if env_alive: message = "环境正在运行,可以接收Interview命令" else: message = "环境未运行或已关闭" return jsonify({ "success": True, "data": { "simulation_id": simulation_id, "env_alive": env_alive, "twitter_available": env_status.get("twitter_available", False), "reddit_available": env_status.get("reddit_available", False), "message": message } }) except Exception as e: logger.error(f"获取环境状态失败: {str(e)}") return jsonify({ "success": False, "error": str(e), "traceback": traceback.format_exc() }), 500 @simulation_bp.route('/close-env', methods=['POST']) def close_simulation_env(): """ 关闭模拟环境 向模拟发送关闭环境命令,使其优雅退出等待命令模式。 注意:这不同于 /stop 接口,/stop 会强制终止进程, 而此接口会让模拟优雅地关闭环境并退出。 请求(JSON): { "simulation_id": "sim_xxxx", // 必填,模拟ID "timeout": 30 // 可选,超时时间(秒),默认30 } 返回: { "success": true, "data": { "message": "环境关闭命令已发送", "result": {...}, "timestamp": "2025-12-08T10:00:01" } } """ try: data = request.get_json() or {} simulation_id = data.get('simulation_id') timeout = data.get('timeout', 30) if not simulation_id: return jsonify({ "success": False, "error": "请提供 simulation_id" }), 400 result = SimulationRunner.close_simulation_env( simulation_id=simulation_id, timeout=timeout ) # 更新模拟状态 manager = SimulationManager() state = manager.get_simulation(simulation_id) if state: state.status = SimulationStatus.COMPLETED manager._save_simulation_state(state) return jsonify({ "success": result.get("success", False), "data": result }) except ValueError as e: return jsonify({ "success": False, "error": str(e) }), 400 except Exception as e: logger.error(f"关闭环境失败: {str(e)}") return jsonify({ "success": False, "error": str(e), "traceback": traceback.format_exc() }), 500