| """ |
| EV2 Service - Standalone Version |
| |
| A complete, self-contained evaluation service that integrates OpenHands agent |
| directly without depending on ev2.py. |
| |
| Key features: |
| - Event-driven architecture (receive generation notifications) |
| - Autonomous decision-making (when to trigger agent) |
| - Persistent agent state (across generations) |
| - Direct OpenHands integration (no wrapper) |
| |
| Author: Evolution Evaluation System |
| Version: 2.0 (Standalone) |
| """ |
|
|
| import os |
| import sys |
| import json |
| import time |
| import logging |
| import asyncio |
| import traceback |
| import tempfile |
| import math |
| import hashlib |
| import numbers |
| from pathlib import Path |
| from typing import Dict, Any, Optional, List |
| from dataclasses import dataclass, asdict |
|
|
| try: |
| import eval_agent.logging as behavior_logging |
| except ModuleNotFoundError: |
| import importlib.util |
|
|
| _behavior_logging_path = Path(__file__).with_name("logging.py") |
| _behavior_logging_spec = importlib.util.spec_from_file_location( |
| "behavior_logging", str(_behavior_logging_path) |
| ) |
| behavior_logging = importlib.util.module_from_spec(_behavior_logging_spec) |
| assert _behavior_logging_spec is not None and _behavior_logging_spec.loader is not None |
| _behavior_logging_spec.loader.exec_module(behavior_logging) |
|
|
| try: |
| from eval_agent.utils import build_meta_recommendation_context_lines |
| except ModuleNotFoundError: |
| import importlib.util |
|
|
| _utils_path = Path(__file__).with_name("utils.py") |
| _utils_spec = importlib.util.spec_from_file_location("eval_agent_utils", str(_utils_path)) |
| _utils_module = importlib.util.module_from_spec(_utils_spec) |
| assert _utils_spec is not None and _utils_spec.loader is not None |
| _utils_spec.loader.exec_module(_utils_module) |
| build_meta_recommendation_context_lines = _utils_module.build_meta_recommendation_context_lines |
|
|
| |
| from fastapi import FastAPI, HTTPException, BackgroundTasks |
| from fastapi.responses import JSONResponse |
| from pydantic import BaseModel, Field |
| import uvicorn |
| import yaml |
|
|
| |
| from openhands.sdk import LLM, Agent, Conversation, Tool |
| from openhands.tools.file_editor import FileEditorTool |
| from openhands.tools.task_tracker import TaskTrackerTool |
| from openhands.tools.terminal import TerminalTool |
|
|
|
|
| |
| |
| |
|
|
| @dataclass |
| class ServiceConfig: |
| """Service configuration""" |
| |
| host: str = "0.0.0.0" |
| port: int = 8765 |
| log_level: str = "INFO" |
| |
| |
| experiment_name: str = "" |
| results_dir: str = "" |
| primary_evaluator_path: str = "" |
| problem_statement: str = "" |
| evaluator_kwargs: Optional[Dict[str, Any]] = None |
|
|
| |
| evaluation_timeout: float = 300.0 |
| |
| |
| trigger_mode: str = "periodic" |
| trigger_interval: int = 10 |
| plateau_threshold: float = 0.01 |
| plateau_window: int = 10 |
| |
| |
| agent_enabled: bool = True |
| llm_model: str = "" |
| llm_api_key: str = "" |
| llm_base_url: str = "" |
| |
| @classmethod |
| def from_yaml(cls, config_path: str) -> 'ServiceConfig': |
| """Load config from YAML file""" |
| with open(config_path) as f: |
| data = yaml.safe_load(f) |
| |
| return cls( |
| host=data.get('service', {}).get('host', '0.0.0.0'), |
| port=data.get('service', {}).get('port', 8765), |
| log_level=data.get('service', {}).get('log_level', 'INFO'), |
| experiment_name=data.get('experiment', {}).get('name', ''), |
| results_dir=data.get('experiment', {}).get('results_dir', ''), |
| primary_evaluator_path=data.get('experiment', {}).get('primary_evaluator', ''), |
| evaluation_timeout=data.get('evaluation', {}).get('timeout', 300.0), |
| trigger_mode=data.get('strategy', {}).get('trigger_mode', 'periodic'), |
| trigger_interval=data.get('strategy', {}).get('trigger_interval', 10), |
| plateau_threshold=data.get('strategy', {}).get('plateau_threshold', 0.01), |
| plateau_window=data.get('strategy', {}).get('plateau_window', 10), |
| agent_enabled=data.get('agent', {}).get('enabled', True), |
| llm_model=data.get('agent', {}).get('llm_model', ''), |
| llm_api_key=data.get('agent', {}).get('llm_api_key', ''), |
| llm_base_url=data.get('agent', {}).get('llm_base_url', ''), |
| ) |
|
|
|
|
| |
| |
| |
|
|
| class GenerationCompleteRequest(BaseModel): |
| """ |
| Generation complete notification (evaluation mode only). |
| """ |
| generation: int = Field(..., description="Generation number") |
| results_dir: str = Field(..., description="Path to generation results directory") |
| |
| code_path: str = Field(..., description="Path to the generated code") |
| evaluator_module: str = Field(..., description="Python module path (e.g., 'examples.circle_packing.evaluate')") |
| evaluator_function: str = Field("evaluate", description="Evaluator function name (default: 'evaluate')") |
| evaluator_kwargs: Optional[Dict[str, Any]] = Field(None, description="Additional kwargs for evaluator") |
| |
| |
| stage: Optional[str] = Field(None, description="Evolution stage") |
| metadata: Optional[Dict[str, Any]] = Field(None, description="Additional metadata") |
|
|
|
|
| class ServiceResponse(BaseModel): |
| """ |
| Service response for async evaluation mode. |
| """ |
| status: str = Field(..., description="Status: accepted | completed | skipped | error") |
| message: str = Field(..., description="Human-readable message") |
| generation: int |
| |
| |
| job_id: Optional[str] = Field(None, description="Job ID for async evaluation (evaluation mode only)") |
| estimated_time: Optional[float] = Field(None, description="Estimated evaluation time in seconds") |
| |
| |
| agent_triggered: bool = Field(..., description="Whether agent was triggered") |
| trigger_reason: Optional[str] = Field(None, description="Why agent was/wasn't triggered") |
| |
| |
| evaluation_result: Optional[Dict[str, Any]] = Field(None, description="Evaluation result (if completed)") |
| insights: Optional[List[str]] = Field(None, description="Agent insights (if agent was triggered)") |
| auxiliary_metrics: Optional[Dict[str, Any]] = Field(None, description="Auxiliary metrics info") |
| |
| |
| processing_time_ms: float |
|
|
|
|
| class ServiceStatusResponse(BaseModel): |
| """Service status information""" |
| status: str = "running" |
| uptime_seconds: float |
| version: str = "2.0.0-standalone" |
| |
| experiment: Dict[str, Any] |
| statistics: Dict[str, Any] |
| config: Dict[str, Any] |
|
|
| class InitializeRequest(BaseModel): |
| """ |
| Initialize/reset service state for a new experiment. |
| """ |
| results_dir: str = Field(..., description="Experiment root directory") |
| primary_evaluator: Optional[str] = Field(None, description="Path to primary evaluator") |
| experiment_name: Optional[str] = Field(None, description="Experiment name") |
| trigger_mode: Optional[str] = Field(None, description="Trigger mode override") |
| trigger_interval: Optional[int] = Field(None, description="Trigger interval override") |
| problem_statement: Optional[str] = Field(None, description="Problem description for diagnostic context") |
| evaluator_kwargs: Optional[Dict[str, Any]] = Field(None, description="Task-specific kwargs (e.g., problem_id, frontier_cs_dir)") |
|
|
| class InitializeResponse(BaseModel): |
| """ |
| Initialize/reset response. |
| """ |
| status: str = Field(..., description="Status: ready | error") |
| message: str = Field(..., description="Human-readable message") |
| results_dir: str |
| agent_initialized: bool |
| processing_time_ms: float |
|
|
|
|
| |
| |
| |
|
|
| class IntegratedEV2Agent: |
| """ |
| Integrated EV2 Agent - Direct OpenHands Management |
| |
| This class replaces the call to evolution_evaluation_agent() in ev2.py |
| with direct, integrated agent management. |
| |
| Key differences from ev2.py: |
| - Agent instance can be persistent (reused across calls) |
| - State management is integrated with service |
| - No subprocess calls |
| """ |
| |
| def __init__(self, |
| results_dir: str, |
| primary_evaluator_path: str, |
| config: ServiceConfig, |
| problem_statement: str = "", |
| evaluator_kwargs: Optional[Dict[str, Any]] = None): |
| """ |
| Initialize integrated agent |
| |
| Args: |
| results_dir: Path to results directory |
| primary_evaluator_path: Path to primary evaluator (ground truth) |
| config: Service configuration |
| problem_statement: Problem description for diagnostic context |
| evaluator_kwargs: Task-specific kwargs (e.g., problem_id, frontier_cs_dir) |
| """ |
| |
| self.results_dir = Path(results_dir).resolve() |
| self.primary_evaluator_path = Path(primary_evaluator_path).resolve() |
| self.config = config |
| self.problem_statement = problem_statement |
| self.evaluator_kwargs = evaluator_kwargs or {} |
| |
| |
| self.workspace = self.results_dir / "eval_agent_memory" |
| self.workspace.mkdir(parents=True, exist_ok=True) |
| self._bootstrap_memory_files() |
| |
| |
| if not self.primary_evaluator_path.exists(): |
| raise FileNotFoundError( |
| f"Primary evaluator not found: {self.primary_evaluator_path}" |
| ) |
| |
| |
| self._agent = None |
| self._llm = None |
|
|
| |
| self._run_lock = asyncio.Lock() |
| |
| logging.info("=" * 80) |
| logging.info("✅ IntegratedEV2Agent Initialized") |
| logging.info("=" * 80) |
| logging.info(f"Results Dir: {self.results_dir}") |
| logging.info(f"Workspace: {self.workspace}") |
| logging.info(f"Primary Evaluator: {self.primary_evaluator_path}") |
| logging.info("=" * 80) |
|
|
| def _get_last_agent_trigger_gen(self) -> int: |
| """Read last_agent_trigger_gen from persisted service state.""" |
| state_file = self.workspace / "service_state.json" |
| if state_file.exists(): |
| try: |
| with open(state_file) as f: |
| return json.load(f).get("last_agent_trigger_gen", -1) |
| except Exception: |
| pass |
| return -1 |
|
|
| def _build_case_analysis(self, current_gen: int) -> List[str]: |
| """Build per-case analysis from metrics.json for the current and best generations.""" |
| lines: List[str] = [] |
| results_path = self.results_dir |
|
|
| |
| current_metrics_path = results_path / f"gen_{current_gen}" / "results" / "metrics.json" |
| if not current_metrics_path.exists(): |
| return lines |
|
|
| try: |
| with open(current_metrics_path) as f: |
| data = json.load(f) |
| except Exception: |
| return lines |
|
|
| public = data.get("public", {}) |
| n_cases = public.get("n_cases", 0) |
| if n_cases == 0: |
| return lines |
|
|
| |
| tle_cases, wa_cases, partial_cases, perfect_cases = [], [], [], [] |
| partial_ratios = [] |
| TLE_THRESHOLD_MS = 1800 |
|
|
| for i in range(min(n_cases, 70)): |
| ratio_key = f"case_{i}_ratio" |
| time_key = f"case_{i}_time_ms" |
| if ratio_key not in public: |
| break |
| ratio = public[ratio_key] |
| time_ms = public.get(time_key, 0) |
|
|
| if ratio >= 1.0: |
| perfect_cases.append(i) |
| elif ratio <= 0 and time_ms >= TLE_THRESHOLD_MS: |
| tle_cases.append(i) |
| elif ratio <= 0: |
| wa_cases.append(i) |
| else: |
| partial_cases.append(i) |
| partial_ratios.append(ratio) |
|
|
| reported = len(tle_cases) + len(wa_cases) + len(partial_cases) + len(perfect_cases) |
| lines.append("") |
| lines.append(f"📊 Per-Case Analysis (gen {current_gen}, {reported} cases reported of {n_cases} total):") |
|
|
| def _fmt_cases(case_list: List[int], limit: int = 10) -> str: |
| if len(case_list) <= limit: |
| return ", ".join(str(c) for c in case_list) |
| return ", ".join(str(c) for c in case_list[:limit]) + f"... (+{len(case_list)-limit} more)" |
|
|
| lines.append(f" TLE (>{TLE_THRESHOLD_MS}ms, ratio=0): {len(tle_cases)}/{reported}" + |
| (f" — cases {_fmt_cases(tle_cases)}" if tle_cases else "")) |
| lines.append(f" WA (ratio=0, not TLE): {len(wa_cases)}/{reported}" + |
| (f" — cases {_fmt_cases(wa_cases)}" if wa_cases else "")) |
| if partial_ratios: |
| lines.append(f" Partial (0<ratio<1): {len(partial_cases)}/{reported}" |
| f", avg={sum(partial_ratios)/len(partial_ratios):.3f}" |
| f", min={min(partial_ratios):.3f}, max={max(partial_ratios):.3f}") |
| else: |
| lines.append(f" Partial (0<ratio<1): 0/{reported}") |
| lines.append(f" Perfect (ratio=1): {len(perfect_cases)}/{reported}" + |
| (f" — cases {_fmt_cases(perfect_cases)}" if perfect_cases else "")) |
|
|
| |
| best_score = 0 |
| best_gen = 0 |
| for g in range(current_gen + 1): |
| mp = results_path / f"gen_{g}" / "results" / "metrics.json" |
| if mp.exists(): |
| try: |
| with open(mp) as f: |
| s = json.load(f).get("combined_score", 0) or 0 |
| if s > best_score: |
| best_score = s |
| best_gen = g |
| except Exception: |
| pass |
| if best_score > 0: |
| lines.append(f" Best gen so far: gen_{best_gen}, score {best_score:.2f}") |
|
|
| |
| if current_metrics_path.exists(): |
| try: |
| with open(current_metrics_path) as f: |
| data = json.load(f) |
| pub = data.get("public", {}) |
| aux_vals = {k: v for k, v in pub.items() |
| if k.startswith("aux_") and not k.startswith("aux_aux_metric_")} |
| if aux_vals: |
| lines.append(f" Auxiliary metrics: " + |
| ", ".join(f"{k}={v:.4f}" if isinstance(v, float) else f"{k}={v}" |
| for k, v in sorted(aux_vals.items()))) |
| except Exception: |
| pass |
|
|
| return lines |
|
|
| def _build_aux_metric_trends(self, current_gen: int) -> List[str]: |
| """Build a table of auxiliary metric values over recent generations.""" |
| lines: List[str] = [] |
| results_path = self.results_dir |
|
|
| |
| FRAMEWORK_KEYS = { |
| "aux_aux_metric_eval_success", "aux_aux_metric_error_code", |
| "aux_aux_metric_error_message_length", "aux_aux_metric_error_detail_length", |
| "aux_aux_metric_non_numeric_dropped_count", |
| } |
|
|
| |
| hist_start = max(0, current_gen - 10) |
| records: List[tuple] = [] |
| all_aux_keys: set = set() |
|
|
| for gen in range(hist_start, current_gen + 1): |
| mp = results_path / f"gen_{gen}" / "results" / "metrics.json" |
| if not mp.exists(): |
| continue |
| try: |
| with open(mp) as f: |
| data = json.load(f) |
| score = data.get("combined_score", 0) or 0 |
| pub = data.get("public", {}) |
| aux = {k: v for k, v in pub.items() |
| if k.startswith("aux_") and k not in FRAMEWORK_KEYS |
| and isinstance(v, (int, float))} |
| if aux: |
| all_aux_keys.update(aux.keys()) |
| records.append((gen, score, aux)) |
| except Exception: |
| continue |
|
|
| if not all_aux_keys: |
| lines.append("") |
| lines.append("📈 Auxiliary Metrics: No custom metrics defined yet. Write auxiliary_metrics.py to start measuring.") |
| return lines |
|
|
| |
| sorted_keys = sorted(all_aux_keys) |
| header = "| Gen | " + " | ".join(k.replace("aux_", "") for k in sorted_keys) + " | Score |" |
| sep = "|-----|" + "|".join("-" * max(len(k.replace("aux_", "")), 7) for k in sorted_keys) + "|-------|" |
|
|
| lines.append("") |
| lines.append("📈 Auxiliary Metric Trends:") |
| lines.append(header) |
| lines.append(sep) |
| for gen, score, aux in records: |
| vals = [] |
| for k in sorted_keys: |
| v = aux.get(k) |
| vals.append(f"{v:.4f}" if isinstance(v, float) else str(v) if v is not None else "N/A") |
| lines.append(f"| {gen:3d} | " + " | ".join(f"{v:>{max(len(k.replace('aux_', '')), 7)}}" for v, k in zip(vals, sorted_keys)) + f" | {min(score, 100):5.1f} |") |
|
|
| return lines |
|
|
| def _bootstrap_memory_files(self): |
| """Create expected memory files if they do not exist yet.""" |
| eval_agents_md = self.workspace / "EVAL_AGENTS.md" |
| if not eval_agents_md.exists(): |
| eval_agents_md.write_text( |
| "# EV2 Agent Memory\n\n" |
| "- Initialized by eval service.\n" |
| "- Use this file as compact cross-generation memory.\n", |
| encoding="utf-8", |
| ) |
|
|
| auxiliary_metrics_py = self.workspace / "auxiliary_metrics.py" |
| if not auxiliary_metrics_py.exists(): |
| auxiliary_metrics_py.write_text( |
| "def evaluate_aux(results_dir, primary_result=None):\n" |
| " \"\"\"Return auxiliary metrics as a dict.\"\"\"\n" |
| " return {}\n", |
| encoding="utf-8", |
| ) |
| |
| def _get_code_ext(self) -> str: |
| """Get the code file extension by checking gen_0 for existing files.""" |
| for ext in [".cpp", ".py", ".java", ".go", ".rs", ".c"]: |
| candidate = self.results_dir / "gen_0" / f"main{ext}" |
| if candidate.exists(): |
| return ext |
| return ".cpp" |
|
|
| def _extract_agent_candidate(self) -> Optional[str]: |
| """Check if agent wrote a candidate code file. Returns code string or None.""" |
| ext = self._get_code_ext() |
| candidate_path = self.workspace / f"agent_candidate{ext}" |
| if candidate_path.exists(): |
| code = candidate_path.read_text(encoding="utf-8").strip() |
| if code: |
| logging.info(f"📝 Found agent candidate: {candidate_path} ({len(code)} chars)") |
| return code |
| return None |
|
|
| def _create_llm(self) -> LLM: |
| """ |
| Create LLM instance |
| |
| Migrated from ev2.py lines 54-58 |
| Uses same environment variable logic |
| """ |
| |
| model = self.config.llm_model or os.getenv("LLM_MODEL", "vertex_ai/gemini-2.5-flash") |
| api_key = self.config.llm_api_key or os.getenv("LLM_API_KEY") |
| base_url = self.config.llm_base_url or os.getenv("LLM_BASE_URL", None) |
| log_completions = OPENHANDS_LOG_COMPLETIONS |
| default_completion_dir = str(self.workspace / "llm_completions") |
| log_completions_folder = os.getenv( |
| "OPENHANDS_LOG_COMPLETIONS_DIR", |
| default_completion_dir, |
| ) |
| |
| logging.info(f"🤖 Creating LLM: {model}") |
| logging.info(f" OpenHands completion logging: {log_completions}") |
| if log_completions: |
| logging.info(f" Completion log dir: {log_completions_folder}") |
| |
| llm = LLM( |
| model=model, |
| api_key=api_key, |
| base_url=base_url, |
| log_completions=log_completions, |
| log_completions_folder=log_completions_folder, |
| ) |
| |
| return llm |
| |
| def _create_agent(self) -> Agent: |
| """ |
| Create OpenHands Agent |
| |
| Migrated from ev2.py lines 60-73 |
| Exact same configuration as ev2.py |
| """ |
| |
| ev2_prompt_path = Path(__file__).parent / "ev2_prompt.j2" |
| |
| if not ev2_prompt_path.exists(): |
| raise FileNotFoundError( |
| f"EV2 prompt template not found: {ev2_prompt_path}" |
| ) |
| |
| logging.info(f"📋 Loading prompt: {ev2_prompt_path}") |
| |
| |
| agent = Agent( |
| llm=self._llm, |
| tools=[ |
| Tool(name=TerminalTool.name), |
| Tool(name=FileEditorTool.name), |
| Tool(name=TaskTrackerTool.name), |
| ], |
| system_prompt_filename=str(ev2_prompt_path), |
| ) |
| |
| logging.info("✅ Agent created") |
| |
| return agent |
| |
| def _ensure_agent_ready(self): |
| """Ensure agent is created and ready""" |
| if self._llm is None: |
| self._llm = self._create_llm() |
| |
| if self._agent is None: |
| self._agent = self._create_agent() |
| |
| def _build_task_message(self, current_gen: int) -> str: |
| """Build task message for eval agent.""" |
| results_path = self.results_dir |
| last_trigger = self._get_last_agent_trigger_gen() |
|
|
| |
| search_start = max(0, last_trigger + 1) if last_trigger >= 0 else 0 |
| target_gen = current_gen |
| best_score = -1.0 |
| for g in range(search_start, current_gen + 1): |
| mp = results_path / f"gen_{g}" / "results" / "metrics.json" |
| if mp.exists(): |
| try: |
| with open(mp) as f: |
| s = json.load(f).get("combined_score", 0) or 0 |
| if s > best_score: |
| best_score = s |
| target_gen = g |
| except Exception: |
| pass |
|
|
| target_metrics = results_path / f"gen_{target_gen}" / "results" / "metrics.json" |
| target_score = None |
| if target_metrics.exists(): |
| try: |
| with open(target_metrics) as f: |
| target_score = json.load(f).get("combined_score", None) |
| except Exception: |
| pass |
|
|
| |
| task_parts = [ |
| f"=== Generation {current_gen} Evaluation ===", |
| "", |
| "📁 File Locations (all absolute paths):", |
| f"- Results directory: {self.results_dir}", |
| f"- Agent candidate (MUST WRITE): {self.results_dir}/eval_agent_memory/agent_candidate{self._get_code_ext()}", |
| f"- Diagnostic report (MUST WRITE): {self.results_dir}/eval_agent_memory/diagnostic_report.md", |
| f"- Memory log (MUST WRITE): {self.results_dir}/eval_agent_memory/EVAL_AGENTS.md", |
| f"- Auxiliary metrics: {self.results_dir}/eval_agent_memory/auxiliary_metrics.py", |
| f"- Code to diagnose: {self.results_dir}/gen_{target_gen}/main.cpp", |
| f"- Metrics to diagnose: {self.results_dir}/gen_{target_gen}/results/metrics.json", |
| f"- All generations: {self.results_dir}/gen_0/ through {self.results_dir}/gen_{current_gen}/", |
| ] |
|
|
| if target_gen != current_gen: |
| task_parts.append(f"- NOTE: Diagnosing gen {target_gen} (best since last trigger), not gen {current_gen} (latest)") |
| if target_score is not None: |
| task_parts.append(f"- Score: {target_score:.4f}") |
|
|
| |
| current_metrics = results_path / f"gen_{current_gen}" / "results" / "metrics.json" |
| if current_metrics.exists(): |
| try: |
| with open(current_metrics) as f: |
| _pub = json.load(f).get("public", {}) |
| _err = _pub.get("aux_aux_metric_error_code", 0) |
| if _err and _err > 0: |
| task_parts.extend(["", |
| f"⚠️ auxiliary_metrics.py had an error (code={int(_err)}) on gen {current_gen}. Fix or rewrite it."]) |
| except Exception: |
| pass |
|
|
| |
| if self.problem_statement: |
| task_parts.extend(["", "📝 PROBLEM STATEMENT:", self.problem_statement[:4000]]) |
|
|
| |
| if self.evaluator_kwargs and self.evaluator_kwargs.get("frontier_cs_dir"): |
| fc_dir = self.evaluator_kwargs["frontier_cs_dir"] |
| pid = self.evaluator_kwargs.get("problem_id", "0") |
| problem_dir = Path(fc_dir) / "algorithmic" / "problems" / str(pid) |
| testdata_dir = problem_dir / "testdata" |
| is_interactive = (problem_dir / "interactor.cc").exists() |
|
|
| |
| smallest_test = "" |
| if testdata_dir.exists(): |
| in_files = sorted(testdata_dir.glob("*.in"), key=lambda f: f.stat().st_size) |
| if in_files: |
| smallest_test = str(in_files[0]) |
| smallest_size = in_files[0].stat().st_size |
|
|
| task_parts.extend(["", |
| "🔧 Code Execution Environment:", |
| f" Your auxiliary_metrics.py can compile and run the code locally using subprocess.", |
| f" This is what makes your metrics non-trivial — measure actual program behavior.", |
| f" In evaluate_aux(results_dir), results_dir points to gen_N/ and the code is at os.path.join(results_dir, 'main.cpp')", |
| f" Compile: g++ -O2 -pipe -std=gnu++17 -o /tmp/main_test {{code_path}}", |
| f" Testdata dir: {testdata_dir}", |
| ]) |
| if smallest_test: |
| task_parts.append(f" Smallest test: {smallest_test} ({smallest_size} bytes)") |
| if is_interactive: |
| testlib_dir = Path(fc_dir) / "algorithmic" / "judge" / "include" |
| task_parts.extend([ |
| f" Problem type: interactive", |
| f" Interactor: {problem_dir}/interactor.cc", |
| f" Compile interactor: g++ -O2 -pipe -std=gnu++17 -I {testlib_dir} -o interactor interactor.cc", |
| ]) |
| else: |
| task_parts.extend([ |
| f" Problem type: default (stdin → stdout)", |
| f" Run: timeout 5 ./main_test < {{test_input}} > output.txt", |
| ]) |
|
|
| |
| case_analysis = self._build_case_analysis(target_gen) |
| if case_analysis: |
| task_parts.extend(case_analysis) |
|
|
| |
| hist_start = max(0, current_gen - 10) |
| task_parts.extend(["", "📈 Score Trend (last 10 gens):"]) |
| if current_gen <= 0: |
| task_parts.append("- No previous generations.") |
| else: |
| trend_tokens: List[str] = [] |
| numeric_scores: List[tuple[int, float]] = [] |
| for gen in range(hist_start, current_gen): |
| mp = results_path / f"gen_{gen}" / "results" / "metrics.json" |
| score = None |
| if mp.exists(): |
| try: |
| with open(mp) as f: |
| score = json.load(f).get("combined_score", None) |
| except Exception: |
| pass |
| if isinstance(score, numbers.Real): |
| trend_tokens.append(f"g{gen}: {float(score):.1f}") |
| numeric_scores.append((gen, float(score))) |
| else: |
| trend_tokens.append(f"g{gen}: N/A") |
| task_parts.append("- " + " | ".join(trend_tokens)) |
|
|
| |
| aux_trend_lines = self._build_aux_metric_trends(current_gen) |
| if aux_trend_lines: |
| task_parts.extend(aux_trend_lines) |
|
|
| |
| task_parts.extend(["", |
| "🧰 Toolbox APIs (if needed):", |
| "- from eval_agent.tool_box import call_vision, call_tool", |
| "- Do NOT import eval_agent/tool_box/_internal/*", |
| ]) |
|
|
| |
| project_root = Path(__file__).parent.parent.resolve() |
| task_parts.extend(["", |
| "🔧 Test aux metrics (copy-paste to terminal):", |
| f" python -c \"import sys; sys.path.insert(0,'{project_root}'); " |
| f"import importlib.util, json; " |
| f"spec=importlib.util.spec_from_file_location('aux','{self.workspace}/auxiliary_metrics.py'); " |
| f"mod=importlib.util.module_from_spec(spec); spec.loader.exec_module(mod); " |
| f"print(json.dumps(mod.evaluate_aux('{self.results_dir}/gen_{target_gen}'),indent=2))\"", |
| ]) |
|
|
| |
| try: |
| from eval_agent.feedback import compute_metric_feedback |
| if last_trigger >= 0: |
| feedback_text = compute_metric_feedback( |
| results_dir=self.results_dir, |
| current_gen=current_gen, |
| last_agent_gen=last_trigger, |
| ) |
| if feedback_text: |
| task_parts.append(feedback_text) |
| except Exception as e: |
| logging.warning(f"Failed to compute metric feedback: {e}") |
|
|
| return "\n".join(task_parts) |
| |
| async def analyze_generation(self, generation: int) -> Dict[str, Any]: |
| """ |
| Analyze a generation using the agent. |
| |
| Uses a non-blocking lock so that concurrent triggers skip rather than |
| queue up a backlog of long-running agent sessions. |
| |
| Args: |
| generation: Generation number to analyze |
| |
| Returns: |
| Dict with analysis results, or a skip-marker dict if the agent is busy |
| """ |
| if self._run_lock.locked(): |
| logging.warning( |
| f"⏭️ Agent busy — skipping analysis for generation {generation}. " |
| "Next periodic trigger will catch up." |
| ) |
| return { |
| "success": False, |
| "generation": generation, |
| "skipped": True, |
| "reason": "agent_busy", |
| } |
|
|
| async with self._run_lock: |
| return await self._analyze_generation_locked(generation) |
|
|
| async def _analyze_generation_locked(self, generation: int) -> Dict[str, Any]: |
| """Inner implementation that runs under the lock.""" |
| logging.info("=" * 80) |
| logging.info(f"🧠 EV2 Agent Analysis - Generation {generation}") |
| logging.info("=" * 80) |
|
|
| start_time = time.time() |
|
|
| try: |
| |
| self._ensure_agent_ready() |
| |
| |
| task_message = self._build_task_message(generation) |
| task_hash = hashlib.sha256(task_message.encode("utf-8")).hexdigest() |
| |
| logging.info(f"📝 Task message: {len(task_message)} characters") |
| logging.info(f"📁 Workspace: {self.workspace}") |
| behavior_logging.save_text_artifact( |
| str(self.results_dir), |
| f"agent_runs/gen_{generation}_task_message.txt", |
| task_message, |
| ) |
| behavior_logging.log_event( |
| str(self.results_dir), |
| "agent_run_start", |
| { |
| "generation": generation, |
| "workspace": str(self.workspace), |
| "task_message_chars": len(task_message), |
| "task_message_sha256": task_hash, |
| }, |
| ) |
| |
| |
| aux_py = self.workspace / "auxiliary_metrics.py" |
| aux_bak = self.workspace / "auxiliary_metrics.py.bak" |
| if aux_py.exists(): |
| import shutil |
| shutil.copy2(aux_py, aux_bak) |
|
|
| |
| conversation = Conversation( |
| agent=self._agent, |
| workspace=str(self.workspace) |
| ) |
|
|
| |
| logging.info("📤 Sending task to agent...") |
| conversation.send_message(task_message) |
|
|
| logging.info("🔄 Agent working...") |
| await asyncio.to_thread(conversation.run) |
|
|
| |
| if aux_py.exists(): |
| try: |
| compile(aux_py.read_text(encoding="utf-8"), str(aux_py), "exec") |
| logging.info("✅ auxiliary_metrics.py syntax OK") |
| except SyntaxError as e: |
| logging.warning(f"⚠️ auxiliary_metrics.py has syntax error: {e}. Reverting to backup.") |
| if aux_bak.exists(): |
| shutil.copy2(aux_bak, aux_py) |
| logging.info("✅ Reverted to backup") |
|
|
| if ENABLE_FULL_TRAJECTORY_LOG: |
| try: |
| from openhands.sdk.event.base import LLMConvertibleEvent |
|
|
| events = list(conversation.state.events) |
| llm_events = [e for e in events if isinstance(e, LLMConvertibleEvent)] |
| llm_messages = LLMConvertibleEvent.events_to_messages(llm_events) |
|
|
| trajectory_messages: List[Dict[str, Any]] = [] |
| for msg in llm_messages: |
| if hasattr(msg, "model_dump"): |
| trajectory_messages.append(_sanitize_for_json(msg.model_dump())) |
| else: |
| trajectory_messages.append(_sanitize_for_json(dict(msg))) |
|
|
| behavior_logging.save_text_artifact( |
| str(self.results_dir), |
| f"agent_runs/gen_{generation}_trajectory_messages.json", |
| json.dumps(trajectory_messages, indent=2, ensure_ascii=False), |
| ) |
| behavior_logging.log_event( |
| str(self.results_dir), |
| "agent_trajectory_saved", |
| { |
| "generation": generation, |
| "event_count": len(events), |
| "llm_event_count": len(llm_events), |
| "message_count": len(trajectory_messages), |
| }, |
| ) |
| except Exception as e: |
| behavior_logging.log_event( |
| str(self.results_dir), |
| "agent_trajectory_save_failed", |
| { |
| "generation": generation, |
| "error": str(e), |
| }, |
| ) |
|
|
| elapsed = time.time() - start_time |
| |
| logging.info("=" * 80) |
| logging.info("✅ EV2 Evaluation Complete!") |
| logging.info("=" * 80) |
| logging.info(f"⏱️ Time: {elapsed:.1f}s") |
| logging.info(f"📁 Workspace: {self.workspace}") |
| logging.info(f"📝 Memory: {self.workspace}/EVAL_AGENTS.md") |
| logging.info("=" * 80) |
| |
| |
| insights = self._extract_insights() |
| metrics = self._extract_metrics() |
| candidate_code = self._extract_agent_candidate() |
|
|
| agent_result = { |
| "success": True, |
| "generation": generation, |
| "workspace": str(self.workspace), |
| "insights": insights, |
| "auxiliary_metrics": metrics, |
| "candidate_code": candidate_code, |
| "elapsed_seconds": elapsed |
| } |
| behavior_logging.save_text_artifact( |
| str(self.results_dir), |
| f"agent_runs/gen_{generation}_result.json", |
| json.dumps(_sanitize_for_json(agent_result), indent=2, ensure_ascii=False), |
| ) |
| behavior_logging.log_event( |
| str(self.results_dir), |
| "agent_run_end", |
| { |
| "generation": generation, |
| "success": True, |
| "elapsed_seconds": elapsed, |
| "insight_count": len(insights), |
| "auxiliary_metric_file_exists": bool(metrics.get("auxiliary_metrics_file_exists")), |
| "auxiliary_metric_file_size_bytes": metrics.get("file_size_bytes", 0), |
| }, |
| ) |
| return agent_result |
| |
| except Exception as e: |
| behavior_logging.log_event( |
| str(self.results_dir), |
| "agent_run_end", |
| { |
| "generation": generation, |
| "success": False, |
| "elapsed_seconds": time.time() - start_time, |
| "error": str(e), |
| }, |
| ) |
| logging.error(f"❌ Agent analysis failed: {e}", exc_info=True) |
| raise |
| |
| def _extract_insights(self) -> List[str]: |
| """Extract insights from EVAL_AGENTS.md""" |
| eval_agents_md = self.workspace / "EVAL_AGENTS.md" |
| |
| if not eval_agents_md.exists(): |
| return [] |
| |
| insights = [] |
| content = eval_agents_md.read_text() |
| |
| |
| for line in content.split('\n'): |
| stripped = line.strip() |
| if stripped.startswith('*') or stripped.startswith('-'): |
| insights.append(stripped) |
| |
| |
| return insights[-10:] if insights else [] |
| |
| def _extract_metrics(self) -> Dict[str, Any]: |
| """Extract auxiliary metrics information""" |
| auxiliary_py = self.workspace / "auxiliary_metrics.py" |
| |
| metrics = { |
| "auxiliary_metrics_file_exists": auxiliary_py.exists(), |
| } |
| |
| if auxiliary_py.exists(): |
| metrics["auxiliary_metrics_path"] = str(auxiliary_py) |
| metrics["file_size_bytes"] = auxiliary_py.stat().st_size |
| |
| return metrics |
|
|
| def _extract_diagnostic_report(self) -> str: |
| """Extract diagnostic report written by the agent.""" |
| report_path = self.workspace / "diagnostic_report.md" |
| if not report_path.exists(): |
| return "" |
| try: |
| content = report_path.read_text(encoding="utf-8").strip() |
| |
| return content[:2000] if content else "" |
| except Exception: |
| return "" |
|
|
|
|
| |
| |
| |
|
|
| class ServiceState: |
| """ |
| Service state management (same as ev2_service.py) |
| |
| Maintains history and decides when to trigger agent |
| """ |
| |
| def __init__(self, config: ServiceConfig, force_clean: bool = False): |
| """ |
| Initialize service state. |
| |
| Args: |
| config: Service configuration |
| force_clean: If True, start with clean state (don't load from disk) |
| """ |
| self.config = config |
| |
| |
| self.generation_history: List[Dict[str, Any]] = [] |
| self.last_agent_trigger_gen: int = -1 |
| self.total_notifications: int = 0 |
| self.total_agent_runs: int = 0 |
| self.agent_trigger_history: List[Dict[str, Any]] = [] |
| |
| |
| self.start_time = time.time() |
| |
| |
| if not force_clean: |
| self._load_state() |
| else: |
| logging.info("🔄 Starting with clean state (not loading from disk)") |
| self._save_state() |
| |
| def _get_state_file(self) -> Path: |
| """Get path to state file""" |
| if self.config.results_dir: |
| state_dir = Path(self.config.results_dir) / "eval_agent_memory" |
| state_dir.mkdir(parents=True, exist_ok=True) |
| return state_dir / "service_state.json" |
| return Path("service_state.json") |
| |
| def _load_state(self): |
| """Load previous state from disk""" |
| state_file = self._get_state_file() |
| if state_file.exists(): |
| try: |
| with open(state_file) as f: |
| data = json.load(f) |
| |
| self.generation_history = data.get('generation_history', []) |
| self.last_agent_trigger_gen = data.get('last_agent_trigger_gen', -1) |
| self.total_notifications = data.get('total_notifications', 0) |
| self.total_agent_runs = data.get('total_agent_runs', 0) |
| self.agent_trigger_history = data.get('agent_trigger_history', []) |
| |
| logging.info(f"📥 Loaded state: {len(self.generation_history)} generations in history") |
| except Exception as e: |
| logging.error(f"Failed to load state: {e}") |
| |
| def _save_state(self): |
| """Save current state to disk""" |
| state_file = self._get_state_file() |
| try: |
| data = { |
| 'generation_history': self.generation_history[-100:], |
| 'last_agent_trigger_gen': self.last_agent_trigger_gen, |
| 'total_notifications': self.total_notifications, |
| 'total_agent_runs': self.total_agent_runs, |
| 'agent_trigger_history': self.agent_trigger_history[-20:], |
| 'last_update': time.time() |
| } |
| |
| with open(state_file, 'w') as f: |
| json.dump(data, f, indent=2) |
| except Exception as e: |
| logging.error(f"Failed to save state: {e}") |
| |
| def add_generation(self, gen_data: Dict[str, Any]): |
| """Record a generation""" |
| self.generation_history.append(gen_data) |
| self.total_notifications += 1 |
| |
| |
| if len(self.generation_history) > 100: |
| self.generation_history = self.generation_history[-100:] |
| |
| self._save_state() |
| |
| def should_trigger_agent(self, generation: int, primary_score: float) -> tuple[bool, str]: |
| """ |
| Decide whether to trigger the agent |
| |
| Returns: (should_trigger, reason) |
| """ |
| if not self.config.agent_enabled: |
| return False, "Agent disabled in config" |
| |
| |
| if self.config.trigger_mode == "always": |
| return True, "Always mode" |
| |
| |
| if self.config.trigger_mode == "periodic": |
| if generation - self.last_agent_trigger_gen >= self.config.trigger_interval: |
| return True, f"Periodic trigger (interval={self.config.trigger_interval})" |
| else: |
| return False, f"Not yet (last trigger at gen {self.last_agent_trigger_gen})" |
| |
| |
| if self.config.trigger_mode == "plateau": |
| if self._detect_plateau(): |
| return True, "Plateau detected" |
| else: |
| return False, "No plateau detected" |
| |
| |
| if self.config.trigger_mode == "mixed": |
| |
| if generation - self.last_agent_trigger_gen >= self.config.trigger_interval: |
| return True, f"Periodic trigger (interval={self.config.trigger_interval})" |
| |
| |
| if self._detect_plateau(): |
| return True, "Plateau detected (early trigger)" |
| |
| return False, f"Waiting (next trigger at gen {self.last_agent_trigger_gen + self.config.trigger_interval})" |
| |
| return False, f"Unknown trigger mode: {self.config.trigger_mode}" |
| |
| def _detect_plateau(self) -> bool: |
| """Detect if primary score has plateaued""" |
| window = self.config.plateau_window |
| if len(self.generation_history) < window: |
| return False |
| |
| recent = self.generation_history[-window:] |
| scores = [g['primary_score'] for g in recent] |
| |
| |
| improvement = (scores[-1] - scores[0]) / max(abs(scores[0]), 1e-6) |
| |
| return abs(improvement) < self.config.plateau_threshold |
| |
| def mark_agent_triggered(self, generation: int, active_metrics: Optional[List[str]] = None): |
| """Mark that agent was triggered, recording active aux metric names.""" |
| self.last_agent_trigger_gen = generation |
| self.total_agent_runs += 1 |
| self.agent_trigger_history.append({ |
| "generation": generation, |
| "timestamp": time.time(), |
| "active_metrics": active_metrics or [], |
| }) |
| self._save_state() |
| |
| def get_statistics(self) -> Dict[str, Any]: |
| """Get service statistics""" |
| return { |
| "total_notifications": self.total_notifications, |
| "total_agent_runs": self.total_agent_runs, |
| "generations_tracked": len(self.generation_history), |
| "last_agent_trigger_gen": self.last_agent_trigger_gen, |
| "uptime_seconds": time.time() - self.start_time |
| } |
|
|
|
|
| |
| |
| |
|
|
| |
| service_state: Optional[ServiceState] = None |
| service_config: Optional[ServiceConfig] = None |
| ev2_agent: Optional[IntegratedEV2Agent] = None |
|
|
| |
| evaluation_jobs: Dict[str, Dict[str, Any]] = {} |
|
|
| |
| import asyncio |
| _state_lock = asyncio.Lock() |
|
|
| |
| app = FastAPI( |
| title="EV2 Evaluation Service (Standalone)", |
| description="Event-driven evaluation service with integrated OpenHands agent", |
| version="2.0.0" |
| ) |
|
|
| |
| logging.basicConfig( |
| level=logging.INFO, |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' |
| ) |
| logger = logging.getLogger(__name__) |
|
|
| def _env_flag(name: str, default: bool = False) -> bool: |
| raw = os.getenv(name) |
| if raw is None: |
| return default |
| return raw.strip().lower() in {"1", "true", "yes", "on"} |
|
|
|
|
| ENABLE_FULL_TRAJECTORY_LOG = _env_flag("ENABLE_FULL_TRAJECTORY_LOG", default=False) |
| OPENHANDS_LOG_COMPLETIONS = _env_flag("OPENHANDS_LOG_COMPLETIONS", default=False) |
|
|
|
|
| def _should_suppress_aux_metrics(experiment_root: str, current_gen: int, window: int = 10) -> bool: |
| """Check if aux metrics should be suppressed due to negative correlation with score. |
| |
| Looks at the last `window` generations. If any aux metric has Spearman rho < -0.3, |
| returns True to suppress all aux metrics for safety. |
| """ |
| from eval_agent.feedback import _load_generation_metrics, _spearman_correlation |
|
|
| results_dir = Path(experiment_root) |
| gen_start = max(0, current_gen - window) |
| records = _load_generation_metrics(results_dir, gen_start, current_gen) |
| if len(records) < 5: |
| return False |
|
|
| scores = [] |
| for _, data in records: |
| cs = data.get("combined_score") |
| scores.append(float(cs) if isinstance(cs, (int, float)) and math.isfinite(cs) else 0.0) |
|
|
| |
| aux_names: set = set() |
| for _, data in records: |
| for key in data.get("public", {}): |
| if key.startswith("aux_"): |
| aux_names.add(key) |
|
|
| for name in aux_names: |
| vals = [] |
| paired_s = [] |
| for (_, data), s in zip(records, scores): |
| v = data.get("public", {}).get(name) |
| if isinstance(v, (int, float)) and math.isfinite(v): |
| vals.append(float(v)) |
| paired_s.append(s) |
|
|
| if len(vals) >= 5: |
| rho = _spearman_correlation(vals, paired_s) |
| if rho < -0.3: |
| logging.warning( |
| f"Aux metric '{name}' has negative correlation {rho:.2f} with score — triggering suppression" |
| ) |
| return True |
|
|
| return False |
|
|
|
|
| def _sanitize_for_json(value: Any) -> Any: |
| """Recursively convert non-JSON-safe numeric values (NaN/Inf) to None.""" |
| if isinstance(value, float): |
| return value if math.isfinite(value) else None |
| if isinstance(value, dict): |
| return {k: _sanitize_for_json(v) for k, v in value.items()} |
| if isinstance(value, list): |
| return [_sanitize_for_json(v) for v in value] |
| if isinstance(value, tuple): |
| return [_sanitize_for_json(v) for v in value] |
| |
| if hasattr(value, "item") and callable(getattr(value, "item")): |
| try: |
| coerced = value.item() |
| return _sanitize_for_json(coerced) |
| except Exception: |
| return str(value) |
| return value |
|
|
|
|
| async def _update_evaluation_job(job_id: str, **updates: Any) -> bool: |
| """Safely update a tracked evaluation job. Returns False if missing.""" |
| async with _state_lock: |
| job = evaluation_jobs.get(job_id) |
| if job is None: |
| return False |
| job.update(updates) |
| return True |
|
|
|
|
| @app.on_event("startup") |
| async def startup_event(): |
| """Initialize service on startup""" |
| global service_state, service_config, ev2_agent |
| |
| logger.info("=" * 80) |
| logger.info("🚀 Starting EV2 Evaluation Service (Standalone)") |
| logger.info("=" * 80) |
| |
| |
| if service_config is None: |
| logger.warning("⚠️ No config provided, using defaults") |
| service_config = ServiceConfig() |
| |
| |
| service_state = ServiceState(service_config) |
| |
| |
| if service_config.results_dir: |
| try: |
| ev2_agent = IntegratedEV2Agent( |
| results_dir=service_config.results_dir, |
| primary_evaluator_path=service_config.primary_evaluator_path, |
| config=service_config, |
| problem_statement=service_config.problem_statement, |
| evaluator_kwargs=service_config.evaluator_kwargs, |
| ) |
| logger.info("✅ Integrated EV2 Agent ready") |
| except Exception as e: |
| logger.error(f"❌ Failed to initialize agent: {e}") |
| logger.warning("⚠️ Service will start but agent calls will fail") |
| ev2_agent = None |
| else: |
| logger.info("⏳ Agent will be initialized on first generation request (dynamic mode)") |
| ev2_agent = None |
| |
| logger.info("=" * 80) |
| logger.info(f"✅ Service Started") |
| logger.info(f" Experiment: {service_config.experiment_name}") |
| logger.info(f" Results dir: {service_config.results_dir}") |
| logger.info(f" Trigger mode: {service_config.trigger_mode}") |
| logger.info(f" Trigger interval: {service_config.trigger_interval}") |
| logger.info(f" Full trajectory log: {ENABLE_FULL_TRAJECTORY_LOG}") |
| logger.info("=" * 80) |
|
|
|
|
| @app.on_event("shutdown") |
| async def shutdown_event(): |
| """Cleanup on shutdown""" |
| logger.info("=" * 80) |
| logger.info("🛑 Shutting down EV2 Evaluation Service") |
| logger.info("=" * 80) |
| |
| if service_state: |
| service_state._save_state() |
| logger.info(f" Total generation requests: {service_state.total_notifications}") |
| logger.info(f" Total agent runs: {service_state.total_agent_runs}") |
| |
| logger.info("=" * 80) |
|
|
|
|
| |
| |
| |
|
|
| def _normalize_experiment_root(results_dir: str) -> Path: |
| """ |
| Normalize results_dir to experiment root. |
| Accepts paths like: |
| - /path/to/experiment |
| - /path/to/experiment/gen_10 |
| - /path/to/experiment/gen_10/results |
| """ |
| results_path = Path(results_dir).resolve() |
| if results_path.name == "results": |
| return results_path.parent.parent |
| if results_path.name.startswith("gen_"): |
| return results_path.parent |
| return results_path |
|
|
|
|
| def _execute_primary_evaluator_sync( |
| code_path: str, |
| results_dir: str, |
| evaluator_module: str, |
| evaluator_function: str, |
| evaluator_kwargs: Optional[Dict[str, Any]], |
| ) -> Dict[str, Any]: |
| """ |
| Execute primary evaluator synchronously. |
| |
| This runs inside a worker subprocess so the parent process can hard-kill it |
| on timeout without leaving background threads. |
| """ |
| import importlib |
| import inspect |
|
|
| module = importlib.import_module(evaluator_module) |
| evaluator_func = getattr(module, evaluator_function) |
|
|
| eval_kwargs = evaluator_kwargs or {} |
| sig = inspect.signature(evaluator_func) |
| params = list(sig.parameters.keys()) |
|
|
| |
| |
| |
| |
| if len(params) >= 2 and "results_dir" in params[:3]: |
| result = evaluator_func(code_path, results_dir, **eval_kwargs) |
| else: |
| result = evaluator_func(code_path, **eval_kwargs) |
|
|
| |
| if result is None: |
| metrics_file = Path(results_dir) / "metrics.json" |
| if not metrics_file.exists(): |
| raise FileNotFoundError( |
| f"Evaluator returned None and metrics.json not found at {metrics_file}" |
| ) |
| with open(metrics_file) as f: |
| result = json.load(f) |
|
|
| if not isinstance(result, dict): |
| raise ValueError(f"Evaluator must return dict, got {type(result)}") |
| if "combined_score" not in result: |
| raise ValueError("Evaluator result must contain 'combined_score' key") |
|
|
| |
| correct_file = Path(results_dir) / "correct.json" |
| if correct_file.exists(): |
| try: |
| with open(correct_file) as f: |
| correct_data = json.load(f) |
| result["correct"] = correct_data.get("correct", False) |
| result["validation_error"] = correct_data.get("error", None) |
| except Exception: |
| result["correct"] = False |
| else: |
| result["correct"] = True |
|
|
| return result |
|
|
|
|
| def _run_primary_evaluator_worker(request_path: str, output_path: str) -> int: |
| """ |
| Worker-mode entrypoint for primary evaluation. |
| |
| Reads request json, executes evaluator, writes output json: |
| - {"ok": true, "result": {...}} |
| - {"ok": false, "error": "...", "traceback": "..."} |
| """ |
| try: |
| with open(request_path) as f: |
| payload = json.load(f) |
|
|
| result = _execute_primary_evaluator_sync( |
| code_path=payload["code_path"], |
| results_dir=payload["results_dir"], |
| evaluator_module=payload["evaluator_module"], |
| evaluator_function=payload["evaluator_function"], |
| evaluator_kwargs=payload.get("evaluator_kwargs"), |
| ) |
|
|
| with open(output_path, "w") as f: |
| json.dump({"ok": True, "result": result}, f) |
| return 0 |
| except Exception as e: |
| try: |
| with open(output_path, "w") as f: |
| json.dump( |
| { |
| "ok": False, |
| "error": str(e), |
| "traceback": traceback.format_exc(), |
| }, |
| f, |
| ) |
| except Exception: |
| pass |
| return 1 |
|
|
| async def run_primary_evaluator(request: GenerationCompleteRequest) -> Dict[str, Any]: |
| """ |
| Run primary evaluator (dynamically loaded) |
| |
| Args: |
| request: Generation complete request with evaluator config |
| |
| Returns: |
| Evaluation result dict with at least 'combined_score' key |
| |
| Raises: |
| ImportError: If evaluator module cannot be loaded |
| AttributeError: If evaluator function not found |
| Exception: If evaluation fails |
| """ |
| logger.info(f"🔬 Running primary evaluator: {request.evaluator_module}.{request.evaluator_function}") |
|
|
| logger.info(f" Code path: {request.code_path}") |
| logger.info(f" Results dir: {request.results_dir}") |
| logger.info(f" Evaluator: {request.evaluator_module}.{request.evaluator_function}") |
|
|
| timeout_seconds = service_config.evaluation_timeout if service_config else 300.0 |
| if timeout_seconds <= 0: |
| logger.warning( |
| f"Invalid evaluation timeout {timeout_seconds}; using default 300.0s" |
| ) |
| timeout_seconds = 300.0 |
| logger.info(f" Evaluation timeout: {timeout_seconds}s (hard kill)") |
|
|
| request_payload = { |
| "code_path": request.code_path, |
| "results_dir": request.results_dir, |
| "evaluator_module": request.evaluator_module, |
| "evaluator_function": request.evaluator_function, |
| "evaluator_kwargs": request.evaluator_kwargs or {}, |
| } |
|
|
| req_path = None |
| out_path = None |
| process = None |
| try: |
| with tempfile.NamedTemporaryFile( |
| mode="w", suffix=".json", prefix="ev2_eval_req_", delete=False |
| ) as req_file: |
| req_path = req_file.name |
| json.dump(request_payload, req_file) |
| out_path = f"{req_path}.out.json" |
|
|
| cmd = [ |
| sys.executable, |
| str(Path(__file__).resolve()), |
| "--worker-eval-request", |
| req_path, |
| "--worker-eval-output", |
| out_path, |
| ] |
|
|
| process = await asyncio.create_subprocess_exec( |
| *cmd, |
| stdout=asyncio.subprocess.PIPE, |
| stderr=asyncio.subprocess.PIPE, |
| ) |
|
|
| try: |
| _, stderr = await asyncio.wait_for( |
| process.communicate(), timeout=timeout_seconds |
| ) |
| except asyncio.TimeoutError: |
| error_msg = f"Evaluation exceeded timeout of {timeout_seconds}s (process killed)" |
| logger.error(f"⏱️ {error_msg}") |
| if process.returncode is None: |
| process.kill() |
| await process.communicate() |
| return { |
| "combined_score": 0.0, |
| "correct": False, |
| "validation_error": error_msg, |
| "execution_time_mean": timeout_seconds, |
| "timeout": True, |
| } |
|
|
| if not Path(out_path).exists(): |
| stderr_text = (stderr.decode("utf-8", errors="replace") or "").strip() |
| raise RuntimeError( |
| f"Evaluator worker did not produce output file. " |
| f"returncode={process.returncode}, stderr={stderr_text[:500]}" |
| ) |
|
|
| with open(out_path) as f: |
| worker_output = json.load(f) |
|
|
| if not worker_output.get("ok", False): |
| raise RuntimeError( |
| f"Evaluator worker failed: {worker_output.get('error', 'unknown error')}\n" |
| f"{worker_output.get('traceback', '')}" |
| ) |
|
|
| result = worker_output.get("result", {}) |
| logger.info( |
| f"✅ Primary evaluation completed: " |
| f"score={result.get('combined_score', 0.0):.4f}, " |
| f"correct={result.get('correct', '?')}" |
| ) |
| return result |
| except Exception as e: |
| logger.error(f"❌ Evaluation failed: {e}", exc_info=True) |
| raise |
| finally: |
| for path in [req_path, out_path]: |
| if path: |
| try: |
| os.unlink(path) |
| except OSError: |
| pass |
|
|
|
|
| async def run_auxiliary_evaluators( |
| request: GenerationCompleteRequest, |
| primary_result: Dict[str, Any], |
| experiment_root: str, |
| ) -> Dict[str, Any]: |
| """ |
| Run auxiliary evaluators (if they exist) |
| |
| Loads auxiliary_metrics.py from eval_agent_memory and calls the |
| evaluate_aux() function (single entry point pattern). |
| |
| Args: |
| request: Generation complete request |
| primary_result: Result from primary evaluator |
| experiment_root: Snapshot of experiment root used for this evaluation run |
| |
| Returns: |
| Dict of auxiliary metric results (flat dictionary) |
| """ |
| logger.info(f"🔍 Looking for auxiliary evaluators...") |
| behavior_logging.log_event( |
| request.results_dir, |
| "aux_eval_start", |
| {"generation": request.generation}, |
| ) |
| |
| |
| if not experiment_root: |
| logger.info(" No results_dir configured, skipping auxiliary metrics") |
| behavior_logging.log_event( |
| request.results_dir, |
| "aux_eval_end", |
| {"generation": request.generation, "success": True, "skipped": True, "reason": "no_results_dir"}, |
| ) |
| return {} |
| |
| aux_metrics_path = Path(experiment_root) / "eval_agent_memory" / "auxiliary_metrics.py" |
| |
| if not aux_metrics_path.exists(): |
| logger.info(f" No auxiliary metrics found at {aux_metrics_path}") |
| behavior_logging.log_event( |
| request.results_dir, |
| "aux_eval_end", |
| {"generation": request.generation, "success": True, "skipped": True, "reason": "aux_file_missing"}, |
| ) |
| return {} |
| |
| logger.info(f" Found auxiliary metrics: {aux_metrics_path}") |
|
|
| |
| |
| |
| try: |
| snapshot_path = Path(request.results_dir) / "auxiliary_metrics_snapshot.py" |
| snapshot_path.parent.mkdir(parents=True, exist_ok=True) |
| snapshot_path.write_text(aux_metrics_path.read_text(encoding="utf-8"), encoding="utf-8") |
| logger.info(f" 📝 Saved auxiliary metrics snapshot: {snapshot_path}") |
| except Exception as e: |
| logger.warning(f" ⚠️ Failed to save auxiliary metrics snapshot: {e}") |
| |
| def _aux_failure_metrics(error_type: str, error_message: str, error_detail: str = "") -> Dict[str, Any]: |
| |
| |
| return { |
| "aux_metric_eval_success": None, |
| "aux_metric_error_code": float({ |
| "syntax": 1, |
| "import": 2, |
| "runtime": 3, |
| "invalid_return": 4, |
| }.get(error_type, 99)), |
| "aux_metric_non_numeric_dropped_count": None, |
| "aux_metric_error_message_length": float(len(error_message or "")), |
| "aux_metric_error_detail_length": float(len(error_detail or "")), |
| } |
|
|
| def _normalize_aux_metrics(raw: Dict[str, Any]) -> Dict[str, Any]: |
| """ |
| Keep numeric/bool metrics usable for time-series analysis. |
| Preserve error context in dedicated metadata fields. |
| """ |
| normalized: Dict[str, Any] = {} |
| dropped_non_numeric = 0 |
|
|
| for key, value in raw.items(): |
| if isinstance(value, bool): |
| normalized[key] = float(value) |
| elif isinstance(value, numbers.Real): |
| if math.isnan(float(value)) or math.isinf(float(value)): |
| dropped_non_numeric += 1 |
| else: |
| normalized[key] = float(value) |
| elif isinstance(value, str) and key == "error": |
| normalized["aux_metric_error_code"] = max(float(normalized.get("aux_metric_error_code", 0.0)), 3.0) |
| normalized["aux_metric_error_message_length"] = float(len(value)) |
| dropped_non_numeric += 1 |
| elif isinstance(value, str) and key == "traceback": |
| normalized["aux_metric_error_detail_length"] = float(len(value)) |
| dropped_non_numeric += 1 |
| else: |
| dropped_non_numeric += 1 |
|
|
| normalized.setdefault("aux_metric_eval_success", 1.0) |
| normalized.setdefault("aux_metric_error_code", 0.0) |
| normalized.setdefault("aux_metric_error_message_length", 0.0) |
| normalized.setdefault("aux_metric_error_detail_length", 0.0) |
| normalized["aux_metric_non_numeric_dropped_count"] = float(dropped_non_numeric) |
| return normalized |
|
|
| try: |
| |
| |
| project_root = str(Path(__file__).resolve().parent.parent) |
| if project_root not in sys.path: |
| sys.path.insert(0, project_root) |
|
|
| |
| try: |
| aux_source = aux_metrics_path.read_text(encoding="utf-8") |
| compile(aux_source, str(aux_metrics_path), "exec") |
| except SyntaxError as e: |
| logger.error(f"❌ auxiliary_metrics.py syntax error: {e}") |
| failure = _aux_failure_metrics( |
| error_type="syntax", |
| error_message=str(e), |
| error_detail=traceback.format_exc(), |
| ) |
| behavior_logging.log_event( |
| request.results_dir, |
| "aux_eval_end", |
| { |
| "generation": request.generation, |
| "success": False, |
| "error_type": "syntax", |
| "aux_metric_error_code": failure.get("aux_metric_error_code", 99.0), |
| }, |
| ) |
| return failure |
|
|
| |
| import importlib.util |
| spec = importlib.util.spec_from_file_location("auxiliary_metrics", str(aux_metrics_path)) |
| aux_module = importlib.util.module_from_spec(spec) |
| try: |
| spec.loader.exec_module(aux_module) |
| except Exception as e: |
| logger.error(f"❌ Failed importing auxiliary metrics module: {e}", exc_info=True) |
| failure = _aux_failure_metrics( |
| error_type="import", |
| error_message=str(e), |
| error_detail=traceback.format_exc(), |
| ) |
| behavior_logging.log_event( |
| request.results_dir, |
| "aux_eval_end", |
| { |
| "generation": request.generation, |
| "success": False, |
| "error_type": "import", |
| "aux_metric_error_code": failure.get("aux_metric_error_code", 99.0), |
| }, |
| ) |
| return failure |
| |
| |
| if not hasattr(aux_module, 'evaluate_aux'): |
| logger.warning(" ⚠️ No 'evaluate_aux' function found in auxiliary_metrics.py") |
| logger.warning(" Please implement: def evaluate_aux(results_dir: str) -> Dict[str, Any]") |
| behavior_logging.log_event( |
| request.results_dir, |
| "aux_eval_end", |
| {"generation": request.generation, "success": True, "skipped": True, "reason": "evaluate_aux_missing"}, |
| ) |
| return {} |
| |
| evaluate_func = getattr(aux_module, 'evaluate_aux') |
| logger.info(f" ✅ Found evaluate_aux function") |
| |
| |
| import inspect |
| sig = inspect.signature(evaluate_func) |
| params = list(sig.parameters.keys()) |
| |
| |
| if not params: |
| |
| logger.info(f" 📞 Calling evaluate_aux()") |
| result = await asyncio.wait_for(asyncio.to_thread(evaluate_func), timeout=30.0) |
| |
| elif len(params) == 1: |
| |
| param_name = params[0] |
| |
| if 'results_dir' in param_name.lower() or 'gen_results_dir' in param_name.lower(): |
| |
| gen_dir = str(Path(request.results_dir).parent) |
| logger.info(f" 📞 Calling evaluate_aux(results_dir='{gen_dir}')") |
| result = await asyncio.wait_for(asyncio.to_thread(evaluate_func, gen_dir), timeout=30.0) |
| |
| elif 'gen_path' in param_name.lower() or 'generation_dir' in param_name.lower(): |
| |
| gen_path = str(Path(request.results_dir).parent) |
| logger.info(f" 📞 Calling evaluate_aux(gen_path='{gen_path}')") |
| result = await asyncio.wait_for(asyncio.to_thread(evaluate_func, gen_path), timeout=30.0) |
| |
| else: |
| |
| logger.info(f" 📞 Calling evaluate_aux('{request.results_dir}')") |
| result = await asyncio.wait_for(asyncio.to_thread(evaluate_func, request.results_dir), timeout=30.0) |
| |
| elif 'primary_result' in params: |
| |
| |
| gen_dir = str(Path(request.results_dir).parent) |
| logger.info(f" 📞 Calling evaluate_aux with results_dir and primary_result") |
| result = await asyncio.wait_for(asyncio.to_thread( |
| evaluate_func, |
| results_dir=gen_dir, |
| primary_result=primary_result |
| ), timeout=30.0) |
| |
| else: |
| |
| logger.info(f" 📞 Calling evaluate_aux with keyword arguments") |
| result = await asyncio.wait_for(asyncio.to_thread( |
| evaluate_func, |
| results_dir=request.results_dir |
| ), timeout=30.0) |
| |
| |
| if not isinstance(result, dict): |
| logger.error(f" ❌ evaluate_aux must return dict, got {type(result)}") |
| failure = _aux_failure_metrics( |
| error_type="invalid_return", |
| error_message=f"Invalid return type: {type(result).__name__}", |
| ) |
| behavior_logging.log_event( |
| request.results_dir, |
| "aux_eval_end", |
| { |
| "generation": request.generation, |
| "success": False, |
| "error_type": "invalid_return", |
| "aux_metric_error_code": failure.get("aux_metric_error_code", 99.0), |
| }, |
| ) |
| return failure |
|
|
| |
| if "error" in result: |
| logger.warning(f" ⚠️ evaluate_aux returned error: {result.get('error')}") |
| else: |
| logger.info(f" ✅ evaluate_aux returned {len(result)} metrics") |
| |
| sample_metrics = list(result.items())[:3] |
| for key, value in sample_metrics: |
| logger.info(f" - {key}: {value}") |
| if len(result) > 3: |
| logger.info(f" ... and {len(result) - 3} more") |
|
|
| normalized = _normalize_aux_metrics(result) |
| behavior_logging.log_event( |
| request.results_dir, |
| "aux_eval_end", |
| { |
| "generation": request.generation, |
| "success": True, |
| "metric_key_count": len(normalized), |
| "aux_metric_eval_success": normalized.get("aux_metric_eval_success", 1.0), |
| "aux_metric_error_code": normalized.get("aux_metric_error_code", 0.0), |
| "aux_metric_non_numeric_dropped_count": normalized.get("aux_metric_non_numeric_dropped_count", 0.0), |
| }, |
| ) |
| return normalized |
| |
| except (asyncio.TimeoutError, TimeoutError) as e: |
| logger.warning(f"⏱️ Auxiliary metrics timed out (30s limit): {e}") |
| failure = _aux_failure_metrics( |
| error_type="timeout", |
| error_message="auxiliary_metrics.py exceeded 30s timeout", |
| ) |
| behavior_logging.log_event( |
| request.results_dir, |
| "aux_eval_end", |
| { |
| "generation": request.generation, |
| "success": False, |
| "error_type": "timeout", |
| "aux_metric_error_code": failure.get("aux_metric_error_code", 99.0), |
| }, |
| ) |
| return failure |
|
|
| except Exception as e: |
| logger.error(f"❌ Failed to run auxiliary metrics: {e}", exc_info=True) |
| failure = _aux_failure_metrics( |
| error_type="runtime", |
| error_message=str(e), |
| error_detail=traceback.format_exc(), |
| ) |
| behavior_logging.log_event( |
| request.results_dir, |
| "aux_eval_end", |
| { |
| "generation": request.generation, |
| "success": False, |
| "error_type": "runtime", |
| "aux_metric_error_code": failure.get("aux_metric_error_code", 99.0), |
| }, |
| ) |
| return failure |
|
|
|
|
| def _extract_metric_descriptions(eval_agents_md_path: Path) -> Dict[str, str]: |
| """ |
| Extract metric descriptions from EVAL_AGENTS.md |
| |
| Parses the markdown file to extract metric names and their descriptions. |
| |
| Args: |
| eval_agents_md_path: Path to EVAL_AGENTS.md |
| |
| Returns: |
| Dict mapping metric names to their descriptions |
| """ |
| descriptions = {} |
| |
| try: |
| content = eval_agents_md_path.read_text() |
| |
| |
| |
| |
| |
| |
| |
| import re |
| |
| |
| pattern1 = r'-\s+`([^`]+)`:\s*([^\n]+(?:\n(?!\s*[-*])[^\n]+)*)' |
| for match in re.finditer(pattern1, content): |
| metric_name = match.group(1).strip() |
| description = match.group(2).strip() |
| |
| description = re.sub(r'\s+', ' ', description) |
| descriptions[metric_name] = description |
|
|
| |
| pattern2 = r'\*\s+\*\*`([^`]+)`\*\*:\s*([^\n]+(?:\n(?!\s*[-*])[^\n]+)*)' |
| for match in re.finditer(pattern2, content): |
| metric_name = match.group(1).strip() |
| description = match.group(2).strip() |
| description = re.sub(r'\s+', ' ', description) |
| descriptions[metric_name] = description |
| |
| |
| |
| |
| |
| logger.info(f"Extracted {len(descriptions)} metric descriptions from {eval_agents_md_path}") |
| |
| except Exception as e: |
| logger.error(f"Failed to parse EVAL_AGENTS.md: {e}") |
| |
| return descriptions |
|
|
|
|
| def save_metrics_file(results_dir: str, metrics: Dict[str, Any]): |
| """ |
| Save complete metrics to metrics.json |
| |
| Args: |
| results_dir: Directory to save metrics.json |
| metrics: Complete metrics dict |
| """ |
| metrics_path = Path(results_dir) / "metrics.json" |
| |
| try: |
| metrics_path.parent.mkdir(parents=True, exist_ok=True) |
| safe_metrics = _sanitize_for_json(metrics) |
| |
| with open(metrics_path, 'w') as f: |
| json.dump(safe_metrics, f, indent=2, allow_nan=False) |
| |
| logger.info(f"💾 Saved metrics to {metrics_path}") |
| |
| except Exception as e: |
| logger.error(f"❌ Failed to save metrics: {e}") |
|
|
|
|
| async def _evaluate_agent_candidate( |
| candidate_code: str, |
| request: GenerationCompleteRequest, |
| ev2_agent, |
| ) -> Dict[str, Any]: |
| """ |
| Evaluate the agent's candidate code using the primary evaluator. |
| |
| Writes the candidate to a temp file, runs primary evaluation, returns result. |
| """ |
| import tempfile |
|
|
| |
| ext = Path(request.code_path).suffix or ".cpp" |
|
|
| |
| with tempfile.NamedTemporaryFile( |
| mode="w", suffix=ext, prefix="agent_candidate_", delete=False, encoding="utf-8" |
| ) as f: |
| f.write(candidate_code) |
| candidate_path = f.name |
|
|
| |
| candidate_results_dir = Path(request.results_dir).parent / "agent_candidate_results" |
| candidate_results_dir.mkdir(parents=True, exist_ok=True) |
|
|
| try: |
| result = await asyncio.to_thread( |
| _execute_primary_evaluator_sync, |
| candidate_path, |
| str(candidate_results_dir), |
| request.evaluator_module, |
| request.evaluator_function, |
| request.evaluator_kwargs, |
| ) |
| return { |
| "combined_score": result.get("combined_score", 0.0), |
| "correct": result.get("correct", False), |
| "code": candidate_code, |
| "public": result.get("public", {}), |
| "text_feedback": result.get("text_feedback", ""), |
| } |
| except Exception as e: |
| logger.error(f"Agent candidate evaluation error: {e}") |
| return { |
| "combined_score": 0.0, |
| "correct": False, |
| "code": candidate_code, |
| "error": str(e), |
| } |
| finally: |
| |
| try: |
| Path(candidate_path).unlink(missing_ok=True) |
| except Exception: |
| pass |
|
|
|
|
| async def run_full_evaluation(job_id: str, request: GenerationCompleteRequest): |
| """ |
| Complete evaluation pipeline (async background task) |
| |
| 1. Run primary evaluator |
| 2. Run auxiliary evaluators (if exist) |
| 3. Save metrics.json |
| 4. Record to history |
| 5. Decide whether to trigger agent |
| 6. If triggered: run agent analysis |
| |
| Args: |
| job_id: Job ID for tracking |
| request: Generation complete request |
| """ |
| logger.info("=" * 80) |
| logger.info(f"🚀 Starting full evaluation for generation {request.generation}") |
| logger.info("=" * 80) |
| |
| if not await _update_evaluation_job(job_id, status="running"): |
| logger.warning(f"Job {job_id} missing before start; continuing without job tracking") |
| |
| try: |
| async with _state_lock: |
| local_service_state = service_state |
| local_ev2_agent = ev2_agent |
| local_results_root = ( |
| str(service_config.results_dir) |
| if service_config and service_config.results_dir |
| else "" |
| ) |
|
|
| if local_service_state is None: |
| raise RuntimeError("Service state not initialized") |
|
|
| |
| logger.info(f"📊 Step 1/6: Running primary evaluator...") |
| primary_result = await run_primary_evaluator(request) |
| |
| |
| logger.info(f"📊 Step 2/6: Running auxiliary evaluators...") |
| auxiliary_results = await run_auxiliary_evaluators( |
| request, |
| primary_result, |
| experiment_root=local_results_root, |
| ) |
| |
| |
| auxiliary_descriptions = {} |
| if auxiliary_results: |
| try: |
| eval_agents_md = Path(local_results_root) / "eval_agent_memory" / "EVAL_AGENTS.md" |
| if eval_agents_md.exists(): |
| auxiliary_descriptions = _extract_metric_descriptions(eval_agents_md) |
| logger.info(f"Extracted {len(auxiliary_descriptions)} metric descriptions") |
| except Exception as e: |
| logger.warning(f"Failed to extract metric descriptions: {e}") |
| |
| |
| |
| if ev2_agent is not None: |
| diagnostic = ev2_agent._extract_diagnostic_report() |
| if diagnostic: |
| existing = primary_result.get("text_feedback", "") |
| primary_result["text_feedback"] = (existing + "\n\n" + diagnostic) if existing else diagnostic |
| logger.info(f"Injected diagnostic report ({len(diagnostic)} chars) into primary_result") |
|
|
| |
| complete_result = { |
| "combined_score": primary_result.get("combined_score", 0.0), |
| "correct": primary_result.get("correct", False), |
| "primary": primary_result, |
| "auxiliary": auxiliary_results, |
| "auxiliary_descriptions": auxiliary_descriptions, |
| "timestamp": time.time(), |
| "generation": request.generation |
| } |
| complete_result = _sanitize_for_json(complete_result) |
|
|
| |
| if auxiliary_results and _should_suppress_aux_metrics(local_results_root, request.generation): |
| logger.warning("⚠️ Auto-suppressing auxiliary metrics due to negative correlation with score") |
| auxiliary_results = {} |
| auxiliary_descriptions = {} |
|
|
| |
| |
| public_metrics = dict(primary_result.get("public", {})) |
| if auxiliary_results: |
| for key, value in auxiliary_results.items(): |
| if isinstance(value, dict) and "error" not in value: |
| for sub_key, sub_value in value.items(): |
| if isinstance(sub_value, (int, float, bool, str)): |
| public_metrics[f"aux_{sub_key}"] = sub_value |
| elif isinstance(value, (int, float, bool, str)): |
| public_metrics[f"aux_{key}"] = value |
|
|
| text_feedback = primary_result.get("text_feedback", "") |
|
|
| if auxiliary_descriptions: |
| desc_text = "\n\n# Auxiliary Metrics Guide\n\n" |
| for metric_name, description in auxiliary_descriptions.items(): |
| desc_text += f"- **{metric_name}**: {description}\n" |
| text_feedback = (text_feedback + desc_text) if text_feedback else desc_text |
|
|
| disk_metrics = { |
| "combined_score": primary_result.get("combined_score", 0.0), |
| "public": public_metrics, |
| "private": primary_result.get("private", {}), |
| "text_feedback": text_feedback, |
| "_eval_service": { |
| "auxiliary": auxiliary_results, |
| "auxiliary_descriptions": auxiliary_descriptions, |
| }, |
| } |
|
|
| |
| logger.info(f"📊 Step 3/6: Saving metrics.json...") |
| save_metrics_file(request.results_dir, _sanitize_for_json(disk_metrics)) |
| |
| |
| logger.info(f"📊 Step 4/6: Recording to history...") |
| local_service_state.add_generation({ |
| "generation": request.generation, |
| "primary_score": complete_result["combined_score"], |
| "results_dir": request.results_dir, |
| "timestamp": time.time() |
| }) |
| |
| |
| logger.info(f"📊 Step 5/6: Deciding whether to trigger agent...") |
| should_trigger, reason = local_service_state.should_trigger_agent( |
| request.generation, |
| complete_result["combined_score"] |
| ) |
| behavior_logging.log_event( |
| request.results_dir, |
| "trigger_decision", |
| { |
| "generation": request.generation, |
| "mode": "evaluation", |
| "should_trigger": bool(should_trigger), |
| "reason": reason, |
| "primary_score": complete_result["combined_score"], |
| }, |
| ) |
| |
| logger.info(f" Decision: {'🧠 TRIGGER' if should_trigger else '⏭️ SKIP'} - {reason}") |
| |
| |
| if should_trigger and local_ev2_agent is not None: |
| logger.info(f"📊 Step 6/6: Running agent analysis...") |
| try: |
| agent_result = await local_ev2_agent.analyze_generation(request.generation) |
| complete_result["agent_analysis"] = agent_result |
|
|
| |
| candidate_code = agent_result.get("candidate_code") |
| if candidate_code and not agent_result.get("skipped"): |
| logger.info("🔬 Evaluating agent candidate code...") |
| try: |
| candidate_eval = await _evaluate_agent_candidate( |
| candidate_code, request, local_ev2_agent |
| ) |
| complete_result["agent_candidate"] = candidate_eval |
| logger.info( |
| f"🔬 Agent candidate score: {candidate_eval.get('combined_score', 0):.2f} " |
| f"(current best: {complete_result['combined_score']:.2f})" |
| ) |
| except Exception as e: |
| logger.error(f"❌ Agent candidate evaluation failed: {e}") |
| complete_result["agent_candidate"] = {"error": str(e)} |
|
|
| |
| if not agent_result.get("skipped"): |
| |
| active_aux = sorted( |
| k for k in public_metrics if k.startswith("aux_") |
| ) |
| local_service_state.mark_agent_triggered( |
| request.generation, active_metrics=active_aux |
| ) |
| behavior_logging.log_event( |
| request.results_dir, |
| "agent_trigger_result", |
| { |
| "generation": request.generation, |
| "mode": "evaluation", |
| "success": agent_result.get("success", True), |
| "skipped": agent_result.get("skipped", False), |
| "elapsed_seconds": agent_result.get("elapsed_seconds", 0), |
| }, |
| ) |
| logger.info( |
| f"✅ Agent analysis completed" |
| if not agent_result.get("skipped") |
| else f"⏭️ Agent analysis skipped (busy)" |
| ) |
| except Exception as e: |
| logger.error(f"❌ Agent analysis failed: {e}", exc_info=True) |
| complete_result["agent_analysis"] = {"error": str(e)} |
| behavior_logging.log_event( |
| request.results_dir, |
| "agent_trigger_result", |
| { |
| "generation": request.generation, |
| "mode": "evaluation", |
| "success": False, |
| "error": str(e), |
| }, |
| ) |
| else: |
| logger.info(f"📊 Step 6/6: Agent not triggered") |
| |
| |
| updated = await _update_evaluation_job( |
| job_id, |
| status="completed", |
| result=_sanitize_for_json(complete_result), |
| completed_at=time.time(), |
| ) |
| if not updated: |
| logger.warning(f"Job {job_id} was cleared before completion state update") |
| |
| logger.info("=" * 80) |
| logger.info(f"✅ Full evaluation completed for generation {request.generation}") |
| logger.info(f" Score: {complete_result['combined_score']:.4f}") |
| logger.info(f" Auxiliary metrics: {len(auxiliary_results)} metrics") |
| logger.info(f" Agent triggered: {should_trigger}") |
| logger.info("=" * 80) |
| |
| except Exception as e: |
| logger.error("=" * 80) |
| logger.error(f"❌ Full evaluation failed for generation {request.generation}") |
| logger.error(f" Error: {e}") |
| logger.error("=" * 80) |
| |
| updated = await _update_evaluation_job( |
| job_id, |
| status="failed", |
| error=str(e), |
| completed_at=time.time(), |
| ) |
| if not updated: |
| logger.warning(f"Job {job_id} was cleared before failure state update") |
|
|
|
|
| |
| |
| |
|
|
| @app.post("/api/v1/initialize", response_model=InitializeResponse) |
| async def initialize_service(request: InitializeRequest): |
| """ |
| Initialize/reset service state for a new experiment. |
| |
| This endpoint provides explicit control over experiment initialization. |
| It resets all state, clears old jobs, and prepares the service for a fresh start. |
| |
| Thread-safe: Uses lock to prevent concurrent initialization conflicts. |
| """ |
| global ev2_agent, service_config, service_state |
|
|
| start_time = time.time() |
|
|
| |
| async with _state_lock: |
| logger.info("=" * 80) |
| logger.info("🔧 INITIALIZE: Resetting service state for new experiment") |
| logger.info(f" Raw results_dir: {request.results_dir}") |
| logger.info("=" * 80) |
|
|
| experiment_root = _normalize_experiment_root(request.results_dir) |
| experiment_root_str = str(experiment_root) |
|
|
| try: |
| if service_config is None: |
| service_config = ServiceConfig() |
|
|
| |
| prev_trigger_mode = service_config.trigger_mode |
| prev_trigger_interval = service_config.trigger_interval |
|
|
| service_config.results_dir = experiment_root_str |
| if request.primary_evaluator: |
| service_config.primary_evaluator_path = request.primary_evaluator |
| if request.experiment_name: |
| service_config.experiment_name = request.experiment_name |
| if request.trigger_mode: |
| service_config.trigger_mode = request.trigger_mode |
| logger.info( |
| f" 🔁 trigger_mode override: {prev_trigger_mode} -> {service_config.trigger_mode}" |
| ) |
| if request.trigger_interval is not None: |
| service_config.trigger_interval = request.trigger_interval |
| logger.info( |
| f" 🔁 trigger_interval override: {prev_trigger_interval} -> {service_config.trigger_interval}" |
| ) |
| if request.problem_statement: |
| service_config.problem_statement = request.problem_statement |
| if request.evaluator_kwargs: |
| service_config.evaluator_kwargs = request.evaluator_kwargs |
|
|
| |
| service_state = ServiceState(service_config, force_clean=True) |
| logger.info(f" ✅ Service state reset (clean start)") |
|
|
| |
| evaluation_jobs.clear() |
| logger.info(f" ✅ Evaluation jobs cleared") |
|
|
| |
| ev2_agent = IntegratedEV2Agent( |
| results_dir=experiment_root_str, |
| primary_evaluator_path=service_config.primary_evaluator_path, |
| config=service_config, |
| problem_statement=service_config.problem_statement, |
| evaluator_kwargs=service_config.evaluator_kwargs, |
| ) |
| logger.info(f" ✅ Agent initialized") |
|
|
| processing_time = (time.time() - start_time) * 1000 |
|
|
| logger.info("=" * 80) |
| logger.info("✅ INITIALIZE COMPLETE") |
| logger.info(f" Experiment root: {experiment_root_str}") |
| logger.info(f" Processing time: {processing_time:.1f}ms") |
| logger.info("=" * 80) |
|
|
| return InitializeResponse( |
| status="ready", |
| message="Service initialized for new experiment", |
| results_dir=experiment_root_str, |
| agent_initialized=True, |
| processing_time_ms=processing_time |
| ) |
| except Exception as e: |
| logger.error(f"❌ Initialize failed: {e}", exc_info=True) |
| processing_time = (time.time() - start_time) * 1000 |
| return InitializeResponse( |
| status="error", |
| message=str(e), |
| results_dir=experiment_root_str, |
| agent_initialized=False, |
| processing_time_ms=processing_time |
| ) |
|
|
| @app.post("/api/v1/generation/complete", response_model=ServiceResponse) |
| async def generation_complete( |
| request: GenerationCompleteRequest, |
| background_tasks: BackgroundTasks |
| ): |
| """ |
| Generation complete handler (evaluation mode only). |
| """ |
| global ev2_agent, service_config, service_state |
| |
| start_time = time.time() |
| logger.info("=" * 80) |
| logger.info(f"📊 EVALUATION MODE: Generation {request.generation}") |
| logger.info(f" Code path: {request.code_path}") |
| logger.info(f" Evaluator: {request.evaluator_module}.{request.evaluator_function}") |
| logger.info("=" * 80) |
| |
| |
| |
| experiment_root = _normalize_experiment_root(request.results_dir) |
| experiment_root_str = str(experiment_root) |
| async with _state_lock: |
| if service_config is None: |
| service_config = ServiceConfig() |
|
|
| current_results_dir = str(service_config.results_dir) if service_config.results_dir else "" |
| |
| |
| if ev2_agent is None or experiment_root_str != current_results_dir: |
| if ev2_agent is None: |
| logger.warning("⚠️ DEPRECATED: Auto-initialization on first generation.") |
| logger.warning(" Please call POST /api/v1/initialize before sending generations.") |
| logger.info(f"🔧 Initializing agent for experiment: {experiment_root_str}") |
| else: |
| logger.warning("⚠️ DEPRECATED: Auto-detection of new experiment.") |
| logger.warning(" Please call POST /api/v1/initialize for new experiments.") |
| logger.info(f"🔄 Detected new experiment - Reinitializing state and agent: {experiment_root_str}") |
| |
| try: |
| service_config.results_dir = experiment_root_str |
| |
| |
| service_state = ServiceState(service_config) |
| logger.info(" ✅ Service state reset for new experiment") |
| |
| |
| evaluation_jobs.clear() |
| logger.info(" ✅ Evaluation jobs cleared") |
| |
| ev2_agent = IntegratedEV2Agent( |
| results_dir=experiment_root_str, |
| primary_evaluator_path=service_config.primary_evaluator_path, |
| config=service_config, |
| problem_statement=service_config.problem_statement, |
| evaluator_kwargs=service_config.evaluator_kwargs, |
| ) |
| logger.info(" ✅ Agent initialized") |
| except Exception as e: |
| logger.error(f"❌ Failed to initialize agent: {e}", exc_info=True) |
| |
| |
| job_id = f"eval_{request.generation}_{int(time.time())}" |
| evaluation_jobs[job_id] = { |
| "status": "pending", |
| "request": request, |
| "generation": request.generation, |
| "result": None, |
| "created_at": time.time(), |
| "completed_at": None |
| } |
| |
| |
| background_tasks.add_task( |
| run_full_evaluation, |
| job_id=job_id, |
| request=request |
| ) |
| |
| |
| processing_time = (time.time() - start_time) * 1000 |
| |
| logger.info(f"✅ Evaluation job submitted: {job_id} (response time: {processing_time:.1f}ms)") |
| |
| return ServiceResponse( |
| status="accepted", |
| message=f"Evaluation started for generation {request.generation}", |
| generation=request.generation, |
| job_id=job_id, |
| estimated_time=15.0, |
| agent_triggered=False, |
| trigger_reason="Will be determined after evaluation", |
| processing_time_ms=processing_time |
| ) |
|
|
|
|
| @app.get("/api/v1/generation/{generation}/status") |
| async def get_generation_status(generation: int): |
| """ |
| Query evaluation status for a specific generation |
| |
| Args: |
| generation: Generation number |
| |
| Returns: |
| Status and result (if completed) |
| """ |
| |
| job = None |
| job_id = None |
| async with _state_lock: |
| for jid, j in evaluation_jobs.items(): |
| if j["generation"] == generation: |
| job = dict(j) |
| job_id = jid |
| break |
| |
| if job is None: |
| raise HTTPException( |
| status_code=404, |
| detail=f"Generation {generation} not found in evaluation jobs" |
| ) |
| |
| response = { |
| "generation": generation, |
| "job_id": job_id, |
| "status": job["status"], |
| "created_at": job["created_at"], |
| "elapsed_time": time.time() - job["created_at"] |
| } |
| |
| if job["status"] == "completed": |
| response["result"] = job.get("result") |
| response["completed_at"] = job.get("completed_at") |
| elif job["status"] == "failed": |
| response["error"] = job.get("error") |
| |
| return _sanitize_for_json(response) |
|
|
|
|
| |
| |
| @app.get("/api/v1/evaluate/{job_id}") |
| async def get_evaluation_job_status_deprecated(job_id: str): |
| """ |
| [DEPRECATED] Query evaluation status by job ID |
| |
| Use /api/v1/generation/{generation}/status instead. |
| This endpoint is kept for backward compatibility only. |
| |
| Args: |
| job_id: Job ID returned by generation complete |
| |
| Returns: |
| Status and result (if completed) |
| """ |
| async with _state_lock: |
| if job_id not in evaluation_jobs: |
| raise HTTPException( |
| status_code=404, |
| detail=f"Job {job_id} not found. Use /api/v1/generation/{{generation}}/status instead." |
| ) |
| |
| job = dict(evaluation_jobs[job_id]) |
| |
| response = { |
| "job_id": job_id, |
| "generation": job["generation"], |
| "status": job["status"], |
| "created_at": job["created_at"], |
| "elapsed_time": time.time() - job["created_at"], |
| "deprecated": True, |
| "message": "Use /api/v1/generation/{generation}/status instead" |
| } |
| |
| if job["status"] == "completed": |
| response["result"] = job.get("result") |
| response["completed_at"] = job.get("completed_at") |
| elif job["status"] == "failed": |
| response["error"] = job.get("error") |
| |
| return _sanitize_for_json(response) |
|
|
|
|
| @app.get("/api/v1/status", response_model=ServiceStatusResponse) |
| async def get_status(): |
| """Get service status""" |
| if service_state is None or service_config is None: |
| raise HTTPException(status_code=500, detail="Service not initialized") |
| |
| return ServiceStatusResponse( |
| status="running", |
| uptime_seconds=time.time() - service_state.start_time, |
| version="2.0.0-standalone", |
| experiment={ |
| "name": service_config.experiment_name, |
| "results_dir": service_config.results_dir, |
| "primary_evaluator": service_config.primary_evaluator_path |
| }, |
| statistics=service_state.get_statistics(), |
| config={ |
| "trigger_mode": service_config.trigger_mode, |
| "trigger_interval": service_config.trigger_interval, |
| "agent_enabled": service_config.agent_enabled, |
| "agent_initialized": ev2_agent is not None |
| } |
| ) |
|
|
|
|
| @app.post("/api/v1/trigger/manual") |
| async def trigger_manual(generation: int): |
| """Manually trigger agent analysis for a specific generation""" |
| logger.info(f"🔧 Manual trigger: generation {generation}") |
| |
| if ev2_agent is None: |
| raise HTTPException(status_code=500, detail="Agent not initialized") |
| |
| |
| gen_data = None |
| for g in reversed(service_state.generation_history): |
| if g['generation'] == generation: |
| gen_data = g |
| break |
| |
| if gen_data is None: |
| raise HTTPException( |
| status_code=404, |
| detail=f"Generation {generation} not found in history" |
| ) |
| |
| |
| try: |
| result = await ev2_agent.analyze_generation(generation) |
| service_state.mark_agent_triggered(generation, active_metrics=[]) |
| |
| return { |
| "status": "success", |
| "message": f"Agent triggered for generation {generation}", |
| "result": result |
| } |
| except Exception as e: |
| raise HTTPException(status_code=500, detail=f"Agent failed: {str(e)}") |
|
|
|
|
| @app.get("/") |
| async def root(): |
| """Root endpoint""" |
| return { |
| "service": "EV2 Evaluation Service (Standalone)", |
| "version": "2.0.0", |
| "status": "running", |
| "docs": "/docs" |
| } |
|
|
|
|
| |
| |
| |
|
|
| def main(): |
| """Main entry point""" |
| import argparse |
| |
| parser = argparse.ArgumentParser(description="EV2 Evaluation Service (Standalone)") |
| parser.add_argument("--worker-eval-request", type=str, help=argparse.SUPPRESS) |
| parser.add_argument("--worker-eval-output", type=str, help=argparse.SUPPRESS) |
| |
| parser.add_argument( |
| "--config", |
| type=str, |
| help="Path to YAML config file" |
| ) |
| |
| |
| parser.add_argument("--results-dir", type=str, help="Results directory") |
| parser.add_argument("--primary-evaluator", type=str, help="Path to primary evaluator") |
| parser.add_argument("--evaluation-timeout", type=float, default=300.0, |
| help="Timeout for each evaluation in seconds (default: 300)") |
| parser.add_argument("--trigger-mode", type=str, default="periodic", |
| choices=["always", "periodic", "plateau", "mixed"]) |
| parser.add_argument("--trigger-interval", type=int, default=10) |
| parser.add_argument("--port", type=int, default=8765) |
| parser.add_argument("--host", type=str, default="0.0.0.0") |
| |
| args = parser.parse_args() |
|
|
| |
| if args.worker_eval_request or args.worker_eval_output: |
| if not args.worker_eval_request or not args.worker_eval_output: |
| logger.error( |
| "Both --worker-eval-request and --worker-eval-output are required in worker mode" |
| ) |
| raise SystemExit(2) |
| raise SystemExit( |
| _run_primary_evaluator_worker( |
| request_path=args.worker_eval_request, |
| output_path=args.worker_eval_output, |
| ) |
| ) |
| |
| global service_config |
| |
| |
| if args.config: |
| logger.info(f"📋 Loading config from {args.config}") |
| service_config = ServiceConfig.from_yaml(args.config) |
| else: |
| |
| |
| |
| service_config = ServiceConfig( |
| results_dir=args.results_dir or "", |
| primary_evaluator_path=args.primary_evaluator or "", |
| evaluation_timeout=args.evaluation_timeout, |
| trigger_mode=args.trigger_mode, |
| trigger_interval=args.trigger_interval, |
| host=args.host, |
| port=args.port |
| ) |
| |
| |
| logger.info(f"🚀 Starting server on {service_config.host}:{service_config.port}") |
| uvicorn.run( |
| app, |
| host=service_config.host, |
| port=service_config.port, |
| log_level=service_config.log_level.lower() |
| ) |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|