Eval Service 重新设计方案(通用 + 职责清晰)
🎯 核心问题
当前设计的问题
- ❌ 过于针对 circle packing: 假设了 centers/radii 特定数据格式
- ❌ 职责不清: ShinkaEvolve 负责评估,Eval Service 只是"旁观者"
- ❌ 重复工作: 如果要运行 auxiliary metrics,需要重新加载程序输出
理想设计
- ✅ 任务无关: 适用于任何优化问题(TSP, circuit design, code optimization, etc.)
- ✅ 职责清晰: Eval Service 完全负责评估(primary + auxiliary)
- ✅ 一次性完成: 一次运行完成所有评估,避免重复
🏗️ 重新设计的架构
新的职责划分
┌─────────────────────────────────────────────────────────┐
│ ShinkaEvolve (Evolution Loop) │
│ 职责: │
│ • 生成新代码 (mutation/crossover) │
│ • 管理演化流程 (parent selection, population, etc.) │
│ • 存储结果到 database │
│ • 决策下一代的演化策略 │
│ │
│ 不负责:❌ 运行和评估程序 │
└─────────────────────────────────────────────────────────┘
↓ 提交代码
┌─────────────────────────────────────────────────────────┐
│ Eval Service (Evaluation Service) │
│ 职责: │
│ • 接收待评估的程序代码 │
│ • 运行 PRIMARY evaluator (ground truth) │
│ • 运行 AUXILIARY evaluators (if available) │
│ • 生成完整的 metrics.json │
│ • [可选] 触发 agent 更新 auxiliary evaluators │
│ │
│ 输出:完整的 evaluation results │
└─────────────────────────────────────────────────────────┘
↓ 返回结果
┌─────────────────────────────────────────────────────────┐
│ ShinkaEvolve (继续演化) │
│ • 读取 metrics.json │
│ • 提取 combined_score → 演化决策 │
│ • [可选] 读取 auxiliary metrics → 用于 LLM prompt │
└─────────────────────────────────────────────────────────┘
📊 通用接口设计
核心原则:Evaluator Contract(评估器契约)
任何任务的 evaluator 必须遵循标准接口:
# Standard Evaluator Interface (任务无关)
def evaluate(
program_path: str,
results_dir: str,
**kwargs
) -> EvaluationResult:
"""
标准评估接口
Args:
program_path: 待评估程序的路径
results_dir: 结果保存目录
**kwargs: 任务特定的额外参数
Returns:
EvaluationResult: 包含 primary metrics, auxiliary metrics, metadata
"""
pass
@dataclass
class EvaluationResult:
"""Standard evaluation result (任务无关)"""
# PRIMARY (必须,决定演化)
combined_score: float # Ground truth score
correct: bool # 是否通过验证
error: Optional[str] # 如果失败,错误信息
# PUBLIC METRICS (可选,可见给 LLM)
public_metrics: Dict[str, Any]
# PRIVATE METRICS (可选,不可见给 LLM)
private_metrics: Dict[str, Any]
# AUXILIARY METRICS (可选,由 eval agent 动态生成)
auxiliary_metrics: Dict[str, Any]
auxiliary_definitions: Dict[str, MetricDefinition]
# PROGRAM OUTPUT (可选,用于后续分析/可视化)
program_output: Any # 任务特定的输出
# METADATA
execution_time: float
num_runs: int
timestamp: str
@dataclass
class MetricDefinition:
"""Metric definition (任务无关)"""
name: str
description: str
interpretation: str # "higher_better", "lower_better", "neutral"
unit: str
formula: Optional[str]
source: str # "primary", "auxiliary_static", "auxiliary_dynamic"
🔧 Eval Service 重新实现
新的 API 设计
1. Evaluation Request (同步/异步)
# POST /api/v1/evaluate
{
"program_path": "gen_42/main.py", # 相对于 experiment root
"results_dir": "gen_42/results", # 相对于 experiment root
"generation": 42,
"experiment_root": "/path/to/results_...", # 实验根目录
"evaluation_config": {
"primary_evaluator": "examples/circle_packing/evaluate.py",
"num_runs": 1,
"timeout": 300, # seconds
"extra_args": {}
},
"auxiliary_config": {
"enabled": true,
"use_dynamic": true, # 使用 agent 生成的 metrics
"use_static": false, # 使用预定义的 metrics
"timeout": 10 # per metric
}
}
# Response (如果异步)
{
"status": "accepted",
"job_id": "eval_job_42",
"estimated_time": 15 # seconds
}
# 或者 (如果同步)
{
"status": "completed",
"evaluation_result": {...} # EvaluationResult
}
2. Query Evaluation Result
# GET /api/v1/evaluate/{job_id}
{
"status": "completed", # "pending", "running", "completed", "failed"
"evaluation_result": {
"combined_score": 2.34,
"correct": true,
"error": null,
"public_metrics": {
"num_circles": 26,
"aux_radius_std_dev": 0.031,
"aux_spatial_uniformity": 0.82
},
"private_metrics": {...},
"auxiliary_metrics": {...},
"auxiliary_definitions": {...},
"program_output": {...}, # task-specific
"execution_time": 12.5,
"timestamp": "2026-02-03T04:00:00Z"
}
}
🔄 完整流程(新架构)
Step 1: ShinkaEvolve 提交评估请求
# shinka/core/runner.py
def _submit_new_job(self):
current_gen = self.next_generation_to_submit
exec_fname = f"{self.results_dir}/gen_{current_gen}/main.py"
results_dir = f"{self.results_dir}/gen_{current_gen}/results"
# 创建目录
Path(results_dir).mkdir(parents=True, exist_ok=True)
# ✅ 新方案:提交到 Eval Service (如果配置)
if self.eval_service_url:
job_id = self._submit_to_eval_service(
exec_fname=exec_fname,
results_dir=results_dir,
generation=current_gen
)
else:
# 兼容旧方案:使用 local scheduler
job_id = self.scheduler.submit_async(exec_fname, results_dir)
# 跟踪 job
running_job = RunningJob(
job_id=job_id,
exec_fname=exec_fname,
results_dir=results_dir,
generation=current_gen,
...
)
self.running_jobs.append(running_job)
def _submit_to_eval_service(
self,
exec_fname: str,
results_dir: str,
generation: int
) -> str:
"""Submit evaluation request to Eval Service"""
import requests
payload = {
"program_path": exec_fname,
"results_dir": results_dir,
"generation": generation,
"experiment_root": str(self.results_dir),
"evaluation_config": {
"primary_evaluator": self.job_config.eval_program_path,
"num_runs": 1,
"timeout": 300,
"extra_args": self.job_config.extra_cmd_args or {}
},
"auxiliary_config": {
"enabled": True,
"use_dynamic": True,
"timeout": 10
}
}
response = requests.post(
f"{self.eval_service_url}/api/v1/evaluate",
json=payload,
timeout=5.0 # 快速提交
)
if response.status_code == 200:
data = response.json()
return data["job_id"] # 返回 eval service 的 job_id
else:
raise RuntimeError(f"Eval service submission failed: {response.status_code}")
def _check_completed_jobs(self) -> List[RunningJob]:
"""Check for completed jobs"""
completed = []
still_running = []
for job in self.running_jobs:
if self.eval_service_url:
# ✅ 新方案:查询 eval service
is_complete, result = self._query_eval_service(job.job_id)
if is_complete:
# 结果已经保存在 job.results_dir/metrics.json
completed.append(job)
else:
still_running.append(job)
else:
# 兼容旧方案:使用 local scheduler
is_running = self.scheduler.check_job_status(job)
if not is_running:
completed.append(job)
else:
still_running.append(job)
self.running_jobs = still_running
return completed
def _query_eval_service(self, job_id: str) -> Tuple[bool, Optional[Dict]]:
"""Query evaluation result from Eval Service"""
import requests
try:
response = requests.get(
f"{self.eval_service_url}/api/v1/evaluate/{job_id}",
timeout=2.0
)
if response.status_code == 200:
data = response.json()
if data["status"] == "completed":
return True, data["evaluation_result"]
elif data["status"] == "failed":
return True, None # Failed but done
else:
return False, None # Still running
else:
return False, None
except Exception as e:
logger.warning(f"Failed to query eval service: {e}")
return False, None
Step 2: Eval Service 处理评估请求
# eval_agent/ev2_service_standalone.py
@app.post("/api/v1/evaluate")
async def evaluate_program(
request: EvaluationRequest,
background_tasks: BackgroundTasks
):
"""
Main evaluation endpoint
Handles both primary and auxiliary evaluation in one place.
"""
# 生成 job_id
job_id = f"eval_{request.generation}_{int(time.time())}"
# 创建 job tracking
eval_jobs[job_id] = {
"status": "pending",
"request": request,
"started_at": time.time(),
"result": None
}
# 启动后台评估任务
background_tasks.add_task(
run_full_evaluation,
job_id=job_id,
request=request
)
# 立即返回 (异步)
return {
"status": "accepted",
"job_id": job_id,
"estimated_time": 15
}
async def run_full_evaluation(job_id: str, request: EvaluationRequest):
"""
Complete evaluation pipeline (primary + auxiliary)
This runs in the background.
"""
logger.info(f"=" * 60)
logger.info(f"🔄 Starting Evaluation: {job_id}")
logger.info(f"=" * 60)
try:
eval_jobs[job_id]["status"] = "running"
# ===== Step 1: Run Primary Evaluator =====
logger.info("📊 Step 1: Running primary evaluator...")
primary_result = await run_primary_evaluator(request)
if not primary_result["success"]:
eval_jobs[job_id]["status"] = "failed"
eval_jobs[job_id]["error"] = primary_result["error"]
return
logger.info(f"✅ Primary evaluation completed: score={primary_result['combined_score']:.4f}")
# ===== Step 2: Load Auxiliary Evaluators (if enabled) =====
auxiliary_results = {}
auxiliary_definitions = {}
if request.auxiliary_config.enabled:
logger.info("🔧 Step 2: Running auxiliary evaluators...")
# 2a. Dynamic metrics (agent-generated)
if request.auxiliary_config.use_dynamic:
dynamic_results = await run_dynamic_auxiliary(
experiment_root=request.experiment_root,
program_output=primary_result.get("program_output"),
timeout=request.auxiliary_config.timeout
)
if dynamic_results:
auxiliary_results.update(dynamic_results["metrics"])
auxiliary_definitions.update(dynamic_results["definitions"])
logger.info(f"✅ Dynamic auxiliary: {len(dynamic_results['metrics'])} metrics")
# 2b. Static metrics (pre-defined)
if request.auxiliary_config.use_static:
static_results = await run_static_auxiliary(
experiment_root=request.experiment_root,
program_output=primary_result.get("program_output")
)
if static_results:
auxiliary_results.update(static_results["metrics"])
auxiliary_definitions.update(static_results["definitions"])
logger.info(f"✅ Static auxiliary: {len(static_results['metrics'])} metrics")
# ===== Step 3: Merge and Save Results =====
logger.info("💾 Step 3: Saving complete evaluation results...")
complete_result = {
"combined_score": primary_result["combined_score"],
"correct": primary_result["correct"],
"error": primary_result.get("error"),
"public_metrics": {
**primary_result.get("public_metrics", {}),
**{f"aux_{k}": v for k, v in auxiliary_results.items()}
},
"private_metrics": primary_result.get("private_metrics", {}),
"auxiliary_metric_definitions": auxiliary_definitions,
"auxiliary_metadata": {
"executed": len(auxiliary_results) > 0,
"num_metrics_computed": len(auxiliary_results),
"timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
},
"execution_time_mean": primary_result.get("execution_time", 0),
"num_valid_runs": primary_result.get("num_valid_runs", 0),
"all_validation_errors": primary_result.get("all_validation_errors", [])
}
# 保存到 metrics.json
metrics_file = Path(request.experiment_root) / request.results_dir / "metrics.json"
with open(metrics_file, 'w') as f:
json.dump(complete_result, f, indent=2)
# 保存 correct.json
correct_file = Path(request.experiment_root) / request.results_dir / "correct.json"
with open(correct_file, 'w') as f:
json.dump({
"correct": primary_result["correct"],
"error": primary_result.get("error")
}, f, indent=2)
logger.info(f"✅ Results saved to: {metrics_file}")
# ===== Step 4: Update Job Status =====
eval_jobs[job_id]["status"] = "completed"
eval_jobs[job_id]["result"] = complete_result
logger.info(f"=" * 60)
logger.info(f"✅ Evaluation Completed: {job_id}")
logger.info(f"=" * 60)
except Exception as e:
logger.error(f"❌ Evaluation failed: {e}", exc_info=True)
eval_jobs[job_id]["status"] = "failed"
eval_jobs[job_id]["error"] = str(e)
async def run_primary_evaluator(request: EvaluationRequest) -> Dict[str, Any]:
"""
Run the primary evaluator (task-specific)
This calls the existing evaluate.py or equivalent.
"""
import subprocess
# 构建命令
program_path = Path(request.experiment_root) / request.program_path
results_dir = Path(request.experiment_root) / request.results_dir
evaluator_path = request.evaluation_config.primary_evaluator
cmd = [
"python",
evaluator_path,
"--program_path", str(program_path),
"--results_dir", str(results_dir)
]
# 添加额外参数
for key, value in request.evaluation_config.extra_args.items():
if isinstance(value, bool):
if value:
cmd.append(f"--{key}")
else:
cmd.extend([f"--{key}", str(value)])
# 运行评估器
try:
process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
stdout, stderr = await asyncio.wait_for(
process.communicate(),
timeout=request.evaluation_config.timeout
)
if process.returncode != 0:
return {
"success": False,
"error": f"Evaluator failed: {stderr.decode()}"
}
# 读取结果
metrics_file = results_dir / "metrics.json"
if not metrics_file.exists():
return {
"success": False,
"error": "metrics.json not found"
}
with open(metrics_file) as f:
metrics = json.load(f)
# 尝试加载程序输出(用于 auxiliary evaluation)
program_output = load_program_output(results_dir)
return {
"success": True,
"combined_score": metrics["combined_score"],
"correct": True, # 如果执行到这里
"public_metrics": metrics.get("public", {}),
"private_metrics": metrics.get("private", {}),
"execution_time": metrics.get("execution_time_mean", 0),
"num_valid_runs": metrics.get("num_valid_runs", 0),
"all_validation_errors": metrics.get("all_validation_errors", []),
"program_output": program_output
}
except asyncio.TimeoutError:
return {
"success": False,
"error": f"Evaluation timeout after {request.evaluation_config.timeout}s"
}
except Exception as e:
return {
"success": False,
"error": f"Evaluation error: {str(e)}"
}
async def run_dynamic_auxiliary(
experiment_root: str,
program_output: Any,
timeout: int
) -> Optional[Dict[str, Any]]:
"""
Run dynamically generated auxiliary metrics (任务无关)
Key: This works with ANY task by using the generic program_output
"""
metrics_file = Path(experiment_root) / "eval_agent_memory" / "auxiliary_metrics.py"
if not metrics_file.exists():
return None
try:
# 动态导入
spec = importlib.util.spec_from_file_location("dynamic_aux", metrics_file)
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
# 查找标准接口
if not hasattr(module, 'evaluate_auxiliary_metrics'):
logger.warning("No 'evaluate_auxiliary_metrics' function found")
return None
eval_fn = module.evaluate_auxiliary_metrics
# ✅ 关键:任务无关的调用
# program_output 可以是任何格式(dict, tuple, object, etc.)
# 由 auxiliary_metrics.py 自己知道如何处理
aux_results = await asyncio.wait_for(
asyncio.to_thread(eval_fn, program_output),
timeout=timeout
)
# 提取定义
definitions = {}
if hasattr(module, 'METRIC_DEFINITIONS'):
definitions = module.METRIC_DEFINITIONS
return {
"metrics": aux_results,
"definitions": definitions
}
except asyncio.TimeoutError:
logger.warning(f"Auxiliary metrics timeout after {timeout}s")
return None
except Exception as e:
logger.error(f"Auxiliary metrics failed: {e}", exc_info=True)
return None
def load_program_output(results_dir: Path) -> Any:
"""
Load program output (任务无关)
尝试多种格式,返回最通用的表示
"""
# 1. 尝试 extra.npz (numpy arrays)
npz_file = results_dir / "extra.npz"
if npz_file.exists():
try:
data = np.load(npz_file)
return {k: data[k] for k in data.files}
except:
pass
# 2. 尝试 extra.pkl (pickle)
pkl_file = results_dir / "extra.pkl"
if pkl_file.exists():
try:
import pickle
with open(pkl_file, 'rb') as f:
return pickle.load(f)
except:
pass
# 3. 尝试 extra.json (json)
json_file = results_dir / "extra.json"
if json_file.exists():
try:
with open(json_file) as f:
return json.load(f)
except:
pass
# 4. 从 metrics.json 提取
metrics_file = results_dir / "metrics.json"
if metrics_file.exists():
try:
with open(metrics_file) as f:
metrics = json.load(f)
# 返回 public + private 作为 output
return {
"public": metrics.get("public", {}),
"private": metrics.get("private", {})
}
except:
pass
return None
🎨 Agent Prompt 的通用化
新的 Prompt 要求
You are an evaluation expert for optimization problems.
<TASK_CONTEXT>
Task: {{ task_name }}
Objective: {{ objective }}
Primary Metric: {{ primary_metric_name }} ({{ primary_metric_interpretation }})
</TASK_CONTEXT>
<YOUR_ROLE>
Design AUXILIARY metrics that complement the primary metric.
These metrics should:
1. Measure aspects NOT captured by primary metric
2. Work with the program output format
3. Be computationally efficient
</YOUR_ROLE>
<PROGRAM_OUTPUT_FORMAT>
The program output is available as a Python object with structure:
{{ program_output_structure }}
Example:
{{ program_output_example }}
</PROGRAM_OUTPUT_FORMAT>
<REQUIRED_INTERFACE>
You MUST create auxiliary_metrics.py with:
```python
def evaluate_auxiliary_metrics(program_output) -> dict:
"""
Evaluate auxiliary metrics.
Args:
program_output: Program output (format depends on task)
For circle packing: {"centers": ndarray, "radii": ndarray}
For TSP: {"tour": list, "distance": float}
For code: {"code": str, "ast": dict}
Returns:
dict: Auxiliary metrics
"""
# Extract what you need from program_output
# Task-specific logic here
return {
"metric_name_1": value1,
"metric_name_2": value2,
}
METRIC_DEFINITIONS = {
"metric_name_1": {
"name": "Human Readable Name",
"description": "What this metric measures",
"interpretation": "higher_better" | "lower_better" | "neutral",
"unit": "...",
"formula": "..."
},
...
}
{% for example in auxiliary_examples %}
{{ example }}
{% endfor %}
```
📋 迁移路径
Phase 1: 保持向后兼容 (1周)
目标: 支持新旧两种模式
# Config option
class EvolutionConfig:
eval_service_url: Optional[str] = None
use_eval_service_for_evaluation: bool = False # ← 新选项
use_eval_service_for_evaluation = False: 使用旧方案(scheduler → evaluate.py)use_eval_service_for_evaluation = True: 使用新方案(eval service 负责)
Phase 2: 实现新的 Eval Service API (2周)
- 实现
/api/v1/evaluateendpoint - 实现
run_full_evaluationpipeline - 实现通用的
load_program_output - 测试多种任务格式
Phase 3: 更新 Agent Prompt (1周)
- 通用化 prompt 模板
- 添加 task-specific examples
- 测试 agent 生成的代码
Phase 4: 全面切换 (1周)
- 默认启用新模式
- 废弃旧 scheduler 调用方式
- 文档更新
🎯 成功标准
通用性验证
测试 3 个不同类型的任务:
Circle Packing (几何优化)
- program_output:
{"centers": ndarray, "radii": ndarray} - auxiliary metrics: spatial uniformity, radius distribution
- program_output:
TSP (组合优化)
- program_output:
{"tour": list, "distance": float} - auxiliary metrics: tour smoothness, cluster quality
- program_output:
Code Optimization (程序综合)
- program_output:
{"code": str, "ast": dict, "runtime": float} - auxiliary metrics: code complexity, readability, efficiency
- program_output:
如果所有3个任务都能用同样的架构,则验证通过 ✅
职责清晰性验证
- ShinkaEvolve 不直接调用 evaluate.py
- Eval Service 完全负责评估
- Primary 和 auxiliary 在同一次调用中完成
- 结果格式统一
📄 总结
关键改进
✅ 任务无关:
- 使用通用的
program_output格式 - Agent 适应不同任务的输出结构
- 使用通用的
✅ 职责清晰:
- ShinkaEvolve: 演化逻辑
- Eval Service: 完整评估(primary + auxiliary)
✅ 一次完成:
- 一次运行得到所有 metrics
- 避免重复加载程序输出
下一步
- 立即开始: 实现新的
/api/v1/evaluateAPI - 保持兼容: 通过 config flag 支持新旧模式
- 逐步迁移: Phase 1-4 按顺序实施