shinka-backup / docs /eval_service_integration_plan.md
JustinTX's picture
Add files using upload-large-folder tool
1556404 verified

Eval Service 动态 Metrics 集成方案

🎯 目标

让 Eval Service 运行动态生成的 auxiliary metrics,并将结果记录到每一代的 metrics.json 中,同时:

  • ✅ 不影响 primary metric (combined_score)
  • ✅ 包含每个 metric 的定义(名称、描述、解释)
  • ✅ 异步执行,不阻塞演化流程
  • ✅ 支持 metric 演化(agent 可以更新 metrics)

📊 当前架构分析

现有流程

1. ShinkaEvolve 运行 gen_X/main.py
   └─> 输出: centers, radii
   
2. evaluate.py 评估
   ├─> 验证约束
   ├─> 计算 primary_score = sum(radii)
   └─> 保存 gen_X/results/metrics.json
       {
         "combined_score": 2.34,
         "public": {"num_circles": 26},
         "private": {"reported_sum_of_radii": 2.34}
       }

3. ShinkaEvolve 读取 metrics.json
   └─> 提取 combined_score → 演化决策

4. ShinkaEvolve 通知 Eval Service (异步)
   └─> generation, score, results_dir

5. Eval Service (可能触发 agent)
   └─> 生成/更新 auxiliary_metrics.py
   └─> [目前] 结束,不做其他事

关键文件位置

<experiment_root>/
├── gen_X/
│   ├── main.py              # 进化的程序
│   └── results/
│       ├── metrics.json     # ← 需要增强
│       ├── correct.json
│       └── extra.npz        # 包含 centers, radii
├── eval_agent_memory/       # ← 修复后的位置
│   ├── auxiliary_metrics.py # ← Agent 生成的代码
│   ├── EVAL_AGENTS.md       # ← Agent 的分析
│   └── service_state.json
└── evolution_db_*.sqlite

🏗️ 目标架构

增强后的流程

1-3. [保持不变] 
   ShinkaEvolve 运行 → evaluate.py → 保存初始 metrics.json

4. ShinkaEvolve 通知 Eval Service
   POST /api/v1/notify/generation_complete
   {
     "generation": X,
     "primary_score": 2.34,
     "results_dir": "<experiment_root>"
   }

5. Eval Service 接收通知 (同步快速响应)
   ├─> 返回 HTTP 200 (不阻塞演化)
   └─> 启动后台任务:
       ├─> 加载 eval_agent_memory/auxiliary_metrics.py
       ├─> 从 gen_X/results/ 读取程序输出数据
       ├─> 运行 auxiliary metrics
       ├─> 增强 gen_X/results/metrics.json (追加 auxiliary 部分)
       └─> [可选] 触发 agent 更新 metrics

6. ShinkaEvolve 继续演化
   └─> 不等待 auxiliary metrics 完成

增强后的 metrics.json 格式

{
  "combined_score": 2.34,           // ← PRIMARY (不变)
  "public": {
    "num_circles": 26,
    "centers_str": "...",
    
    // ↓ 新增: Auxiliary metrics 数据
    "aux_metrics_version": "gen_9_v1",
    "aux_radius_std_dev": 0.030866,
    "aux_avg_nn_distance": 0.145581,
    "aux_std_nn_distance": 0.054509
  },
  "private": {
    "reported_sum_of_radii": 2.34
  },
  
  // ↓ 新增: Metric 定义(第一次出现时记录)
  "auxiliary_metric_definitions": {
    "aux_radius_std_dev": {
      "name": "Radius Standard Deviation",
      "description": "Measures the standard deviation of circle radii",
      "interpretation": "lower_better",
      "unit": "unitless",
      "source": "eval_agent_gen_9",
      "added_at_generation": 9
    },
    "aux_avg_nn_distance": {
      "name": "Average Nearest Neighbor Distance",
      "description": "Average distance from each circle center to its closest neighbor",
      "interpretation": "neutral",
      "unit": "distance",
      "source": "eval_agent_gen_9",
      "added_at_generation": 9
    }
  },
  
  // ↓ 新增: Auxiliary 执行元数据
  "auxiliary_metadata": {
    "executed": true,
    "execution_time": 0.152,        // 秒
    "metrics_file": "eval_agent_memory/auxiliary_metrics.py",
    "metrics_version": "gen_9_v1",
    "num_metrics_computed": 3,
    "timestamp": "2026-02-03T02:15:30Z"
  }
}

🔧 实现方案

Phase 1: 基础集成 (MVP)

1.1 修改 Eval Service 的通知处理

文件: eval_agent/ev2_service_standalone.py

当前代码 (~line 685):

@app.post("/api/v1/notify/generation_complete")
async def notify_generation_complete(request, background_tasks):
    # 记录 generation
    service_state.add_generation(...)
    
    # 决定是否触发 agent
    should_trigger, reason = service_state.should_trigger_agent(...)
    
    if should_trigger:
        # 启动 agent 分析
        background_tasks.add_task(run_agent_analysis, ...)
    
    return {"status": "success"}

修改为:

@app.post("/api/v1/notify/generation_complete")
async def notify_generation_complete(request, background_tasks):
    # 记录 generation
    service_state.add_generation(...)
    
    # ✅ 新增: 总是尝试运行 auxiliary metrics (异步)
    background_tasks.add_task(
        run_auxiliary_metrics,
        results_dir=request.results_dir,
        generation=request.generation
    )
    
    # 决定是否触发 agent (独立的决策)
    should_trigger, reason = service_state.should_trigger_agent(...)
    
    if should_trigger:
        background_tasks.add_task(run_agent_analysis, ...)
    
    return {"status": "success"}  # 立即返回,不阻塞

1.2 实现 Auxiliary Metrics 运行器

新文件: eval_agent/auxiliary_runner.py

"""
Auxiliary Metrics Runner
========================
Loads and executes dynamically generated auxiliary metrics,
then updates the metrics.json file.
"""

import json
import time
import numpy as np
from pathlib import Path
from typing import Dict, Any, Optional, Callable
import importlib.util
import logging

logger = logging.getLogger(__name__)


class AuxiliaryMetricsRunner:
    """
    Loads and runs auxiliary metrics generated by the eval agent.
    """
    
    def __init__(self, experiment_root: Path):
        self.experiment_root = Path(experiment_root)
        self.metrics_file_path = self.experiment_root / "eval_agent_memory" / "auxiliary_metrics.py"
        self.definitions_cache: Dict[str, Dict] = {}
    
    def has_metrics(self) -> bool:
        """Check if auxiliary metrics file exists."""
        return self.metrics_file_path.exists()
    
    def load_metrics_function(self) -> Optional[Callable]:
        """
        Dynamically load the auxiliary metrics function.
        
        Expects the module to have:
        - evaluate_auxiliary_metrics(centers: np.ndarray, radii: np.ndarray) -> dict
        """
        if not self.has_metrics():
            return None
        
        try:
            # 动态导入
            spec = importlib.util.spec_from_file_location(
                "dynamic_auxiliary_metrics",
                self.metrics_file_path
            )
            module = importlib.util.module_from_spec(spec)
            spec.loader.exec_module(module)
            
            # 查找标准接口
            if hasattr(module, 'evaluate_auxiliary_metrics'):
                logger.info(f"✅ Loaded auxiliary metrics from: {self.metrics_file_path}")
                return module.evaluate_auxiliary_metrics
            else:
                logger.warning(
                    f"⚠️  Module loaded but no 'evaluate_auxiliary_metrics' function found"
                )
                return None
        
        except Exception as e:
            logger.error(f"❌ Failed to load auxiliary metrics: {e}", exc_info=True)
            return None
    
    def extract_metric_definitions(self, metrics_module) -> Dict[str, Dict]:
        """
        Extract metric definitions from the module's docstrings and metadata.
        """
        definitions = {}
        
        # 尝试从模块获取定义
        if hasattr(metrics_module, '__doc__') and metrics_module.__doc__:
            # 简单解析(可以更复杂)
            pass
        
        # 尝试从主函数获取定义
        if hasattr(metrics_module, 'evaluate_auxiliary_metrics'):
            func = metrics_module.evaluate_auxiliary_metrics
            if hasattr(func, '__doc__') and func.__doc__:
                # 解析 docstring
                pass
        
        # 如果有预定义的 metadata
        if hasattr(metrics_module, 'METRIC_DEFINITIONS'):
            definitions.update(metrics_module.METRIC_DEFINITIONS)
        
        return definitions
    
    def load_program_output(self, generation_dir: Path) -> Optional[Dict[str, np.ndarray]]:
        """
        Load the program output (centers, radii) from results directory.
        
        Tries multiple sources:
        1. extra.npz (if available)
        2. Parse from metrics.json public.centers_str
        """
        # 方法 1: 从 extra.npz 读取
        extra_npz = generation_dir / "results" / "extra.npz"
        if extra_npz.exists():
            try:
                data = np.load(extra_npz)
                if 'centers' in data and 'radii' in data:
                    logger.info(f"✅ Loaded program output from extra.npz")
                    return {
                        'centers': data['centers'],
                        'radii': data['radii']
                    }
            except Exception as e:
                logger.warning(f"Failed to load extra.npz: {e}")
        
        # 方法 2: 从 metrics.json 解析 (如果有 centers_str)
        metrics_file = generation_dir / "results" / "metrics.json"
        if metrics_file.exists():
            try:
                with open(metrics_file) as f:
                    metrics = json.load(f)
                
                # 尝试解析 centers_str
                if 'public' in metrics and 'centers_str' in metrics['public']:
                    centers = self._parse_centers_str(metrics['public']['centers_str'])
                    # radii 可能需要从程序重新运行...
                    # 这里需要更好的方案
                    pass
            except Exception as e:
                logger.warning(f"Failed to parse metrics.json: {e}")
        
        logger.error(f"❌ Could not load program output for generation")
        return None
    
    def _parse_centers_str(self, centers_str: str) -> np.ndarray:
        """Parse centers from the formatted string."""
        # 实现解析逻辑
        # 例如: "centers[0] = (0.1141, 0.1139)\n..."
        pass
    
    def run_and_update(self, generation: int) -> bool:
        """
        Main method: Load metrics, run them, update metrics.json
        
        Returns:
            True if successful, False otherwise
        """
        generation_dir = self.experiment_root / f"gen_{generation}"
        metrics_file = generation_dir / "results" / "metrics.json"
        
        if not metrics_file.exists():
            logger.warning(f"Metrics file not found: {metrics_file}")
            return False
        
        # 1. 检查是否已经运行过
        with open(metrics_file) as f:
            metrics = json.load(f)
        
        if 'auxiliary_metadata' in metrics and metrics['auxiliary_metadata'].get('executed'):
            logger.info(f"✓ Auxiliary metrics already executed for gen {generation}")
            return True
        
        # 2. 加载 auxiliary metrics 函数
        metrics_fn = self.load_metrics_function()
        if metrics_fn is None:
            logger.info(f"No auxiliary metrics available yet for gen {generation}")
            return False
        
        # 3. 加载程序输出数据
        program_output = self.load_program_output(generation_dir)
        if program_output is None:
            logger.error(f"Failed to load program output for gen {generation}")
            return False
        
        centers = program_output['centers']
        radii = program_output['radii']
        
        # 4. 运行 auxiliary metrics
        logger.info(f"🔄 Running auxiliary metrics for gen {generation}...")
        start_time = time.time()
        
        try:
            aux_results = metrics_fn(centers, radii)
            execution_time = time.time() - start_time
            
            logger.info(
                f"✅ Computed {len(aux_results)} auxiliary metrics "
                f"in {execution_time:.3f}s"
            )
        
        except Exception as e:
            logger.error(f"❌ Failed to run auxiliary metrics: {e}", exc_info=True)
            return False
        
        # 5. 更新 metrics.json
        self._update_metrics_file(
            metrics_file,
            aux_results,
            execution_time,
            generation
        )
        
        logger.info(f"✅ Updated metrics.json for gen {generation}")
        return True
    
    def _update_metrics_file(
        self,
        metrics_file: Path,
        aux_results: Dict[str, Any],
        execution_time: float,
        generation: int
    ):
        """
        Update the metrics.json file with auxiliary results.
        
        Preserves combined_score and all existing data.
        """
        # 1. 读取现有文件
        with open(metrics_file) as f:
            metrics = json.load(f)
        
        # 2. 确保 combined_score 不变 (验证)
        original_combined_score = metrics.get('combined_score')
        
        # 3. 添加 auxiliary metrics 到 public
        if 'public' not in metrics:
            metrics['public'] = {}
        
        # 添加 aux_ 前缀
        for key, value in aux_results.items():
            # 如果 key 已经有 aux_ 或 auxiliary_ 前缀,保持
            if key.startswith('aux_') or key.startswith('auxiliary_'):
                metrics['public'][key] = value
            else:
                metrics['public'][f'aux_{key}'] = value
        
        # 4. 添加 metric 定义 (如果是第一次出现)
        if 'auxiliary_metric_definitions' not in metrics:
            metrics['auxiliary_metric_definitions'] = {}
        
        # 尝试获取定义(这里简化,实际可以从模块元数据获取)
        for key in aux_results.keys():
            metric_key = key if key.startswith('aux_') else f'aux_{key}'
            if metric_key not in metrics['auxiliary_metric_definitions']:
                metrics['auxiliary_metric_definitions'][metric_key] = {
                    "name": key.replace('_', ' ').title(),
                    "description": f"Auxiliary metric: {key}",
                    "interpretation": "neutral",
                    "added_at_generation": generation,
                    "source": "eval_agent"
                }
        
        # 5. 添加执行元数据
        metrics['auxiliary_metadata'] = {
            "executed": True,
            "execution_time": execution_time,
            "metrics_file": str(self.metrics_file_path.relative_to(self.experiment_root)),
            "num_metrics_computed": len(aux_results),
            "timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
            "generation": generation
        }
        
        # 6. 验证 combined_score 未被修改
        assert metrics['combined_score'] == original_combined_score, \
            "CRITICAL: combined_score was modified!"
        
        # 7. 保存(原子写入)
        temp_file = metrics_file.with_suffix('.json.tmp')
        with open(temp_file, 'w') as f:
            json.dump(metrics, f, indent=2)
        
        # 原子替换
        temp_file.replace(metrics_file)
        
        logger.info(f"📝 Updated {metrics_file}")


async def run_auxiliary_metrics(results_dir: str, generation: int):
    """
    Background task: Run auxiliary metrics and update results.
    
    This is called asynchronously by the eval service.
    """
    logger.info(f"=" * 60)
    logger.info(f"🔄 Running Auxiliary Metrics for Generation {generation}")
    logger.info(f"=" * 60)
    
    try:
        runner = AuxiliaryMetricsRunner(experiment_root=Path(results_dir))
        success = runner.run_and_update(generation)
        
        if success:
            logger.info(f"✅ Auxiliary metrics completed for gen {generation}")
        else:
            logger.info(f"⏭️  Auxiliary metrics skipped for gen {generation}")
    
    except Exception as e:
        logger.error(
            f"❌ Auxiliary metrics failed for gen {generation}: {e}",
            exc_info=True
        )
    
    logger.info(f"=" * 60)

1.3 更新 Eval Service 导入

文件: eval_agent/ev2_service_standalone.py

# 在文件顶部添加
from eval_agent.auxiliary_runner import run_auxiliary_metrics

# 在 notify_generation_complete 中使用 (见 1.1)

Phase 2: 增强功能

2.1 Metric 定义的自动提取

改进 Agent Prompt (eval_agent/ev2_prompt.j2):

When creating auxiliary_metrics.py, you MUST include:

1. A standard interface function:
   ```python
   def evaluate_auxiliary_metrics(centers: np.ndarray, radii: np.ndarray) -> dict:
       """Main evaluation function."""
       pass
  1. A METRIC_DEFINITIONS dictionary:

    METRIC_DEFINITIONS = {
        "aux_radius_std_dev": {
            "name": "Radius Standard Deviation",
            "description": "Measures the standard deviation of circle radii",
            "interpretation": "lower_better",  # or "higher_better", "neutral"
            "unit": "unitless",
            "formula": "np.std(radii)"
        },
        # ... more metrics
    }
    
  2. Clear docstrings for each metric function.


#### 2.2 Metric 版本管理

当 Agent 更新 `auxiliary_metrics.py` 时,需要记录版本:

```python
# auxiliary_metrics.py (Agent 生成)
METRICS_VERSION = "gen_20_v2"  # Agent 自动生成
CREATED_AT_GENERATION = 20
UPDATED_AT_GENERATION = 25

# 每个 metrics.json 记录使用的版本
{
  "auxiliary_metadata": {
    "metrics_version": "gen_20_v2",
    "metrics_created_at": 20,
    "metrics_last_updated": 25
  }
}

2.3 向后兼容性处理

当 Agent 添加新 metrics 或删除旧 metrics 时:

def run_and_update(self, generation: int):
    # ...
    aux_results = metrics_fn(centers, radii)
    
    # 标记哪些 metrics 在当前版本中可用
    available_metrics = set(aux_results.keys())
    
    # 记录 metric 可用性历史
    metrics['auxiliary_metadata']['available_metrics'] = list(available_metrics)

2.4 性能监控

# 记录每个 metric 的执行时间
{
  "auxiliary_metadata": {
    "metric_timings": {
      "aux_radius_std_dev": 0.001,
      "aux_avg_nn_distance": 0.150,
      "aux_spatial_uniformity": 2.345  # 如果太慢,可以禁用
    },
    "total_execution_time": 2.496
  }
}

Phase 3: 高级集成

3.1 实时反馈给演化循环

虽然 auxiliary metrics 不影响 combined_score,但可以通过其他方式影响演化:

选项 A: 通过 LLM Prompt

# ShinkaEvolve 在生成新代码时
# 可以读取 auxiliary metrics 作为上下文

# 在 mutation prompt 中包含:
"""
Recent auxiliary metrics trends:
- aux_radius_std_dev: 0.031 → 0.025 (improving, more uniform)
- aux_spatial_uniformity: 0.75 → 0.82 (improving)

Suggestion: Continue exploring patterns that improve spatial uniformity.
"""

选项 B: 多目标优化

# 未来可以支持加权组合
combined_score_weighted = (
    1.0 * primary_score +
    0.1 * aux_spatial_uniformity +
    0.05 * aux_edge_utilization
)
# 但这需要更大的架构变化

3.2 可视化仪表板

创建一个 web dashboard 显示:

  • Primary score 趋势
  • Auxiliary metrics 趋势
  • Metric 定义和解释
  • Agent 生成的 insights (EVAL_AGENTS.md)

📋 实施步骤

Step 1: 核心功能 (1-2 天)

  • 实现 AuxiliaryMetricsRunner
  • 修改 ev2_service_standalone.py 的通知处理
  • 测试基本流程:加载 → 运行 → 更新 metrics.json

Step 2: 数据获取 (0.5-1 天)

  • 实现从 extra.npz 读取 centers/radii
  • 备选方案:从 metrics.json 解析
  • 如果需要:实现重新运行程序

Step 3: Metric 定义 (0.5 天)

  • 更新 Agent prompt 要求生成 METRIC_DEFINITIONS
  • 实现定义提取和记录

Step 4: 测试和验证 (1 天)

  • 单元测试
  • 集成测试(完整演化循环)
  • 验证 combined_score 不被修改
  • 验证异步执行不阻塞演化

Step 5: 向后兼容 (0.5 天)

  • 处理 metrics 版本变化
  • 处理 metrics 不可用的情况

Step 6: 文档和示例 (0.5 天)

  • 更新 README
  • 创建示例配置
  • 编写用户指南

总计: ~4-6 天


⚠️ 风险和注意事项

1. 性能影响

  • 风险: 某些 auxiliary metrics 可能很慢(如 Voronoi 分析)
  • 缓解:
    • 后台异步执行
    • 超时机制(5-10秒)
    • 监控执行时间,自动禁用慢 metrics

2. 数据可用性

  • 风险: extra.npz 可能不总是存在
  • 缓解:
    • 多种数据源(npz, metrics.json, 重新运行)
    • 清晰的错误处理

3. 代码安全性

  • 风险: 动态导入 LLM 生成的代码
  • 缓解:
    • 在隔离环境中运行(如果可能)
    • 超时保护
    • 异常捕获
    • 未来:代码审查机制

4. Metric 演化

  • 风险: Agent 更新 metrics 导致不一致
  • 缓解:
    • 版本管理
    • 记录每个 generation 使用的版本
    • 保留历史版本

5. 文件竞争

  • 风险: ShinkaEvolve 和 Eval Service 同时写 metrics.json
  • 缓解:
    • Eval Service 总是在 ShinkaEvolve 完成后才写入
    • 使用原子写入(写 temp 文件,然后 rename)
    • 文件锁(如果需要)

🔍 验证标准

功能验证

  • Auxiliary metrics 正确计算
  • metrics.json 正确更新
  • combined_score 从不被修改
  • Metric 定义正确记录

性能验证

  • 演化循环不被阻塞
  • Auxiliary metrics 在 5 秒内完成(大多数情况)
  • 内存使用合理

鲁棒性验证

  • 处理 metrics 文件不存在
  • 处理程序输出数据不可用
  • 处理 metric 计算失败
  • 处理 Agent 代码有 bug

📊 示例输出

增强后的 metrics.json (完整示例)

{
  "combined_score": 2.3423,
  "public": {
    "num_circles": 26,
    "centers_str": "centers[0] = (0.1141, 0.1139)\n...",
    
    "aux_radius_std_dev": 0.0282,
    "aux_avg_nn_distance": 0.1389,
    "aux_std_nn_distance": 0.0498,
    "aux_spatial_uniformity": 0.8234,
    "aux_edge_utilization": 0.7156
  },
  "private": {
    "reported_sum_of_radii": 2.3423
  },
  "auxiliary_metric_definitions": {
    "aux_radius_std_dev": {
      "name": "Radius Standard Deviation",
      "description": "Standard deviation of circle radii. Lower indicates more uniform sizes.",
      "interpretation": "lower_better",
      "unit": "unitless",
      "formula": "np.std(radii)",
      "source": "eval_agent",
      "added_at_generation": 9
    },
    "aux_avg_nn_distance": {
      "name": "Average Nearest Neighbor Distance",
      "description": "Average distance from each circle to its closest neighbor",
      "interpretation": "neutral",
      "unit": "distance",
      "source": "eval_agent",
      "added_at_generation": 9
    },
    "aux_spatial_uniformity": {
      "name": "Spatial Uniformity",
      "description": "Measures how evenly circles are distributed (Voronoi-based)",
      "interpretation": "higher_better",
      "unit": "score [0-1]",
      "source": "auxiliary_eval.py",
      "added_at_generation": 30
    }
  },
  "auxiliary_metadata": {
    "executed": true,
    "execution_time": 0.245,
    "metrics_file": "eval_agent_memory/auxiliary_metrics.py",
    "metrics_version": "gen_20_v2",
    "num_metrics_computed": 5,
    "timestamp": "2026-02-03T03:45:12Z",
    "generation": 42,
    "available_metrics": [
      "aux_radius_std_dev",
      "aux_avg_nn_distance", 
      "aux_std_nn_distance",
      "aux_spatial_uniformity",
      "aux_edge_utilization"
    ],
    "metric_timings": {
      "aux_radius_std_dev": 0.001,
      "aux_avg_nn_distance": 0.024,
      "aux_std_nn_distance": 0.020,
      "aux_spatial_uniformity": 0.180,
      "aux_edge_utilization": 0.020
    }
  },
  "execution_time_mean": 12.061,
  "execution_time_std": 0.0,
  "num_valid_runs": 1,
  "num_invalid_runs": 0,
  "all_validation_errors": []
}

🎯 成功标准

  1. 功能完整性

    • ✅ Auxiliary metrics 自动运行并记录
    • ✅ 不影响 primary metric
    • ✅ 包含完整的 metric 定义
  2. 性能要求

    • ✅ 演化循环不被阻塞
    • ✅ 95% 的 metrics 在 5 秒内完成
  3. 可靠性

    • ✅ 处理各种边缘情况
    • ✅ 清晰的错误日志
    • ✅ 不会导致演化失败
  4. 可维护性

    • ✅ 代码清晰,有文档
    • ✅ 易于添加新功能
    • ✅ 易于调试

🚀 下一步行动

  1. 立即开始: 实现 AuxiliaryMetricsRunner 的基础版本
  2. 一周内完成: Phase 1 (核心功能)
  3. 两周内完成: Phase 2 (增强功能)
  4. 未来考虑: Phase 3 (高级集成)