shinka-backup / docs /eval_service_redesign_v2.md
JustinTX's picture
Add files using upload-large-folder tool
1556404 verified

Eval Service 重新设计方案(通用 + 职责清晰)

🎯 核心问题

当前设计的问题

  1. ❌ 过于针对 circle packing: 假设了 centers/radii 特定数据格式
  2. ❌ 职责不清: ShinkaEvolve 负责评估,Eval Service 只是"旁观者"
  3. ❌ 重复工作: 如果要运行 auxiliary metrics,需要重新加载程序输出

理想设计

  1. ✅ 任务无关: 适用于任何优化问题(TSP, circuit design, code optimization, etc.)
  2. ✅ 职责清晰: Eval Service 完全负责评估(primary + auxiliary)
  3. ✅ 一次性完成: 一次运行完成所有评估,避免重复

🏗️ 重新设计的架构

新的职责划分

┌─────────────────────────────────────────────────────────┐
│              ShinkaEvolve (Evolution Loop)              │
│  职责:                                                  │
│  • 生成新代码 (mutation/crossover)                       │
│  • 管理演化流程 (parent selection, population, etc.)    │
│  • 存储结果到 database                                   │
│  • 决策下一代的演化策略                                  │
│                                                          │
│  不负责:❌ 运行和评估程序                               │
└─────────────────────────────────────────────────────────┘
                    ↓ 提交代码
┌─────────────────────────────────────────────────────────┐
│            Eval Service (Evaluation Service)            │
│  职责:                                                  │
│  • 接收待评估的程序代码                                  │
│  • 运行 PRIMARY evaluator (ground truth)                │
│  • 运行 AUXILIARY evaluators (if available)             │
│  • 生成完整的 metrics.json                               │
│  • [可选] 触发 agent 更新 auxiliary evaluators          │
│                                                          │
│  输出:完整的 evaluation results                         │
└─────────────────────────────────────────────────────────┘
                    ↓ 返回结果
┌─────────────────────────────────────────────────────────┐
│              ShinkaEvolve (继续演化)                     │
│  • 读取 metrics.json                                     │
│  • 提取 combined_score → 演化决策                       │
│  • [可选] 读取 auxiliary metrics → 用于 LLM prompt      │
└─────────────────────────────────────────────────────────┘

📊 通用接口设计

核心原则:Evaluator Contract(评估器契约)

任何任务的 evaluator 必须遵循标准接口:

# Standard Evaluator Interface (任务无关)

def evaluate(
    program_path: str,
    results_dir: str,
    **kwargs
) -> EvaluationResult:
    """
    标准评估接口
    
    Args:
        program_path: 待评估程序的路径
        results_dir: 结果保存目录
        **kwargs: 任务特定的额外参数
    
    Returns:
        EvaluationResult: 包含 primary metrics, auxiliary metrics, metadata
    """
    pass


@dataclass
class EvaluationResult:
    """Standard evaluation result (任务无关)"""
    
    # PRIMARY (必须,决定演化)
    combined_score: float          # Ground truth score
    correct: bool                  # 是否通过验证
    error: Optional[str]           # 如果失败,错误信息
    
    # PUBLIC METRICS (可选,可见给 LLM)
    public_metrics: Dict[str, Any]
    
    # PRIVATE METRICS (可选,不可见给 LLM)
    private_metrics: Dict[str, Any]
    
    # AUXILIARY METRICS (可选,由 eval agent 动态生成)
    auxiliary_metrics: Dict[str, Any]
    auxiliary_definitions: Dict[str, MetricDefinition]
    
    # PROGRAM OUTPUT (可选,用于后续分析/可视化)
    program_output: Any            # 任务特定的输出
    
    # METADATA
    execution_time: float
    num_runs: int
    timestamp: str


@dataclass
class MetricDefinition:
    """Metric definition (任务无关)"""
    name: str
    description: str
    interpretation: str  # "higher_better", "lower_better", "neutral"
    unit: str
    formula: Optional[str]
    source: str  # "primary", "auxiliary_static", "auxiliary_dynamic"

🔧 Eval Service 重新实现

新的 API 设计

1. Evaluation Request (同步/异步)

# POST /api/v1/evaluate
{
  "program_path": "gen_42/main.py",          # 相对于 experiment root
  "results_dir": "gen_42/results",           # 相对于 experiment root
  "generation": 42,
  "experiment_root": "/path/to/results_...", # 实验根目录
  "evaluation_config": {
    "primary_evaluator": "examples/circle_packing/evaluate.py",
    "num_runs": 1,
    "timeout": 300,  # seconds
    "extra_args": {}
  },
  "auxiliary_config": {
    "enabled": true,
    "use_dynamic": true,  # 使用 agent 生成的 metrics
    "use_static": false,  # 使用预定义的 metrics
    "timeout": 10         # per metric
  }
}

# Response (如果异步)
{
  "status": "accepted",
  "job_id": "eval_job_42",
  "estimated_time": 15  # seconds
}

# 或者 (如果同步)
{
  "status": "completed",
  "evaluation_result": {...}  # EvaluationResult
}

2. Query Evaluation Result

# GET /api/v1/evaluate/{job_id}
{
  "status": "completed",  # "pending", "running", "completed", "failed"
  "evaluation_result": {
    "combined_score": 2.34,
    "correct": true,
    "error": null,
    "public_metrics": {
      "num_circles": 26,
      "aux_radius_std_dev": 0.031,
      "aux_spatial_uniformity": 0.82
    },
    "private_metrics": {...},
    "auxiliary_metrics": {...},
    "auxiliary_definitions": {...},
    "program_output": {...},  # task-specific
    "execution_time": 12.5,
    "timestamp": "2026-02-03T04:00:00Z"
  }
}

🔄 完整流程(新架构)

Step 1: ShinkaEvolve 提交评估请求

# shinka/core/runner.py

def _submit_new_job(self):
    current_gen = self.next_generation_to_submit
    
    exec_fname = f"{self.results_dir}/gen_{current_gen}/main.py"
    results_dir = f"{self.results_dir}/gen_{current_gen}/results"
    
    # 创建目录
    Path(results_dir).mkdir(parents=True, exist_ok=True)
    
    # ✅ 新方案:提交到 Eval Service (如果配置)
    if self.eval_service_url:
        job_id = self._submit_to_eval_service(
            exec_fname=exec_fname,
            results_dir=results_dir,
            generation=current_gen
        )
    else:
        # 兼容旧方案:使用 local scheduler
        job_id = self.scheduler.submit_async(exec_fname, results_dir)
    
    # 跟踪 job
    running_job = RunningJob(
        job_id=job_id,
        exec_fname=exec_fname,
        results_dir=results_dir,
        generation=current_gen,
        ...
    )
    self.running_jobs.append(running_job)


def _submit_to_eval_service(
    self,
    exec_fname: str,
    results_dir: str,
    generation: int
) -> str:
    """Submit evaluation request to Eval Service"""
    
    import requests
    
    payload = {
        "program_path": exec_fname,
        "results_dir": results_dir,
        "generation": generation,
        "experiment_root": str(self.results_dir),
        "evaluation_config": {
            "primary_evaluator": self.job_config.eval_program_path,
            "num_runs": 1,
            "timeout": 300,
            "extra_args": self.job_config.extra_cmd_args or {}
        },
        "auxiliary_config": {
            "enabled": True,
            "use_dynamic": True,
            "timeout": 10
        }
    }
    
    response = requests.post(
        f"{self.eval_service_url}/api/v1/evaluate",
        json=payload,
        timeout=5.0  # 快速提交
    )
    
    if response.status_code == 200:
        data = response.json()
        return data["job_id"]  # 返回 eval service 的 job_id
    else:
        raise RuntimeError(f"Eval service submission failed: {response.status_code}")


def _check_completed_jobs(self) -> List[RunningJob]:
    """Check for completed jobs"""
    completed = []
    still_running = []
    
    for job in self.running_jobs:
        if self.eval_service_url:
            # ✅ 新方案:查询 eval service
            is_complete, result = self._query_eval_service(job.job_id)
            if is_complete:
                # 结果已经保存在 job.results_dir/metrics.json
                completed.append(job)
            else:
                still_running.append(job)
        else:
            # 兼容旧方案:使用 local scheduler
            is_running = self.scheduler.check_job_status(job)
            if not is_running:
                completed.append(job)
            else:
                still_running.append(job)
    
    self.running_jobs = still_running
    return completed


def _query_eval_service(self, job_id: str) -> Tuple[bool, Optional[Dict]]:
    """Query evaluation result from Eval Service"""
    
    import requests
    
    try:
        response = requests.get(
            f"{self.eval_service_url}/api/v1/evaluate/{job_id}",
            timeout=2.0
        )
        
        if response.status_code == 200:
            data = response.json()
            if data["status"] == "completed":
                return True, data["evaluation_result"]
            elif data["status"] == "failed":
                return True, None  # Failed but done
            else:
                return False, None  # Still running
        else:
            return False, None
    
    except Exception as e:
        logger.warning(f"Failed to query eval service: {e}")
        return False, None

Step 2: Eval Service 处理评估请求

# eval_agent/ev2_service_standalone.py

@app.post("/api/v1/evaluate")
async def evaluate_program(
    request: EvaluationRequest,
    background_tasks: BackgroundTasks
):
    """
    Main evaluation endpoint
    
    Handles both primary and auxiliary evaluation in one place.
    """
    # 生成 job_id
    job_id = f"eval_{request.generation}_{int(time.time())}"
    
    # 创建 job tracking
    eval_jobs[job_id] = {
        "status": "pending",
        "request": request,
        "started_at": time.time(),
        "result": None
    }
    
    # 启动后台评估任务
    background_tasks.add_task(
        run_full_evaluation,
        job_id=job_id,
        request=request
    )
    
    # 立即返回 (异步)
    return {
        "status": "accepted",
        "job_id": job_id,
        "estimated_time": 15
    }


async def run_full_evaluation(job_id: str, request: EvaluationRequest):
    """
    Complete evaluation pipeline (primary + auxiliary)
    
    This runs in the background.
    """
    logger.info(f"=" * 60)
    logger.info(f"🔄 Starting Evaluation: {job_id}")
    logger.info(f"=" * 60)
    
    try:
        eval_jobs[job_id]["status"] = "running"
        
        # ===== Step 1: Run Primary Evaluator =====
        logger.info("📊 Step 1: Running primary evaluator...")
        
        primary_result = await run_primary_evaluator(request)
        
        if not primary_result["success"]:
            eval_jobs[job_id]["status"] = "failed"
            eval_jobs[job_id]["error"] = primary_result["error"]
            return
        
        logger.info(f"✅ Primary evaluation completed: score={primary_result['combined_score']:.4f}")
        
        # ===== Step 2: Load Auxiliary Evaluators (if enabled) =====
        auxiliary_results = {}
        auxiliary_definitions = {}
        
        if request.auxiliary_config.enabled:
            logger.info("🔧 Step 2: Running auxiliary evaluators...")
            
            # 2a. Dynamic metrics (agent-generated)
            if request.auxiliary_config.use_dynamic:
                dynamic_results = await run_dynamic_auxiliary(
                    experiment_root=request.experiment_root,
                    program_output=primary_result.get("program_output"),
                    timeout=request.auxiliary_config.timeout
                )
                if dynamic_results:
                    auxiliary_results.update(dynamic_results["metrics"])
                    auxiliary_definitions.update(dynamic_results["definitions"])
                    logger.info(f"✅ Dynamic auxiliary: {len(dynamic_results['metrics'])} metrics")
            
            # 2b. Static metrics (pre-defined)
            if request.auxiliary_config.use_static:
                static_results = await run_static_auxiliary(
                    experiment_root=request.experiment_root,
                    program_output=primary_result.get("program_output")
                )
                if static_results:
                    auxiliary_results.update(static_results["metrics"])
                    auxiliary_definitions.update(static_results["definitions"])
                    logger.info(f"✅ Static auxiliary: {len(static_results['metrics'])} metrics")
        
        # ===== Step 3: Merge and Save Results =====
        logger.info("💾 Step 3: Saving complete evaluation results...")
        
        complete_result = {
            "combined_score": primary_result["combined_score"],
            "correct": primary_result["correct"],
            "error": primary_result.get("error"),
            "public_metrics": {
                **primary_result.get("public_metrics", {}),
                **{f"aux_{k}": v for k, v in auxiliary_results.items()}
            },
            "private_metrics": primary_result.get("private_metrics", {}),
            "auxiliary_metric_definitions": auxiliary_definitions,
            "auxiliary_metadata": {
                "executed": len(auxiliary_results) > 0,
                "num_metrics_computed": len(auxiliary_results),
                "timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
            },
            "execution_time_mean": primary_result.get("execution_time", 0),
            "num_valid_runs": primary_result.get("num_valid_runs", 0),
            "all_validation_errors": primary_result.get("all_validation_errors", [])
        }
        
        # 保存到 metrics.json
        metrics_file = Path(request.experiment_root) / request.results_dir / "metrics.json"
        with open(metrics_file, 'w') as f:
            json.dump(complete_result, f, indent=2)
        
        # 保存 correct.json
        correct_file = Path(request.experiment_root) / request.results_dir / "correct.json"
        with open(correct_file, 'w') as f:
            json.dump({
                "correct": primary_result["correct"],
                "error": primary_result.get("error")
            }, f, indent=2)
        
        logger.info(f"✅ Results saved to: {metrics_file}")
        
        # ===== Step 4: Update Job Status =====
        eval_jobs[job_id]["status"] = "completed"
        eval_jobs[job_id]["result"] = complete_result
        
        logger.info(f"=" * 60)
        logger.info(f"✅ Evaluation Completed: {job_id}")
        logger.info(f"=" * 60)
    
    except Exception as e:
        logger.error(f"❌ Evaluation failed: {e}", exc_info=True)
        eval_jobs[job_id]["status"] = "failed"
        eval_jobs[job_id]["error"] = str(e)


async def run_primary_evaluator(request: EvaluationRequest) -> Dict[str, Any]:
    """
    Run the primary evaluator (task-specific)
    
    This calls the existing evaluate.py or equivalent.
    """
    import subprocess
    
    # 构建命令
    program_path = Path(request.experiment_root) / request.program_path
    results_dir = Path(request.experiment_root) / request.results_dir
    evaluator_path = request.evaluation_config.primary_evaluator
    
    cmd = [
        "python",
        evaluator_path,
        "--program_path", str(program_path),
        "--results_dir", str(results_dir)
    ]
    
    # 添加额外参数
    for key, value in request.evaluation_config.extra_args.items():
        if isinstance(value, bool):
            if value:
                cmd.append(f"--{key}")
        else:
            cmd.extend([f"--{key}", str(value)])
    
    # 运行评估器
    try:
        process = await asyncio.create_subprocess_exec(
            *cmd,
            stdout=asyncio.subprocess.PIPE,
            stderr=asyncio.subprocess.PIPE
        )
        
        stdout, stderr = await asyncio.wait_for(
            process.communicate(),
            timeout=request.evaluation_config.timeout
        )
        
        if process.returncode != 0:
            return {
                "success": False,
                "error": f"Evaluator failed: {stderr.decode()}"
            }
        
        # 读取结果
        metrics_file = results_dir / "metrics.json"
        if not metrics_file.exists():
            return {
                "success": False,
                "error": "metrics.json not found"
            }
        
        with open(metrics_file) as f:
            metrics = json.load(f)
        
        # 尝试加载程序输出(用于 auxiliary evaluation)
        program_output = load_program_output(results_dir)
        
        return {
            "success": True,
            "combined_score": metrics["combined_score"],
            "correct": True,  # 如果执行到这里
            "public_metrics": metrics.get("public", {}),
            "private_metrics": metrics.get("private", {}),
            "execution_time": metrics.get("execution_time_mean", 0),
            "num_valid_runs": metrics.get("num_valid_runs", 0),
            "all_validation_errors": metrics.get("all_validation_errors", []),
            "program_output": program_output
        }
    
    except asyncio.TimeoutError:
        return {
            "success": False,
            "error": f"Evaluation timeout after {request.evaluation_config.timeout}s"
        }
    except Exception as e:
        return {
            "success": False,
            "error": f"Evaluation error: {str(e)}"
        }


async def run_dynamic_auxiliary(
    experiment_root: str,
    program_output: Any,
    timeout: int
) -> Optional[Dict[str, Any]]:
    """
    Run dynamically generated auxiliary metrics (任务无关)
    
    Key: This works with ANY task by using the generic program_output
    """
    metrics_file = Path(experiment_root) / "eval_agent_memory" / "auxiliary_metrics.py"
    
    if not metrics_file.exists():
        return None
    
    try:
        # 动态导入
        spec = importlib.util.spec_from_file_location("dynamic_aux", metrics_file)
        module = importlib.util.module_from_spec(spec)
        spec.loader.exec_module(module)
        
        # 查找标准接口
        if not hasattr(module, 'evaluate_auxiliary_metrics'):
            logger.warning("No 'evaluate_auxiliary_metrics' function found")
            return None
        
        eval_fn = module.evaluate_auxiliary_metrics
        
        # ✅ 关键:任务无关的调用
        # program_output 可以是任何格式(dict, tuple, object, etc.)
        # 由 auxiliary_metrics.py 自己知道如何处理
        
        aux_results = await asyncio.wait_for(
            asyncio.to_thread(eval_fn, program_output),
            timeout=timeout
        )
        
        # 提取定义
        definitions = {}
        if hasattr(module, 'METRIC_DEFINITIONS'):
            definitions = module.METRIC_DEFINITIONS
        
        return {
            "metrics": aux_results,
            "definitions": definitions
        }
    
    except asyncio.TimeoutError:
        logger.warning(f"Auxiliary metrics timeout after {timeout}s")
        return None
    except Exception as e:
        logger.error(f"Auxiliary metrics failed: {e}", exc_info=True)
        return None


def load_program_output(results_dir: Path) -> Any:
    """
    Load program output (任务无关)
    
    尝试多种格式,返回最通用的表示
    """
    # 1. 尝试 extra.npz (numpy arrays)
    npz_file = results_dir / "extra.npz"
    if npz_file.exists():
        try:
            data = np.load(npz_file)
            return {k: data[k] for k in data.files}
        except:
            pass
    
    # 2. 尝试 extra.pkl (pickle)
    pkl_file = results_dir / "extra.pkl"
    if pkl_file.exists():
        try:
            import pickle
            with open(pkl_file, 'rb') as f:
                return pickle.load(f)
        except:
            pass
    
    # 3. 尝试 extra.json (json)
    json_file = results_dir / "extra.json"
    if json_file.exists():
        try:
            with open(json_file) as f:
                return json.load(f)
        except:
            pass
    
    # 4. 从 metrics.json 提取
    metrics_file = results_dir / "metrics.json"
    if metrics_file.exists():
        try:
            with open(metrics_file) as f:
                metrics = json.load(f)
            # 返回 public + private 作为 output
            return {
                "public": metrics.get("public", {}),
                "private": metrics.get("private", {})
            }
        except:
            pass
    
    return None

🎨 Agent Prompt 的通用化

新的 Prompt 要求

You are an evaluation expert for optimization problems.

<TASK_CONTEXT>
Task: {{ task_name }}
Objective: {{ objective }}
Primary Metric: {{ primary_metric_name }} ({{ primary_metric_interpretation }})
</TASK_CONTEXT>

<YOUR_ROLE>
Design AUXILIARY metrics that complement the primary metric.
These metrics should:
1. Measure aspects NOT captured by primary metric
2. Work with the program output format
3. Be computationally efficient
</YOUR_ROLE>

<PROGRAM_OUTPUT_FORMAT>
The program output is available as a Python object with structure:
{{ program_output_structure }}

Example:
{{ program_output_example }}
</PROGRAM_OUTPUT_FORMAT>

<REQUIRED_INTERFACE>
You MUST create auxiliary_metrics.py with:

```python
def evaluate_auxiliary_metrics(program_output) -> dict:
    """
    Evaluate auxiliary metrics.
    
    Args:
        program_output: Program output (format depends on task)
            For circle packing: {"centers": ndarray, "radii": ndarray}
            For TSP: {"tour": list, "distance": float}
            For code: {"code": str, "ast": dict}
    
    Returns:
        dict: Auxiliary metrics
    """
    # Extract what you need from program_output
    # Task-specific logic here
    
    return {
        "metric_name_1": value1,
        "metric_name_2": value2,
    }


METRIC_DEFINITIONS = {
    "metric_name_1": {
        "name": "Human Readable Name",
        "description": "What this metric measures",
        "interpretation": "higher_better" | "lower_better" | "neutral",
        "unit": "...",
        "formula": "..."
    },
    ...
}
{% for example in auxiliary_examples %} {{ example }} {% endfor %} ```

📋 迁移路径

Phase 1: 保持向后兼容 (1周)

目标: 支持新旧两种模式

# Config option
class EvolutionConfig:
    eval_service_url: Optional[str] = None
    use_eval_service_for_evaluation: bool = False  # ← 新选项
  • use_eval_service_for_evaluation = False: 使用旧方案(scheduler → evaluate.py)
  • use_eval_service_for_evaluation = True: 使用新方案(eval service 负责)

Phase 2: 实现新的 Eval Service API (2周)

  • 实现 /api/v1/evaluate endpoint
  • 实现 run_full_evaluation pipeline
  • 实现通用的 load_program_output
  • 测试多种任务格式

Phase 3: 更新 Agent Prompt (1周)

  • 通用化 prompt 模板
  • 添加 task-specific examples
  • 测试 agent 生成的代码

Phase 4: 全面切换 (1周)

  • 默认启用新模式
  • 废弃旧 scheduler 调用方式
  • 文档更新

🎯 成功标准

通用性验证

测试 3 个不同类型的任务:

  1. Circle Packing (几何优化)

    • program_output: {"centers": ndarray, "radii": ndarray}
    • auxiliary metrics: spatial uniformity, radius distribution
  2. TSP (组合优化)

    • program_output: {"tour": list, "distance": float}
    • auxiliary metrics: tour smoothness, cluster quality
  3. Code Optimization (程序综合)

    • program_output: {"code": str, "ast": dict, "runtime": float}
    • auxiliary metrics: code complexity, readability, efficiency

如果所有3个任务都能用同样的架构,则验证通过 ✅

职责清晰性验证

  • ShinkaEvolve 不直接调用 evaluate.py
  • Eval Service 完全负责评估
  • Primary 和 auxiliary 在同一次调用中完成
  • 结果格式统一

📄 总结

关键改进

  1. ✅ 任务无关:

    • 使用通用的 program_output 格式
    • Agent 适应不同任务的输出结构
  2. ✅ 职责清晰:

    • ShinkaEvolve: 演化逻辑
    • Eval Service: 完整评估(primary + auxiliary)
  3. ✅ 一次完成:

    • 一次运行得到所有 metrics
    • 避免重复加载程序输出

下一步

  1. 立即开始: 实现新的 /api/v1/evaluate API
  2. 保持兼容: 通过 config flag 支持新旧模式
  3. 逐步迁移: Phase 1-4 按顺序实施