""" EV2 Service - Standalone Version A complete, self-contained evaluation service that integrates OpenHands agent directly without depending on ev2.py. Key features: - Event-driven architecture (receive generation notifications) - Autonomous decision-making (when to trigger agent) - Persistent agent state (across generations) - Direct OpenHands integration (no wrapper) Author: Evolution Evaluation System Version: 2.0 (Standalone) """ import os import sys import json import time import logging import asyncio import traceback import tempfile import math import hashlib import numbers from pathlib import Path from typing import Dict, Any, Optional, List from dataclasses import dataclass, asdict try: import eval_agent.logging as behavior_logging except ModuleNotFoundError: import importlib.util _behavior_logging_path = Path(__file__).with_name("logging.py") _behavior_logging_spec = importlib.util.spec_from_file_location( "behavior_logging", str(_behavior_logging_path) ) behavior_logging = importlib.util.module_from_spec(_behavior_logging_spec) assert _behavior_logging_spec is not None and _behavior_logging_spec.loader is not None _behavior_logging_spec.loader.exec_module(behavior_logging) try: from eval_agent.utils import build_meta_recommendation_context_lines except ModuleNotFoundError: import importlib.util _utils_path = Path(__file__).with_name("utils.py") _utils_spec = importlib.util.spec_from_file_location("eval_agent_utils", str(_utils_path)) _utils_module = importlib.util.module_from_spec(_utils_spec) assert _utils_spec is not None and _utils_spec.loader is not None _utils_spec.loader.exec_module(_utils_module) build_meta_recommendation_context_lines = _utils_module.build_meta_recommendation_context_lines # FastAPI imports from fastapi import FastAPI, HTTPException, BackgroundTasks from fastapi.responses import JSONResponse from pydantic import BaseModel, Field import uvicorn import yaml # OpenHands imports (same as ev2.py) from openhands.sdk import LLM, Agent, Conversation, Tool from openhands.tools.file_editor import FileEditorTool from openhands.tools.task_tracker import TaskTrackerTool from openhands.tools.terminal import TerminalTool # ============================================================================ # Configuration # ============================================================================ @dataclass class ServiceConfig: """Service configuration""" # Server settings host: str = "0.0.0.0" port: int = 8765 log_level: str = "INFO" # Experiment settings experiment_name: str = "" results_dir: str = "" primary_evaluator_path: str = "" problem_statement: str = "" # Problem description for diagnostic context evaluator_kwargs: Optional[Dict[str, Any]] = None # Task-specific kwargs (e.g., problem_id, frontier_cs_dir) # Evaluation settings evaluation_timeout: float = 300.0 # Maximum time for evaluation (seconds), default 5 minutes # Trigger strategy trigger_mode: str = "periodic" # "periodic", "plateau", "mixed", "always" trigger_interval: int = 10 # Run agent every N generations plateau_threshold: float = 0.01 plateau_window: int = 10 # Agent settings agent_enabled: bool = True llm_model: str = "" # Empty = use env var llm_api_key: str = "" # Empty = use env var llm_base_url: str = "" # Empty = use env var @classmethod def from_yaml(cls, config_path: str) -> 'ServiceConfig': """Load config from YAML file""" with open(config_path) as f: data = yaml.safe_load(f) return cls( host=data.get('service', {}).get('host', '0.0.0.0'), port=data.get('service', {}).get('port', 8765), log_level=data.get('service', {}).get('log_level', 'INFO'), experiment_name=data.get('experiment', {}).get('name', ''), results_dir=data.get('experiment', {}).get('results_dir', ''), primary_evaluator_path=data.get('experiment', {}).get('primary_evaluator', ''), evaluation_timeout=data.get('evaluation', {}).get('timeout', 300.0), trigger_mode=data.get('strategy', {}).get('trigger_mode', 'periodic'), trigger_interval=data.get('strategy', {}).get('trigger_interval', 10), plateau_threshold=data.get('strategy', {}).get('plateau_threshold', 0.01), plateau_window=data.get('strategy', {}).get('plateau_window', 10), agent_enabled=data.get('agent', {}).get('enabled', True), llm_model=data.get('agent', {}).get('llm_model', ''), llm_api_key=data.get('agent', {}).get('llm_api_key', ''), llm_base_url=data.get('agent', {}).get('llm_base_url', ''), ) # ============================================================================ # Request/Response Models # ============================================================================ class GenerationCompleteRequest(BaseModel): """ Generation complete notification (evaluation mode only). """ generation: int = Field(..., description="Generation number") results_dir: str = Field(..., description="Path to generation results directory") code_path: str = Field(..., description="Path to the generated code") evaluator_module: str = Field(..., description="Python module path (e.g., 'examples.circle_packing.evaluate')") evaluator_function: str = Field("evaluate", description="Evaluator function name (default: 'evaluate')") evaluator_kwargs: Optional[Dict[str, Any]] = Field(None, description="Additional kwargs for evaluator") # ===== Optional metadata ===== stage: Optional[str] = Field(None, description="Evolution stage") metadata: Optional[Dict[str, Any]] = Field(None, description="Additional metadata") class ServiceResponse(BaseModel): """ Service response for async evaluation mode. """ status: str = Field(..., description="Status: accepted | completed | skipped | error") message: str = Field(..., description="Human-readable message") generation: int # ===== Async evaluation mode ===== job_id: Optional[str] = Field(None, description="Job ID for async evaluation (evaluation mode only)") estimated_time: Optional[float] = Field(None, description="Estimated evaluation time in seconds") # ===== Agent decision info ===== agent_triggered: bool = Field(..., description="Whether agent was triggered") trigger_reason: Optional[str] = Field(None, description="Why agent was/wasn't triggered") # ===== Results (if completed) ===== evaluation_result: Optional[Dict[str, Any]] = Field(None, description="Evaluation result (if completed)") insights: Optional[List[str]] = Field(None, description="Agent insights (if agent was triggered)") auxiliary_metrics: Optional[Dict[str, Any]] = Field(None, description="Auxiliary metrics info") # ===== Timing ===== processing_time_ms: float class ServiceStatusResponse(BaseModel): """Service status information""" status: str = "running" uptime_seconds: float version: str = "2.0.0-standalone" experiment: Dict[str, Any] statistics: Dict[str, Any] config: Dict[str, Any] class InitializeRequest(BaseModel): """ Initialize/reset service state for a new experiment. """ results_dir: str = Field(..., description="Experiment root directory") primary_evaluator: Optional[str] = Field(None, description="Path to primary evaluator") experiment_name: Optional[str] = Field(None, description="Experiment name") trigger_mode: Optional[str] = Field(None, description="Trigger mode override") trigger_interval: Optional[int] = Field(None, description="Trigger interval override") problem_statement: Optional[str] = Field(None, description="Problem description for diagnostic context") evaluator_kwargs: Optional[Dict[str, Any]] = Field(None, description="Task-specific kwargs (e.g., problem_id, frontier_cs_dir)") class InitializeResponse(BaseModel): """ Initialize/reset response. """ status: str = Field(..., description="Status: ready | error") message: str = Field(..., description="Human-readable message") results_dir: str agent_initialized: bool processing_time_ms: float # ============================================================================ # Integrated EV2 Agent # ============================================================================ class IntegratedEV2Agent: """ Integrated EV2 Agent - Direct OpenHands Management This class replaces the call to evolution_evaluation_agent() in ev2.py with direct, integrated agent management. Key differences from ev2.py: - Agent instance can be persistent (reused across calls) - State management is integrated with service - No subprocess calls """ def __init__(self, results_dir: str, primary_evaluator_path: str, config: ServiceConfig, problem_statement: str = "", evaluator_kwargs: Optional[Dict[str, Any]] = None): """ Initialize integrated agent Args: results_dir: Path to results directory primary_evaluator_path: Path to primary evaluator (ground truth) config: Service configuration problem_statement: Problem description for diagnostic context evaluator_kwargs: Task-specific kwargs (e.g., problem_id, frontier_cs_dir) """ # Store paths as absolute self.results_dir = Path(results_dir).resolve() self.primary_evaluator_path = Path(primary_evaluator_path).resolve() self.config = config self.problem_statement = problem_statement self.evaluator_kwargs = evaluator_kwargs or {} # Agent workspace (same as ev2.py: results_dir/eval_agent_memory) self.workspace = self.results_dir / "eval_agent_memory" self.workspace.mkdir(parents=True, exist_ok=True) self._bootstrap_memory_files() # Validate primary evaluator exists if not self.primary_evaluator_path.exists(): raise FileNotFoundError( f"Primary evaluator not found: {self.primary_evaluator_path}" ) # Agent components (will be created on first use) self._agent = None self._llm = None # Prevent concurrent agent runs (non-blocking: skip if busy) self._run_lock = asyncio.Lock() logging.info("=" * 80) logging.info("✅ IntegratedEV2Agent Initialized") logging.info("=" * 80) logging.info(f"Results Dir: {self.results_dir}") logging.info(f"Workspace: {self.workspace}") logging.info(f"Primary Evaluator: {self.primary_evaluator_path}") logging.info("=" * 80) def _get_last_agent_trigger_gen(self) -> int: """Read last_agent_trigger_gen from persisted service state.""" state_file = self.workspace / "service_state.json" if state_file.exists(): try: with open(state_file) as f: return json.load(f).get("last_agent_trigger_gen", -1) except Exception: pass return -1 def _build_case_analysis(self, current_gen: int) -> List[str]: """Build per-case analysis from metrics.json for the current and best generations.""" lines: List[str] = [] results_path = self.results_dir # Load current gen metrics current_metrics_path = results_path / f"gen_{current_gen}" / "results" / "metrics.json" if not current_metrics_path.exists(): return lines try: with open(current_metrics_path) as f: data = json.load(f) except Exception: return lines public = data.get("public", {}) n_cases = public.get("n_cases", 0) if n_cases == 0: return lines # Classify cases tle_cases, wa_cases, partial_cases, perfect_cases = [], [], [], [] partial_ratios = [] TLE_THRESHOLD_MS = 1800 for i in range(min(n_cases, 70)): # public metrics have up to 20 cases, but check available ratio_key = f"case_{i}_ratio" time_key = f"case_{i}_time_ms" if ratio_key not in public: break ratio = public[ratio_key] time_ms = public.get(time_key, 0) if ratio >= 1.0: perfect_cases.append(i) elif ratio <= 0 and time_ms >= TLE_THRESHOLD_MS: tle_cases.append(i) elif ratio <= 0: wa_cases.append(i) else: partial_cases.append(i) partial_ratios.append(ratio) reported = len(tle_cases) + len(wa_cases) + len(partial_cases) + len(perfect_cases) lines.append("") lines.append(f"📊 Per-Case Analysis (gen {current_gen}, {reported} cases reported of {n_cases} total):") def _fmt_cases(case_list: List[int], limit: int = 10) -> str: if len(case_list) <= limit: return ", ".join(str(c) for c in case_list) return ", ".join(str(c) for c in case_list[:limit]) + f"... (+{len(case_list)-limit} more)" lines.append(f" TLE (>{TLE_THRESHOLD_MS}ms, ratio=0): {len(tle_cases)}/{reported}" + (f" — cases {_fmt_cases(tle_cases)}" if tle_cases else "")) lines.append(f" WA (ratio=0, not TLE): {len(wa_cases)}/{reported}" + (f" — cases {_fmt_cases(wa_cases)}" if wa_cases else "")) if partial_ratios: lines.append(f" Partial (0 best_score: best_score = s best_gen = g except Exception: pass if best_score > 0: lines.append(f" Best gen so far: gen_{best_gen}, score {best_score:.2f}") # Append current gen's aux metric values if available if current_metrics_path.exists(): try: with open(current_metrics_path) as f: data = json.load(f) pub = data.get("public", {}) aux_vals = {k: v for k, v in pub.items() if k.startswith("aux_") and not k.startswith("aux_aux_metric_")} if aux_vals: lines.append(f" Auxiliary metrics: " + ", ".join(f"{k}={v:.4f}" if isinstance(v, float) else f"{k}={v}" for k, v in sorted(aux_vals.items()))) except Exception: pass return lines def _build_aux_metric_trends(self, current_gen: int) -> List[str]: """Build a table of auxiliary metric values over recent generations.""" lines: List[str] = [] results_path = self.results_dir # Framework keys to filter out FRAMEWORK_KEYS = { "aux_aux_metric_eval_success", "aux_aux_metric_error_code", "aux_aux_metric_error_message_length", "aux_aux_metric_error_detail_length", "aux_aux_metric_non_numeric_dropped_count", } # Collect aux metrics from last 10 gens hist_start = max(0, current_gen - 10) records: List[tuple] = [] # (gen, score, {aux_key: value}) all_aux_keys: set = set() for gen in range(hist_start, current_gen + 1): mp = results_path / f"gen_{gen}" / "results" / "metrics.json" if not mp.exists(): continue try: with open(mp) as f: data = json.load(f) score = data.get("combined_score", 0) or 0 pub = data.get("public", {}) aux = {k: v for k, v in pub.items() if k.startswith("aux_") and k not in FRAMEWORK_KEYS and isinstance(v, (int, float))} if aux: all_aux_keys.update(aux.keys()) records.append((gen, score, aux)) except Exception: continue if not all_aux_keys: lines.append("") lines.append("📈 Auxiliary Metrics: No custom metrics defined yet. Write auxiliary_metrics.py to start measuring.") return lines # Build table sorted_keys = sorted(all_aux_keys) header = "| Gen | " + " | ".join(k.replace("aux_", "") for k in sorted_keys) + " | Score |" sep = "|-----|" + "|".join("-" * max(len(k.replace("aux_", "")), 7) for k in sorted_keys) + "|-------|" lines.append("") lines.append("📈 Auxiliary Metric Trends:") lines.append(header) lines.append(sep) for gen, score, aux in records: vals = [] for k in sorted_keys: v = aux.get(k) vals.append(f"{v:.4f}" if isinstance(v, float) else str(v) if v is not None else "N/A") lines.append(f"| {gen:3d} | " + " | ".join(f"{v:>{max(len(k.replace('aux_', '')), 7)}}" for v, k in zip(vals, sorted_keys)) + f" | {min(score, 100):5.1f} |") return lines def _bootstrap_memory_files(self): """Create expected memory files if they do not exist yet.""" eval_agents_md = self.workspace / "EVAL_AGENTS.md" if not eval_agents_md.exists(): eval_agents_md.write_text( "# EV2 Agent Memory\n\n" "- Initialized by eval service.\n" "- Use this file as compact cross-generation memory.\n", encoding="utf-8", ) auxiliary_metrics_py = self.workspace / "auxiliary_metrics.py" if not auxiliary_metrics_py.exists(): auxiliary_metrics_py.write_text( "def evaluate_aux(results_dir, primary_result=None):\n" " \"\"\"Return auxiliary metrics as a dict.\"\"\"\n" " return {}\n", encoding="utf-8", ) def _get_code_ext(self) -> str: """Get the code file extension by checking gen_0 for existing files.""" for ext in [".cpp", ".py", ".java", ".go", ".rs", ".c"]: candidate = self.results_dir / "gen_0" / f"main{ext}" if candidate.exists(): return ext return ".cpp" # default def _extract_agent_candidate(self) -> Optional[str]: """Check if agent wrote a candidate code file. Returns code string or None.""" ext = self._get_code_ext() candidate_path = self.workspace / f"agent_candidate{ext}" if candidate_path.exists(): code = candidate_path.read_text(encoding="utf-8").strip() if code: logging.info(f"📝 Found agent candidate: {candidate_path} ({len(code)} chars)") return code return None def _create_llm(self) -> LLM: """ Create LLM instance Migrated from ev2.py lines 54-58 Uses same environment variable logic """ # Get LLM config (prefer service config, fallback to env vars) model = self.config.llm_model or os.getenv("LLM_MODEL", "vertex_ai/gemini-2.5-flash") api_key = self.config.llm_api_key or os.getenv("LLM_API_KEY") base_url = self.config.llm_base_url or os.getenv("LLM_BASE_URL", None) log_completions = OPENHANDS_LOG_COMPLETIONS default_completion_dir = str(self.workspace / "llm_completions") log_completions_folder = os.getenv( "OPENHANDS_LOG_COMPLETIONS_DIR", default_completion_dir, ) logging.info(f"🤖 Creating LLM: {model}") logging.info(f" OpenHands completion logging: {log_completions}") if log_completions: logging.info(f" Completion log dir: {log_completions_folder}") llm = LLM( model=model, api_key=api_key, base_url=base_url, log_completions=log_completions, log_completions_folder=log_completions_folder, ) return llm def _create_agent(self) -> Agent: """ Create OpenHands Agent Migrated from ev2.py lines 60-73 Exact same configuration as ev2.py """ # Load EV2 prompt template (same path as ev2.py) ev2_prompt_path = Path(__file__).parent / "ev2_prompt.j2" if not ev2_prompt_path.exists(): raise FileNotFoundError( f"EV2 prompt template not found: {ev2_prompt_path}" ) logging.info(f"📋 Loading prompt: {ev2_prompt_path}") # Create agent with tools (exact same as ev2.py) agent = Agent( llm=self._llm, tools=[ Tool(name=TerminalTool.name), Tool(name=FileEditorTool.name), Tool(name=TaskTrackerTool.name), ], system_prompt_filename=str(ev2_prompt_path), ) logging.info("✅ Agent created") return agent def _ensure_agent_ready(self): """Ensure agent is created and ready""" if self._llm is None: self._llm = self._create_llm() if self._agent is None: self._agent = self._create_agent() def _build_task_message(self, current_gen: int) -> str: """Build task message for eval agent.""" results_path = self.results_dir last_trigger = self._get_last_agent_trigger_gen() # Find the best generation since last trigger (most valuable to diagnose) search_start = max(0, last_trigger + 1) if last_trigger >= 0 else 0 target_gen = current_gen best_score = -1.0 for g in range(search_start, current_gen + 1): mp = results_path / f"gen_{g}" / "results" / "metrics.json" if mp.exists(): try: with open(mp) as f: s = json.load(f).get("combined_score", 0) or 0 if s > best_score: best_score = s target_gen = g except Exception: pass target_metrics = results_path / f"gen_{target_gen}" / "results" / "metrics.json" target_score = None if target_metrics.exists(): try: with open(target_metrics) as f: target_score = json.load(f).get("combined_score", None) except Exception: pass # === File locations === task_parts = [ f"=== Generation {current_gen} Evaluation ===", "", "📁 File Locations (all absolute paths):", f"- Results directory: {self.results_dir}", f"- Agent candidate (MUST WRITE): {self.results_dir}/eval_agent_memory/agent_candidate{self._get_code_ext()}", f"- Diagnostic report (MUST WRITE): {self.results_dir}/eval_agent_memory/diagnostic_report.md", f"- Memory log (MUST WRITE): {self.results_dir}/eval_agent_memory/EVAL_AGENTS.md", f"- Auxiliary metrics: {self.results_dir}/eval_agent_memory/auxiliary_metrics.py", f"- Code to diagnose: {self.results_dir}/gen_{target_gen}/main.cpp", f"- Metrics to diagnose: {self.results_dir}/gen_{target_gen}/results/metrics.json", f"- All generations: {self.results_dir}/gen_0/ through {self.results_dir}/gen_{current_gen}/", ] if target_gen != current_gen: task_parts.append(f"- NOTE: Diagnosing gen {target_gen} (best since last trigger), not gen {current_gen} (latest)") if target_score is not None: task_parts.append(f"- Score: {target_score:.4f}") # === Aux metrics error warning === current_metrics = results_path / f"gen_{current_gen}" / "results" / "metrics.json" if current_metrics.exists(): try: with open(current_metrics) as f: _pub = json.load(f).get("public", {}) _err = _pub.get("aux_aux_metric_error_code", 0) if _err and _err > 0: task_parts.extend(["", f"⚠️ auxiliary_metrics.py had an error (code={int(_err)}) on gen {current_gen}. Fix or rewrite it."]) except Exception: pass # === Problem statement === if self.problem_statement: task_parts.extend(["", "📝 PROBLEM STATEMENT:", self.problem_statement[:4000]]) # === Code execution environment (task-specific) === if self.evaluator_kwargs and self.evaluator_kwargs.get("frontier_cs_dir"): fc_dir = self.evaluator_kwargs["frontier_cs_dir"] pid = self.evaluator_kwargs.get("problem_id", "0") problem_dir = Path(fc_dir) / "algorithmic" / "problems" / str(pid) testdata_dir = problem_dir / "testdata" is_interactive = (problem_dir / "interactor.cc").exists() # Find smallest test input smallest_test = "" if testdata_dir.exists(): in_files = sorted(testdata_dir.glob("*.in"), key=lambda f: f.stat().st_size) if in_files: smallest_test = str(in_files[0]) smallest_size = in_files[0].stat().st_size task_parts.extend(["", "🔧 Code Execution Environment:", f" Your auxiliary_metrics.py can compile and run the code locally using subprocess.", f" This is what makes your metrics non-trivial — measure actual program behavior.", f" In evaluate_aux(results_dir), results_dir points to gen_N/ and the code is at os.path.join(results_dir, 'main.cpp')", f" Compile: g++ -O2 -pipe -std=gnu++17 -o /tmp/main_test {{code_path}}", f" Testdata dir: {testdata_dir}", ]) if smallest_test: task_parts.append(f" Smallest test: {smallest_test} ({smallest_size} bytes)") if is_interactive: testlib_dir = Path(fc_dir) / "algorithmic" / "judge" / "include" task_parts.extend([ f" Problem type: interactive", f" Interactor: {problem_dir}/interactor.cc", f" Compile interactor: g++ -O2 -pipe -std=gnu++17 -I {testlib_dir} -o interactor interactor.cc", ]) else: task_parts.extend([ f" Problem type: default (stdin → stdout)", f" Run: timeout 5 ./main_test < {{test_input}} > output.txt", ]) # === Per-case analysis of target gen === case_analysis = self._build_case_analysis(target_gen) if case_analysis: task_parts.extend(case_analysis) # === Score trend === hist_start = max(0, current_gen - 10) task_parts.extend(["", "📈 Score Trend (last 10 gens):"]) if current_gen <= 0: task_parts.append("- No previous generations.") else: trend_tokens: List[str] = [] numeric_scores: List[tuple[int, float]] = [] for gen in range(hist_start, current_gen): mp = results_path / f"gen_{gen}" / "results" / "metrics.json" score = None if mp.exists(): try: with open(mp) as f: score = json.load(f).get("combined_score", None) except Exception: pass if isinstance(score, numbers.Real): trend_tokens.append(f"g{gen}: {float(score):.1f}") numeric_scores.append((gen, float(score))) else: trend_tokens.append(f"g{gen}: N/A") task_parts.append("- " + " | ".join(trend_tokens)) # === Aux metric trends === aux_trend_lines = self._build_aux_metric_trends(current_gen) if aux_trend_lines: task_parts.extend(aux_trend_lines) # === Toolbox APIs === task_parts.extend(["", "🧰 Toolbox APIs (if needed):", "- from eval_agent.tool_box import call_vision, call_tool", "- Do NOT import eval_agent/tool_box/_internal/*", ]) # === Run command hint === project_root = Path(__file__).parent.parent.resolve() task_parts.extend(["", "🔧 Test aux metrics (copy-paste to terminal):", f" python -c \"import sys; sys.path.insert(0,'{project_root}'); " f"import importlib.util, json; " f"spec=importlib.util.spec_from_file_location('aux','{self.workspace}/auxiliary_metrics.py'); " f"mod=importlib.util.module_from_spec(spec); spec.loader.exec_module(mod); " f"print(json.dumps(mod.evaluate_aux('{self.results_dir}/gen_{target_gen}'),indent=2))\"", ]) # === Feedback on previous actions === try: from eval_agent.feedback import compute_metric_feedback if last_trigger >= 0: feedback_text = compute_metric_feedback( results_dir=self.results_dir, current_gen=current_gen, last_agent_gen=last_trigger, ) if feedback_text: task_parts.append(feedback_text) except Exception as e: logging.warning(f"Failed to compute metric feedback: {e}") return "\n".join(task_parts) async def analyze_generation(self, generation: int) -> Dict[str, Any]: """ Analyze a generation using the agent. Uses a non-blocking lock so that concurrent triggers skip rather than queue up a backlog of long-running agent sessions. Args: generation: Generation number to analyze Returns: Dict with analysis results, or a skip-marker dict if the agent is busy """ if self._run_lock.locked(): logging.warning( f"⏭️ Agent busy — skipping analysis for generation {generation}. " "Next periodic trigger will catch up." ) return { "success": False, "generation": generation, "skipped": True, "reason": "agent_busy", } async with self._run_lock: return await self._analyze_generation_locked(generation) async def _analyze_generation_locked(self, generation: int) -> Dict[str, Any]: """Inner implementation that runs under the lock.""" logging.info("=" * 80) logging.info(f"🧠 EV2 Agent Analysis - Generation {generation}") logging.info("=" * 80) start_time = time.time() try: # Ensure agent is ready self._ensure_agent_ready() # Build task message (same as ev2.py) task_message = self._build_task_message(generation) task_hash = hashlib.sha256(task_message.encode("utf-8")).hexdigest() logging.info(f"📝 Task message: {len(task_message)} characters") logging.info(f"📁 Workspace: {self.workspace}") behavior_logging.save_text_artifact( str(self.results_dir), f"agent_runs/gen_{generation}_task_message.txt", task_message, ) behavior_logging.log_event( str(self.results_dir), "agent_run_start", { "generation": generation, "workspace": str(self.workspace), "task_message_chars": len(task_message), "task_message_sha256": task_hash, }, ) # Backup auxiliary_metrics.py before agent modifies it aux_py = self.workspace / "auxiliary_metrics.py" aux_bak = self.workspace / "auxiliary_metrics.py.bak" if aux_py.exists(): import shutil shutil.copy2(aux_py, aux_bak) # Create conversation (same as ev2.py line 76) conversation = Conversation( agent=self._agent, workspace=str(self.workspace) ) # Send message and run (same as ev2.py lines 85-91) logging.info("📤 Sending task to agent...") conversation.send_message(task_message) logging.info("🔄 Agent working...") await asyncio.to_thread(conversation.run) # Validate auxiliary_metrics.py after agent — revert on syntax error if aux_py.exists(): try: compile(aux_py.read_text(encoding="utf-8"), str(aux_py), "exec") logging.info("✅ auxiliary_metrics.py syntax OK") except SyntaxError as e: logging.warning(f"⚠️ auxiliary_metrics.py has syntax error: {e}. Reverting to backup.") if aux_bak.exists(): shutil.copy2(aux_bak, aux_py) logging.info("✅ Reverted to backup") if ENABLE_FULL_TRAJECTORY_LOG: try: from openhands.sdk.event.base import LLMConvertibleEvent events = list(conversation.state.events) llm_events = [e for e in events if isinstance(e, LLMConvertibleEvent)] llm_messages = LLMConvertibleEvent.events_to_messages(llm_events) trajectory_messages: List[Dict[str, Any]] = [] for msg in llm_messages: if hasattr(msg, "model_dump"): trajectory_messages.append(_sanitize_for_json(msg.model_dump())) else: trajectory_messages.append(_sanitize_for_json(dict(msg))) behavior_logging.save_text_artifact( str(self.results_dir), f"agent_runs/gen_{generation}_trajectory_messages.json", json.dumps(trajectory_messages, indent=2, ensure_ascii=False), ) behavior_logging.log_event( str(self.results_dir), "agent_trajectory_saved", { "generation": generation, "event_count": len(events), "llm_event_count": len(llm_events), "message_count": len(trajectory_messages), }, ) except Exception as e: behavior_logging.log_event( str(self.results_dir), "agent_trajectory_save_failed", { "generation": generation, "error": str(e), }, ) elapsed = time.time() - start_time logging.info("=" * 80) logging.info("✅ EV2 Evaluation Complete!") logging.info("=" * 80) logging.info(f"⏱️ Time: {elapsed:.1f}s") logging.info(f"📁 Workspace: {self.workspace}") logging.info(f"📝 Memory: {self.workspace}/EVAL_AGENTS.md") logging.info("=" * 80) # Extract results insights = self._extract_insights() metrics = self._extract_metrics() candidate_code = self._extract_agent_candidate() agent_result = { "success": True, "generation": generation, "workspace": str(self.workspace), "insights": insights, "auxiliary_metrics": metrics, "candidate_code": candidate_code, "elapsed_seconds": elapsed } behavior_logging.save_text_artifact( str(self.results_dir), f"agent_runs/gen_{generation}_result.json", json.dumps(_sanitize_for_json(agent_result), indent=2, ensure_ascii=False), ) behavior_logging.log_event( str(self.results_dir), "agent_run_end", { "generation": generation, "success": True, "elapsed_seconds": elapsed, "insight_count": len(insights), "auxiliary_metric_file_exists": bool(metrics.get("auxiliary_metrics_file_exists")), "auxiliary_metric_file_size_bytes": metrics.get("file_size_bytes", 0), }, ) return agent_result except Exception as e: behavior_logging.log_event( str(self.results_dir), "agent_run_end", { "generation": generation, "success": False, "elapsed_seconds": time.time() - start_time, "error": str(e), }, ) logging.error(f"❌ Agent analysis failed: {e}", exc_info=True) raise def _extract_insights(self) -> List[str]: """Extract insights from EVAL_AGENTS.md""" eval_agents_md = self.workspace / "EVAL_AGENTS.md" if not eval_agents_md.exists(): return [] insights = [] content = eval_agents_md.read_text() # Extract bullet points (simple heuristic) for line in content.split('\n'): stripped = line.strip() if stripped.startswith('*') or stripped.startswith('-'): insights.append(stripped) # Return last 10 insights return insights[-10:] if insights else [] def _extract_metrics(self) -> Dict[str, Any]: """Extract auxiliary metrics information""" auxiliary_py = self.workspace / "auxiliary_metrics.py" metrics = { "auxiliary_metrics_file_exists": auxiliary_py.exists(), } if auxiliary_py.exists(): metrics["auxiliary_metrics_path"] = str(auxiliary_py) metrics["file_size_bytes"] = auxiliary_py.stat().st_size return metrics def _extract_diagnostic_report(self) -> str: """Extract diagnostic report written by the agent.""" report_path = self.workspace / "diagnostic_report.md" if not report_path.exists(): return "" try: content = report_path.read_text(encoding="utf-8").strip() # Limit to 2000 chars to avoid bloating text_feedback return content[:2000] if content else "" except Exception: return "" # ============================================================================ # Service State Management # ============================================================================ class ServiceState: """ Service state management (same as ev2_service.py) Maintains history and decides when to trigger agent """ def __init__(self, config: ServiceConfig, force_clean: bool = False): """ Initialize service state. Args: config: Service configuration force_clean: If True, start with clean state (don't load from disk) """ self.config = config # State tracking self.generation_history: List[Dict[str, Any]] = [] self.last_agent_trigger_gen: int = -1 self.total_notifications: int = 0 self.total_agent_runs: int = 0 self.agent_trigger_history: List[Dict[str, Any]] = [] # history of all agent triggers # Timing self.start_time = time.time() # Load previous state if exists (unless forced clean) if not force_clean: self._load_state() else: logging.info("🔄 Starting with clean state (not loading from disk)") self._save_state() # Save clean state to disk def _get_state_file(self) -> Path: """Get path to state file""" if self.config.results_dir: state_dir = Path(self.config.results_dir) / "eval_agent_memory" state_dir.mkdir(parents=True, exist_ok=True) return state_dir / "service_state.json" return Path("service_state.json") def _load_state(self): """Load previous state from disk""" state_file = self._get_state_file() if state_file.exists(): try: with open(state_file) as f: data = json.load(f) self.generation_history = data.get('generation_history', []) self.last_agent_trigger_gen = data.get('last_agent_trigger_gen', -1) self.total_notifications = data.get('total_notifications', 0) self.total_agent_runs = data.get('total_agent_runs', 0) self.agent_trigger_history = data.get('agent_trigger_history', []) logging.info(f"📥 Loaded state: {len(self.generation_history)} generations in history") except Exception as e: logging.error(f"Failed to load state: {e}") def _save_state(self): """Save current state to disk""" state_file = self._get_state_file() try: data = { 'generation_history': self.generation_history[-100:], # Keep last 100 'last_agent_trigger_gen': self.last_agent_trigger_gen, 'total_notifications': self.total_notifications, 'total_agent_runs': self.total_agent_runs, 'agent_trigger_history': self.agent_trigger_history[-20:], # Keep last 20 'last_update': time.time() } with open(state_file, 'w') as f: json.dump(data, f, indent=2) except Exception as e: logging.error(f"Failed to save state: {e}") def add_generation(self, gen_data: Dict[str, Any]): """Record a generation""" self.generation_history.append(gen_data) self.total_notifications += 1 # Keep only recent history in memory if len(self.generation_history) > 100: self.generation_history = self.generation_history[-100:] self._save_state() def should_trigger_agent(self, generation: int, primary_score: float) -> tuple[bool, str]: """ Decide whether to trigger the agent Returns: (should_trigger, reason) """ if not self.config.agent_enabled: return False, "Agent disabled in config" # Strategy 1: Always (for testing) if self.config.trigger_mode == "always": return True, "Always mode" # Strategy 2: Periodic if self.config.trigger_mode == "periodic": if generation - self.last_agent_trigger_gen >= self.config.trigger_interval: return True, f"Periodic trigger (interval={self.config.trigger_interval})" else: return False, f"Not yet (last trigger at gen {self.last_agent_trigger_gen})" # Strategy 3: Plateau detection if self.config.trigger_mode == "plateau": if self._detect_plateau(): return True, "Plateau detected" else: return False, "No plateau detected" # Strategy 4: Mixed (periodic OR plateau) if self.config.trigger_mode == "mixed": # Check periodic if generation - self.last_agent_trigger_gen >= self.config.trigger_interval: return True, f"Periodic trigger (interval={self.config.trigger_interval})" # Check plateau if self._detect_plateau(): return True, "Plateau detected (early trigger)" return False, f"Waiting (next trigger at gen {self.last_agent_trigger_gen + self.config.trigger_interval})" return False, f"Unknown trigger mode: {self.config.trigger_mode}" def _detect_plateau(self) -> bool: """Detect if primary score has plateaued""" window = self.config.plateau_window if len(self.generation_history) < window: return False recent = self.generation_history[-window:] scores = [g['primary_score'] for g in recent] # Check if improvement is below threshold improvement = (scores[-1] - scores[0]) / max(abs(scores[0]), 1e-6) return abs(improvement) < self.config.plateau_threshold def mark_agent_triggered(self, generation: int, active_metrics: Optional[List[str]] = None): """Mark that agent was triggered, recording active aux metric names.""" self.last_agent_trigger_gen = generation self.total_agent_runs += 1 self.agent_trigger_history.append({ "generation": generation, "timestamp": time.time(), "active_metrics": active_metrics or [], }) self._save_state() def get_statistics(self) -> Dict[str, Any]: """Get service statistics""" return { "total_notifications": self.total_notifications, "total_agent_runs": self.total_agent_runs, "generations_tracked": len(self.generation_history), "last_agent_trigger_gen": self.last_agent_trigger_gen, "uptime_seconds": time.time() - self.start_time } # ============================================================================ # FastAPI Application # ============================================================================ # Global state (initialized on startup) service_state: Optional[ServiceState] = None service_config: Optional[ServiceConfig] = None ev2_agent: Optional[IntegratedEV2Agent] = None # Global evaluation job tracking (for async evaluation mode) evaluation_jobs: Dict[str, Dict[str, Any]] = {} # State lock for thread-safe initialization/reset import asyncio _state_lock = asyncio.Lock() # Create FastAPI app app = FastAPI( title="EV2 Evaluation Service (Standalone)", description="Event-driven evaluation service with integrated OpenHands agent", version="2.0.0" ) # Setup logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) def _env_flag(name: str, default: bool = False) -> bool: raw = os.getenv(name) if raw is None: return default return raw.strip().lower() in {"1", "true", "yes", "on"} ENABLE_FULL_TRAJECTORY_LOG = _env_flag("ENABLE_FULL_TRAJECTORY_LOG", default=False) OPENHANDS_LOG_COMPLETIONS = _env_flag("OPENHANDS_LOG_COMPLETIONS", default=False) def _should_suppress_aux_metrics(experiment_root: str, current_gen: int, window: int = 10) -> bool: """Check if aux metrics should be suppressed due to negative correlation with score. Looks at the last `window` generations. If any aux metric has Spearman rho < -0.3, returns True to suppress all aux metrics for safety. """ from eval_agent.feedback import _load_generation_metrics, _spearman_correlation results_dir = Path(experiment_root) gen_start = max(0, current_gen - window) records = _load_generation_metrics(results_dir, gen_start, current_gen) if len(records) < 5: return False # Not enough data scores = [] for _, data in records: cs = data.get("combined_score") scores.append(float(cs) if isinstance(cs, (int, float)) and math.isfinite(cs) else 0.0) # Collect aux metric names aux_names: set = set() for _, data in records: for key in data.get("public", {}): if key.startswith("aux_"): aux_names.add(key) for name in aux_names: vals = [] paired_s = [] for (_, data), s in zip(records, scores): v = data.get("public", {}).get(name) if isinstance(v, (int, float)) and math.isfinite(v): vals.append(float(v)) paired_s.append(s) if len(vals) >= 5: rho = _spearman_correlation(vals, paired_s) if rho < -0.3: logging.warning( f"Aux metric '{name}' has negative correlation {rho:.2f} with score — triggering suppression" ) return True return False def _sanitize_for_json(value: Any) -> Any: """Recursively convert non-JSON-safe numeric values (NaN/Inf) to None.""" if isinstance(value, float): return value if math.isfinite(value) else None if isinstance(value, dict): return {k: _sanitize_for_json(v) for k, v in value.items()} if isinstance(value, list): return [_sanitize_for_json(v) for v in value] if isinstance(value, tuple): return [_sanitize_for_json(v) for v in value] # Handle numpy scalar types without importing numpy globally. if hasattr(value, "item") and callable(getattr(value, "item")): try: coerced = value.item() return _sanitize_for_json(coerced) except Exception: return str(value) return value async def _update_evaluation_job(job_id: str, **updates: Any) -> bool: """Safely update a tracked evaluation job. Returns False if missing.""" async with _state_lock: job = evaluation_jobs.get(job_id) if job is None: return False job.update(updates) return True @app.on_event("startup") async def startup_event(): """Initialize service on startup""" global service_state, service_config, ev2_agent logger.info("=" * 80) logger.info("🚀 Starting EV2 Evaluation Service (Standalone)") logger.info("=" * 80) # Load config (will be set by main()) if service_config is None: logger.warning("⚠️ No config provided, using defaults") service_config = ServiceConfig() # Initialize state service_state = ServiceState(service_config) # Initialize integrated agent (if results_dir is configured) if service_config.results_dir: try: ev2_agent = IntegratedEV2Agent( results_dir=service_config.results_dir, primary_evaluator_path=service_config.primary_evaluator_path, config=service_config, problem_statement=service_config.problem_statement, evaluator_kwargs=service_config.evaluator_kwargs, ) logger.info("✅ Integrated EV2 Agent ready") except Exception as e: logger.error(f"❌ Failed to initialize agent: {e}") logger.warning("⚠️ Service will start but agent calls will fail") ev2_agent = None else: logger.info("⏳ Agent will be initialized on first generation request (dynamic mode)") ev2_agent = None logger.info("=" * 80) logger.info(f"✅ Service Started") logger.info(f" Experiment: {service_config.experiment_name}") logger.info(f" Results dir: {service_config.results_dir}") logger.info(f" Trigger mode: {service_config.trigger_mode}") logger.info(f" Trigger interval: {service_config.trigger_interval}") logger.info(f" Full trajectory log: {ENABLE_FULL_TRAJECTORY_LOG}") logger.info("=" * 80) @app.on_event("shutdown") async def shutdown_event(): """Cleanup on shutdown""" logger.info("=" * 80) logger.info("🛑 Shutting down EV2 Evaluation Service") logger.info("=" * 80) if service_state: service_state._save_state() logger.info(f" Total generation requests: {service_state.total_notifications}") logger.info(f" Total agent runs: {service_state.total_agent_runs}") logger.info("=" * 80) # ============================================================================ # Evaluation Executors (for async evaluation mode) # ============================================================================ def _normalize_experiment_root(results_dir: str) -> Path: """ Normalize results_dir to experiment root. Accepts paths like: - /path/to/experiment - /path/to/experiment/gen_10 - /path/to/experiment/gen_10/results """ results_path = Path(results_dir).resolve() if results_path.name == "results": return results_path.parent.parent if results_path.name.startswith("gen_"): return results_path.parent return results_path def _execute_primary_evaluator_sync( code_path: str, results_dir: str, evaluator_module: str, evaluator_function: str, evaluator_kwargs: Optional[Dict[str, Any]], ) -> Dict[str, Any]: """ Execute primary evaluator synchronously. This runs inside a worker subprocess so the parent process can hard-kill it on timeout without leaving background threads. """ import importlib import inspect module = importlib.import_module(evaluator_module) evaluator_func = getattr(module, evaluator_function) eval_kwargs = evaluator_kwargs or {} sig = inspect.signature(evaluator_func) params = list(sig.parameters.keys()) # Support common evaluator signatures: # 1. evaluate(code_path, **kwargs) # 2. main(program_path, results_dir) # 3. evaluate(code_path, results_dir, **kwargs) if len(params) >= 2 and "results_dir" in params[:3]: result = evaluator_func(code_path, results_dir, **eval_kwargs) else: result = evaluator_func(code_path, **eval_kwargs) # Evaluator may persist metrics and return None. if result is None: metrics_file = Path(results_dir) / "metrics.json" if not metrics_file.exists(): raise FileNotFoundError( f"Evaluator returned None and metrics.json not found at {metrics_file}" ) with open(metrics_file) as f: result = json.load(f) if not isinstance(result, dict): raise ValueError(f"Evaluator must return dict, got {type(result)}") if "combined_score" not in result: raise ValueError("Evaluator result must contain 'combined_score' key") # Merge validation status from correct.json when available. correct_file = Path(results_dir) / "correct.json" if correct_file.exists(): try: with open(correct_file) as f: correct_data = json.load(f) result["correct"] = correct_data.get("correct", False) result["validation_error"] = correct_data.get("error", None) except Exception: result["correct"] = False else: result["correct"] = True return result def _run_primary_evaluator_worker(request_path: str, output_path: str) -> int: """ Worker-mode entrypoint for primary evaluation. Reads request json, executes evaluator, writes output json: - {"ok": true, "result": {...}} - {"ok": false, "error": "...", "traceback": "..."} """ try: with open(request_path) as f: payload = json.load(f) result = _execute_primary_evaluator_sync( code_path=payload["code_path"], results_dir=payload["results_dir"], evaluator_module=payload["evaluator_module"], evaluator_function=payload["evaluator_function"], evaluator_kwargs=payload.get("evaluator_kwargs"), ) with open(output_path, "w") as f: json.dump({"ok": True, "result": result}, f) return 0 except Exception as e: try: with open(output_path, "w") as f: json.dump( { "ok": False, "error": str(e), "traceback": traceback.format_exc(), }, f, ) except Exception: pass return 1 async def run_primary_evaluator(request: GenerationCompleteRequest) -> Dict[str, Any]: """ Run primary evaluator (dynamically loaded) Args: request: Generation complete request with evaluator config Returns: Evaluation result dict with at least 'combined_score' key Raises: ImportError: If evaluator module cannot be loaded AttributeError: If evaluator function not found Exception: If evaluation fails """ logger.info(f"🔬 Running primary evaluator: {request.evaluator_module}.{request.evaluator_function}") logger.info(f" Code path: {request.code_path}") logger.info(f" Results dir: {request.results_dir}") logger.info(f" Evaluator: {request.evaluator_module}.{request.evaluator_function}") timeout_seconds = service_config.evaluation_timeout if service_config else 300.0 if timeout_seconds <= 0: logger.warning( f"Invalid evaluation timeout {timeout_seconds}; using default 300.0s" ) timeout_seconds = 300.0 logger.info(f" Evaluation timeout: {timeout_seconds}s (hard kill)") request_payload = { "code_path": request.code_path, "results_dir": request.results_dir, "evaluator_module": request.evaluator_module, "evaluator_function": request.evaluator_function, "evaluator_kwargs": request.evaluator_kwargs or {}, } req_path = None out_path = None process = None try: with tempfile.NamedTemporaryFile( mode="w", suffix=".json", prefix="ev2_eval_req_", delete=False ) as req_file: req_path = req_file.name json.dump(request_payload, req_file) out_path = f"{req_path}.out.json" cmd = [ sys.executable, str(Path(__file__).resolve()), "--worker-eval-request", req_path, "--worker-eval-output", out_path, ] process = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, ) try: _, stderr = await asyncio.wait_for( process.communicate(), timeout=timeout_seconds ) except asyncio.TimeoutError: error_msg = f"Evaluation exceeded timeout of {timeout_seconds}s (process killed)" logger.error(f"⏱️ {error_msg}") if process.returncode is None: process.kill() await process.communicate() return { "combined_score": 0.0, "correct": False, "validation_error": error_msg, "execution_time_mean": timeout_seconds, "timeout": True, } if not Path(out_path).exists(): stderr_text = (stderr.decode("utf-8", errors="replace") or "").strip() raise RuntimeError( f"Evaluator worker did not produce output file. " f"returncode={process.returncode}, stderr={stderr_text[:500]}" ) with open(out_path) as f: worker_output = json.load(f) if not worker_output.get("ok", False): raise RuntimeError( f"Evaluator worker failed: {worker_output.get('error', 'unknown error')}\n" f"{worker_output.get('traceback', '')}" ) result = worker_output.get("result", {}) logger.info( f"✅ Primary evaluation completed: " f"score={result.get('combined_score', 0.0):.4f}, " f"correct={result.get('correct', '?')}" ) return result except Exception as e: logger.error(f"❌ Evaluation failed: {e}", exc_info=True) raise finally: for path in [req_path, out_path]: if path: try: os.unlink(path) except OSError: pass async def run_auxiliary_evaluators( request: GenerationCompleteRequest, primary_result: Dict[str, Any], experiment_root: str, ) -> Dict[str, Any]: """ Run auxiliary evaluators (if they exist) Loads auxiliary_metrics.py from eval_agent_memory and calls the evaluate_aux() function (single entry point pattern). Args: request: Generation complete request primary_result: Result from primary evaluator experiment_root: Snapshot of experiment root used for this evaluation run Returns: Dict of auxiliary metric results (flat dictionary) """ logger.info(f"🔍 Looking for auxiliary evaluators...") behavior_logging.log_event( request.results_dir, "aux_eval_start", {"generation": request.generation}, ) # Find auxiliary_metrics.py in eval_agent_memory if not experiment_root: logger.info(" No results_dir configured, skipping auxiliary metrics") behavior_logging.log_event( request.results_dir, "aux_eval_end", {"generation": request.generation, "success": True, "skipped": True, "reason": "no_results_dir"}, ) return {} aux_metrics_path = Path(experiment_root) / "eval_agent_memory" / "auxiliary_metrics.py" if not aux_metrics_path.exists(): logger.info(f" No auxiliary metrics found at {aux_metrics_path}") behavior_logging.log_event( request.results_dir, "aux_eval_end", {"generation": request.generation, "success": True, "skipped": True, "reason": "aux_file_missing"}, ) return {} logger.info(f" Found auxiliary metrics: {aux_metrics_path}") # Snapshot the exact auxiliary metrics code used for this generation. # This makes post-hoc debugging reproducible even after the agent updates # eval_agent_memory/auxiliary_metrics.py in later generations. try: snapshot_path = Path(request.results_dir) / "auxiliary_metrics_snapshot.py" snapshot_path.parent.mkdir(parents=True, exist_ok=True) snapshot_path.write_text(aux_metrics_path.read_text(encoding="utf-8"), encoding="utf-8") logger.info(f" 📝 Saved auxiliary metrics snapshot: {snapshot_path}") except Exception as e: logger.warning(f" ⚠️ Failed to save auxiliary metrics snapshot: {e}") def _aux_failure_metrics(error_type: str, error_message: str, error_detail: str = "") -> Dict[str, Any]: # Failure fallback uses null for non-essential diagnostic fields to avoid # conflating "failed evaluation" with valid zero values. return { "aux_metric_eval_success": None, "aux_metric_error_code": float({ "syntax": 1, "import": 2, "runtime": 3, "invalid_return": 4, }.get(error_type, 99)), "aux_metric_non_numeric_dropped_count": None, "aux_metric_error_message_length": float(len(error_message or "")), "aux_metric_error_detail_length": float(len(error_detail or "")), } def _normalize_aux_metrics(raw: Dict[str, Any]) -> Dict[str, Any]: """ Keep numeric/bool metrics usable for time-series analysis. Preserve error context in dedicated metadata fields. """ normalized: Dict[str, Any] = {} dropped_non_numeric = 0 for key, value in raw.items(): if isinstance(value, bool): normalized[key] = float(value) elif isinstance(value, numbers.Real): if math.isnan(float(value)) or math.isinf(float(value)): dropped_non_numeric += 1 else: normalized[key] = float(value) elif isinstance(value, str) and key == "error": normalized["aux_metric_error_code"] = max(float(normalized.get("aux_metric_error_code", 0.0)), 3.0) normalized["aux_metric_error_message_length"] = float(len(value)) dropped_non_numeric += 1 elif isinstance(value, str) and key == "traceback": normalized["aux_metric_error_detail_length"] = float(len(value)) dropped_non_numeric += 1 else: dropped_non_numeric += 1 normalized.setdefault("aux_metric_eval_success", 1.0) normalized.setdefault("aux_metric_error_code", 0.0) normalized.setdefault("aux_metric_error_message_length", 0.0) normalized.setdefault("aux_metric_error_detail_length", 0.0) normalized["aux_metric_non_numeric_dropped_count"] = float(dropped_non_numeric) return normalized try: # Ensure repo root is importable so auxiliary_metrics.py can import # shared helpers like `eval_agent.tool_box` regardless of service CWD. project_root = str(Path(__file__).resolve().parent.parent) if project_root not in sys.path: sys.path.insert(0, project_root) # Fast syntax validation before import to avoid ambiguous runtime failures. try: aux_source = aux_metrics_path.read_text(encoding="utf-8") compile(aux_source, str(aux_metrics_path), "exec") except SyntaxError as e: logger.error(f"❌ auxiliary_metrics.py syntax error: {e}") failure = _aux_failure_metrics( error_type="syntax", error_message=str(e), error_detail=traceback.format_exc(), ) behavior_logging.log_event( request.results_dir, "aux_eval_end", { "generation": request.generation, "success": False, "error_type": "syntax", "aux_metric_error_code": failure.get("aux_metric_error_code", 99.0), }, ) return failure # Dynamically load auxiliary metrics module import importlib.util spec = importlib.util.spec_from_file_location("auxiliary_metrics", str(aux_metrics_path)) aux_module = importlib.util.module_from_spec(spec) try: spec.loader.exec_module(aux_module) except Exception as e: logger.error(f"❌ Failed importing auxiliary metrics module: {e}", exc_info=True) failure = _aux_failure_metrics( error_type="import", error_message=str(e), error_detail=traceback.format_exc(), ) behavior_logging.log_event( request.results_dir, "aux_eval_end", { "generation": request.generation, "success": False, "error_type": "import", "aux_metric_error_code": failure.get("aux_metric_error_code", 99.0), }, ) return failure # Look for the evaluate_aux function (single entry point) if not hasattr(aux_module, 'evaluate_aux'): logger.warning(" ⚠️ No 'evaluate_aux' function found in auxiliary_metrics.py") logger.warning(" Please implement: def evaluate_aux(results_dir: str) -> Dict[str, Any]") behavior_logging.log_event( request.results_dir, "aux_eval_end", {"generation": request.generation, "success": True, "skipped": True, "reason": "evaluate_aux_missing"}, ) return {} evaluate_func = getattr(aux_module, 'evaluate_aux') logger.info(f" ✅ Found evaluate_aux function") # Inspect function signature to determine how to call it import inspect sig = inspect.signature(evaluate_func) params = list(sig.parameters.keys()) # Call evaluate_aux with appropriate parameters if not params: # No parameters - call without arguments logger.info(f" 📞 Calling evaluate_aux()") result = await asyncio.wait_for(asyncio.to_thread(evaluate_func), timeout=30.0) elif len(params) == 1: # Single parameter - determine what to pass param_name = params[0] if 'results_dir' in param_name.lower() or 'gen_results_dir' in param_name.lower(): # Pass gen_N/ (parent of gen_N/results/) so that results_dir/main.cpp resolves correctly gen_dir = str(Path(request.results_dir).parent) logger.info(f" 📞 Calling evaluate_aux(results_dir='{gen_dir}')") result = await asyncio.wait_for(asyncio.to_thread(evaluate_func, gen_dir), timeout=30.0) elif 'gen_path' in param_name.lower() or 'generation_dir' in param_name.lower(): # Pass generation directory (parent of results_dir) gen_path = str(Path(request.results_dir).parent) logger.info(f" 📞 Calling evaluate_aux(gen_path='{gen_path}')") result = await asyncio.wait_for(asyncio.to_thread(evaluate_func, gen_path), timeout=30.0) else: # Default: pass results_dir logger.info(f" 📞 Calling evaluate_aux('{request.results_dir}')") result = await asyncio.wait_for(asyncio.to_thread(evaluate_func, request.results_dir), timeout=30.0) elif 'primary_result' in params: # Multi-parameter with primary_result # Pass gen_N/ (parent of gen_N/results/) so that results_dir/main.cpp resolves correctly gen_dir = str(Path(request.results_dir).parent) logger.info(f" 📞 Calling evaluate_aux with results_dir and primary_result") result = await asyncio.wait_for(asyncio.to_thread( evaluate_func, results_dir=gen_dir, primary_result=primary_result ), timeout=30.0) else: # Multi-parameter - use keyword arguments logger.info(f" 📞 Calling evaluate_aux with keyword arguments") result = await asyncio.wait_for(asyncio.to_thread( evaluate_func, results_dir=request.results_dir ), timeout=30.0) # Validate result format if not isinstance(result, dict): logger.error(f" ❌ evaluate_aux must return dict, got {type(result)}") failure = _aux_failure_metrics( error_type="invalid_return", error_message=f"Invalid return type: {type(result).__name__}", ) behavior_logging.log_event( request.results_dir, "aux_eval_end", { "generation": request.generation, "success": False, "error_type": "invalid_return", "aux_metric_error_code": failure.get("aux_metric_error_code", 99.0), }, ) return failure # Check if result contains error if "error" in result: logger.warning(f" ⚠️ evaluate_aux returned error: {result.get('error')}") else: logger.info(f" ✅ evaluate_aux returned {len(result)} metrics") # Log first few metrics for debugging sample_metrics = list(result.items())[:3] for key, value in sample_metrics: logger.info(f" - {key}: {value}") if len(result) > 3: logger.info(f" ... and {len(result) - 3} more") normalized = _normalize_aux_metrics(result) behavior_logging.log_event( request.results_dir, "aux_eval_end", { "generation": request.generation, "success": True, "metric_key_count": len(normalized), "aux_metric_eval_success": normalized.get("aux_metric_eval_success", 1.0), "aux_metric_error_code": normalized.get("aux_metric_error_code", 0.0), "aux_metric_non_numeric_dropped_count": normalized.get("aux_metric_non_numeric_dropped_count", 0.0), }, ) return normalized except (asyncio.TimeoutError, TimeoutError) as e: logger.warning(f"⏱️ Auxiliary metrics timed out (30s limit): {e}") failure = _aux_failure_metrics( error_type="timeout", error_message="auxiliary_metrics.py exceeded 30s timeout", ) behavior_logging.log_event( request.results_dir, "aux_eval_end", { "generation": request.generation, "success": False, "error_type": "timeout", "aux_metric_error_code": failure.get("aux_metric_error_code", 99.0), }, ) return failure except Exception as e: logger.error(f"❌ Failed to run auxiliary metrics: {e}", exc_info=True) failure = _aux_failure_metrics( error_type="runtime", error_message=str(e), error_detail=traceback.format_exc(), ) behavior_logging.log_event( request.results_dir, "aux_eval_end", { "generation": request.generation, "success": False, "error_type": "runtime", "aux_metric_error_code": failure.get("aux_metric_error_code", 99.0), }, ) return failure def _extract_metric_descriptions(eval_agents_md_path: Path) -> Dict[str, str]: """ Extract metric descriptions from EVAL_AGENTS.md Parses the markdown file to extract metric names and their descriptions. Args: eval_agents_md_path: Path to EVAL_AGENTS.md Returns: Dict mapping metric names to their descriptions """ descriptions = {} try: content = eval_agents_md_path.read_text() # Parse markdown sections # Look for patterns like: # - `metric_name`: Description # - **Metrics Calculated**: # - `metric_name`: Description import re # Pattern 1: - `metric_name`: Description pattern1 = r'-\s+`([^`]+)`:\s*([^\n]+(?:\n(?!\s*[-*])[^\n]+)*)' for match in re.finditer(pattern1, content): metric_name = match.group(1).strip() description = match.group(2).strip() # Clean up description description = re.sub(r'\s+', ' ', description) descriptions[metric_name] = description # Pattern 2: * **`metric_name`**: Description (bold + bullet) pattern2 = r'\*\s+\*\*`([^`]+)`\*\*:\s*([^\n]+(?:\n(?!\s*[-*])[^\n]+)*)' for match in re.finditer(pattern2, content): metric_name = match.group(1).strip() description = match.group(2).strip() description = re.sub(r'\s+', ' ', description) descriptions[metric_name] = description # Pattern 3: Look for metric names in auxiliary_metrics results # They might be prefixed with aux_ or evaluation_status, etc. # Store base descriptions that can be matched logger.info(f"Extracted {len(descriptions)} metric descriptions from {eval_agents_md_path}") except Exception as e: logger.error(f"Failed to parse EVAL_AGENTS.md: {e}") return descriptions def save_metrics_file(results_dir: str, metrics: Dict[str, Any]): """ Save complete metrics to metrics.json Args: results_dir: Directory to save metrics.json metrics: Complete metrics dict """ metrics_path = Path(results_dir) / "metrics.json" try: metrics_path.parent.mkdir(parents=True, exist_ok=True) safe_metrics = _sanitize_for_json(metrics) with open(metrics_path, 'w') as f: json.dump(safe_metrics, f, indent=2, allow_nan=False) logger.info(f"💾 Saved metrics to {metrics_path}") except Exception as e: logger.error(f"❌ Failed to save metrics: {e}") async def _evaluate_agent_candidate( candidate_code: str, request: GenerationCompleteRequest, ev2_agent, ) -> Dict[str, Any]: """ Evaluate the agent's candidate code using the primary evaluator. Writes the candidate to a temp file, runs primary evaluation, returns result. """ import tempfile # Determine file extension from the original code path ext = Path(request.code_path).suffix or ".cpp" # Write candidate to a temp file with tempfile.NamedTemporaryFile( mode="w", suffix=ext, prefix="agent_candidate_", delete=False, encoding="utf-8" ) as f: f.write(candidate_code) candidate_path = f.name # Create a temp results dir candidate_results_dir = Path(request.results_dir).parent / "agent_candidate_results" candidate_results_dir.mkdir(parents=True, exist_ok=True) try: result = await asyncio.to_thread( _execute_primary_evaluator_sync, candidate_path, str(candidate_results_dir), request.evaluator_module, request.evaluator_function, request.evaluator_kwargs, ) return { "combined_score": result.get("combined_score", 0.0), "correct": result.get("correct", False), "code": candidate_code, "public": result.get("public", {}), "text_feedback": result.get("text_feedback", ""), } except Exception as e: logger.error(f"Agent candidate evaluation error: {e}") return { "combined_score": 0.0, "correct": False, "code": candidate_code, "error": str(e), } finally: # Clean up temp file try: Path(candidate_path).unlink(missing_ok=True) except Exception: pass async def run_full_evaluation(job_id: str, request: GenerationCompleteRequest): """ Complete evaluation pipeline (async background task) 1. Run primary evaluator 2. Run auxiliary evaluators (if exist) 3. Save metrics.json 4. Record to history 5. Decide whether to trigger agent 6. If triggered: run agent analysis Args: job_id: Job ID for tracking request: Generation complete request """ logger.info("=" * 80) logger.info(f"🚀 Starting full evaluation for generation {request.generation}") logger.info("=" * 80) if not await _update_evaluation_job(job_id, status="running"): logger.warning(f"Job {job_id} missing before start; continuing without job tracking") try: async with _state_lock: local_service_state = service_state local_ev2_agent = ev2_agent local_results_root = ( str(service_config.results_dir) if service_config and service_config.results_dir else "" ) if local_service_state is None: raise RuntimeError("Service state not initialized") # 1. Run primary evaluator logger.info(f"📊 Step 1/6: Running primary evaluator...") primary_result = await run_primary_evaluator(request) # 2. Run auxiliary evaluators logger.info(f"📊 Step 2/6: Running auxiliary evaluators...") auxiliary_results = await run_auxiliary_evaluators( request, primary_result, experiment_root=local_results_root, ) # 3. Extract auxiliary metric descriptions from EVAL_AGENTS.md auxiliary_descriptions = {} if auxiliary_results: try: eval_agents_md = Path(local_results_root) / "eval_agent_memory" / "EVAL_AGENTS.md" if eval_agents_md.exists(): auxiliary_descriptions = _extract_metric_descriptions(eval_agents_md) logger.info(f"Extracted {len(auxiliary_descriptions)} metric descriptions") except Exception as e: logger.warning(f"Failed to extract metric descriptions: {e}") # Inject diagnostic report into primary_result BEFORE building complete_result, # so it flows through both API response and disk metrics. if ev2_agent is not None: diagnostic = ev2_agent._extract_diagnostic_report() if diagnostic: existing = primary_result.get("text_feedback", "") primary_result["text_feedback"] = (existing + "\n\n" + diagnostic) if existing else diagnostic logger.info(f"Injected diagnostic report ({len(diagnostic)} chars) into primary_result") # 4. Merge results (rich format for API response) complete_result = { "combined_score": primary_result.get("combined_score", 0.0), "correct": primary_result.get("correct", False), "primary": primary_result, "auxiliary": auxiliary_results, "auxiliary_descriptions": auxiliary_descriptions, "timestamp": time.time(), "generation": request.generation } complete_result = _sanitize_for_json(complete_result) # Safety: suppress aux metrics if they correlate negatively with score if auxiliary_results and _should_suppress_aux_metrics(local_results_root, request.generation): logger.warning("⚠️ Auto-suppressing auxiliary metrics due to negative correlation with score") auxiliary_results = {} auxiliary_descriptions = {} # Build scheduler-compatible metrics.json so that load_results() # (shinka/utils/general.py) can read it correctly on resume. public_metrics = dict(primary_result.get("public", {})) if auxiliary_results: for key, value in auxiliary_results.items(): if isinstance(value, dict) and "error" not in value: for sub_key, sub_value in value.items(): if isinstance(sub_value, (int, float, bool, str)): public_metrics[f"aux_{sub_key}"] = sub_value elif isinstance(value, (int, float, bool, str)): public_metrics[f"aux_{key}"] = value text_feedback = primary_result.get("text_feedback", "") if auxiliary_descriptions: desc_text = "\n\n# Auxiliary Metrics Guide\n\n" for metric_name, description in auxiliary_descriptions.items(): desc_text += f"- **{metric_name}**: {description}\n" text_feedback = (text_feedback + desc_text) if text_feedback else desc_text disk_metrics = { "combined_score": primary_result.get("combined_score", 0.0), "public": public_metrics, "private": primary_result.get("private", {}), "text_feedback": text_feedback, "_eval_service": { "auxiliary": auxiliary_results, "auxiliary_descriptions": auxiliary_descriptions, }, } # 4. Save metrics.json (scheduler-compatible format) logger.info(f"📊 Step 3/6: Saving metrics.json...") save_metrics_file(request.results_dir, _sanitize_for_json(disk_metrics)) # 5. Record to history logger.info(f"📊 Step 4/6: Recording to history...") local_service_state.add_generation({ "generation": request.generation, "primary_score": complete_result["combined_score"], "results_dir": request.results_dir, "timestamp": time.time() }) # 6. Decide whether to trigger agent logger.info(f"📊 Step 5/6: Deciding whether to trigger agent...") should_trigger, reason = local_service_state.should_trigger_agent( request.generation, complete_result["combined_score"] ) behavior_logging.log_event( request.results_dir, "trigger_decision", { "generation": request.generation, "mode": "evaluation", "should_trigger": bool(should_trigger), "reason": reason, "primary_score": complete_result["combined_score"], }, ) logger.info(f" Decision: {'🧠 TRIGGER' if should_trigger else '⏭️ SKIP'} - {reason}") # 7. Trigger agent if needed if should_trigger and local_ev2_agent is not None: logger.info(f"📊 Step 6/6: Running agent analysis...") try: agent_result = await local_ev2_agent.analyze_generation(request.generation) complete_result["agent_analysis"] = agent_result # Evaluate agent candidate if present candidate_code = agent_result.get("candidate_code") if candidate_code and not agent_result.get("skipped"): logger.info("🔬 Evaluating agent candidate code...") try: candidate_eval = await _evaluate_agent_candidate( candidate_code, request, local_ev2_agent ) complete_result["agent_candidate"] = candidate_eval logger.info( f"🔬 Agent candidate score: {candidate_eval.get('combined_score', 0):.2f} " f"(current best: {complete_result['combined_score']:.2f})" ) except Exception as e: logger.error(f"❌ Agent candidate evaluation failed: {e}") complete_result["agent_candidate"] = {"error": str(e)} # Only advance trigger counter if agent actually ran (not skipped) if not agent_result.get("skipped"): # Snapshot current aux metric names for feedback loop active_aux = sorted( k for k in public_metrics if k.startswith("aux_") ) local_service_state.mark_agent_triggered( request.generation, active_metrics=active_aux ) behavior_logging.log_event( request.results_dir, "agent_trigger_result", { "generation": request.generation, "mode": "evaluation", "success": agent_result.get("success", True), "skipped": agent_result.get("skipped", False), "elapsed_seconds": agent_result.get("elapsed_seconds", 0), }, ) logger.info( f"✅ Agent analysis completed" if not agent_result.get("skipped") else f"⏭️ Agent analysis skipped (busy)" ) except Exception as e: logger.error(f"❌ Agent analysis failed: {e}", exc_info=True) complete_result["agent_analysis"] = {"error": str(e)} behavior_logging.log_event( request.results_dir, "agent_trigger_result", { "generation": request.generation, "mode": "evaluation", "success": False, "error": str(e), }, ) else: logger.info(f"📊 Step 6/6: Agent not triggered") # 8. Update job status updated = await _update_evaluation_job( job_id, status="completed", result=_sanitize_for_json(complete_result), completed_at=time.time(), ) if not updated: logger.warning(f"Job {job_id} was cleared before completion state update") logger.info("=" * 80) logger.info(f"✅ Full evaluation completed for generation {request.generation}") logger.info(f" Score: {complete_result['combined_score']:.4f}") logger.info(f" Auxiliary metrics: {len(auxiliary_results)} metrics") logger.info(f" Agent triggered: {should_trigger}") logger.info("=" * 80) except Exception as e: logger.error("=" * 80) logger.error(f"❌ Full evaluation failed for generation {request.generation}") logger.error(f" Error: {e}") logger.error("=" * 80) updated = await _update_evaluation_job( job_id, status="failed", error=str(e), completed_at=time.time(), ) if not updated: logger.warning(f"Job {job_id} was cleared before failure state update") # ============================================================================ # API Endpoints # ============================================================================ @app.post("/api/v1/initialize", response_model=InitializeResponse) async def initialize_service(request: InitializeRequest): """ Initialize/reset service state for a new experiment. This endpoint provides explicit control over experiment initialization. It resets all state, clears old jobs, and prepares the service for a fresh start. Thread-safe: Uses lock to prevent concurrent initialization conflicts. """ global ev2_agent, service_config, service_state start_time = time.time() # ===== Thread-safe initialization ===== async with _state_lock: logger.info("=" * 80) logger.info("🔧 INITIALIZE: Resetting service state for new experiment") logger.info(f" Raw results_dir: {request.results_dir}") logger.info("=" * 80) experiment_root = _normalize_experiment_root(request.results_dir) experiment_root_str = str(experiment_root) try: if service_config is None: service_config = ServiceConfig() # Update config overrides prev_trigger_mode = service_config.trigger_mode prev_trigger_interval = service_config.trigger_interval service_config.results_dir = experiment_root_str if request.primary_evaluator: service_config.primary_evaluator_path = request.primary_evaluator if request.experiment_name: service_config.experiment_name = request.experiment_name if request.trigger_mode: service_config.trigger_mode = request.trigger_mode logger.info( f" 🔁 trigger_mode override: {prev_trigger_mode} -> {service_config.trigger_mode}" ) if request.trigger_interval is not None: service_config.trigger_interval = request.trigger_interval logger.info( f" 🔁 trigger_interval override: {prev_trigger_interval} -> {service_config.trigger_interval}" ) if request.problem_statement: service_config.problem_statement = request.problem_statement if request.evaluator_kwargs: service_config.evaluator_kwargs = request.evaluator_kwargs # Reset state with force_clean=True (don't load old state from disk) service_state = ServiceState(service_config, force_clean=True) logger.info(f" ✅ Service state reset (clean start)") # Clear old evaluation jobs evaluation_jobs.clear() logger.info(f" ✅ Evaluation jobs cleared") # Initialize agent ev2_agent = IntegratedEV2Agent( results_dir=experiment_root_str, primary_evaluator_path=service_config.primary_evaluator_path, config=service_config, problem_statement=service_config.problem_statement, evaluator_kwargs=service_config.evaluator_kwargs, ) logger.info(f" ✅ Agent initialized") processing_time = (time.time() - start_time) * 1000 logger.info("=" * 80) logger.info("✅ INITIALIZE COMPLETE") logger.info(f" Experiment root: {experiment_root_str}") logger.info(f" Processing time: {processing_time:.1f}ms") logger.info("=" * 80) return InitializeResponse( status="ready", message="Service initialized for new experiment", results_dir=experiment_root_str, agent_initialized=True, processing_time_ms=processing_time ) except Exception as e: logger.error(f"❌ Initialize failed: {e}", exc_info=True) processing_time = (time.time() - start_time) * 1000 return InitializeResponse( status="error", message=str(e), results_dir=experiment_root_str, agent_initialized=False, processing_time_ms=processing_time ) @app.post("/api/v1/generation/complete", response_model=ServiceResponse) async def generation_complete( request: GenerationCompleteRequest, background_tasks: BackgroundTasks ): """ Generation complete handler (evaluation mode only). """ global ev2_agent, service_config, service_state start_time = time.time() logger.info("=" * 80) logger.info(f"📊 EVALUATION MODE: Generation {request.generation}") logger.info(f" Code path: {request.code_path}") logger.info(f" Evaluator: {request.evaluator_module}.{request.evaluator_function}") logger.info("=" * 80) # Ensure agent is initialized (extract experiment root from results_dir) # results_dir can be: /path/to/experiment, /gen_10, or /gen_10/results experiment_root = _normalize_experiment_root(request.results_dir) experiment_root_str = str(experiment_root) async with _state_lock: if service_config is None: service_config = ServiceConfig() current_results_dir = str(service_config.results_dir) if service_config.results_dir else "" # Initialize agent if needed (auto-detect fallback) if ev2_agent is None or experiment_root_str != current_results_dir: if ev2_agent is None: logger.warning("⚠️ DEPRECATED: Auto-initialization on first generation.") logger.warning(" Please call POST /api/v1/initialize before sending generations.") logger.info(f"🔧 Initializing agent for experiment: {experiment_root_str}") else: logger.warning("⚠️ DEPRECATED: Auto-detection of new experiment.") logger.warning(" Please call POST /api/v1/initialize for new experiments.") logger.info(f"🔄 Detected new experiment - Reinitializing state and agent: {experiment_root_str}") try: service_config.results_dir = experiment_root_str # Reset service_state for new experiment to avoid cross-experiment history. service_state = ServiceState(service_config) logger.info(" ✅ Service state reset for new experiment") # Clear old evaluation jobs from previous experiment. evaluation_jobs.clear() logger.info(" ✅ Evaluation jobs cleared") ev2_agent = IntegratedEV2Agent( results_dir=experiment_root_str, primary_evaluator_path=service_config.primary_evaluator_path, config=service_config, problem_statement=service_config.problem_statement, evaluator_kwargs=service_config.evaluator_kwargs, ) logger.info(" ✅ Agent initialized") except Exception as e: logger.error(f"❌ Failed to initialize agent: {e}", exc_info=True) # Create evaluation job (after any reset) job_id = f"eval_{request.generation}_{int(time.time())}" evaluation_jobs[job_id] = { "status": "pending", "request": request, "generation": request.generation, "result": None, "created_at": time.time(), "completed_at": None } # Start background evaluation (non-blocking) background_tasks.add_task( run_full_evaluation, job_id=job_id, request=request ) # Return immediately processing_time = (time.time() - start_time) * 1000 logger.info(f"✅ Evaluation job submitted: {job_id} (response time: {processing_time:.1f}ms)") return ServiceResponse( status="accepted", message=f"Evaluation started for generation {request.generation}", generation=request.generation, job_id=job_id, estimated_time=15.0, agent_triggered=False, # Agent may be triggered later in background trigger_reason="Will be determined after evaluation", processing_time_ms=processing_time ) @app.get("/api/v1/generation/{generation}/status") async def get_generation_status(generation: int): """ Query evaluation status for a specific generation Args: generation: Generation number Returns: Status and result (if completed) """ # Find the evaluation job for this generation job = None job_id = None async with _state_lock: for jid, j in evaluation_jobs.items(): if j["generation"] == generation: job = dict(j) job_id = jid break if job is None: raise HTTPException( status_code=404, detail=f"Generation {generation} not found in evaluation jobs" ) response = { "generation": generation, "job_id": job_id, "status": job["status"], "created_at": job["created_at"], "elapsed_time": time.time() - job["created_at"] } if job["status"] == "completed": response["result"] = job.get("result") response["completed_at"] = job.get("completed_at") elif job["status"] == "failed": response["error"] = job.get("error") return _sanitize_for_json(response) # DEPRECATED: Use /api/v1/generation/{generation}/status instead # Keeping for backward compatibility only @app.get("/api/v1/evaluate/{job_id}") async def get_evaluation_job_status_deprecated(job_id: str): """ [DEPRECATED] Query evaluation status by job ID Use /api/v1/generation/{generation}/status instead. This endpoint is kept for backward compatibility only. Args: job_id: Job ID returned by generation complete Returns: Status and result (if completed) """ async with _state_lock: if job_id not in evaluation_jobs: raise HTTPException( status_code=404, detail=f"Job {job_id} not found. Use /api/v1/generation/{{generation}}/status instead." ) job = dict(evaluation_jobs[job_id]) response = { "job_id": job_id, "generation": job["generation"], "status": job["status"], "created_at": job["created_at"], "elapsed_time": time.time() - job["created_at"], "deprecated": True, "message": "Use /api/v1/generation/{generation}/status instead" } if job["status"] == "completed": response["result"] = job.get("result") response["completed_at"] = job.get("completed_at") elif job["status"] == "failed": response["error"] = job.get("error") return _sanitize_for_json(response) @app.get("/api/v1/status", response_model=ServiceStatusResponse) async def get_status(): """Get service status""" if service_state is None or service_config is None: raise HTTPException(status_code=500, detail="Service not initialized") return ServiceStatusResponse( status="running", uptime_seconds=time.time() - service_state.start_time, version="2.0.0-standalone", experiment={ "name": service_config.experiment_name, "results_dir": service_config.results_dir, "primary_evaluator": service_config.primary_evaluator_path }, statistics=service_state.get_statistics(), config={ "trigger_mode": service_config.trigger_mode, "trigger_interval": service_config.trigger_interval, "agent_enabled": service_config.agent_enabled, "agent_initialized": ev2_agent is not None } ) @app.post("/api/v1/trigger/manual") async def trigger_manual(generation: int): """Manually trigger agent analysis for a specific generation""" logger.info(f"🔧 Manual trigger: generation {generation}") if ev2_agent is None: raise HTTPException(status_code=500, detail="Agent not initialized") # Find the generation in history gen_data = None for g in reversed(service_state.generation_history): if g['generation'] == generation: gen_data = g break if gen_data is None: raise HTTPException( status_code=404, detail=f"Generation {generation} not found in history" ) # Run agent try: result = await ev2_agent.analyze_generation(generation) service_state.mark_agent_triggered(generation, active_metrics=[]) return { "status": "success", "message": f"Agent triggered for generation {generation}", "result": result } except Exception as e: raise HTTPException(status_code=500, detail=f"Agent failed: {str(e)}") @app.get("/") async def root(): """Root endpoint""" return { "service": "EV2 Evaluation Service (Standalone)", "version": "2.0.0", "status": "running", "docs": "/docs" } # ============================================================================ # CLI Entry Point # ============================================================================ def main(): """Main entry point""" import argparse parser = argparse.ArgumentParser(description="EV2 Evaluation Service (Standalone)") parser.add_argument("--worker-eval-request", type=str, help=argparse.SUPPRESS) parser.add_argument("--worker-eval-output", type=str, help=argparse.SUPPRESS) parser.add_argument( "--config", type=str, help="Path to YAML config file" ) # Or specify directly parser.add_argument("--results-dir", type=str, help="Results directory") parser.add_argument("--primary-evaluator", type=str, help="Path to primary evaluator") parser.add_argument("--evaluation-timeout", type=float, default=300.0, help="Timeout for each evaluation in seconds (default: 300)") parser.add_argument("--trigger-mode", type=str, default="periodic", choices=["always", "periodic", "plateau", "mixed"]) parser.add_argument("--trigger-interval", type=int, default=10) parser.add_argument("--port", type=int, default=8765) parser.add_argument("--host", type=str, default="0.0.0.0") args = parser.parse_args() # Worker mode for hard-timeout evaluator execution. if args.worker_eval_request or args.worker_eval_output: if not args.worker_eval_request or not args.worker_eval_output: logger.error( "Both --worker-eval-request and --worker-eval-output are required in worker mode" ) raise SystemExit(2) raise SystemExit( _run_primary_evaluator_worker( request_path=args.worker_eval_request, output_path=args.worker_eval_output, ) ) global service_config # Load config if args.config: logger.info(f"📋 Loading config from {args.config}") service_config = ServiceConfig.from_yaml(args.config) else: # Create from args. # results_dir/primary_evaluator are optional here because ShinkaEvolve # can provide them at runtime via POST /api/v1/initialize. service_config = ServiceConfig( results_dir=args.results_dir or "", primary_evaluator_path=args.primary_evaluator or "", evaluation_timeout=args.evaluation_timeout, trigger_mode=args.trigger_mode, trigger_interval=args.trigger_interval, host=args.host, port=args.port ) # Start server logger.info(f"🚀 Starting server on {service_config.host}:{service_config.port}") uvicorn.run( app, host=service_config.host, port=service_config.port, log_level=service_config.log_level.lower() ) if __name__ == "__main__": main()