""" EV2 Evaluation Service A lightweight HTTP service wrapper around ev2.py that: 1. Receives generation completion notifications from evolution frameworks 2. Decides autonomously when to trigger EV2 agent analysis 3. Maintains persistent state across generations Design principles: - Minimal changes to ShinkaEvolve (just send notifications) - Event-driven architecture (fire-and-forget notifications) - Service makes autonomous decisions (when to analyze, when to update metrics) - Gradual enhancement (start simple, add features incrementally) Usage: # Start the service python eval_agent/ev2_service.py --config eval_service_config.yaml # From ShinkaEvolve (or any evolution framework) import requests requests.post("http://localhost:8765/api/v1/notify/generation_complete", json={ "generation": 42, "results_dir": "/path/to/results", "primary_score": 2.5407 }) """ from fastapi import FastAPI, HTTPException, BackgroundTasks from fastapi.responses import JSONResponse from pydantic import BaseModel, Field from typing import Dict, Any, Optional, List import uvicorn import logging import time import json from pathlib import Path from dataclasses import dataclass, asdict import yaml # Import the existing ev2 logic import sys from pathlib import Path # Add project root to path project_root = Path(__file__).parent.parent if str(project_root) not in sys.path: sys.path.insert(0, str(project_root)) from eval_agent.ev2 import evolution_evaluation_agent # ============================================================================ # Configuration # ============================================================================ @dataclass class ServiceConfig: """Service configuration""" # Server settings host: str = "0.0.0.0" port: int = 8765 log_level: str = "INFO" # Experiment settings experiment_name: str = "" results_dir: str = "" primary_evaluator_path: str = "" # Trigger strategy trigger_mode: str = "periodic" # "periodic", "plateau", "mixed", "always" trigger_interval: int = 10 # Run agent every N generations plateau_threshold: float = 0.01 plateau_window: int = 10 # Agent settings agent_enabled: bool = True @classmethod def from_yaml(cls, config_path: str) -> 'ServiceConfig': """Load config from YAML file""" with open(config_path) as f: data = yaml.safe_load(f) return cls( host=data.get('service', {}).get('host', '0.0.0.0'), port=data.get('service', {}).get('port', 8765), log_level=data.get('service', {}).get('log_level', 'INFO'), experiment_name=data.get('experiment', {}).get('name', ''), results_dir=data.get('experiment', {}).get('results_dir', ''), primary_evaluator_path=data.get('experiment', {}).get('primary_evaluator', ''), trigger_mode=data.get('strategy', {}).get('trigger_mode', 'periodic'), trigger_interval=data.get('strategy', {}).get('trigger_interval', 10), plateau_threshold=data.get('strategy', {}).get('plateau_threshold', 0.01), plateau_window=data.get('strategy', {}).get('plateau_window', 10), agent_enabled=data.get('agent', {}).get('enabled', True) ) @classmethod def from_args(cls, results_dir: str, primary_evaluator_path: str, **kwargs) -> 'ServiceConfig': """Create config from arguments""" return cls( results_dir=results_dir, primary_evaluator_path=primary_evaluator_path, **kwargs ) # ============================================================================ # Request/Response Models # ============================================================================ class GenerationCompleteRequest(BaseModel): """Notification that a generation has completed""" generation: int = Field(..., description="Generation number") results_dir: str = Field(..., description="Path to generation results") primary_score: float = Field(..., description="Primary evaluation score") # Optional context code_path: Optional[str] = Field(None, description="Path to the code") stage: Optional[str] = Field(None, description="Evolution stage (exploration/optimization/convergence)") metadata: Optional[Dict[str, Any]] = Field(None, description="Additional metadata") class ServiceResponse(BaseModel): """Standard service response""" status: str = Field(..., description="Status: success, skipped, error") message: str = Field(..., description="Human-readable message") generation: int # Decision info agent_triggered: bool = Field(..., description="Whether agent was triggered") trigger_reason: Optional[str] = Field(None, description="Why agent was/wasn't triggered") # Results (if agent was triggered) insights: Optional[List[str]] = Field(None, description="Agent insights") auxiliary_metrics: Optional[Dict[str, float]] = Field(None, description="Auxiliary metrics") # Timing processing_time_ms: float class ServiceStatusResponse(BaseModel): """Service status information""" status: str = "running" uptime_seconds: float version: str = "0.1.0" experiment: Dict[str, Any] statistics: Dict[str, Any] config: Dict[str, Any] # ============================================================================ # Service State Management # ============================================================================ class ServiceState: """ Maintains service state across generations This is the "brain" that decides when to trigger the agent """ def __init__(self, config: ServiceConfig): self.config = config # State tracking self.generation_history: List[Dict[str, Any]] = [] self.last_agent_trigger_gen: int = -1 self.total_notifications: int = 0 self.total_agent_runs: int = 0 # Timing self.start_time = time.time() # Load previous state if exists self._load_state() def _get_state_file(self) -> Path: """Get path to state file""" if self.config.results_dir: state_dir = Path(self.config.results_dir) / "eval_agent_memory" state_dir.mkdir(parents=True, exist_ok=True) return state_dir / "service_state.json" return Path("service_state.json") def _load_state(self): """Load previous state from disk""" state_file = self._get_state_file() if state_file.exists(): try: with open(state_file) as f: data = json.load(f) self.generation_history = data.get('generation_history', []) self.last_agent_trigger_gen = data.get('last_agent_trigger_gen', -1) self.total_notifications = data.get('total_notifications', 0) self.total_agent_runs = data.get('total_agent_runs', 0) logging.info(f"Loaded state: {len(self.generation_history)} generations in history") except Exception as e: logging.error(f"Failed to load state: {e}") def _save_state(self): """Save current state to disk""" state_file = self._get_state_file() try: data = { 'generation_history': self.generation_history[-100:], # Keep last 100 'last_agent_trigger_gen': self.last_agent_trigger_gen, 'total_notifications': self.total_notifications, 'total_agent_runs': self.total_agent_runs, 'last_update': time.time() } with open(state_file, 'w') as f: json.dump(data, f, indent=2) except Exception as e: logging.error(f"Failed to save state: {e}") def add_generation(self, gen_data: Dict[str, Any]): """Record a generation""" self.generation_history.append(gen_data) self.total_notifications += 1 # Keep only recent history in memory if len(self.generation_history) > 100: self.generation_history = self.generation_history[-100:] self._save_state() def should_trigger_agent(self, generation: int, primary_score: float) -> tuple[bool, str]: """ Decide whether to trigger the agent Returns: (should_trigger, reason) """ if not self.config.agent_enabled: return False, "Agent disabled in config" # Strategy 1: Always (for testing) if self.config.trigger_mode == "always": return True, "Always mode" # Strategy 2: Periodic if self.config.trigger_mode == "periodic": if generation - self.last_agent_trigger_gen >= self.config.trigger_interval: return True, f"Periodic trigger (interval={self.config.trigger_interval})" else: return False, f"Not yet (last trigger at gen {self.last_agent_trigger_gen})" # Strategy 3: Plateau detection if self.config.trigger_mode == "plateau": if self._detect_plateau(): return True, "Plateau detected" else: return False, "No plateau detected" # Strategy 4: Mixed (periodic OR plateau) if self.config.trigger_mode == "mixed": # Check periodic if generation - self.last_agent_trigger_gen >= self.config.trigger_interval: return True, f"Periodic trigger (interval={self.config.trigger_interval})" # Check plateau if self._detect_plateau(): return True, "Plateau detected (early trigger)" return False, f"Waiting (next trigger at gen {self.last_agent_trigger_gen + self.config.trigger_interval})" return False, f"Unknown trigger mode: {self.config.trigger_mode}" def _detect_plateau(self) -> bool: """Detect if primary score has plateaued""" window = self.config.plateau_window if len(self.generation_history) < window: return False recent = self.generation_history[-window:] scores = [g['primary_score'] for g in recent] # Check if improvement is below threshold improvement = (scores[-1] - scores[0]) / max(abs(scores[0]), 1e-6) return abs(improvement) < self.config.plateau_threshold def mark_agent_triggered(self, generation: int): """Mark that agent was triggered""" self.last_agent_trigger_gen = generation self.total_agent_runs += 1 self._save_state() def get_statistics(self) -> Dict[str, Any]: """Get service statistics""" return { "total_notifications": self.total_notifications, "total_agent_runs": self.total_agent_runs, "generations_tracked": len(self.generation_history), "last_agent_trigger_gen": self.last_agent_trigger_gen, "uptime_seconds": time.time() - self.start_time } # ============================================================================ # FastAPI Application # ============================================================================ # Global state (initialized on startup) service_state: Optional[ServiceState] = None service_config: Optional[ServiceConfig] = None # Create FastAPI app app = FastAPI( title="EV2 Evaluation Service", description="Event-driven evaluation service for code evolution", version="0.1.0" ) # Setup logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) @app.on_event("startup") async def startup_event(): """Initialize service on startup""" global service_state, service_config logger.info("🚀 Starting EV2 Evaluation Service...") # Load config (will be set by main()) if service_config is None: logger.warning("⚠️ No config provided, using defaults") service_config = ServiceConfig() # Initialize state service_state = ServiceState(service_config) logger.info(f"✅ Service started") logger.info(f" Experiment: {service_config.experiment_name}") logger.info(f" Results dir: {service_config.results_dir}") logger.info(f" Trigger mode: {service_config.trigger_mode}") logger.info(f" Trigger interval: {service_config.trigger_interval}") @app.on_event("shutdown") async def shutdown_event(): """Cleanup on shutdown""" logger.info("🛑 Shutting down EV2 Evaluation Service...") if service_state: service_state._save_state() logger.info(f" Total notifications: {service_state.total_notifications}") logger.info(f" Total agent runs: {service_state.total_agent_runs}") # ============================================================================ # API Endpoints # ============================================================================ @app.post("/api/v1/notify/generation_complete", response_model=ServiceResponse) async def notify_generation_complete( request: GenerationCompleteRequest, background_tasks: BackgroundTasks ): """ Receive notification that a generation has completed This is the main entry point called by evolution frameworks. The service will decide autonomously whether to trigger agent analysis. """ start_time = time.time() logger.info(f"📬 Received notification: generation {request.generation}, score={request.primary_score:.4f}") # Record this generation gen_data = { "generation": request.generation, "primary_score": request.primary_score, "results_dir": request.results_dir, "timestamp": time.time() } service_state.add_generation(gen_data) # Decide whether to trigger agent should_trigger, reason = service_state.should_trigger_agent( request.generation, request.primary_score ) logger.info(f" Decision: {'🧠 TRIGGER AGENT' if should_trigger else '⏭️ SKIP'} - {reason}") response_data = { "status": "success", "message": reason, "generation": request.generation, "agent_triggered": should_trigger, "trigger_reason": reason, "processing_time_ms": 0 # Will be updated } if should_trigger: # Mark that we triggered service_state.mark_agent_triggered(request.generation) # Run agent analysis (this may take a while) try: agent_result = await run_ev2_agent( results_dir=request.results_dir, generation=request.generation ) response_data["insights"] = agent_result.get("insights", []) response_data["auxiliary_metrics"] = agent_result.get("auxiliary_metrics", {}) logger.info(f"✅ Agent analysis complete for generation {request.generation}") except Exception as e: logger.error(f"❌ Agent analysis failed: {e}") response_data["status"] = "error" response_data["message"] = f"Agent failed: {str(e)}" else: response_data["status"] = "skipped" # Calculate processing time response_data["processing_time_ms"] = (time.time() - start_time) * 1000 return JSONResponse(content=response_data) async def run_ev2_agent(results_dir: str, generation: int) -> Dict[str, Any]: """ Run the EV2 agent analysis This wraps the existing evolution_evaluation_agent() function """ logger.info(f"🧠 Running EV2 agent for generation {generation}...") try: # Convert to absolute path primary_evaluator_abs = Path(service_config.primary_evaluator_path).resolve() # Call the existing ev2.py logic # Note: evolution_evaluation_agent() is currently synchronous, # but we can still call it from an async function evolution_evaluation_agent( results_dir=results_dir, current_gen=generation, primary_evaluator_path=str(primary_evaluator_abs) ) # Try to extract insights from EVAL_AGENTS.md insights = [] auxiliary_metrics = {} memory_dir = Path(results_dir) / "eval_agent_memory" eval_agents_md = memory_dir / "EVAL_AGENTS.md" if eval_agents_md.exists(): # Simple parsing (can be improved) content = eval_agents_md.read_text() # Extract insights (lines starting with bullets) for line in content.split('\n'): if line.strip().startswith('*') or line.strip().startswith('-'): insights.append(line.strip()) # Try to load auxiliary metrics auxiliary_py = memory_dir / "auxiliary_metrics.py" if auxiliary_py.exists(): # Check if it exists and is valid auxiliary_metrics["auxiliary_metrics_created"] = True return { "success": True, "insights": insights[-5:] if insights else ["Agent completed analysis"], "auxiliary_metrics": auxiliary_metrics } except Exception as e: logger.error(f"Agent execution failed: {e}", exc_info=True) raise @app.get("/api/v1/status", response_model=ServiceStatusResponse) async def get_status(): """Get service status""" if service_state is None or service_config is None: raise HTTPException(status_code=500, detail="Service not initialized") return ServiceStatusResponse( status="running", uptime_seconds=time.time() - service_state.start_time, version="0.1.0", experiment={ "name": service_config.experiment_name, "results_dir": service_config.results_dir, "primary_evaluator": service_config.primary_evaluator_path }, statistics=service_state.get_statistics(), config={ "trigger_mode": service_config.trigger_mode, "trigger_interval": service_config.trigger_interval, "agent_enabled": service_config.agent_enabled } ) @app.post("/api/v1/trigger/manual") async def trigger_manual(generation: int): """ Manually trigger agent analysis for a specific generation Useful for debugging or forcing an update """ logger.info(f"🔧 Manual trigger requested for generation {generation}") # Find the generation in history gen_data = None for g in reversed(service_state.generation_history): if g['generation'] == generation: gen_data = g break if gen_data is None: raise HTTPException(status_code=404, detail=f"Generation {generation} not found") # Run agent try: result = await run_ev2_agent( results_dir=gen_data['results_dir'], generation=generation ) service_state.mark_agent_triggered(generation) return { "status": "success", "message": f"Agent triggered for generation {generation}", "result": result } except Exception as e: raise HTTPException(status_code=500, detail=f"Agent failed: {str(e)}") @app.get("/") async def root(): """Root endpoint""" return { "service": "EV2 Evaluation Service", "version": "0.1.0", "status": "running", "docs": "/docs" } # ============================================================================ # CLI Entry Point # ============================================================================ def main(): """Main entry point""" import argparse parser = argparse.ArgumentParser(description="EV2 Evaluation Service") parser.add_argument( "--config", type=str, help="Path to YAML config file" ) # Or specify directly parser.add_argument("--results-dir", type=str, help="Results directory") parser.add_argument("--primary-evaluator", type=str, help="Path to primary evaluator") parser.add_argument("--trigger-mode", type=str, default="periodic", choices=["always", "periodic", "plateau", "mixed"]) parser.add_argument("--trigger-interval", type=int, default=10) parser.add_argument("--port", type=int, default=8765) parser.add_argument("--host", type=str, default="0.0.0.0") args = parser.parse_args() global service_config # Load config if args.config: logger.info(f"Loading config from {args.config}") service_config = ServiceConfig.from_yaml(args.config) else: # Create from args if not args.results_dir or not args.primary_evaluator: logger.error("Must provide either --config or (--results-dir and --primary-evaluator)") return service_config = ServiceConfig.from_args( results_dir=args.results_dir, primary_evaluator_path=args.primary_evaluator, trigger_mode=args.trigger_mode, trigger_interval=args.trigger_interval, host=args.host, port=args.port ) # Start server logger.info(f"Starting server on {service_config.host}:{service_config.port}") uvicorn.run( app, host=service_config.host, port=service_config.port, log_level=service_config.log_level.lower() ) if __name__ == "__main__": main()