shinka-backup / docs /eval_service_unified_api.md
JustinTX's picture
Add files using upload-large-folder tool
1556404 verified

Eval Service 统一 API 文档

📋 概述

ev2_service_standalone.py 现已支持统一的接口,可以自动判断工作模式:

  1. 评估模式(新): 提供 code_path + evaluator_module → Service 执行评估
  2. 通知模式(旧): 仅提供 primary_score → 向后兼容旧代码

🚀 启动服务

python eval_agent/ev2_service_standalone.py \
    --results-dir /path/to/experiment \
    --primary-evaluator examples/circle_packing/evaluate.py \
    --trigger-mode periodic \
    --trigger-interval 10 \
    --port 8765

或使用配置文件:

python eval_agent/ev2_service_standalone.py --config config.yaml

📡 API Endpoints

1. Generation Complete (统一入口)

POST /api/v1/notify/generation_complete

模式 1: 评估模式 (NEW)

请求

{
    "generation": 10,
    "code_path": "/path/to/gen_10/main.py",
    "results_dir": "/path/to/gen_10/results",
    "evaluator_module": "examples.circle_packing.evaluate_ori",
    "evaluator_function": "main",
    "evaluator_kwargs": {}
}

响应 (立即返回,< 100ms):

{
    "status": "accepted",
    "generation": 10,
    "job_id": "eval_10_1738512345",
    "estimated_time": 15.0,
    "agent_triggered": false,
    "trigger_reason": "Will be determined after evaluation",
    "processing_time_ms": 50.2
}

后台执行

  1. 运行 primary evaluator
  2. 运行 auxiliary evaluators(如果存在)
  3. 保存 metrics.json
  4. 决定是否触发 Agent
  5. 如果触发:运行 EV2 Agent 分析

模式 2: 通知模式 (LEGACY - 向后兼容)

请求

{
    "generation": 10,
    "results_dir": "/path/to/gen_10/results",
    "primary_score": 0.85
}

响应 (同步):

{
    "status": "completed",
    "generation": 10,
    "job_id": null,
    "agent_triggered": false,
    "trigger_reason": "Not yet (last trigger at gen 0)",
    "processing_time_ms": 5.1
}

2. 查询 Generation 状态

GET /api/v1/generation/{generation}/status

响应

{
    "generation": 10,
    "job_id": "eval_10_1738512345",
    "status": "running",  // "pending" | "running" | "completed" | "failed"
    "created_at": 1738512345.0,
    "elapsed_time": 5.2
}

如果已完成:

{
    "generation": 10,
    "job_id": "eval_10_1738512345",
    "status": "completed",
    "created_at": 1738512345.0,
    "completed_at": 1738512360.0,
    "elapsed_time": 15.0,
    "result": {
        "combined_score": 0.85,
        "primary": {...},
        "auxiliary": {...},
        "timestamp": 1738512360.0
    }
}

3. 查询 Job 状态

GET /api/v1/evaluate/{job_id}

响应:同上

4. 服务状态

GET /api/v1/status

响应

{
    "status": "running",
    "uptime_seconds": 12345.6,
    "version": "2.0.0-standalone",
    "experiment": {
        "name": "circle-packing",
        "results_dir": "/path/to/experiment",
        "primary_evaluator": "examples/circle_packing/evaluate.py"
    },
    "statistics": {
        "total_notifications": 20,
        "total_agent_runs": 2,
        "generations_tracked": 20,
        "last_agent_trigger_gen": 10
    },
    "config": {
        "trigger_mode": "periodic",
        "trigger_interval": 10,
        "agent_enabled": true,
        "agent_initialized": true
    }
}

📊 使用示例

Python 客户端

import requests
import time

SERVICE_URL = "http://localhost:8765"

# 评估模式
def submit_evaluation(generation, code_path):
    response = requests.post(
        f"{SERVICE_URL}/api/v1/notify/generation_complete",
        json={
            "generation": generation,
            "code_path": code_path,
            "results_dir": f"/path/to/gen_{generation}/results",
            "evaluator_module": "examples.circle_packing.evaluate_ori",
        "evaluator_function": "main"
        },
        timeout=5.0
    )
    
    data = response.json()
    job_id = data['job_id']
    
    print(f"Submitted: job_id={job_id}")
    
    # 轮询状态
    while True:
        status_response = requests.get(
            f"{SERVICE_URL}/api/v1/generation/{generation}/status"
        )
        
        status_data = status_response.json()
        
        if status_data['status'] == 'completed':
            result = status_data['result']
            return result['combined_score']
        elif status_data['status'] == 'failed':
            raise RuntimeError(f"Evaluation failed: {status_data.get('error')}")
        
        time.sleep(2)

ShinkaEvolve 集成

修改 shinka/core/runner.py

class EvolutionConfig:
    eval_service_url: Optional[str] = None
    use_eval_service: bool = False
    evaluator_module: Optional[str] = None

class EvolutionRunner:
    def _submit_new_job(self):
        # ... 生成代码 ...
        
        if self.eval_service_url and self.evo_config.use_eval_service:
            # 使用 Eval Service
            job_id = self._submit_to_eval_service(
                generation=current_gen,
                code_path=str(exec_fname),
                results_dir=str(results_dir)
            )
        else:
            # 旧方式
            job_id = self.scheduler.submit_async(exec_fname, results_dir)
        
        running_job = RunningJob(
            job_id=job_id,
            use_eval_service=self.evo_config.use_eval_service,
            ...
        )
    
    def _submit_to_eval_service(self, generation, code_path, results_dir):
        response = requests.post(
            f"{self.eval_service_url}/api/v1/notify/generation_complete",
            json={
                "generation": generation,
                "code_path": code_path,
                "results_dir": results_dir,
                "evaluator_module": self.evo_config.evaluator_module
            },
            timeout=5.0
        )
        
        return response.json()['job_id']
    
    def _check_completed_jobs(self):
        for job in self.running_jobs:
            if job.use_eval_service:
                # 查询 eval service
                response = requests.get(
                    f"{self.eval_service_url}/api/v1/generation/{job.generation}/status"
                )
                
                if response.json()['status'] == 'completed':
                    completed.append(job)

🧪 测试

运行测试脚本:

# 1. 启动服务
python eval_agent/ev2_service_standalone.py \
    --results-dir /tmp/test \
    --primary-evaluator examples/circle_packing/evaluate.py

# 2. 运行测试
python test_eval_service_unified.py

测试覆盖:

  • ✅ 服务健康检查
  • ✅ 通知模式(向后兼容)
  • ✅ 评估模式(异步)
  • ✅ 状态查询(按 generation 和 job_id)

🔧 工作流程

评估模式完整流程

1. ShinkaEvolve 生成代码
   ↓
2. POST /api/v1/notify/generation_complete
   {
     generation: 10,
     code_path: "gen_10/main.py",
     evaluator_module: "examples.circle_packing.evaluate"
   }
   ↓
3. 立即返回 (< 100ms)
   {
     status: "accepted",
     job_id: "eval_10_..."
   }
   ↓
4. Eval Service 后台执行:
   - 运行 primary evaluator → combined_score
   - 运行 auxiliary evaluators → {diversity, ...}
   - 保存 metrics.json
   - 决定是否触发 Agent
   - 如果触发:运行 EV2 Agent 分析
   ↓
5. ShinkaEvolve 轮询:
   GET /api/v1/generation/10/status
   → status: "running"
   → status: "running"
   → status: "completed", result: {...}
   ↓
6. ShinkaEvolve 获取 combined_score,继续下一代

⚙️ 配置

Evaluator Contract

任何任务的 evaluator 必须满足:

def evaluate(code_path: str, **kwargs) -> Dict[str, Any]:
    """
    评估函数约定
    
    Args:
        code_path: 生成的代码路径
        **kwargs: 额外参数
    
    Returns:
        {
            "combined_score": float,  # 必需
            "metrics": Dict[str, Any],  # 可选
            "metadata": Dict[str, Any]  # 可选
        }
    """
    # 运行代码
    result = run_code(code_path)
    
    # 计算分数
    score = compute_score(result)
    
    return {
        "combined_score": score,
        "metrics": {"coverage": 0.8},
        "metadata": {"num_items": 100}
    }

Auxiliary Metrics

Agent 生成的 auxiliary metrics 位于:

experiment_root/
  └── eval_agent_memory/
      └── auxiliary_metrics.py  # Agent 生成

约定:所有以 evaluate_ 开头的函数都会被自动调用:

def evaluate_diversity(code_path: str, primary_result: Dict) -> Dict[str, Any]:
    """多样性指标"""
    return {"diversity_score": 0.7}

def evaluate_robustness(code_path: str, primary_result: Dict) -> Dict[str, Any]:
    """鲁棒性指标"""
    return {"robustness_score": 0.8}

📝 迁移指南

从旧模式迁移到新模式

旧代码(ShinkaEvolve 自己评估):

# 1. 生成代码
# 2. 运行评估
combined_score = evaluate(code_path)
# 3. 通知 service
requests.post(url, json={
    "generation": gen,
    "primary_score": combined_score
})

新代码(Eval Service 负责评估):

# 1. 生成代码
# 2. 提交到 service(不等待)
response = requests.post(url, json={
    "generation": gen,
    "code_path": code_path,
    "evaluator_module": "examples.task.evaluate"
})
job_id = response.json()['job_id']

# 3. 轮询状态
while True:
    status = requests.get(f"{url}/generation/{gen}/status")
    if status.json()['status'] == 'completed':
        combined_score = status.json()['result']['combined_score']
        break

🎯 优势

  1. 统一接口: 一个 endpoint 处理所有情况
  2. 自动判断: 根据参数自动选择模式
  3. 向后兼容: 旧代码无需修改
  4. 职责清晰: 评估由 Service 统一管理
  5. 异步高效: 立即返回,不阻塞
  6. 并发支持: 可同时处理多个评估

📊 性能

  • 提交请求: < 100ms
  • 评估执行: 10-30秒(取决于 evaluator)
  • 状态查询: < 10ms
  • 并发支持: 可同时处理多个 generation 的评估