| """ |
| EV2 Evaluation Service |
| |
| A lightweight HTTP service wrapper around ev2.py that: |
| 1. Receives generation completion notifications from evolution frameworks |
| 2. Decides autonomously when to trigger EV2 agent analysis |
| 3. Maintains persistent state across generations |
| |
| Design principles: |
| - Minimal changes to ShinkaEvolve (just send notifications) |
| - Event-driven architecture (fire-and-forget notifications) |
| - Service makes autonomous decisions (when to analyze, when to update metrics) |
| - Gradual enhancement (start simple, add features incrementally) |
| |
| Usage: |
| # Start the service |
| python eval_agent/ev2_service.py --config eval_service_config.yaml |
| |
| # From ShinkaEvolve (or any evolution framework) |
| import requests |
| requests.post("http://localhost:8765/api/v1/notify/generation_complete", json={ |
| "generation": 42, |
| "results_dir": "/path/to/results", |
| "primary_score": 2.5407 |
| }) |
| """ |
|
|
| from fastapi import FastAPI, HTTPException, BackgroundTasks |
| from fastapi.responses import JSONResponse |
| from pydantic import BaseModel, Field |
| from typing import Dict, Any, Optional, List |
| import uvicorn |
| import logging |
| import time |
| import json |
| from pathlib import Path |
| from dataclasses import dataclass, asdict |
| import yaml |
|
|
| |
| import sys |
| from pathlib import Path |
|
|
| |
| project_root = Path(__file__).parent.parent |
| if str(project_root) not in sys.path: |
| sys.path.insert(0, str(project_root)) |
|
|
| from eval_agent.ev2 import evolution_evaluation_agent |
|
|
| |
| |
| |
|
|
| @dataclass |
| class ServiceConfig: |
| """Service configuration""" |
| |
| host: str = "0.0.0.0" |
| port: int = 8765 |
| log_level: str = "INFO" |
| |
| |
| experiment_name: str = "" |
| results_dir: str = "" |
| primary_evaluator_path: str = "" |
| |
| |
| trigger_mode: str = "periodic" |
| trigger_interval: int = 10 |
| plateau_threshold: float = 0.01 |
| plateau_window: int = 10 |
| |
| |
| agent_enabled: bool = True |
| |
| @classmethod |
| def from_yaml(cls, config_path: str) -> 'ServiceConfig': |
| """Load config from YAML file""" |
| with open(config_path) as f: |
| data = yaml.safe_load(f) |
| |
| return cls( |
| host=data.get('service', {}).get('host', '0.0.0.0'), |
| port=data.get('service', {}).get('port', 8765), |
| log_level=data.get('service', {}).get('log_level', 'INFO'), |
| experiment_name=data.get('experiment', {}).get('name', ''), |
| results_dir=data.get('experiment', {}).get('results_dir', ''), |
| primary_evaluator_path=data.get('experiment', {}).get('primary_evaluator', ''), |
| trigger_mode=data.get('strategy', {}).get('trigger_mode', 'periodic'), |
| trigger_interval=data.get('strategy', {}).get('trigger_interval', 10), |
| plateau_threshold=data.get('strategy', {}).get('plateau_threshold', 0.01), |
| plateau_window=data.get('strategy', {}).get('plateau_window', 10), |
| agent_enabled=data.get('agent', {}).get('enabled', True) |
| ) |
| |
| @classmethod |
| def from_args(cls, |
| results_dir: str, |
| primary_evaluator_path: str, |
| **kwargs) -> 'ServiceConfig': |
| """Create config from arguments""" |
| return cls( |
| results_dir=results_dir, |
| primary_evaluator_path=primary_evaluator_path, |
| **kwargs |
| ) |
|
|
|
|
| |
| |
| |
|
|
| class GenerationCompleteRequest(BaseModel): |
| """Notification that a generation has completed""" |
| generation: int = Field(..., description="Generation number") |
| results_dir: str = Field(..., description="Path to generation results") |
| primary_score: float = Field(..., description="Primary evaluation score") |
| |
| |
| code_path: Optional[str] = Field(None, description="Path to the code") |
| stage: Optional[str] = Field(None, description="Evolution stage (exploration/optimization/convergence)") |
| metadata: Optional[Dict[str, Any]] = Field(None, description="Additional metadata") |
|
|
|
|
| class ServiceResponse(BaseModel): |
| """Standard service response""" |
| status: str = Field(..., description="Status: success, skipped, error") |
| message: str = Field(..., description="Human-readable message") |
| generation: int |
| |
| |
| agent_triggered: bool = Field(..., description="Whether agent was triggered") |
| trigger_reason: Optional[str] = Field(None, description="Why agent was/wasn't triggered") |
| |
| |
| insights: Optional[List[str]] = Field(None, description="Agent insights") |
| auxiliary_metrics: Optional[Dict[str, float]] = Field(None, description="Auxiliary metrics") |
| |
| |
| processing_time_ms: float |
|
|
|
|
| class ServiceStatusResponse(BaseModel): |
| """Service status information""" |
| status: str = "running" |
| uptime_seconds: float |
| version: str = "0.1.0" |
| |
| experiment: Dict[str, Any] |
| statistics: Dict[str, Any] |
| config: Dict[str, Any] |
|
|
|
|
| |
| |
| |
|
|
| class ServiceState: |
| """ |
| Maintains service state across generations |
| |
| This is the "brain" that decides when to trigger the agent |
| """ |
| |
| def __init__(self, config: ServiceConfig): |
| self.config = config |
| |
| |
| self.generation_history: List[Dict[str, Any]] = [] |
| self.last_agent_trigger_gen: int = -1 |
| self.total_notifications: int = 0 |
| self.total_agent_runs: int = 0 |
| |
| |
| self.start_time = time.time() |
| |
| |
| self._load_state() |
| |
| def _get_state_file(self) -> Path: |
| """Get path to state file""" |
| if self.config.results_dir: |
| state_dir = Path(self.config.results_dir) / "eval_agent_memory" |
| state_dir.mkdir(parents=True, exist_ok=True) |
| return state_dir / "service_state.json" |
| return Path("service_state.json") |
| |
| def _load_state(self): |
| """Load previous state from disk""" |
| state_file = self._get_state_file() |
| if state_file.exists(): |
| try: |
| with open(state_file) as f: |
| data = json.load(f) |
| |
| self.generation_history = data.get('generation_history', []) |
| self.last_agent_trigger_gen = data.get('last_agent_trigger_gen', -1) |
| self.total_notifications = data.get('total_notifications', 0) |
| self.total_agent_runs = data.get('total_agent_runs', 0) |
| |
| logging.info(f"Loaded state: {len(self.generation_history)} generations in history") |
| except Exception as e: |
| logging.error(f"Failed to load state: {e}") |
| |
| def _save_state(self): |
| """Save current state to disk""" |
| state_file = self._get_state_file() |
| try: |
| data = { |
| 'generation_history': self.generation_history[-100:], |
| 'last_agent_trigger_gen': self.last_agent_trigger_gen, |
| 'total_notifications': self.total_notifications, |
| 'total_agent_runs': self.total_agent_runs, |
| 'last_update': time.time() |
| } |
| |
| with open(state_file, 'w') as f: |
| json.dump(data, f, indent=2) |
| except Exception as e: |
| logging.error(f"Failed to save state: {e}") |
| |
| def add_generation(self, gen_data: Dict[str, Any]): |
| """Record a generation""" |
| self.generation_history.append(gen_data) |
| self.total_notifications += 1 |
| |
| |
| if len(self.generation_history) > 100: |
| self.generation_history = self.generation_history[-100:] |
| |
| self._save_state() |
| |
| def should_trigger_agent(self, generation: int, primary_score: float) -> tuple[bool, str]: |
| """ |
| Decide whether to trigger the agent |
| |
| Returns: (should_trigger, reason) |
| """ |
| if not self.config.agent_enabled: |
| return False, "Agent disabled in config" |
| |
| |
| if self.config.trigger_mode == "always": |
| return True, "Always mode" |
| |
| |
| if self.config.trigger_mode == "periodic": |
| if generation - self.last_agent_trigger_gen >= self.config.trigger_interval: |
| return True, f"Periodic trigger (interval={self.config.trigger_interval})" |
| else: |
| return False, f"Not yet (last trigger at gen {self.last_agent_trigger_gen})" |
| |
| |
| if self.config.trigger_mode == "plateau": |
| if self._detect_plateau(): |
| return True, "Plateau detected" |
| else: |
| return False, "No plateau detected" |
| |
| |
| if self.config.trigger_mode == "mixed": |
| |
| if generation - self.last_agent_trigger_gen >= self.config.trigger_interval: |
| return True, f"Periodic trigger (interval={self.config.trigger_interval})" |
| |
| |
| if self._detect_plateau(): |
| return True, "Plateau detected (early trigger)" |
| |
| return False, f"Waiting (next trigger at gen {self.last_agent_trigger_gen + self.config.trigger_interval})" |
| |
| return False, f"Unknown trigger mode: {self.config.trigger_mode}" |
| |
| def _detect_plateau(self) -> bool: |
| """Detect if primary score has plateaued""" |
| window = self.config.plateau_window |
| if len(self.generation_history) < window: |
| return False |
| |
| recent = self.generation_history[-window:] |
| scores = [g['primary_score'] for g in recent] |
| |
| |
| improvement = (scores[-1] - scores[0]) / max(abs(scores[0]), 1e-6) |
| |
| return abs(improvement) < self.config.plateau_threshold |
| |
| def mark_agent_triggered(self, generation: int): |
| """Mark that agent was triggered""" |
| self.last_agent_trigger_gen = generation |
| self.total_agent_runs += 1 |
| self._save_state() |
| |
| def get_statistics(self) -> Dict[str, Any]: |
| """Get service statistics""" |
| return { |
| "total_notifications": self.total_notifications, |
| "total_agent_runs": self.total_agent_runs, |
| "generations_tracked": len(self.generation_history), |
| "last_agent_trigger_gen": self.last_agent_trigger_gen, |
| "uptime_seconds": time.time() - self.start_time |
| } |
|
|
|
|
| |
| |
| |
|
|
| |
| service_state: Optional[ServiceState] = None |
| service_config: Optional[ServiceConfig] = None |
|
|
| |
| app = FastAPI( |
| title="EV2 Evaluation Service", |
| description="Event-driven evaluation service for code evolution", |
| version="0.1.0" |
| ) |
|
|
| |
| logging.basicConfig( |
| level=logging.INFO, |
| format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' |
| ) |
| logger = logging.getLogger(__name__) |
|
|
|
|
| @app.on_event("startup") |
| async def startup_event(): |
| """Initialize service on startup""" |
| global service_state, service_config |
| |
| logger.info("🚀 Starting EV2 Evaluation Service...") |
| |
| |
| if service_config is None: |
| logger.warning("⚠️ No config provided, using defaults") |
| service_config = ServiceConfig() |
| |
| |
| service_state = ServiceState(service_config) |
| |
| logger.info(f"✅ Service started") |
| logger.info(f" Experiment: {service_config.experiment_name}") |
| logger.info(f" Results dir: {service_config.results_dir}") |
| logger.info(f" Trigger mode: {service_config.trigger_mode}") |
| logger.info(f" Trigger interval: {service_config.trigger_interval}") |
|
|
|
|
| @app.on_event("shutdown") |
| async def shutdown_event(): |
| """Cleanup on shutdown""" |
| logger.info("🛑 Shutting down EV2 Evaluation Service...") |
| |
| if service_state: |
| service_state._save_state() |
| logger.info(f" Total notifications: {service_state.total_notifications}") |
| logger.info(f" Total agent runs: {service_state.total_agent_runs}") |
|
|
|
|
| |
| |
| |
|
|
| @app.post("/api/v1/notify/generation_complete", response_model=ServiceResponse) |
| async def notify_generation_complete( |
| request: GenerationCompleteRequest, |
| background_tasks: BackgroundTasks |
| ): |
| """ |
| Receive notification that a generation has completed |
| |
| This is the main entry point called by evolution frameworks. |
| The service will decide autonomously whether to trigger agent analysis. |
| """ |
| start_time = time.time() |
| |
| logger.info(f"📬 Received notification: generation {request.generation}, score={request.primary_score:.4f}") |
| |
| |
| gen_data = { |
| "generation": request.generation, |
| "primary_score": request.primary_score, |
| "results_dir": request.results_dir, |
| "timestamp": time.time() |
| } |
| service_state.add_generation(gen_data) |
| |
| |
| should_trigger, reason = service_state.should_trigger_agent( |
| request.generation, |
| request.primary_score |
| ) |
| |
| logger.info(f" Decision: {'🧠 TRIGGER AGENT' if should_trigger else '⏭️ SKIP'} - {reason}") |
| |
| response_data = { |
| "status": "success", |
| "message": reason, |
| "generation": request.generation, |
| "agent_triggered": should_trigger, |
| "trigger_reason": reason, |
| "processing_time_ms": 0 |
| } |
| |
| if should_trigger: |
| |
| service_state.mark_agent_triggered(request.generation) |
| |
| |
| try: |
| agent_result = await run_ev2_agent( |
| results_dir=request.results_dir, |
| generation=request.generation |
| ) |
| |
| response_data["insights"] = agent_result.get("insights", []) |
| response_data["auxiliary_metrics"] = agent_result.get("auxiliary_metrics", {}) |
| |
| logger.info(f"✅ Agent analysis complete for generation {request.generation}") |
| |
| except Exception as e: |
| logger.error(f"❌ Agent analysis failed: {e}") |
| response_data["status"] = "error" |
| response_data["message"] = f"Agent failed: {str(e)}" |
| else: |
| response_data["status"] = "skipped" |
| |
| |
| response_data["processing_time_ms"] = (time.time() - start_time) * 1000 |
| |
| return JSONResponse(content=response_data) |
|
|
|
|
| async def run_ev2_agent(results_dir: str, generation: int) -> Dict[str, Any]: |
| """ |
| Run the EV2 agent analysis |
| |
| This wraps the existing evolution_evaluation_agent() function |
| """ |
| logger.info(f"🧠 Running EV2 agent for generation {generation}...") |
| |
| try: |
| |
| primary_evaluator_abs = Path(service_config.primary_evaluator_path).resolve() |
| |
| |
| |
| |
| evolution_evaluation_agent( |
| results_dir=results_dir, |
| current_gen=generation, |
| primary_evaluator_path=str(primary_evaluator_abs) |
| ) |
| |
| |
| insights = [] |
| auxiliary_metrics = {} |
| |
| memory_dir = Path(results_dir) / "eval_agent_memory" |
| eval_agents_md = memory_dir / "EVAL_AGENTS.md" |
| |
| if eval_agents_md.exists(): |
| |
| content = eval_agents_md.read_text() |
| |
| |
| for line in content.split('\n'): |
| if line.strip().startswith('*') or line.strip().startswith('-'): |
| insights.append(line.strip()) |
| |
| |
| auxiliary_py = memory_dir / "auxiliary_metrics.py" |
| if auxiliary_py.exists(): |
| |
| auxiliary_metrics["auxiliary_metrics_created"] = True |
| |
| return { |
| "success": True, |
| "insights": insights[-5:] if insights else ["Agent completed analysis"], |
| "auxiliary_metrics": auxiliary_metrics |
| } |
| |
| except Exception as e: |
| logger.error(f"Agent execution failed: {e}", exc_info=True) |
| raise |
|
|
|
|
| @app.get("/api/v1/status", response_model=ServiceStatusResponse) |
| async def get_status(): |
| """Get service status""" |
| if service_state is None or service_config is None: |
| raise HTTPException(status_code=500, detail="Service not initialized") |
| |
| return ServiceStatusResponse( |
| status="running", |
| uptime_seconds=time.time() - service_state.start_time, |
| version="0.1.0", |
| experiment={ |
| "name": service_config.experiment_name, |
| "results_dir": service_config.results_dir, |
| "primary_evaluator": service_config.primary_evaluator_path |
| }, |
| statistics=service_state.get_statistics(), |
| config={ |
| "trigger_mode": service_config.trigger_mode, |
| "trigger_interval": service_config.trigger_interval, |
| "agent_enabled": service_config.agent_enabled |
| } |
| ) |
|
|
|
|
| @app.post("/api/v1/trigger/manual") |
| async def trigger_manual(generation: int): |
| """ |
| Manually trigger agent analysis for a specific generation |
| |
| Useful for debugging or forcing an update |
| """ |
| logger.info(f"🔧 Manual trigger requested for generation {generation}") |
| |
| |
| gen_data = None |
| for g in reversed(service_state.generation_history): |
| if g['generation'] == generation: |
| gen_data = g |
| break |
| |
| if gen_data is None: |
| raise HTTPException(status_code=404, detail=f"Generation {generation} not found") |
| |
| |
| try: |
| result = await run_ev2_agent( |
| results_dir=gen_data['results_dir'], |
| generation=generation |
| ) |
| |
| service_state.mark_agent_triggered(generation) |
| |
| return { |
| "status": "success", |
| "message": f"Agent triggered for generation {generation}", |
| "result": result |
| } |
| except Exception as e: |
| raise HTTPException(status_code=500, detail=f"Agent failed: {str(e)}") |
|
|
|
|
| @app.get("/") |
| async def root(): |
| """Root endpoint""" |
| return { |
| "service": "EV2 Evaluation Service", |
| "version": "0.1.0", |
| "status": "running", |
| "docs": "/docs" |
| } |
|
|
|
|
| |
| |
| |
|
|
| def main(): |
| """Main entry point""" |
| import argparse |
| |
| parser = argparse.ArgumentParser(description="EV2 Evaluation Service") |
| |
| parser.add_argument( |
| "--config", |
| type=str, |
| help="Path to YAML config file" |
| ) |
| |
| |
| parser.add_argument("--results-dir", type=str, help="Results directory") |
| parser.add_argument("--primary-evaluator", type=str, help="Path to primary evaluator") |
| parser.add_argument("--trigger-mode", type=str, default="periodic", |
| choices=["always", "periodic", "plateau", "mixed"]) |
| parser.add_argument("--trigger-interval", type=int, default=10) |
| parser.add_argument("--port", type=int, default=8765) |
| parser.add_argument("--host", type=str, default="0.0.0.0") |
| |
| args = parser.parse_args() |
| |
| global service_config |
| |
| |
| if args.config: |
| logger.info(f"Loading config from {args.config}") |
| service_config = ServiceConfig.from_yaml(args.config) |
| else: |
| |
| if not args.results_dir or not args.primary_evaluator: |
| logger.error("Must provide either --config or (--results-dir and --primary-evaluator)") |
| return |
| |
| service_config = ServiceConfig.from_args( |
| results_dir=args.results_dir, |
| primary_evaluator_path=args.primary_evaluator, |
| trigger_mode=args.trigger_mode, |
| trigger_interval=args.trigger_interval, |
| host=args.host, |
| port=args.port |
| ) |
| |
| |
| logger.info(f"Starting server on {service_config.host}:{service_config.port}") |
| uvicorn.run( |
| app, |
| host=service_config.host, |
| port=service_config.port, |
| log_level=service_config.log_level.lower() |
| ) |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|