# Eval Service 重新设计方案(通用 + 职责清晰) ## 🎯 核心问题 ### 当前设计的问题 1. **❌ 过于针对 circle packing**: 假设了 centers/radii 特定数据格式 2. **❌ 职责不清**: ShinkaEvolve 负责评估,Eval Service 只是"旁观者" 3. **❌ 重复工作**: 如果要运行 auxiliary metrics,需要重新加载程序输出 ### 理想设计 1. **✅ 任务无关**: 适用于任何优化问题(TSP, circuit design, code optimization, etc.) 2. **✅ 职责清晰**: Eval Service **完全负责**评估(primary + auxiliary) 3. **✅ 一次性完成**: 一次运行完成所有评估,避免重复 --- ## 🏗️ 重新设计的架构 ### 新的职责划分 ``` ┌─────────────────────────────────────────────────────────┐ │ ShinkaEvolve (Evolution Loop) │ │ 职责: │ │ • 生成新代码 (mutation/crossover) │ │ • 管理演化流程 (parent selection, population, etc.) │ │ • 存储结果到 database │ │ • 决策下一代的演化策略 │ │ │ │ 不负责:❌ 运行和评估程序 │ └─────────────────────────────────────────────────────────┘ ↓ 提交代码 ┌─────────────────────────────────────────────────────────┐ │ Eval Service (Evaluation Service) │ │ 职责: │ │ • 接收待评估的程序代码 │ │ • 运行 PRIMARY evaluator (ground truth) │ │ • 运行 AUXILIARY evaluators (if available) │ │ • 生成完整的 metrics.json │ │ • [可选] 触发 agent 更新 auxiliary evaluators │ │ │ │ 输出:完整的 evaluation results │ └─────────────────────────────────────────────────────────┘ ↓ 返回结果 ┌─────────────────────────────────────────────────────────┐ │ ShinkaEvolve (继续演化) │ │ • 读取 metrics.json │ │ • 提取 combined_score → 演化决策 │ │ • [可选] 读取 auxiliary metrics → 用于 LLM prompt │ └─────────────────────────────────────────────────────────┘ ``` --- ## 📊 通用接口设计 ### 核心原则:Evaluator Contract(评估器契约) **任何任务的 evaluator 必须遵循标准接口**: ```python # Standard Evaluator Interface (任务无关) def evaluate( program_path: str, results_dir: str, **kwargs ) -> EvaluationResult: """ 标准评估接口 Args: program_path: 待评估程序的路径 results_dir: 结果保存目录 **kwargs: 任务特定的额外参数 Returns: EvaluationResult: 包含 primary metrics, auxiliary metrics, metadata """ pass @dataclass class EvaluationResult: """Standard evaluation result (任务无关)""" # PRIMARY (必须,决定演化) combined_score: float # Ground truth score correct: bool # 是否通过验证 error: Optional[str] # 如果失败,错误信息 # PUBLIC METRICS (可选,可见给 LLM) public_metrics: Dict[str, Any] # PRIVATE METRICS (可选,不可见给 LLM) private_metrics: Dict[str, Any] # AUXILIARY METRICS (可选,由 eval agent 动态生成) auxiliary_metrics: Dict[str, Any] auxiliary_definitions: Dict[str, MetricDefinition] # PROGRAM OUTPUT (可选,用于后续分析/可视化) program_output: Any # 任务特定的输出 # METADATA execution_time: float num_runs: int timestamp: str @dataclass class MetricDefinition: """Metric definition (任务无关)""" name: str description: str interpretation: str # "higher_better", "lower_better", "neutral" unit: str formula: Optional[str] source: str # "primary", "auxiliary_static", "auxiliary_dynamic" ``` --- ## 🔧 Eval Service 重新实现 ### 新的 API 设计 #### 1. Evaluation Request (同步/异步) ```python # POST /api/v1/evaluate { "program_path": "gen_42/main.py", # 相对于 experiment root "results_dir": "gen_42/results", # 相对于 experiment root "generation": 42, "experiment_root": "/path/to/results_...", # 实验根目录 "evaluation_config": { "primary_evaluator": "examples/circle_packing/evaluate.py", "num_runs": 1, "timeout": 300, # seconds "extra_args": {} }, "auxiliary_config": { "enabled": true, "use_dynamic": true, # 使用 agent 生成的 metrics "use_static": false, # 使用预定义的 metrics "timeout": 10 # per metric } } # Response (如果异步) { "status": "accepted", "job_id": "eval_job_42", "estimated_time": 15 # seconds } # 或者 (如果同步) { "status": "completed", "evaluation_result": {...} # EvaluationResult } ``` #### 2. Query Evaluation Result ```python # GET /api/v1/evaluate/{job_id} { "status": "completed", # "pending", "running", "completed", "failed" "evaluation_result": { "combined_score": 2.34, "correct": true, "error": null, "public_metrics": { "num_circles": 26, "aux_radius_std_dev": 0.031, "aux_spatial_uniformity": 0.82 }, "private_metrics": {...}, "auxiliary_metrics": {...}, "auxiliary_definitions": {...}, "program_output": {...}, # task-specific "execution_time": 12.5, "timestamp": "2026-02-03T04:00:00Z" } } ``` --- ## 🔄 完整流程(新架构) ### Step 1: ShinkaEvolve 提交评估请求 ```python # shinka/core/runner.py def _submit_new_job(self): current_gen = self.next_generation_to_submit exec_fname = f"{self.results_dir}/gen_{current_gen}/main.py" results_dir = f"{self.results_dir}/gen_{current_gen}/results" # 创建目录 Path(results_dir).mkdir(parents=True, exist_ok=True) # ✅ 新方案:提交到 Eval Service (如果配置) if self.eval_service_url: job_id = self._submit_to_eval_service( exec_fname=exec_fname, results_dir=results_dir, generation=current_gen ) else: # 兼容旧方案:使用 local scheduler job_id = self.scheduler.submit_async(exec_fname, results_dir) # 跟踪 job running_job = RunningJob( job_id=job_id, exec_fname=exec_fname, results_dir=results_dir, generation=current_gen, ... ) self.running_jobs.append(running_job) def _submit_to_eval_service( self, exec_fname: str, results_dir: str, generation: int ) -> str: """Submit evaluation request to Eval Service""" import requests payload = { "program_path": exec_fname, "results_dir": results_dir, "generation": generation, "experiment_root": str(self.results_dir), "evaluation_config": { "primary_evaluator": self.job_config.eval_program_path, "num_runs": 1, "timeout": 300, "extra_args": self.job_config.extra_cmd_args or {} }, "auxiliary_config": { "enabled": True, "use_dynamic": True, "timeout": 10 } } response = requests.post( f"{self.eval_service_url}/api/v1/evaluate", json=payload, timeout=5.0 # 快速提交 ) if response.status_code == 200: data = response.json() return data["job_id"] # 返回 eval service 的 job_id else: raise RuntimeError(f"Eval service submission failed: {response.status_code}") def _check_completed_jobs(self) -> List[RunningJob]: """Check for completed jobs""" completed = [] still_running = [] for job in self.running_jobs: if self.eval_service_url: # ✅ 新方案:查询 eval service is_complete, result = self._query_eval_service(job.job_id) if is_complete: # 结果已经保存在 job.results_dir/metrics.json completed.append(job) else: still_running.append(job) else: # 兼容旧方案:使用 local scheduler is_running = self.scheduler.check_job_status(job) if not is_running: completed.append(job) else: still_running.append(job) self.running_jobs = still_running return completed def _query_eval_service(self, job_id: str) -> Tuple[bool, Optional[Dict]]: """Query evaluation result from Eval Service""" import requests try: response = requests.get( f"{self.eval_service_url}/api/v1/evaluate/{job_id}", timeout=2.0 ) if response.status_code == 200: data = response.json() if data["status"] == "completed": return True, data["evaluation_result"] elif data["status"] == "failed": return True, None # Failed but done else: return False, None # Still running else: return False, None except Exception as e: logger.warning(f"Failed to query eval service: {e}") return False, None ``` ### Step 2: Eval Service 处理评估请求 ```python # eval_agent/ev2_service_standalone.py @app.post("/api/v1/evaluate") async def evaluate_program( request: EvaluationRequest, background_tasks: BackgroundTasks ): """ Main evaluation endpoint Handles both primary and auxiliary evaluation in one place. """ # 生成 job_id job_id = f"eval_{request.generation}_{int(time.time())}" # 创建 job tracking eval_jobs[job_id] = { "status": "pending", "request": request, "started_at": time.time(), "result": None } # 启动后台评估任务 background_tasks.add_task( run_full_evaluation, job_id=job_id, request=request ) # 立即返回 (异步) return { "status": "accepted", "job_id": job_id, "estimated_time": 15 } async def run_full_evaluation(job_id: str, request: EvaluationRequest): """ Complete evaluation pipeline (primary + auxiliary) This runs in the background. """ logger.info(f"=" * 60) logger.info(f"🔄 Starting Evaluation: {job_id}") logger.info(f"=" * 60) try: eval_jobs[job_id]["status"] = "running" # ===== Step 1: Run Primary Evaluator ===== logger.info("📊 Step 1: Running primary evaluator...") primary_result = await run_primary_evaluator(request) if not primary_result["success"]: eval_jobs[job_id]["status"] = "failed" eval_jobs[job_id]["error"] = primary_result["error"] return logger.info(f"✅ Primary evaluation completed: score={primary_result['combined_score']:.4f}") # ===== Step 2: Load Auxiliary Evaluators (if enabled) ===== auxiliary_results = {} auxiliary_definitions = {} if request.auxiliary_config.enabled: logger.info("🔧 Step 2: Running auxiliary evaluators...") # 2a. Dynamic metrics (agent-generated) if request.auxiliary_config.use_dynamic: dynamic_results = await run_dynamic_auxiliary( experiment_root=request.experiment_root, program_output=primary_result.get("program_output"), timeout=request.auxiliary_config.timeout ) if dynamic_results: auxiliary_results.update(dynamic_results["metrics"]) auxiliary_definitions.update(dynamic_results["definitions"]) logger.info(f"✅ Dynamic auxiliary: {len(dynamic_results['metrics'])} metrics") # 2b. Static metrics (pre-defined) if request.auxiliary_config.use_static: static_results = await run_static_auxiliary( experiment_root=request.experiment_root, program_output=primary_result.get("program_output") ) if static_results: auxiliary_results.update(static_results["metrics"]) auxiliary_definitions.update(static_results["definitions"]) logger.info(f"✅ Static auxiliary: {len(static_results['metrics'])} metrics") # ===== Step 3: Merge and Save Results ===== logger.info("💾 Step 3: Saving complete evaluation results...") complete_result = { "combined_score": primary_result["combined_score"], "correct": primary_result["correct"], "error": primary_result.get("error"), "public_metrics": { **primary_result.get("public_metrics", {}), **{f"aux_{k}": v for k, v in auxiliary_results.items()} }, "private_metrics": primary_result.get("private_metrics", {}), "auxiliary_metric_definitions": auxiliary_definitions, "auxiliary_metadata": { "executed": len(auxiliary_results) > 0, "num_metrics_computed": len(auxiliary_results), "timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()) }, "execution_time_mean": primary_result.get("execution_time", 0), "num_valid_runs": primary_result.get("num_valid_runs", 0), "all_validation_errors": primary_result.get("all_validation_errors", []) } # 保存到 metrics.json metrics_file = Path(request.experiment_root) / request.results_dir / "metrics.json" with open(metrics_file, 'w') as f: json.dump(complete_result, f, indent=2) # 保存 correct.json correct_file = Path(request.experiment_root) / request.results_dir / "correct.json" with open(correct_file, 'w') as f: json.dump({ "correct": primary_result["correct"], "error": primary_result.get("error") }, f, indent=2) logger.info(f"✅ Results saved to: {metrics_file}") # ===== Step 4: Update Job Status ===== eval_jobs[job_id]["status"] = "completed" eval_jobs[job_id]["result"] = complete_result logger.info(f"=" * 60) logger.info(f"✅ Evaluation Completed: {job_id}") logger.info(f"=" * 60) except Exception as e: logger.error(f"❌ Evaluation failed: {e}", exc_info=True) eval_jobs[job_id]["status"] = "failed" eval_jobs[job_id]["error"] = str(e) async def run_primary_evaluator(request: EvaluationRequest) -> Dict[str, Any]: """ Run the primary evaluator (task-specific) This calls the existing evaluate.py or equivalent. """ import subprocess # 构建命令 program_path = Path(request.experiment_root) / request.program_path results_dir = Path(request.experiment_root) / request.results_dir evaluator_path = request.evaluation_config.primary_evaluator cmd = [ "python", evaluator_path, "--program_path", str(program_path), "--results_dir", str(results_dir) ] # 添加额外参数 for key, value in request.evaluation_config.extra_args.items(): if isinstance(value, bool): if value: cmd.append(f"--{key}") else: cmd.extend([f"--{key}", str(value)]) # 运行评估器 try: process = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE ) stdout, stderr = await asyncio.wait_for( process.communicate(), timeout=request.evaluation_config.timeout ) if process.returncode != 0: return { "success": False, "error": f"Evaluator failed: {stderr.decode()}" } # 读取结果 metrics_file = results_dir / "metrics.json" if not metrics_file.exists(): return { "success": False, "error": "metrics.json not found" } with open(metrics_file) as f: metrics = json.load(f) # 尝试加载程序输出(用于 auxiliary evaluation) program_output = load_program_output(results_dir) return { "success": True, "combined_score": metrics["combined_score"], "correct": True, # 如果执行到这里 "public_metrics": metrics.get("public", {}), "private_metrics": metrics.get("private", {}), "execution_time": metrics.get("execution_time_mean", 0), "num_valid_runs": metrics.get("num_valid_runs", 0), "all_validation_errors": metrics.get("all_validation_errors", []), "program_output": program_output } except asyncio.TimeoutError: return { "success": False, "error": f"Evaluation timeout after {request.evaluation_config.timeout}s" } except Exception as e: return { "success": False, "error": f"Evaluation error: {str(e)}" } async def run_dynamic_auxiliary( experiment_root: str, program_output: Any, timeout: int ) -> Optional[Dict[str, Any]]: """ Run dynamically generated auxiliary metrics (任务无关) Key: This works with ANY task by using the generic program_output """ metrics_file = Path(experiment_root) / "eval_agent_memory" / "auxiliary_metrics.py" if not metrics_file.exists(): return None try: # 动态导入 spec = importlib.util.spec_from_file_location("dynamic_aux", metrics_file) module = importlib.util.module_from_spec(spec) spec.loader.exec_module(module) # 查找标准接口 if not hasattr(module, 'evaluate_auxiliary_metrics'): logger.warning("No 'evaluate_auxiliary_metrics' function found") return None eval_fn = module.evaluate_auxiliary_metrics # ✅ 关键:任务无关的调用 # program_output 可以是任何格式(dict, tuple, object, etc.) # 由 auxiliary_metrics.py 自己知道如何处理 aux_results = await asyncio.wait_for( asyncio.to_thread(eval_fn, program_output), timeout=timeout ) # 提取定义 definitions = {} if hasattr(module, 'METRIC_DEFINITIONS'): definitions = module.METRIC_DEFINITIONS return { "metrics": aux_results, "definitions": definitions } except asyncio.TimeoutError: logger.warning(f"Auxiliary metrics timeout after {timeout}s") return None except Exception as e: logger.error(f"Auxiliary metrics failed: {e}", exc_info=True) return None def load_program_output(results_dir: Path) -> Any: """ Load program output (任务无关) 尝试多种格式,返回最通用的表示 """ # 1. 尝试 extra.npz (numpy arrays) npz_file = results_dir / "extra.npz" if npz_file.exists(): try: data = np.load(npz_file) return {k: data[k] for k in data.files} except: pass # 2. 尝试 extra.pkl (pickle) pkl_file = results_dir / "extra.pkl" if pkl_file.exists(): try: import pickle with open(pkl_file, 'rb') as f: return pickle.load(f) except: pass # 3. 尝试 extra.json (json) json_file = results_dir / "extra.json" if json_file.exists(): try: with open(json_file) as f: return json.load(f) except: pass # 4. 从 metrics.json 提取 metrics_file = results_dir / "metrics.json" if metrics_file.exists(): try: with open(metrics_file) as f: metrics = json.load(f) # 返回 public + private 作为 output return { "public": metrics.get("public", {}), "private": metrics.get("private", {}) } except: pass return None ``` --- ## 🎨 Agent Prompt 的通用化 ### 新的 Prompt 要求 ```jinja2 You are an evaluation expert for optimization problems. Task: {{ task_name }} Objective: {{ objective }} Primary Metric: {{ primary_metric_name }} ({{ primary_metric_interpretation }}) Design AUXILIARY metrics that complement the primary metric. These metrics should: 1. Measure aspects NOT captured by primary metric 2. Work with the program output format 3. Be computationally efficient The program output is available as a Python object with structure: {{ program_output_structure }} Example: {{ program_output_example }} You MUST create auxiliary_metrics.py with: ```python def evaluate_auxiliary_metrics(program_output) -> dict: """ Evaluate auxiliary metrics. Args: program_output: Program output (format depends on task) For circle packing: {"centers": ndarray, "radii": ndarray} For TSP: {"tour": list, "distance": float} For code: {"code": str, "ast": dict} Returns: dict: Auxiliary metrics """ # Extract what you need from program_output # Task-specific logic here return { "metric_name_1": value1, "metric_name_2": value2, } METRIC_DEFINITIONS = { "metric_name_1": { "name": "Human Readable Name", "description": "What this metric measures", "interpretation": "higher_better" | "lower_better" | "neutral", "unit": "...", "formula": "..." }, ... } ``` {% for example in auxiliary_examples %} {{ example }} {% endfor %} ``` --- ## 📋 迁移路径 ### Phase 1: 保持向后兼容 (1周) **目标**: 支持新旧两种模式 ```python # Config option class EvolutionConfig: eval_service_url: Optional[str] = None use_eval_service_for_evaluation: bool = False # ← 新选项 ``` - `use_eval_service_for_evaluation = False`: 使用旧方案(scheduler → evaluate.py) - `use_eval_service_for_evaluation = True`: 使用新方案(eval service 负责) ### Phase 2: 实现新的 Eval Service API (2周) - [ ] 实现 `/api/v1/evaluate` endpoint - [ ] 实现 `run_full_evaluation` pipeline - [ ] 实现通用的 `load_program_output` - [ ] 测试多种任务格式 ### Phase 3: 更新 Agent Prompt (1周) - [ ] 通用化 prompt 模板 - [ ] 添加 task-specific examples - [ ] 测试 agent 生成的代码 ### Phase 4: 全面切换 (1周) - [ ] 默认启用新模式 - [ ] 废弃旧 scheduler 调用方式 - [ ] 文档更新 --- ## 🎯 成功标准 ### 通用性验证 测试 3 个不同类型的任务: 1. **Circle Packing** (几何优化) - program_output: `{"centers": ndarray, "radii": ndarray}` - auxiliary metrics: spatial uniformity, radius distribution 2. **TSP** (组合优化) - program_output: `{"tour": list, "distance": float}` - auxiliary metrics: tour smoothness, cluster quality 3. **Code Optimization** (程序综合) - program_output: `{"code": str, "ast": dict, "runtime": float}` - auxiliary metrics: code complexity, readability, efficiency 如果所有3个任务都能用同样的架构,则验证通过 ✅ ### 职责清晰性验证 - [ ] ShinkaEvolve 不直接调用 evaluate.py - [ ] Eval Service 完全负责评估 - [ ] Primary 和 auxiliary 在同一次调用中完成 - [ ] 结果格式统一 --- ## 📄 总结 ### 关键改进 1. **✅ 任务无关**: - 使用通用的 `program_output` 格式 - Agent 适应不同任务的输出结构 2. **✅ 职责清晰**: - ShinkaEvolve: 演化逻辑 - Eval Service: 完整评估(primary + auxiliary) 3. **✅ 一次完成**: - 一次运行得到所有 metrics - 避免重复加载程序输出 ### 下一步 1. **立即开始**: 实现新的 `/api/v1/evaluate` API 2. **保持兼容**: 通过 config flag 支持新旧模式 3. **逐步迁移**: Phase 1-4 按顺序实施