shinka-backup / eval_agent /ev2_service_standalone.py
JustinTX's picture
Add files using upload-large-folder tool
6f90f5c verified
"""
EV2 Service - Standalone Version
A complete, self-contained evaluation service that integrates OpenHands agent
directly without depending on ev2.py.
Key features:
- Event-driven architecture (receive generation notifications)
- Autonomous decision-making (when to trigger agent)
- Persistent agent state (across generations)
- Direct OpenHands integration (no wrapper)
Author: Evolution Evaluation System
Version: 2.0 (Standalone)
"""
import os
import sys
import json
import time
import logging
import asyncio
import traceback
import tempfile
import math
import hashlib
import numbers
from pathlib import Path
from typing import Dict, Any, Optional, List
from dataclasses import dataclass, asdict
try:
import eval_agent.logging as behavior_logging
except ModuleNotFoundError:
import importlib.util
_behavior_logging_path = Path(__file__).with_name("logging.py")
_behavior_logging_spec = importlib.util.spec_from_file_location(
"behavior_logging", str(_behavior_logging_path)
)
behavior_logging = importlib.util.module_from_spec(_behavior_logging_spec)
assert _behavior_logging_spec is not None and _behavior_logging_spec.loader is not None
_behavior_logging_spec.loader.exec_module(behavior_logging)
try:
from eval_agent.utils import build_meta_recommendation_context_lines
except ModuleNotFoundError:
import importlib.util
_utils_path = Path(__file__).with_name("utils.py")
_utils_spec = importlib.util.spec_from_file_location("eval_agent_utils", str(_utils_path))
_utils_module = importlib.util.module_from_spec(_utils_spec)
assert _utils_spec is not None and _utils_spec.loader is not None
_utils_spec.loader.exec_module(_utils_module)
build_meta_recommendation_context_lines = _utils_module.build_meta_recommendation_context_lines
# FastAPI imports
from fastapi import FastAPI, HTTPException, BackgroundTasks
from fastapi.responses import JSONResponse
from pydantic import BaseModel, Field
import uvicorn
import yaml
# OpenHands imports (same as ev2.py)
from openhands.sdk import LLM, Agent, Conversation, Tool
from openhands.tools.file_editor import FileEditorTool
from openhands.tools.task_tracker import TaskTrackerTool
from openhands.tools.terminal import TerminalTool
# ============================================================================
# Configuration
# ============================================================================
@dataclass
class ServiceConfig:
"""Service configuration"""
# Server settings
host: str = "0.0.0.0"
port: int = 8765
log_level: str = "INFO"
# Experiment settings
experiment_name: str = ""
results_dir: str = ""
primary_evaluator_path: str = ""
problem_statement: str = "" # Problem description for diagnostic context
evaluator_kwargs: Optional[Dict[str, Any]] = None # Task-specific kwargs (e.g., problem_id, frontier_cs_dir)
# Evaluation settings
evaluation_timeout: float = 300.0 # Maximum time for evaluation (seconds), default 5 minutes
# Trigger strategy
trigger_mode: str = "periodic" # "periodic", "plateau", "mixed", "always"
trigger_interval: int = 10 # Run agent every N generations
plateau_threshold: float = 0.01
plateau_window: int = 10
# Agent settings
agent_enabled: bool = True
llm_model: str = "" # Empty = use env var
llm_api_key: str = "" # Empty = use env var
llm_base_url: str = "" # Empty = use env var
@classmethod
def from_yaml(cls, config_path: str) -> 'ServiceConfig':
"""Load config from YAML file"""
with open(config_path) as f:
data = yaml.safe_load(f)
return cls(
host=data.get('service', {}).get('host', '0.0.0.0'),
port=data.get('service', {}).get('port', 8765),
log_level=data.get('service', {}).get('log_level', 'INFO'),
experiment_name=data.get('experiment', {}).get('name', ''),
results_dir=data.get('experiment', {}).get('results_dir', ''),
primary_evaluator_path=data.get('experiment', {}).get('primary_evaluator', ''),
evaluation_timeout=data.get('evaluation', {}).get('timeout', 300.0),
trigger_mode=data.get('strategy', {}).get('trigger_mode', 'periodic'),
trigger_interval=data.get('strategy', {}).get('trigger_interval', 10),
plateau_threshold=data.get('strategy', {}).get('plateau_threshold', 0.01),
plateau_window=data.get('strategy', {}).get('plateau_window', 10),
agent_enabled=data.get('agent', {}).get('enabled', True),
llm_model=data.get('agent', {}).get('llm_model', ''),
llm_api_key=data.get('agent', {}).get('llm_api_key', ''),
llm_base_url=data.get('agent', {}).get('llm_base_url', ''),
)
# ============================================================================
# Request/Response Models
# ============================================================================
class GenerationCompleteRequest(BaseModel):
"""
Generation complete notification (evaluation mode only).
"""
generation: int = Field(..., description="Generation number")
results_dir: str = Field(..., description="Path to generation results directory")
code_path: str = Field(..., description="Path to the generated code")
evaluator_module: str = Field(..., description="Python module path (e.g., 'examples.circle_packing.evaluate')")
evaluator_function: str = Field("evaluate", description="Evaluator function name (default: 'evaluate')")
evaluator_kwargs: Optional[Dict[str, Any]] = Field(None, description="Additional kwargs for evaluator")
# ===== Optional metadata =====
stage: Optional[str] = Field(None, description="Evolution stage")
metadata: Optional[Dict[str, Any]] = Field(None, description="Additional metadata")
class ServiceResponse(BaseModel):
"""
Service response for async evaluation mode.
"""
status: str = Field(..., description="Status: accepted | completed | skipped | error")
message: str = Field(..., description="Human-readable message")
generation: int
# ===== Async evaluation mode =====
job_id: Optional[str] = Field(None, description="Job ID for async evaluation (evaluation mode only)")
estimated_time: Optional[float] = Field(None, description="Estimated evaluation time in seconds")
# ===== Agent decision info =====
agent_triggered: bool = Field(..., description="Whether agent was triggered")
trigger_reason: Optional[str] = Field(None, description="Why agent was/wasn't triggered")
# ===== Results (if completed) =====
evaluation_result: Optional[Dict[str, Any]] = Field(None, description="Evaluation result (if completed)")
insights: Optional[List[str]] = Field(None, description="Agent insights (if agent was triggered)")
auxiliary_metrics: Optional[Dict[str, Any]] = Field(None, description="Auxiliary metrics info")
# ===== Timing =====
processing_time_ms: float
class ServiceStatusResponse(BaseModel):
"""Service status information"""
status: str = "running"
uptime_seconds: float
version: str = "2.0.0-standalone"
experiment: Dict[str, Any]
statistics: Dict[str, Any]
config: Dict[str, Any]
class InitializeRequest(BaseModel):
"""
Initialize/reset service state for a new experiment.
"""
results_dir: str = Field(..., description="Experiment root directory")
primary_evaluator: Optional[str] = Field(None, description="Path to primary evaluator")
experiment_name: Optional[str] = Field(None, description="Experiment name")
trigger_mode: Optional[str] = Field(None, description="Trigger mode override")
trigger_interval: Optional[int] = Field(None, description="Trigger interval override")
problem_statement: Optional[str] = Field(None, description="Problem description for diagnostic context")
evaluator_kwargs: Optional[Dict[str, Any]] = Field(None, description="Task-specific kwargs (e.g., problem_id, frontier_cs_dir)")
class InitializeResponse(BaseModel):
"""
Initialize/reset response.
"""
status: str = Field(..., description="Status: ready | error")
message: str = Field(..., description="Human-readable message")
results_dir: str
agent_initialized: bool
processing_time_ms: float
# ============================================================================
# Integrated EV2 Agent
# ============================================================================
class IntegratedEV2Agent:
"""
Integrated EV2 Agent - Direct OpenHands Management
This class replaces the call to evolution_evaluation_agent() in ev2.py
with direct, integrated agent management.
Key differences from ev2.py:
- Agent instance can be persistent (reused across calls)
- State management is integrated with service
- No subprocess calls
"""
def __init__(self,
results_dir: str,
primary_evaluator_path: str,
config: ServiceConfig,
problem_statement: str = "",
evaluator_kwargs: Optional[Dict[str, Any]] = None):
"""
Initialize integrated agent
Args:
results_dir: Path to results directory
primary_evaluator_path: Path to primary evaluator (ground truth)
config: Service configuration
problem_statement: Problem description for diagnostic context
evaluator_kwargs: Task-specific kwargs (e.g., problem_id, frontier_cs_dir)
"""
# Store paths as absolute
self.results_dir = Path(results_dir).resolve()
self.primary_evaluator_path = Path(primary_evaluator_path).resolve()
self.config = config
self.problem_statement = problem_statement
self.evaluator_kwargs = evaluator_kwargs or {}
# Agent workspace (same as ev2.py: results_dir/eval_agent_memory)
self.workspace = self.results_dir / "eval_agent_memory"
self.workspace.mkdir(parents=True, exist_ok=True)
self._bootstrap_memory_files()
# Validate primary evaluator exists
if not self.primary_evaluator_path.exists():
raise FileNotFoundError(
f"Primary evaluator not found: {self.primary_evaluator_path}"
)
# Agent components (will be created on first use)
self._agent = None
self._llm = None
# Prevent concurrent agent runs (non-blocking: skip if busy)
self._run_lock = asyncio.Lock()
logging.info("=" * 80)
logging.info("✅ IntegratedEV2Agent Initialized")
logging.info("=" * 80)
logging.info(f"Results Dir: {self.results_dir}")
logging.info(f"Workspace: {self.workspace}")
logging.info(f"Primary Evaluator: {self.primary_evaluator_path}")
logging.info("=" * 80)
def _get_last_agent_trigger_gen(self) -> int:
"""Read last_agent_trigger_gen from persisted service state."""
state_file = self.workspace / "service_state.json"
if state_file.exists():
try:
with open(state_file) as f:
return json.load(f).get("last_agent_trigger_gen", -1)
except Exception:
pass
return -1
def _build_case_analysis(self, current_gen: int) -> List[str]:
"""Build per-case analysis from metrics.json for the current and best generations."""
lines: List[str] = []
results_path = self.results_dir
# Load current gen metrics
current_metrics_path = results_path / f"gen_{current_gen}" / "results" / "metrics.json"
if not current_metrics_path.exists():
return lines
try:
with open(current_metrics_path) as f:
data = json.load(f)
except Exception:
return lines
public = data.get("public", {})
n_cases = public.get("n_cases", 0)
if n_cases == 0:
return lines
# Classify cases
tle_cases, wa_cases, partial_cases, perfect_cases = [], [], [], []
partial_ratios = []
TLE_THRESHOLD_MS = 1800
for i in range(min(n_cases, 70)): # public metrics have up to 20 cases, but check available
ratio_key = f"case_{i}_ratio"
time_key = f"case_{i}_time_ms"
if ratio_key not in public:
break
ratio = public[ratio_key]
time_ms = public.get(time_key, 0)
if ratio >= 1.0:
perfect_cases.append(i)
elif ratio <= 0 and time_ms >= TLE_THRESHOLD_MS:
tle_cases.append(i)
elif ratio <= 0:
wa_cases.append(i)
else:
partial_cases.append(i)
partial_ratios.append(ratio)
reported = len(tle_cases) + len(wa_cases) + len(partial_cases) + len(perfect_cases)
lines.append("")
lines.append(f"📊 Per-Case Analysis (gen {current_gen}, {reported} cases reported of {n_cases} total):")
def _fmt_cases(case_list: List[int], limit: int = 10) -> str:
if len(case_list) <= limit:
return ", ".join(str(c) for c in case_list)
return ", ".join(str(c) for c in case_list[:limit]) + f"... (+{len(case_list)-limit} more)"
lines.append(f" TLE (>{TLE_THRESHOLD_MS}ms, ratio=0): {len(tle_cases)}/{reported}" +
(f" — cases {_fmt_cases(tle_cases)}" if tle_cases else ""))
lines.append(f" WA (ratio=0, not TLE): {len(wa_cases)}/{reported}" +
(f" — cases {_fmt_cases(wa_cases)}" if wa_cases else ""))
if partial_ratios:
lines.append(f" Partial (0<ratio<1): {len(partial_cases)}/{reported}"
f", avg={sum(partial_ratios)/len(partial_ratios):.3f}"
f", min={min(partial_ratios):.3f}, max={max(partial_ratios):.3f}")
else:
lines.append(f" Partial (0<ratio<1): 0/{reported}")
lines.append(f" Perfect (ratio=1): {len(perfect_cases)}/{reported}" +
(f" — cases {_fmt_cases(perfect_cases)}" if perfect_cases else ""))
# Find best gen so far
best_score = 0
best_gen = 0
for g in range(current_gen + 1):
mp = results_path / f"gen_{g}" / "results" / "metrics.json"
if mp.exists():
try:
with open(mp) as f:
s = json.load(f).get("combined_score", 0) or 0
if s > best_score:
best_score = s
best_gen = g
except Exception:
pass
if best_score > 0:
lines.append(f" Best gen so far: gen_{best_gen}, score {best_score:.2f}")
# Append current gen's aux metric values if available
if current_metrics_path.exists():
try:
with open(current_metrics_path) as f:
data = json.load(f)
pub = data.get("public", {})
aux_vals = {k: v for k, v in pub.items()
if k.startswith("aux_") and not k.startswith("aux_aux_metric_")}
if aux_vals:
lines.append(f" Auxiliary metrics: " +
", ".join(f"{k}={v:.4f}" if isinstance(v, float) else f"{k}={v}"
for k, v in sorted(aux_vals.items())))
except Exception:
pass
return lines
def _build_aux_metric_trends(self, current_gen: int) -> List[str]:
"""Build a table of auxiliary metric values over recent generations."""
lines: List[str] = []
results_path = self.results_dir
# Framework keys to filter out
FRAMEWORK_KEYS = {
"aux_aux_metric_eval_success", "aux_aux_metric_error_code",
"aux_aux_metric_error_message_length", "aux_aux_metric_error_detail_length",
"aux_aux_metric_non_numeric_dropped_count",
}
# Collect aux metrics from last 10 gens
hist_start = max(0, current_gen - 10)
records: List[tuple] = [] # (gen, score, {aux_key: value})
all_aux_keys: set = set()
for gen in range(hist_start, current_gen + 1):
mp = results_path / f"gen_{gen}" / "results" / "metrics.json"
if not mp.exists():
continue
try:
with open(mp) as f:
data = json.load(f)
score = data.get("combined_score", 0) or 0
pub = data.get("public", {})
aux = {k: v for k, v in pub.items()
if k.startswith("aux_") and k not in FRAMEWORK_KEYS
and isinstance(v, (int, float))}
if aux:
all_aux_keys.update(aux.keys())
records.append((gen, score, aux))
except Exception:
continue
if not all_aux_keys:
lines.append("")
lines.append("📈 Auxiliary Metrics: No custom metrics defined yet. Write auxiliary_metrics.py to start measuring.")
return lines
# Build table
sorted_keys = sorted(all_aux_keys)
header = "| Gen | " + " | ".join(k.replace("aux_", "") for k in sorted_keys) + " | Score |"
sep = "|-----|" + "|".join("-" * max(len(k.replace("aux_", "")), 7) for k in sorted_keys) + "|-------|"
lines.append("")
lines.append("📈 Auxiliary Metric Trends:")
lines.append(header)
lines.append(sep)
for gen, score, aux in records:
vals = []
for k in sorted_keys:
v = aux.get(k)
vals.append(f"{v:.4f}" if isinstance(v, float) else str(v) if v is not None else "N/A")
lines.append(f"| {gen:3d} | " + " | ".join(f"{v:>{max(len(k.replace('aux_', '')), 7)}}" for v, k in zip(vals, sorted_keys)) + f" | {min(score, 100):5.1f} |")
return lines
def _bootstrap_memory_files(self):
"""Create expected memory files if they do not exist yet."""
eval_agents_md = self.workspace / "EVAL_AGENTS.md"
if not eval_agents_md.exists():
eval_agents_md.write_text(
"# EV2 Agent Memory\n\n"
"- Initialized by eval service.\n"
"- Use this file as compact cross-generation memory.\n",
encoding="utf-8",
)
auxiliary_metrics_py = self.workspace / "auxiliary_metrics.py"
if not auxiliary_metrics_py.exists():
auxiliary_metrics_py.write_text(
"def evaluate_aux(results_dir, primary_result=None):\n"
" \"\"\"Return auxiliary metrics as a dict.\"\"\"\n"
" return {}\n",
encoding="utf-8",
)
def _get_code_ext(self) -> str:
"""Get the code file extension by checking gen_0 for existing files."""
for ext in [".cpp", ".py", ".java", ".go", ".rs", ".c"]:
candidate = self.results_dir / "gen_0" / f"main{ext}"
if candidate.exists():
return ext
return ".cpp" # default
def _extract_agent_candidate(self) -> Optional[str]:
"""Check if agent wrote a candidate code file. Returns code string or None."""
ext = self._get_code_ext()
candidate_path = self.workspace / f"agent_candidate{ext}"
if candidate_path.exists():
code = candidate_path.read_text(encoding="utf-8").strip()
if code:
logging.info(f"📝 Found agent candidate: {candidate_path} ({len(code)} chars)")
return code
return None
def _create_llm(self) -> LLM:
"""
Create LLM instance
Migrated from ev2.py lines 54-58
Uses same environment variable logic
"""
# Get LLM config (prefer service config, fallback to env vars)
model = self.config.llm_model or os.getenv("LLM_MODEL", "vertex_ai/gemini-2.5-flash")
api_key = self.config.llm_api_key or os.getenv("LLM_API_KEY")
base_url = self.config.llm_base_url or os.getenv("LLM_BASE_URL", None)
log_completions = OPENHANDS_LOG_COMPLETIONS
default_completion_dir = str(self.workspace / "llm_completions")
log_completions_folder = os.getenv(
"OPENHANDS_LOG_COMPLETIONS_DIR",
default_completion_dir,
)
logging.info(f"🤖 Creating LLM: {model}")
logging.info(f" OpenHands completion logging: {log_completions}")
if log_completions:
logging.info(f" Completion log dir: {log_completions_folder}")
llm = LLM(
model=model,
api_key=api_key,
base_url=base_url,
log_completions=log_completions,
log_completions_folder=log_completions_folder,
)
return llm
def _create_agent(self) -> Agent:
"""
Create OpenHands Agent
Migrated from ev2.py lines 60-73
Exact same configuration as ev2.py
"""
# Load EV2 prompt template (same path as ev2.py)
ev2_prompt_path = Path(__file__).parent / "ev2_prompt.j2"
if not ev2_prompt_path.exists():
raise FileNotFoundError(
f"EV2 prompt template not found: {ev2_prompt_path}"
)
logging.info(f"📋 Loading prompt: {ev2_prompt_path}")
# Create agent with tools (exact same as ev2.py)
agent = Agent(
llm=self._llm,
tools=[
Tool(name=TerminalTool.name),
Tool(name=FileEditorTool.name),
Tool(name=TaskTrackerTool.name),
],
system_prompt_filename=str(ev2_prompt_path),
)
logging.info("✅ Agent created")
return agent
def _ensure_agent_ready(self):
"""Ensure agent is created and ready"""
if self._llm is None:
self._llm = self._create_llm()
if self._agent is None:
self._agent = self._create_agent()
def _build_task_message(self, current_gen: int) -> str:
"""Build task message for eval agent."""
results_path = self.results_dir
last_trigger = self._get_last_agent_trigger_gen()
# Find the best generation since last trigger (most valuable to diagnose)
search_start = max(0, last_trigger + 1) if last_trigger >= 0 else 0
target_gen = current_gen
best_score = -1.0
for g in range(search_start, current_gen + 1):
mp = results_path / f"gen_{g}" / "results" / "metrics.json"
if mp.exists():
try:
with open(mp) as f:
s = json.load(f).get("combined_score", 0) or 0
if s > best_score:
best_score = s
target_gen = g
except Exception:
pass
target_metrics = results_path / f"gen_{target_gen}" / "results" / "metrics.json"
target_score = None
if target_metrics.exists():
try:
with open(target_metrics) as f:
target_score = json.load(f).get("combined_score", None)
except Exception:
pass
# === File locations ===
task_parts = [
f"=== Generation {current_gen} Evaluation ===",
"",
"📁 File Locations (all absolute paths):",
f"- Results directory: {self.results_dir}",
f"- Agent candidate (MUST WRITE): {self.results_dir}/eval_agent_memory/agent_candidate{self._get_code_ext()}",
f"- Diagnostic report (MUST WRITE): {self.results_dir}/eval_agent_memory/diagnostic_report.md",
f"- Memory log (MUST WRITE): {self.results_dir}/eval_agent_memory/EVAL_AGENTS.md",
f"- Auxiliary metrics: {self.results_dir}/eval_agent_memory/auxiliary_metrics.py",
f"- Code to diagnose: {self.results_dir}/gen_{target_gen}/main.cpp",
f"- Metrics to diagnose: {self.results_dir}/gen_{target_gen}/results/metrics.json",
f"- All generations: {self.results_dir}/gen_0/ through {self.results_dir}/gen_{current_gen}/",
]
if target_gen != current_gen:
task_parts.append(f"- NOTE: Diagnosing gen {target_gen} (best since last trigger), not gen {current_gen} (latest)")
if target_score is not None:
task_parts.append(f"- Score: {target_score:.4f}")
# === Aux metrics error warning ===
current_metrics = results_path / f"gen_{current_gen}" / "results" / "metrics.json"
if current_metrics.exists():
try:
with open(current_metrics) as f:
_pub = json.load(f).get("public", {})
_err = _pub.get("aux_aux_metric_error_code", 0)
if _err and _err > 0:
task_parts.extend(["",
f"⚠️ auxiliary_metrics.py had an error (code={int(_err)}) on gen {current_gen}. Fix or rewrite it."])
except Exception:
pass
# === Problem statement ===
if self.problem_statement:
task_parts.extend(["", "📝 PROBLEM STATEMENT:", self.problem_statement[:4000]])
# === Code execution environment (task-specific) ===
if self.evaluator_kwargs and self.evaluator_kwargs.get("frontier_cs_dir"):
fc_dir = self.evaluator_kwargs["frontier_cs_dir"]
pid = self.evaluator_kwargs.get("problem_id", "0")
problem_dir = Path(fc_dir) / "algorithmic" / "problems" / str(pid)
testdata_dir = problem_dir / "testdata"
is_interactive = (problem_dir / "interactor.cc").exists()
# Find smallest test input
smallest_test = ""
if testdata_dir.exists():
in_files = sorted(testdata_dir.glob("*.in"), key=lambda f: f.stat().st_size)
if in_files:
smallest_test = str(in_files[0])
smallest_size = in_files[0].stat().st_size
task_parts.extend(["",
"🔧 Code Execution Environment:",
f" Your auxiliary_metrics.py can compile and run the code locally using subprocess.",
f" This is what makes your metrics non-trivial — measure actual program behavior.",
f" In evaluate_aux(results_dir), results_dir points to gen_N/ and the code is at os.path.join(results_dir, 'main.cpp')",
f" Compile: g++ -O2 -pipe -std=gnu++17 -o /tmp/main_test {{code_path}}",
f" Testdata dir: {testdata_dir}",
])
if smallest_test:
task_parts.append(f" Smallest test: {smallest_test} ({smallest_size} bytes)")
if is_interactive:
testlib_dir = Path(fc_dir) / "algorithmic" / "judge" / "include"
task_parts.extend([
f" Problem type: interactive",
f" Interactor: {problem_dir}/interactor.cc",
f" Compile interactor: g++ -O2 -pipe -std=gnu++17 -I {testlib_dir} -o interactor interactor.cc",
])
else:
task_parts.extend([
f" Problem type: default (stdin → stdout)",
f" Run: timeout 5 ./main_test < {{test_input}} > output.txt",
])
# === Per-case analysis of target gen ===
case_analysis = self._build_case_analysis(target_gen)
if case_analysis:
task_parts.extend(case_analysis)
# === Score trend ===
hist_start = max(0, current_gen - 10)
task_parts.extend(["", "📈 Score Trend (last 10 gens):"])
if current_gen <= 0:
task_parts.append("- No previous generations.")
else:
trend_tokens: List[str] = []
numeric_scores: List[tuple[int, float]] = []
for gen in range(hist_start, current_gen):
mp = results_path / f"gen_{gen}" / "results" / "metrics.json"
score = None
if mp.exists():
try:
with open(mp) as f:
score = json.load(f).get("combined_score", None)
except Exception:
pass
if isinstance(score, numbers.Real):
trend_tokens.append(f"g{gen}: {float(score):.1f}")
numeric_scores.append((gen, float(score)))
else:
trend_tokens.append(f"g{gen}: N/A")
task_parts.append("- " + " | ".join(trend_tokens))
# === Aux metric trends ===
aux_trend_lines = self._build_aux_metric_trends(current_gen)
if aux_trend_lines:
task_parts.extend(aux_trend_lines)
# === Toolbox APIs ===
task_parts.extend(["",
"🧰 Toolbox APIs (if needed):",
"- from eval_agent.tool_box import call_vision, call_tool",
"- Do NOT import eval_agent/tool_box/_internal/*",
])
# === Run command hint ===
project_root = Path(__file__).parent.parent.resolve()
task_parts.extend(["",
"🔧 Test aux metrics (copy-paste to terminal):",
f" python -c \"import sys; sys.path.insert(0,'{project_root}'); "
f"import importlib.util, json; "
f"spec=importlib.util.spec_from_file_location('aux','{self.workspace}/auxiliary_metrics.py'); "
f"mod=importlib.util.module_from_spec(spec); spec.loader.exec_module(mod); "
f"print(json.dumps(mod.evaluate_aux('{self.results_dir}/gen_{target_gen}'),indent=2))\"",
])
# === Feedback on previous actions ===
try:
from eval_agent.feedback import compute_metric_feedback
if last_trigger >= 0:
feedback_text = compute_metric_feedback(
results_dir=self.results_dir,
current_gen=current_gen,
last_agent_gen=last_trigger,
)
if feedback_text:
task_parts.append(feedback_text)
except Exception as e:
logging.warning(f"Failed to compute metric feedback: {e}")
return "\n".join(task_parts)
async def analyze_generation(self, generation: int) -> Dict[str, Any]:
"""
Analyze a generation using the agent.
Uses a non-blocking lock so that concurrent triggers skip rather than
queue up a backlog of long-running agent sessions.
Args:
generation: Generation number to analyze
Returns:
Dict with analysis results, or a skip-marker dict if the agent is busy
"""
if self._run_lock.locked():
logging.warning(
f"⏭️ Agent busy — skipping analysis for generation {generation}. "
"Next periodic trigger will catch up."
)
return {
"success": False,
"generation": generation,
"skipped": True,
"reason": "agent_busy",
}
async with self._run_lock:
return await self._analyze_generation_locked(generation)
async def _analyze_generation_locked(self, generation: int) -> Dict[str, Any]:
"""Inner implementation that runs under the lock."""
logging.info("=" * 80)
logging.info(f"🧠 EV2 Agent Analysis - Generation {generation}")
logging.info("=" * 80)
start_time = time.time()
try:
# Ensure agent is ready
self._ensure_agent_ready()
# Build task message (same as ev2.py)
task_message = self._build_task_message(generation)
task_hash = hashlib.sha256(task_message.encode("utf-8")).hexdigest()
logging.info(f"📝 Task message: {len(task_message)} characters")
logging.info(f"📁 Workspace: {self.workspace}")
behavior_logging.save_text_artifact(
str(self.results_dir),
f"agent_runs/gen_{generation}_task_message.txt",
task_message,
)
behavior_logging.log_event(
str(self.results_dir),
"agent_run_start",
{
"generation": generation,
"workspace": str(self.workspace),
"task_message_chars": len(task_message),
"task_message_sha256": task_hash,
},
)
# Backup auxiliary_metrics.py before agent modifies it
aux_py = self.workspace / "auxiliary_metrics.py"
aux_bak = self.workspace / "auxiliary_metrics.py.bak"
if aux_py.exists():
import shutil
shutil.copy2(aux_py, aux_bak)
# Create conversation (same as ev2.py line 76)
conversation = Conversation(
agent=self._agent,
workspace=str(self.workspace)
)
# Send message and run (same as ev2.py lines 85-91)
logging.info("📤 Sending task to agent...")
conversation.send_message(task_message)
logging.info("🔄 Agent working...")
await asyncio.to_thread(conversation.run)
# Validate auxiliary_metrics.py after agent — revert on syntax error
if aux_py.exists():
try:
compile(aux_py.read_text(encoding="utf-8"), str(aux_py), "exec")
logging.info("✅ auxiliary_metrics.py syntax OK")
except SyntaxError as e:
logging.warning(f"⚠️ auxiliary_metrics.py has syntax error: {e}. Reverting to backup.")
if aux_bak.exists():
shutil.copy2(aux_bak, aux_py)
logging.info("✅ Reverted to backup")
if ENABLE_FULL_TRAJECTORY_LOG:
try:
from openhands.sdk.event.base import LLMConvertibleEvent
events = list(conversation.state.events)
llm_events = [e for e in events if isinstance(e, LLMConvertibleEvent)]
llm_messages = LLMConvertibleEvent.events_to_messages(llm_events)
trajectory_messages: List[Dict[str, Any]] = []
for msg in llm_messages:
if hasattr(msg, "model_dump"):
trajectory_messages.append(_sanitize_for_json(msg.model_dump()))
else:
trajectory_messages.append(_sanitize_for_json(dict(msg)))
behavior_logging.save_text_artifact(
str(self.results_dir),
f"agent_runs/gen_{generation}_trajectory_messages.json",
json.dumps(trajectory_messages, indent=2, ensure_ascii=False),
)
behavior_logging.log_event(
str(self.results_dir),
"agent_trajectory_saved",
{
"generation": generation,
"event_count": len(events),
"llm_event_count": len(llm_events),
"message_count": len(trajectory_messages),
},
)
except Exception as e:
behavior_logging.log_event(
str(self.results_dir),
"agent_trajectory_save_failed",
{
"generation": generation,
"error": str(e),
},
)
elapsed = time.time() - start_time
logging.info("=" * 80)
logging.info("✅ EV2 Evaluation Complete!")
logging.info("=" * 80)
logging.info(f"⏱️ Time: {elapsed:.1f}s")
logging.info(f"📁 Workspace: {self.workspace}")
logging.info(f"📝 Memory: {self.workspace}/EVAL_AGENTS.md")
logging.info("=" * 80)
# Extract results
insights = self._extract_insights()
metrics = self._extract_metrics()
candidate_code = self._extract_agent_candidate()
agent_result = {
"success": True,
"generation": generation,
"workspace": str(self.workspace),
"insights": insights,
"auxiliary_metrics": metrics,
"candidate_code": candidate_code,
"elapsed_seconds": elapsed
}
behavior_logging.save_text_artifact(
str(self.results_dir),
f"agent_runs/gen_{generation}_result.json",
json.dumps(_sanitize_for_json(agent_result), indent=2, ensure_ascii=False),
)
behavior_logging.log_event(
str(self.results_dir),
"agent_run_end",
{
"generation": generation,
"success": True,
"elapsed_seconds": elapsed,
"insight_count": len(insights),
"auxiliary_metric_file_exists": bool(metrics.get("auxiliary_metrics_file_exists")),
"auxiliary_metric_file_size_bytes": metrics.get("file_size_bytes", 0),
},
)
return agent_result
except Exception as e:
behavior_logging.log_event(
str(self.results_dir),
"agent_run_end",
{
"generation": generation,
"success": False,
"elapsed_seconds": time.time() - start_time,
"error": str(e),
},
)
logging.error(f"❌ Agent analysis failed: {e}", exc_info=True)
raise
def _extract_insights(self) -> List[str]:
"""Extract insights from EVAL_AGENTS.md"""
eval_agents_md = self.workspace / "EVAL_AGENTS.md"
if not eval_agents_md.exists():
return []
insights = []
content = eval_agents_md.read_text()
# Extract bullet points (simple heuristic)
for line in content.split('\n'):
stripped = line.strip()
if stripped.startswith('*') or stripped.startswith('-'):
insights.append(stripped)
# Return last 10 insights
return insights[-10:] if insights else []
def _extract_metrics(self) -> Dict[str, Any]:
"""Extract auxiliary metrics information"""
auxiliary_py = self.workspace / "auxiliary_metrics.py"
metrics = {
"auxiliary_metrics_file_exists": auxiliary_py.exists(),
}
if auxiliary_py.exists():
metrics["auxiliary_metrics_path"] = str(auxiliary_py)
metrics["file_size_bytes"] = auxiliary_py.stat().st_size
return metrics
def _extract_diagnostic_report(self) -> str:
"""Extract diagnostic report written by the agent."""
report_path = self.workspace / "diagnostic_report.md"
if not report_path.exists():
return ""
try:
content = report_path.read_text(encoding="utf-8").strip()
# Limit to 2000 chars to avoid bloating text_feedback
return content[:2000] if content else ""
except Exception:
return ""
# ============================================================================
# Service State Management
# ============================================================================
class ServiceState:
"""
Service state management (same as ev2_service.py)
Maintains history and decides when to trigger agent
"""
def __init__(self, config: ServiceConfig, force_clean: bool = False):
"""
Initialize service state.
Args:
config: Service configuration
force_clean: If True, start with clean state (don't load from disk)
"""
self.config = config
# State tracking
self.generation_history: List[Dict[str, Any]] = []
self.last_agent_trigger_gen: int = -1
self.total_notifications: int = 0
self.total_agent_runs: int = 0
self.agent_trigger_history: List[Dict[str, Any]] = [] # history of all agent triggers
# Timing
self.start_time = time.time()
# Load previous state if exists (unless forced clean)
if not force_clean:
self._load_state()
else:
logging.info("🔄 Starting with clean state (not loading from disk)")
self._save_state() # Save clean state to disk
def _get_state_file(self) -> Path:
"""Get path to state file"""
if self.config.results_dir:
state_dir = Path(self.config.results_dir) / "eval_agent_memory"
state_dir.mkdir(parents=True, exist_ok=True)
return state_dir / "service_state.json"
return Path("service_state.json")
def _load_state(self):
"""Load previous state from disk"""
state_file = self._get_state_file()
if state_file.exists():
try:
with open(state_file) as f:
data = json.load(f)
self.generation_history = data.get('generation_history', [])
self.last_agent_trigger_gen = data.get('last_agent_trigger_gen', -1)
self.total_notifications = data.get('total_notifications', 0)
self.total_agent_runs = data.get('total_agent_runs', 0)
self.agent_trigger_history = data.get('agent_trigger_history', [])
logging.info(f"📥 Loaded state: {len(self.generation_history)} generations in history")
except Exception as e:
logging.error(f"Failed to load state: {e}")
def _save_state(self):
"""Save current state to disk"""
state_file = self._get_state_file()
try:
data = {
'generation_history': self.generation_history[-100:], # Keep last 100
'last_agent_trigger_gen': self.last_agent_trigger_gen,
'total_notifications': self.total_notifications,
'total_agent_runs': self.total_agent_runs,
'agent_trigger_history': self.agent_trigger_history[-20:], # Keep last 20
'last_update': time.time()
}
with open(state_file, 'w') as f:
json.dump(data, f, indent=2)
except Exception as e:
logging.error(f"Failed to save state: {e}")
def add_generation(self, gen_data: Dict[str, Any]):
"""Record a generation"""
self.generation_history.append(gen_data)
self.total_notifications += 1
# Keep only recent history in memory
if len(self.generation_history) > 100:
self.generation_history = self.generation_history[-100:]
self._save_state()
def should_trigger_agent(self, generation: int, primary_score: float) -> tuple[bool, str]:
"""
Decide whether to trigger the agent
Returns: (should_trigger, reason)
"""
if not self.config.agent_enabled:
return False, "Agent disabled in config"
# Strategy 1: Always (for testing)
if self.config.trigger_mode == "always":
return True, "Always mode"
# Strategy 2: Periodic
if self.config.trigger_mode == "periodic":
if generation - self.last_agent_trigger_gen >= self.config.trigger_interval:
return True, f"Periodic trigger (interval={self.config.trigger_interval})"
else:
return False, f"Not yet (last trigger at gen {self.last_agent_trigger_gen})"
# Strategy 3: Plateau detection
if self.config.trigger_mode == "plateau":
if self._detect_plateau():
return True, "Plateau detected"
else:
return False, "No plateau detected"
# Strategy 4: Mixed (periodic OR plateau)
if self.config.trigger_mode == "mixed":
# Check periodic
if generation - self.last_agent_trigger_gen >= self.config.trigger_interval:
return True, f"Periodic trigger (interval={self.config.trigger_interval})"
# Check plateau
if self._detect_plateau():
return True, "Plateau detected (early trigger)"
return False, f"Waiting (next trigger at gen {self.last_agent_trigger_gen + self.config.trigger_interval})"
return False, f"Unknown trigger mode: {self.config.trigger_mode}"
def _detect_plateau(self) -> bool:
"""Detect if primary score has plateaued"""
window = self.config.plateau_window
if len(self.generation_history) < window:
return False
recent = self.generation_history[-window:]
scores = [g['primary_score'] for g in recent]
# Check if improvement is below threshold
improvement = (scores[-1] - scores[0]) / max(abs(scores[0]), 1e-6)
return abs(improvement) < self.config.plateau_threshold
def mark_agent_triggered(self, generation: int, active_metrics: Optional[List[str]] = None):
"""Mark that agent was triggered, recording active aux metric names."""
self.last_agent_trigger_gen = generation
self.total_agent_runs += 1
self.agent_trigger_history.append({
"generation": generation,
"timestamp": time.time(),
"active_metrics": active_metrics or [],
})
self._save_state()
def get_statistics(self) -> Dict[str, Any]:
"""Get service statistics"""
return {
"total_notifications": self.total_notifications,
"total_agent_runs": self.total_agent_runs,
"generations_tracked": len(self.generation_history),
"last_agent_trigger_gen": self.last_agent_trigger_gen,
"uptime_seconds": time.time() - self.start_time
}
# ============================================================================
# FastAPI Application
# ============================================================================
# Global state (initialized on startup)
service_state: Optional[ServiceState] = None
service_config: Optional[ServiceConfig] = None
ev2_agent: Optional[IntegratedEV2Agent] = None
# Global evaluation job tracking (for async evaluation mode)
evaluation_jobs: Dict[str, Dict[str, Any]] = {}
# State lock for thread-safe initialization/reset
import asyncio
_state_lock = asyncio.Lock()
# Create FastAPI app
app = FastAPI(
title="EV2 Evaluation Service (Standalone)",
description="Event-driven evaluation service with integrated OpenHands agent",
version="2.0.0"
)
# Setup logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
def _env_flag(name: str, default: bool = False) -> bool:
raw = os.getenv(name)
if raw is None:
return default
return raw.strip().lower() in {"1", "true", "yes", "on"}
ENABLE_FULL_TRAJECTORY_LOG = _env_flag("ENABLE_FULL_TRAJECTORY_LOG", default=False)
OPENHANDS_LOG_COMPLETIONS = _env_flag("OPENHANDS_LOG_COMPLETIONS", default=False)
def _should_suppress_aux_metrics(experiment_root: str, current_gen: int, window: int = 10) -> bool:
"""Check if aux metrics should be suppressed due to negative correlation with score.
Looks at the last `window` generations. If any aux metric has Spearman rho < -0.3,
returns True to suppress all aux metrics for safety.
"""
from eval_agent.feedback import _load_generation_metrics, _spearman_correlation
results_dir = Path(experiment_root)
gen_start = max(0, current_gen - window)
records = _load_generation_metrics(results_dir, gen_start, current_gen)
if len(records) < 5:
return False # Not enough data
scores = []
for _, data in records:
cs = data.get("combined_score")
scores.append(float(cs) if isinstance(cs, (int, float)) and math.isfinite(cs) else 0.0)
# Collect aux metric names
aux_names: set = set()
for _, data in records:
for key in data.get("public", {}):
if key.startswith("aux_"):
aux_names.add(key)
for name in aux_names:
vals = []
paired_s = []
for (_, data), s in zip(records, scores):
v = data.get("public", {}).get(name)
if isinstance(v, (int, float)) and math.isfinite(v):
vals.append(float(v))
paired_s.append(s)
if len(vals) >= 5:
rho = _spearman_correlation(vals, paired_s)
if rho < -0.3:
logging.warning(
f"Aux metric '{name}' has negative correlation {rho:.2f} with score — triggering suppression"
)
return True
return False
def _sanitize_for_json(value: Any) -> Any:
"""Recursively convert non-JSON-safe numeric values (NaN/Inf) to None."""
if isinstance(value, float):
return value if math.isfinite(value) else None
if isinstance(value, dict):
return {k: _sanitize_for_json(v) for k, v in value.items()}
if isinstance(value, list):
return [_sanitize_for_json(v) for v in value]
if isinstance(value, tuple):
return [_sanitize_for_json(v) for v in value]
# Handle numpy scalar types without importing numpy globally.
if hasattr(value, "item") and callable(getattr(value, "item")):
try:
coerced = value.item()
return _sanitize_for_json(coerced)
except Exception:
return str(value)
return value
async def _update_evaluation_job(job_id: str, **updates: Any) -> bool:
"""Safely update a tracked evaluation job. Returns False if missing."""
async with _state_lock:
job = evaluation_jobs.get(job_id)
if job is None:
return False
job.update(updates)
return True
@app.on_event("startup")
async def startup_event():
"""Initialize service on startup"""
global service_state, service_config, ev2_agent
logger.info("=" * 80)
logger.info("🚀 Starting EV2 Evaluation Service (Standalone)")
logger.info("=" * 80)
# Load config (will be set by main())
if service_config is None:
logger.warning("⚠️ No config provided, using defaults")
service_config = ServiceConfig()
# Initialize state
service_state = ServiceState(service_config)
# Initialize integrated agent (if results_dir is configured)
if service_config.results_dir:
try:
ev2_agent = IntegratedEV2Agent(
results_dir=service_config.results_dir,
primary_evaluator_path=service_config.primary_evaluator_path,
config=service_config,
problem_statement=service_config.problem_statement,
evaluator_kwargs=service_config.evaluator_kwargs,
)
logger.info("✅ Integrated EV2 Agent ready")
except Exception as e:
logger.error(f"❌ Failed to initialize agent: {e}")
logger.warning("⚠️ Service will start but agent calls will fail")
ev2_agent = None
else:
logger.info("⏳ Agent will be initialized on first generation request (dynamic mode)")
ev2_agent = None
logger.info("=" * 80)
logger.info(f"✅ Service Started")
logger.info(f" Experiment: {service_config.experiment_name}")
logger.info(f" Results dir: {service_config.results_dir}")
logger.info(f" Trigger mode: {service_config.trigger_mode}")
logger.info(f" Trigger interval: {service_config.trigger_interval}")
logger.info(f" Full trajectory log: {ENABLE_FULL_TRAJECTORY_LOG}")
logger.info("=" * 80)
@app.on_event("shutdown")
async def shutdown_event():
"""Cleanup on shutdown"""
logger.info("=" * 80)
logger.info("🛑 Shutting down EV2 Evaluation Service")
logger.info("=" * 80)
if service_state:
service_state._save_state()
logger.info(f" Total generation requests: {service_state.total_notifications}")
logger.info(f" Total agent runs: {service_state.total_agent_runs}")
logger.info("=" * 80)
# ============================================================================
# Evaluation Executors (for async evaluation mode)
# ============================================================================
def _normalize_experiment_root(results_dir: str) -> Path:
"""
Normalize results_dir to experiment root.
Accepts paths like:
- /path/to/experiment
- /path/to/experiment/gen_10
- /path/to/experiment/gen_10/results
"""
results_path = Path(results_dir).resolve()
if results_path.name == "results":
return results_path.parent.parent
if results_path.name.startswith("gen_"):
return results_path.parent
return results_path
def _execute_primary_evaluator_sync(
code_path: str,
results_dir: str,
evaluator_module: str,
evaluator_function: str,
evaluator_kwargs: Optional[Dict[str, Any]],
) -> Dict[str, Any]:
"""
Execute primary evaluator synchronously.
This runs inside a worker subprocess so the parent process can hard-kill it
on timeout without leaving background threads.
"""
import importlib
import inspect
module = importlib.import_module(evaluator_module)
evaluator_func = getattr(module, evaluator_function)
eval_kwargs = evaluator_kwargs or {}
sig = inspect.signature(evaluator_func)
params = list(sig.parameters.keys())
# Support common evaluator signatures:
# 1. evaluate(code_path, **kwargs)
# 2. main(program_path, results_dir)
# 3. evaluate(code_path, results_dir, **kwargs)
if len(params) >= 2 and "results_dir" in params[:3]:
result = evaluator_func(code_path, results_dir, **eval_kwargs)
else:
result = evaluator_func(code_path, **eval_kwargs)
# Evaluator may persist metrics and return None.
if result is None:
metrics_file = Path(results_dir) / "metrics.json"
if not metrics_file.exists():
raise FileNotFoundError(
f"Evaluator returned None and metrics.json not found at {metrics_file}"
)
with open(metrics_file) as f:
result = json.load(f)
if not isinstance(result, dict):
raise ValueError(f"Evaluator must return dict, got {type(result)}")
if "combined_score" not in result:
raise ValueError("Evaluator result must contain 'combined_score' key")
# Merge validation status from correct.json when available.
correct_file = Path(results_dir) / "correct.json"
if correct_file.exists():
try:
with open(correct_file) as f:
correct_data = json.load(f)
result["correct"] = correct_data.get("correct", False)
result["validation_error"] = correct_data.get("error", None)
except Exception:
result["correct"] = False
else:
result["correct"] = True
return result
def _run_primary_evaluator_worker(request_path: str, output_path: str) -> int:
"""
Worker-mode entrypoint for primary evaluation.
Reads request json, executes evaluator, writes output json:
- {"ok": true, "result": {...}}
- {"ok": false, "error": "...", "traceback": "..."}
"""
try:
with open(request_path) as f:
payload = json.load(f)
result = _execute_primary_evaluator_sync(
code_path=payload["code_path"],
results_dir=payload["results_dir"],
evaluator_module=payload["evaluator_module"],
evaluator_function=payload["evaluator_function"],
evaluator_kwargs=payload.get("evaluator_kwargs"),
)
with open(output_path, "w") as f:
json.dump({"ok": True, "result": result}, f)
return 0
except Exception as e:
try:
with open(output_path, "w") as f:
json.dump(
{
"ok": False,
"error": str(e),
"traceback": traceback.format_exc(),
},
f,
)
except Exception:
pass
return 1
async def run_primary_evaluator(request: GenerationCompleteRequest) -> Dict[str, Any]:
"""
Run primary evaluator (dynamically loaded)
Args:
request: Generation complete request with evaluator config
Returns:
Evaluation result dict with at least 'combined_score' key
Raises:
ImportError: If evaluator module cannot be loaded
AttributeError: If evaluator function not found
Exception: If evaluation fails
"""
logger.info(f"🔬 Running primary evaluator: {request.evaluator_module}.{request.evaluator_function}")
logger.info(f" Code path: {request.code_path}")
logger.info(f" Results dir: {request.results_dir}")
logger.info(f" Evaluator: {request.evaluator_module}.{request.evaluator_function}")
timeout_seconds = service_config.evaluation_timeout if service_config else 300.0
if timeout_seconds <= 0:
logger.warning(
f"Invalid evaluation timeout {timeout_seconds}; using default 300.0s"
)
timeout_seconds = 300.0
logger.info(f" Evaluation timeout: {timeout_seconds}s (hard kill)")
request_payload = {
"code_path": request.code_path,
"results_dir": request.results_dir,
"evaluator_module": request.evaluator_module,
"evaluator_function": request.evaluator_function,
"evaluator_kwargs": request.evaluator_kwargs or {},
}
req_path = None
out_path = None
process = None
try:
with tempfile.NamedTemporaryFile(
mode="w", suffix=".json", prefix="ev2_eval_req_", delete=False
) as req_file:
req_path = req_file.name
json.dump(request_payload, req_file)
out_path = f"{req_path}.out.json"
cmd = [
sys.executable,
str(Path(__file__).resolve()),
"--worker-eval-request",
req_path,
"--worker-eval-output",
out_path,
]
process = await asyncio.create_subprocess_exec(
*cmd,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
try:
_, stderr = await asyncio.wait_for(
process.communicate(), timeout=timeout_seconds
)
except asyncio.TimeoutError:
error_msg = f"Evaluation exceeded timeout of {timeout_seconds}s (process killed)"
logger.error(f"⏱️ {error_msg}")
if process.returncode is None:
process.kill()
await process.communicate()
return {
"combined_score": 0.0,
"correct": False,
"validation_error": error_msg,
"execution_time_mean": timeout_seconds,
"timeout": True,
}
if not Path(out_path).exists():
stderr_text = (stderr.decode("utf-8", errors="replace") or "").strip()
raise RuntimeError(
f"Evaluator worker did not produce output file. "
f"returncode={process.returncode}, stderr={stderr_text[:500]}"
)
with open(out_path) as f:
worker_output = json.load(f)
if not worker_output.get("ok", False):
raise RuntimeError(
f"Evaluator worker failed: {worker_output.get('error', 'unknown error')}\n"
f"{worker_output.get('traceback', '')}"
)
result = worker_output.get("result", {})
logger.info(
f"✅ Primary evaluation completed: "
f"score={result.get('combined_score', 0.0):.4f}, "
f"correct={result.get('correct', '?')}"
)
return result
except Exception as e:
logger.error(f"❌ Evaluation failed: {e}", exc_info=True)
raise
finally:
for path in [req_path, out_path]:
if path:
try:
os.unlink(path)
except OSError:
pass
async def run_auxiliary_evaluators(
request: GenerationCompleteRequest,
primary_result: Dict[str, Any],
experiment_root: str,
) -> Dict[str, Any]:
"""
Run auxiliary evaluators (if they exist)
Loads auxiliary_metrics.py from eval_agent_memory and calls the
evaluate_aux() function (single entry point pattern).
Args:
request: Generation complete request
primary_result: Result from primary evaluator
experiment_root: Snapshot of experiment root used for this evaluation run
Returns:
Dict of auxiliary metric results (flat dictionary)
"""
logger.info(f"🔍 Looking for auxiliary evaluators...")
behavior_logging.log_event(
request.results_dir,
"aux_eval_start",
{"generation": request.generation},
)
# Find auxiliary_metrics.py in eval_agent_memory
if not experiment_root:
logger.info(" No results_dir configured, skipping auxiliary metrics")
behavior_logging.log_event(
request.results_dir,
"aux_eval_end",
{"generation": request.generation, "success": True, "skipped": True, "reason": "no_results_dir"},
)
return {}
aux_metrics_path = Path(experiment_root) / "eval_agent_memory" / "auxiliary_metrics.py"
if not aux_metrics_path.exists():
logger.info(f" No auxiliary metrics found at {aux_metrics_path}")
behavior_logging.log_event(
request.results_dir,
"aux_eval_end",
{"generation": request.generation, "success": True, "skipped": True, "reason": "aux_file_missing"},
)
return {}
logger.info(f" Found auxiliary metrics: {aux_metrics_path}")
# Snapshot the exact auxiliary metrics code used for this generation.
# This makes post-hoc debugging reproducible even after the agent updates
# eval_agent_memory/auxiliary_metrics.py in later generations.
try:
snapshot_path = Path(request.results_dir) / "auxiliary_metrics_snapshot.py"
snapshot_path.parent.mkdir(parents=True, exist_ok=True)
snapshot_path.write_text(aux_metrics_path.read_text(encoding="utf-8"), encoding="utf-8")
logger.info(f" 📝 Saved auxiliary metrics snapshot: {snapshot_path}")
except Exception as e:
logger.warning(f" ⚠️ Failed to save auxiliary metrics snapshot: {e}")
def _aux_failure_metrics(error_type: str, error_message: str, error_detail: str = "") -> Dict[str, Any]:
# Failure fallback uses null for non-essential diagnostic fields to avoid
# conflating "failed evaluation" with valid zero values.
return {
"aux_metric_eval_success": None,
"aux_metric_error_code": float({
"syntax": 1,
"import": 2,
"runtime": 3,
"invalid_return": 4,
}.get(error_type, 99)),
"aux_metric_non_numeric_dropped_count": None,
"aux_metric_error_message_length": float(len(error_message or "")),
"aux_metric_error_detail_length": float(len(error_detail or "")),
}
def _normalize_aux_metrics(raw: Dict[str, Any]) -> Dict[str, Any]:
"""
Keep numeric/bool metrics usable for time-series analysis.
Preserve error context in dedicated metadata fields.
"""
normalized: Dict[str, Any] = {}
dropped_non_numeric = 0
for key, value in raw.items():
if isinstance(value, bool):
normalized[key] = float(value)
elif isinstance(value, numbers.Real):
if math.isnan(float(value)) or math.isinf(float(value)):
dropped_non_numeric += 1
else:
normalized[key] = float(value)
elif isinstance(value, str) and key == "error":
normalized["aux_metric_error_code"] = max(float(normalized.get("aux_metric_error_code", 0.0)), 3.0)
normalized["aux_metric_error_message_length"] = float(len(value))
dropped_non_numeric += 1
elif isinstance(value, str) and key == "traceback":
normalized["aux_metric_error_detail_length"] = float(len(value))
dropped_non_numeric += 1
else:
dropped_non_numeric += 1
normalized.setdefault("aux_metric_eval_success", 1.0)
normalized.setdefault("aux_metric_error_code", 0.0)
normalized.setdefault("aux_metric_error_message_length", 0.0)
normalized.setdefault("aux_metric_error_detail_length", 0.0)
normalized["aux_metric_non_numeric_dropped_count"] = float(dropped_non_numeric)
return normalized
try:
# Ensure repo root is importable so auxiliary_metrics.py can import
# shared helpers like `eval_agent.tool_box` regardless of service CWD.
project_root = str(Path(__file__).resolve().parent.parent)
if project_root not in sys.path:
sys.path.insert(0, project_root)
# Fast syntax validation before import to avoid ambiguous runtime failures.
try:
aux_source = aux_metrics_path.read_text(encoding="utf-8")
compile(aux_source, str(aux_metrics_path), "exec")
except SyntaxError as e:
logger.error(f"❌ auxiliary_metrics.py syntax error: {e}")
failure = _aux_failure_metrics(
error_type="syntax",
error_message=str(e),
error_detail=traceback.format_exc(),
)
behavior_logging.log_event(
request.results_dir,
"aux_eval_end",
{
"generation": request.generation,
"success": False,
"error_type": "syntax",
"aux_metric_error_code": failure.get("aux_metric_error_code", 99.0),
},
)
return failure
# Dynamically load auxiliary metrics module
import importlib.util
spec = importlib.util.spec_from_file_location("auxiliary_metrics", str(aux_metrics_path))
aux_module = importlib.util.module_from_spec(spec)
try:
spec.loader.exec_module(aux_module)
except Exception as e:
logger.error(f"❌ Failed importing auxiliary metrics module: {e}", exc_info=True)
failure = _aux_failure_metrics(
error_type="import",
error_message=str(e),
error_detail=traceback.format_exc(),
)
behavior_logging.log_event(
request.results_dir,
"aux_eval_end",
{
"generation": request.generation,
"success": False,
"error_type": "import",
"aux_metric_error_code": failure.get("aux_metric_error_code", 99.0),
},
)
return failure
# Look for the evaluate_aux function (single entry point)
if not hasattr(aux_module, 'evaluate_aux'):
logger.warning(" ⚠️ No 'evaluate_aux' function found in auxiliary_metrics.py")
logger.warning(" Please implement: def evaluate_aux(results_dir: str) -> Dict[str, Any]")
behavior_logging.log_event(
request.results_dir,
"aux_eval_end",
{"generation": request.generation, "success": True, "skipped": True, "reason": "evaluate_aux_missing"},
)
return {}
evaluate_func = getattr(aux_module, 'evaluate_aux')
logger.info(f" ✅ Found evaluate_aux function")
# Inspect function signature to determine how to call it
import inspect
sig = inspect.signature(evaluate_func)
params = list(sig.parameters.keys())
# Call evaluate_aux with appropriate parameters
if not params:
# No parameters - call without arguments
logger.info(f" 📞 Calling evaluate_aux()")
result = await asyncio.wait_for(asyncio.to_thread(evaluate_func), timeout=30.0)
elif len(params) == 1:
# Single parameter - determine what to pass
param_name = params[0]
if 'results_dir' in param_name.lower() or 'gen_results_dir' in param_name.lower():
# Pass gen_N/ (parent of gen_N/results/) so that results_dir/main.cpp resolves correctly
gen_dir = str(Path(request.results_dir).parent)
logger.info(f" 📞 Calling evaluate_aux(results_dir='{gen_dir}')")
result = await asyncio.wait_for(asyncio.to_thread(evaluate_func, gen_dir), timeout=30.0)
elif 'gen_path' in param_name.lower() or 'generation_dir' in param_name.lower():
# Pass generation directory (parent of results_dir)
gen_path = str(Path(request.results_dir).parent)
logger.info(f" 📞 Calling evaluate_aux(gen_path='{gen_path}')")
result = await asyncio.wait_for(asyncio.to_thread(evaluate_func, gen_path), timeout=30.0)
else:
# Default: pass results_dir
logger.info(f" 📞 Calling evaluate_aux('{request.results_dir}')")
result = await asyncio.wait_for(asyncio.to_thread(evaluate_func, request.results_dir), timeout=30.0)
elif 'primary_result' in params:
# Multi-parameter with primary_result
# Pass gen_N/ (parent of gen_N/results/) so that results_dir/main.cpp resolves correctly
gen_dir = str(Path(request.results_dir).parent)
logger.info(f" 📞 Calling evaluate_aux with results_dir and primary_result")
result = await asyncio.wait_for(asyncio.to_thread(
evaluate_func,
results_dir=gen_dir,
primary_result=primary_result
), timeout=30.0)
else:
# Multi-parameter - use keyword arguments
logger.info(f" 📞 Calling evaluate_aux with keyword arguments")
result = await asyncio.wait_for(asyncio.to_thread(
evaluate_func,
results_dir=request.results_dir
), timeout=30.0)
# Validate result format
if not isinstance(result, dict):
logger.error(f" ❌ evaluate_aux must return dict, got {type(result)}")
failure = _aux_failure_metrics(
error_type="invalid_return",
error_message=f"Invalid return type: {type(result).__name__}",
)
behavior_logging.log_event(
request.results_dir,
"aux_eval_end",
{
"generation": request.generation,
"success": False,
"error_type": "invalid_return",
"aux_metric_error_code": failure.get("aux_metric_error_code", 99.0),
},
)
return failure
# Check if result contains error
if "error" in result:
logger.warning(f" ⚠️ evaluate_aux returned error: {result.get('error')}")
else:
logger.info(f" ✅ evaluate_aux returned {len(result)} metrics")
# Log first few metrics for debugging
sample_metrics = list(result.items())[:3]
for key, value in sample_metrics:
logger.info(f" - {key}: {value}")
if len(result) > 3:
logger.info(f" ... and {len(result) - 3} more")
normalized = _normalize_aux_metrics(result)
behavior_logging.log_event(
request.results_dir,
"aux_eval_end",
{
"generation": request.generation,
"success": True,
"metric_key_count": len(normalized),
"aux_metric_eval_success": normalized.get("aux_metric_eval_success", 1.0),
"aux_metric_error_code": normalized.get("aux_metric_error_code", 0.0),
"aux_metric_non_numeric_dropped_count": normalized.get("aux_metric_non_numeric_dropped_count", 0.0),
},
)
return normalized
except (asyncio.TimeoutError, TimeoutError) as e:
logger.warning(f"⏱️ Auxiliary metrics timed out (30s limit): {e}")
failure = _aux_failure_metrics(
error_type="timeout",
error_message="auxiliary_metrics.py exceeded 30s timeout",
)
behavior_logging.log_event(
request.results_dir,
"aux_eval_end",
{
"generation": request.generation,
"success": False,
"error_type": "timeout",
"aux_metric_error_code": failure.get("aux_metric_error_code", 99.0),
},
)
return failure
except Exception as e:
logger.error(f"❌ Failed to run auxiliary metrics: {e}", exc_info=True)
failure = _aux_failure_metrics(
error_type="runtime",
error_message=str(e),
error_detail=traceback.format_exc(),
)
behavior_logging.log_event(
request.results_dir,
"aux_eval_end",
{
"generation": request.generation,
"success": False,
"error_type": "runtime",
"aux_metric_error_code": failure.get("aux_metric_error_code", 99.0),
},
)
return failure
def _extract_metric_descriptions(eval_agents_md_path: Path) -> Dict[str, str]:
"""
Extract metric descriptions from EVAL_AGENTS.md
Parses the markdown file to extract metric names and their descriptions.
Args:
eval_agents_md_path: Path to EVAL_AGENTS.md
Returns:
Dict mapping metric names to their descriptions
"""
descriptions = {}
try:
content = eval_agents_md_path.read_text()
# Parse markdown sections
# Look for patterns like:
# - `metric_name`: Description
# - **Metrics Calculated**:
# - `metric_name`: Description
import re
# Pattern 1: - `metric_name`: Description
pattern1 = r'-\s+`([^`]+)`:\s*([^\n]+(?:\n(?!\s*[-*])[^\n]+)*)'
for match in re.finditer(pattern1, content):
metric_name = match.group(1).strip()
description = match.group(2).strip()
# Clean up description
description = re.sub(r'\s+', ' ', description)
descriptions[metric_name] = description
# Pattern 2: * **`metric_name`**: Description (bold + bullet)
pattern2 = r'\*\s+\*\*`([^`]+)`\*\*:\s*([^\n]+(?:\n(?!\s*[-*])[^\n]+)*)'
for match in re.finditer(pattern2, content):
metric_name = match.group(1).strip()
description = match.group(2).strip()
description = re.sub(r'\s+', ' ', description)
descriptions[metric_name] = description
# Pattern 3: Look for metric names in auxiliary_metrics results
# They might be prefixed with aux_ or evaluation_status, etc.
# Store base descriptions that can be matched
logger.info(f"Extracted {len(descriptions)} metric descriptions from {eval_agents_md_path}")
except Exception as e:
logger.error(f"Failed to parse EVAL_AGENTS.md: {e}")
return descriptions
def save_metrics_file(results_dir: str, metrics: Dict[str, Any]):
"""
Save complete metrics to metrics.json
Args:
results_dir: Directory to save metrics.json
metrics: Complete metrics dict
"""
metrics_path = Path(results_dir) / "metrics.json"
try:
metrics_path.parent.mkdir(parents=True, exist_ok=True)
safe_metrics = _sanitize_for_json(metrics)
with open(metrics_path, 'w') as f:
json.dump(safe_metrics, f, indent=2, allow_nan=False)
logger.info(f"💾 Saved metrics to {metrics_path}")
except Exception as e:
logger.error(f"❌ Failed to save metrics: {e}")
async def _evaluate_agent_candidate(
candidate_code: str,
request: GenerationCompleteRequest,
ev2_agent,
) -> Dict[str, Any]:
"""
Evaluate the agent's candidate code using the primary evaluator.
Writes the candidate to a temp file, runs primary evaluation, returns result.
"""
import tempfile
# Determine file extension from the original code path
ext = Path(request.code_path).suffix or ".cpp"
# Write candidate to a temp file
with tempfile.NamedTemporaryFile(
mode="w", suffix=ext, prefix="agent_candidate_", delete=False, encoding="utf-8"
) as f:
f.write(candidate_code)
candidate_path = f.name
# Create a temp results dir
candidate_results_dir = Path(request.results_dir).parent / "agent_candidate_results"
candidate_results_dir.mkdir(parents=True, exist_ok=True)
try:
result = await asyncio.to_thread(
_execute_primary_evaluator_sync,
candidate_path,
str(candidate_results_dir),
request.evaluator_module,
request.evaluator_function,
request.evaluator_kwargs,
)
return {
"combined_score": result.get("combined_score", 0.0),
"correct": result.get("correct", False),
"code": candidate_code,
"public": result.get("public", {}),
"text_feedback": result.get("text_feedback", ""),
}
except Exception as e:
logger.error(f"Agent candidate evaluation error: {e}")
return {
"combined_score": 0.0,
"correct": False,
"code": candidate_code,
"error": str(e),
}
finally:
# Clean up temp file
try:
Path(candidate_path).unlink(missing_ok=True)
except Exception:
pass
async def run_full_evaluation(job_id: str, request: GenerationCompleteRequest):
"""
Complete evaluation pipeline (async background task)
1. Run primary evaluator
2. Run auxiliary evaluators (if exist)
3. Save metrics.json
4. Record to history
5. Decide whether to trigger agent
6. If triggered: run agent analysis
Args:
job_id: Job ID for tracking
request: Generation complete request
"""
logger.info("=" * 80)
logger.info(f"🚀 Starting full evaluation for generation {request.generation}")
logger.info("=" * 80)
if not await _update_evaluation_job(job_id, status="running"):
logger.warning(f"Job {job_id} missing before start; continuing without job tracking")
try:
async with _state_lock:
local_service_state = service_state
local_ev2_agent = ev2_agent
local_results_root = (
str(service_config.results_dir)
if service_config and service_config.results_dir
else ""
)
if local_service_state is None:
raise RuntimeError("Service state not initialized")
# 1. Run primary evaluator
logger.info(f"📊 Step 1/6: Running primary evaluator...")
primary_result = await run_primary_evaluator(request)
# 2. Run auxiliary evaluators
logger.info(f"📊 Step 2/6: Running auxiliary evaluators...")
auxiliary_results = await run_auxiliary_evaluators(
request,
primary_result,
experiment_root=local_results_root,
)
# 3. Extract auxiliary metric descriptions from EVAL_AGENTS.md
auxiliary_descriptions = {}
if auxiliary_results:
try:
eval_agents_md = Path(local_results_root) / "eval_agent_memory" / "EVAL_AGENTS.md"
if eval_agents_md.exists():
auxiliary_descriptions = _extract_metric_descriptions(eval_agents_md)
logger.info(f"Extracted {len(auxiliary_descriptions)} metric descriptions")
except Exception as e:
logger.warning(f"Failed to extract metric descriptions: {e}")
# Inject diagnostic report into primary_result BEFORE building complete_result,
# so it flows through both API response and disk metrics.
if ev2_agent is not None:
diagnostic = ev2_agent._extract_diagnostic_report()
if diagnostic:
existing = primary_result.get("text_feedback", "")
primary_result["text_feedback"] = (existing + "\n\n" + diagnostic) if existing else diagnostic
logger.info(f"Injected diagnostic report ({len(diagnostic)} chars) into primary_result")
# 4. Merge results (rich format for API response)
complete_result = {
"combined_score": primary_result.get("combined_score", 0.0),
"correct": primary_result.get("correct", False),
"primary": primary_result,
"auxiliary": auxiliary_results,
"auxiliary_descriptions": auxiliary_descriptions,
"timestamp": time.time(),
"generation": request.generation
}
complete_result = _sanitize_for_json(complete_result)
# Safety: suppress aux metrics if they correlate negatively with score
if auxiliary_results and _should_suppress_aux_metrics(local_results_root, request.generation):
logger.warning("⚠️ Auto-suppressing auxiliary metrics due to negative correlation with score")
auxiliary_results = {}
auxiliary_descriptions = {}
# Build scheduler-compatible metrics.json so that load_results()
# (shinka/utils/general.py) can read it correctly on resume.
public_metrics = dict(primary_result.get("public", {}))
if auxiliary_results:
for key, value in auxiliary_results.items():
if isinstance(value, dict) and "error" not in value:
for sub_key, sub_value in value.items():
if isinstance(sub_value, (int, float, bool, str)):
public_metrics[f"aux_{sub_key}"] = sub_value
elif isinstance(value, (int, float, bool, str)):
public_metrics[f"aux_{key}"] = value
text_feedback = primary_result.get("text_feedback", "")
if auxiliary_descriptions:
desc_text = "\n\n# Auxiliary Metrics Guide\n\n"
for metric_name, description in auxiliary_descriptions.items():
desc_text += f"- **{metric_name}**: {description}\n"
text_feedback = (text_feedback + desc_text) if text_feedback else desc_text
disk_metrics = {
"combined_score": primary_result.get("combined_score", 0.0),
"public": public_metrics,
"private": primary_result.get("private", {}),
"text_feedback": text_feedback,
"_eval_service": {
"auxiliary": auxiliary_results,
"auxiliary_descriptions": auxiliary_descriptions,
},
}
# 4. Save metrics.json (scheduler-compatible format)
logger.info(f"📊 Step 3/6: Saving metrics.json...")
save_metrics_file(request.results_dir, _sanitize_for_json(disk_metrics))
# 5. Record to history
logger.info(f"📊 Step 4/6: Recording to history...")
local_service_state.add_generation({
"generation": request.generation,
"primary_score": complete_result["combined_score"],
"results_dir": request.results_dir,
"timestamp": time.time()
})
# 6. Decide whether to trigger agent
logger.info(f"📊 Step 5/6: Deciding whether to trigger agent...")
should_trigger, reason = local_service_state.should_trigger_agent(
request.generation,
complete_result["combined_score"]
)
behavior_logging.log_event(
request.results_dir,
"trigger_decision",
{
"generation": request.generation,
"mode": "evaluation",
"should_trigger": bool(should_trigger),
"reason": reason,
"primary_score": complete_result["combined_score"],
},
)
logger.info(f" Decision: {'🧠 TRIGGER' if should_trigger else '⏭️ SKIP'} - {reason}")
# 7. Trigger agent if needed
if should_trigger and local_ev2_agent is not None:
logger.info(f"📊 Step 6/6: Running agent analysis...")
try:
agent_result = await local_ev2_agent.analyze_generation(request.generation)
complete_result["agent_analysis"] = agent_result
# Evaluate agent candidate if present
candidate_code = agent_result.get("candidate_code")
if candidate_code and not agent_result.get("skipped"):
logger.info("🔬 Evaluating agent candidate code...")
try:
candidate_eval = await _evaluate_agent_candidate(
candidate_code, request, local_ev2_agent
)
complete_result["agent_candidate"] = candidate_eval
logger.info(
f"🔬 Agent candidate score: {candidate_eval.get('combined_score', 0):.2f} "
f"(current best: {complete_result['combined_score']:.2f})"
)
except Exception as e:
logger.error(f"❌ Agent candidate evaluation failed: {e}")
complete_result["agent_candidate"] = {"error": str(e)}
# Only advance trigger counter if agent actually ran (not skipped)
if not agent_result.get("skipped"):
# Snapshot current aux metric names for feedback loop
active_aux = sorted(
k for k in public_metrics if k.startswith("aux_")
)
local_service_state.mark_agent_triggered(
request.generation, active_metrics=active_aux
)
behavior_logging.log_event(
request.results_dir,
"agent_trigger_result",
{
"generation": request.generation,
"mode": "evaluation",
"success": agent_result.get("success", True),
"skipped": agent_result.get("skipped", False),
"elapsed_seconds": agent_result.get("elapsed_seconds", 0),
},
)
logger.info(
f"✅ Agent analysis completed"
if not agent_result.get("skipped")
else f"⏭️ Agent analysis skipped (busy)"
)
except Exception as e:
logger.error(f"❌ Agent analysis failed: {e}", exc_info=True)
complete_result["agent_analysis"] = {"error": str(e)}
behavior_logging.log_event(
request.results_dir,
"agent_trigger_result",
{
"generation": request.generation,
"mode": "evaluation",
"success": False,
"error": str(e),
},
)
else:
logger.info(f"📊 Step 6/6: Agent not triggered")
# 8. Update job status
updated = await _update_evaluation_job(
job_id,
status="completed",
result=_sanitize_for_json(complete_result),
completed_at=time.time(),
)
if not updated:
logger.warning(f"Job {job_id} was cleared before completion state update")
logger.info("=" * 80)
logger.info(f"✅ Full evaluation completed for generation {request.generation}")
logger.info(f" Score: {complete_result['combined_score']:.4f}")
logger.info(f" Auxiliary metrics: {len(auxiliary_results)} metrics")
logger.info(f" Agent triggered: {should_trigger}")
logger.info("=" * 80)
except Exception as e:
logger.error("=" * 80)
logger.error(f"❌ Full evaluation failed for generation {request.generation}")
logger.error(f" Error: {e}")
logger.error("=" * 80)
updated = await _update_evaluation_job(
job_id,
status="failed",
error=str(e),
completed_at=time.time(),
)
if not updated:
logger.warning(f"Job {job_id} was cleared before failure state update")
# ============================================================================
# API Endpoints
# ============================================================================
@app.post("/api/v1/initialize", response_model=InitializeResponse)
async def initialize_service(request: InitializeRequest):
"""
Initialize/reset service state for a new experiment.
This endpoint provides explicit control over experiment initialization.
It resets all state, clears old jobs, and prepares the service for a fresh start.
Thread-safe: Uses lock to prevent concurrent initialization conflicts.
"""
global ev2_agent, service_config, service_state
start_time = time.time()
# ===== Thread-safe initialization =====
async with _state_lock:
logger.info("=" * 80)
logger.info("🔧 INITIALIZE: Resetting service state for new experiment")
logger.info(f" Raw results_dir: {request.results_dir}")
logger.info("=" * 80)
experiment_root = _normalize_experiment_root(request.results_dir)
experiment_root_str = str(experiment_root)
try:
if service_config is None:
service_config = ServiceConfig()
# Update config overrides
prev_trigger_mode = service_config.trigger_mode
prev_trigger_interval = service_config.trigger_interval
service_config.results_dir = experiment_root_str
if request.primary_evaluator:
service_config.primary_evaluator_path = request.primary_evaluator
if request.experiment_name:
service_config.experiment_name = request.experiment_name
if request.trigger_mode:
service_config.trigger_mode = request.trigger_mode
logger.info(
f" 🔁 trigger_mode override: {prev_trigger_mode} -> {service_config.trigger_mode}"
)
if request.trigger_interval is not None:
service_config.trigger_interval = request.trigger_interval
logger.info(
f" 🔁 trigger_interval override: {prev_trigger_interval} -> {service_config.trigger_interval}"
)
if request.problem_statement:
service_config.problem_statement = request.problem_statement
if request.evaluator_kwargs:
service_config.evaluator_kwargs = request.evaluator_kwargs
# Reset state with force_clean=True (don't load old state from disk)
service_state = ServiceState(service_config, force_clean=True)
logger.info(f" ✅ Service state reset (clean start)")
# Clear old evaluation jobs
evaluation_jobs.clear()
logger.info(f" ✅ Evaluation jobs cleared")
# Initialize agent
ev2_agent = IntegratedEV2Agent(
results_dir=experiment_root_str,
primary_evaluator_path=service_config.primary_evaluator_path,
config=service_config,
problem_statement=service_config.problem_statement,
evaluator_kwargs=service_config.evaluator_kwargs,
)
logger.info(f" ✅ Agent initialized")
processing_time = (time.time() - start_time) * 1000
logger.info("=" * 80)
logger.info("✅ INITIALIZE COMPLETE")
logger.info(f" Experiment root: {experiment_root_str}")
logger.info(f" Processing time: {processing_time:.1f}ms")
logger.info("=" * 80)
return InitializeResponse(
status="ready",
message="Service initialized for new experiment",
results_dir=experiment_root_str,
agent_initialized=True,
processing_time_ms=processing_time
)
except Exception as e:
logger.error(f"❌ Initialize failed: {e}", exc_info=True)
processing_time = (time.time() - start_time) * 1000
return InitializeResponse(
status="error",
message=str(e),
results_dir=experiment_root_str,
agent_initialized=False,
processing_time_ms=processing_time
)
@app.post("/api/v1/generation/complete", response_model=ServiceResponse)
async def generation_complete(
request: GenerationCompleteRequest,
background_tasks: BackgroundTasks
):
"""
Generation complete handler (evaluation mode only).
"""
global ev2_agent, service_config, service_state
start_time = time.time()
logger.info("=" * 80)
logger.info(f"📊 EVALUATION MODE: Generation {request.generation}")
logger.info(f" Code path: {request.code_path}")
logger.info(f" Evaluator: {request.evaluator_module}.{request.evaluator_function}")
logger.info("=" * 80)
# Ensure agent is initialized (extract experiment root from results_dir)
# results_dir can be: /path/to/experiment, /gen_10, or /gen_10/results
experiment_root = _normalize_experiment_root(request.results_dir)
experiment_root_str = str(experiment_root)
async with _state_lock:
if service_config is None:
service_config = ServiceConfig()
current_results_dir = str(service_config.results_dir) if service_config.results_dir else ""
# Initialize agent if needed (auto-detect fallback)
if ev2_agent is None or experiment_root_str != current_results_dir:
if ev2_agent is None:
logger.warning("⚠️ DEPRECATED: Auto-initialization on first generation.")
logger.warning(" Please call POST /api/v1/initialize before sending generations.")
logger.info(f"🔧 Initializing agent for experiment: {experiment_root_str}")
else:
logger.warning("⚠️ DEPRECATED: Auto-detection of new experiment.")
logger.warning(" Please call POST /api/v1/initialize for new experiments.")
logger.info(f"🔄 Detected new experiment - Reinitializing state and agent: {experiment_root_str}")
try:
service_config.results_dir = experiment_root_str
# Reset service_state for new experiment to avoid cross-experiment history.
service_state = ServiceState(service_config)
logger.info(" ✅ Service state reset for new experiment")
# Clear old evaluation jobs from previous experiment.
evaluation_jobs.clear()
logger.info(" ✅ Evaluation jobs cleared")
ev2_agent = IntegratedEV2Agent(
results_dir=experiment_root_str,
primary_evaluator_path=service_config.primary_evaluator_path,
config=service_config,
problem_statement=service_config.problem_statement,
evaluator_kwargs=service_config.evaluator_kwargs,
)
logger.info(" ✅ Agent initialized")
except Exception as e:
logger.error(f"❌ Failed to initialize agent: {e}", exc_info=True)
# Create evaluation job (after any reset)
job_id = f"eval_{request.generation}_{int(time.time())}"
evaluation_jobs[job_id] = {
"status": "pending",
"request": request,
"generation": request.generation,
"result": None,
"created_at": time.time(),
"completed_at": None
}
# Start background evaluation (non-blocking)
background_tasks.add_task(
run_full_evaluation,
job_id=job_id,
request=request
)
# Return immediately
processing_time = (time.time() - start_time) * 1000
logger.info(f"✅ Evaluation job submitted: {job_id} (response time: {processing_time:.1f}ms)")
return ServiceResponse(
status="accepted",
message=f"Evaluation started for generation {request.generation}",
generation=request.generation,
job_id=job_id,
estimated_time=15.0,
agent_triggered=False, # Agent may be triggered later in background
trigger_reason="Will be determined after evaluation",
processing_time_ms=processing_time
)
@app.get("/api/v1/generation/{generation}/status")
async def get_generation_status(generation: int):
"""
Query evaluation status for a specific generation
Args:
generation: Generation number
Returns:
Status and result (if completed)
"""
# Find the evaluation job for this generation
job = None
job_id = None
async with _state_lock:
for jid, j in evaluation_jobs.items():
if j["generation"] == generation:
job = dict(j)
job_id = jid
break
if job is None:
raise HTTPException(
status_code=404,
detail=f"Generation {generation} not found in evaluation jobs"
)
response = {
"generation": generation,
"job_id": job_id,
"status": job["status"],
"created_at": job["created_at"],
"elapsed_time": time.time() - job["created_at"]
}
if job["status"] == "completed":
response["result"] = job.get("result")
response["completed_at"] = job.get("completed_at")
elif job["status"] == "failed":
response["error"] = job.get("error")
return _sanitize_for_json(response)
# DEPRECATED: Use /api/v1/generation/{generation}/status instead
# Keeping for backward compatibility only
@app.get("/api/v1/evaluate/{job_id}")
async def get_evaluation_job_status_deprecated(job_id: str):
"""
[DEPRECATED] Query evaluation status by job ID
Use /api/v1/generation/{generation}/status instead.
This endpoint is kept for backward compatibility only.
Args:
job_id: Job ID returned by generation complete
Returns:
Status and result (if completed)
"""
async with _state_lock:
if job_id not in evaluation_jobs:
raise HTTPException(
status_code=404,
detail=f"Job {job_id} not found. Use /api/v1/generation/{{generation}}/status instead."
)
job = dict(evaluation_jobs[job_id])
response = {
"job_id": job_id,
"generation": job["generation"],
"status": job["status"],
"created_at": job["created_at"],
"elapsed_time": time.time() - job["created_at"],
"deprecated": True,
"message": "Use /api/v1/generation/{generation}/status instead"
}
if job["status"] == "completed":
response["result"] = job.get("result")
response["completed_at"] = job.get("completed_at")
elif job["status"] == "failed":
response["error"] = job.get("error")
return _sanitize_for_json(response)
@app.get("/api/v1/status", response_model=ServiceStatusResponse)
async def get_status():
"""Get service status"""
if service_state is None or service_config is None:
raise HTTPException(status_code=500, detail="Service not initialized")
return ServiceStatusResponse(
status="running",
uptime_seconds=time.time() - service_state.start_time,
version="2.0.0-standalone",
experiment={
"name": service_config.experiment_name,
"results_dir": service_config.results_dir,
"primary_evaluator": service_config.primary_evaluator_path
},
statistics=service_state.get_statistics(),
config={
"trigger_mode": service_config.trigger_mode,
"trigger_interval": service_config.trigger_interval,
"agent_enabled": service_config.agent_enabled,
"agent_initialized": ev2_agent is not None
}
)
@app.post("/api/v1/trigger/manual")
async def trigger_manual(generation: int):
"""Manually trigger agent analysis for a specific generation"""
logger.info(f"🔧 Manual trigger: generation {generation}")
if ev2_agent is None:
raise HTTPException(status_code=500, detail="Agent not initialized")
# Find the generation in history
gen_data = None
for g in reversed(service_state.generation_history):
if g['generation'] == generation:
gen_data = g
break
if gen_data is None:
raise HTTPException(
status_code=404,
detail=f"Generation {generation} not found in history"
)
# Run agent
try:
result = await ev2_agent.analyze_generation(generation)
service_state.mark_agent_triggered(generation, active_metrics=[])
return {
"status": "success",
"message": f"Agent triggered for generation {generation}",
"result": result
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Agent failed: {str(e)}")
@app.get("/")
async def root():
"""Root endpoint"""
return {
"service": "EV2 Evaluation Service (Standalone)",
"version": "2.0.0",
"status": "running",
"docs": "/docs"
}
# ============================================================================
# CLI Entry Point
# ============================================================================
def main():
"""Main entry point"""
import argparse
parser = argparse.ArgumentParser(description="EV2 Evaluation Service (Standalone)")
parser.add_argument("--worker-eval-request", type=str, help=argparse.SUPPRESS)
parser.add_argument("--worker-eval-output", type=str, help=argparse.SUPPRESS)
parser.add_argument(
"--config",
type=str,
help="Path to YAML config file"
)
# Or specify directly
parser.add_argument("--results-dir", type=str, help="Results directory")
parser.add_argument("--primary-evaluator", type=str, help="Path to primary evaluator")
parser.add_argument("--evaluation-timeout", type=float, default=300.0,
help="Timeout for each evaluation in seconds (default: 300)")
parser.add_argument("--trigger-mode", type=str, default="periodic",
choices=["always", "periodic", "plateau", "mixed"])
parser.add_argument("--trigger-interval", type=int, default=10)
parser.add_argument("--port", type=int, default=8765)
parser.add_argument("--host", type=str, default="0.0.0.0")
args = parser.parse_args()
# Worker mode for hard-timeout evaluator execution.
if args.worker_eval_request or args.worker_eval_output:
if not args.worker_eval_request or not args.worker_eval_output:
logger.error(
"Both --worker-eval-request and --worker-eval-output are required in worker mode"
)
raise SystemExit(2)
raise SystemExit(
_run_primary_evaluator_worker(
request_path=args.worker_eval_request,
output_path=args.worker_eval_output,
)
)
global service_config
# Load config
if args.config:
logger.info(f"📋 Loading config from {args.config}")
service_config = ServiceConfig.from_yaml(args.config)
else:
# Create from args.
# results_dir/primary_evaluator are optional here because ShinkaEvolve
# can provide them at runtime via POST /api/v1/initialize.
service_config = ServiceConfig(
results_dir=args.results_dir or "",
primary_evaluator_path=args.primary_evaluator or "",
evaluation_timeout=args.evaluation_timeout,
trigger_mode=args.trigger_mode,
trigger_interval=args.trigger_interval,
host=args.host,
port=args.port
)
# Start server
logger.info(f"🚀 Starting server on {service_config.host}:{service_config.port}")
uvicorn.run(
app,
host=service_config.host,
port=service_config.port,
log_level=service_config.log_level.lower()
)
if __name__ == "__main__":
main()