""" PolicyEvolverEnv — Hackathon Inference Script ============================================= MANDATORY ENV VARS: API_BASE_URL The API endpoint for the LLM. MODEL_NAME The model identifier to use for inference. HF_TOKEN Your Hugging Face / API key. IMAGE_NAME The Docker image name (set by validator). STDOUT FORMAT: [START] task= env=policy_evolver_env model= [STEP] step= action= reward=<0.00> done= error= [END] task= success= steps= score= rewards= """ import asyncio import os import sys import json from typing import Dict, List, Optional from openai import OpenAI from client import PolicyEvolverEnv from models import Action # ─── Environment Variables (Hackathon Mandatory) ─── IMAGE_NAME = os.getenv("IMAGE_NAME") API_KEY = os.environ.get("API_KEY") or os.environ.get("HF_TOKEN") API_BASE_URL = os.environ.get("API_BASE_URL") MODEL_NAME = os.environ.get("MODEL_NAME") BENCHMARK = "policy_evolver_env" # ─── Auto-discover model if MODEL_NAME is not set ─── if not MODEL_NAME and API_BASE_URL and API_KEY: try: import httpx resp = httpx.get( f"{API_BASE_URL.rstrip('/')}/models", headers={"Authorization": f"Bearer {API_KEY}"}, timeout=10, ) if resp.status_code == 200: models_data = resp.json().get("data", []) # Filter out wildcards and pick a real model name for m in models_data: mid = m.get("id", "") if mid and mid != "*" and not mid.startswith("*"): MODEL_NAME = mid print(f"[DEBUG] Auto-discovered model: {MODEL_NAME}", flush=True) break except Exception as e: print(f"[DEBUG] Model discovery failed: {e}", flush=True) if not MODEL_NAME: MODEL_NAME = "gpt-4o-mini" print(f"[DEBUG] Using default MODEL_NAME: {MODEL_NAME}", flush=True) MAX_STEPS = 5 TEMPERATURE = 0.0 SUCCESS_THRESHOLD = 0.70 # ─── Logging Helpers (Hackathon Mandatory Format) ─── def log_start(task: str, env: str, model: str) -> None: print(f"[START] task={task} env={env} model={model}", flush=True) def log_step(step: int, action: str, reward: float, done: bool, error: Optional[str]) -> None: error_val = f'"{error}"' if error else "null" done_val = str(done).lower() print(f"[STEP] step={step} action={action} reward={reward:.2f} done={done_val} error={error_val}", flush=True) def log_end(task: str, success: bool, steps: int, score: float, rewards: List[float]) -> None: rewards_str = ",".join(f"{r:.2f}" for r in rewards) print(f"[END] task={task} success={str(success).lower()} steps={steps} score={score:.3f} rewards={rewards_str}", flush=True) # ─── LLM Agent ─── class PolicyEvolverAgent: """Strategic Policy Agent — maximizes governance scores via in-context adaptation.""" SYSTEM_PROMPT = ( "You are a Strategic Policy Engineer. Your goal is to maximize governance outcomes through verifiable " "precision. STYLISTIC RULES:\n" "1. NO VAGUENESS: Never use words like 'maybe', 'perhaps', 'sometimes', 'usually'.\n" "2. COMMAND LANGUAGE: Use 'must', 'shall', 'prohibited', 'required', 'mandatory'.\n" "3. MEASURABLE CRITERIA: Define terms with 'if-then' and metrics.\n" "4. ANALYTICAL COT: Your 'think' field MUST be 150-250 words and include terms: 'tradeoff', 'precision', " "'recall', 'threshold', 'impact', 'evidence'.\n" "5. JSON ONLY: Output ONLY the JSON object. No preamble.\n" "6. INCREMENTALISM: If your previous score was high (>0.80), focus on surgical precision rather than holistic rewriting. " "DO NOT add words that create ambiguity." ) def __init__(self, model: str): self.model = model self.action_history: list = [] self.score_history: list = [] def _call_llm(self, client: OpenAI, prompt: str) -> Optional[dict]: """Call the LLM and robustly parse the JSON response.""" try: resp = client.chat.completions.create( model=self.model, messages=[ {"role": "system", "content": self.SYSTEM_PROMPT}, {"role": "user", "content": prompt}, ], max_tokens=1024, temperature=TEMPERATURE, seed=42, ) raw = resp.choices[0].message.content.strip() # Strip markdown fences if "```json" in raw: raw = raw.split("```json")[1].split("```")[0].strip() elif "```" in raw: raw = raw.split("```")[1].split("```")[0].strip() try: return json.loads(raw) except json.JSONDecodeError: start = raw.find("{") end = raw.rfind("}") if start != -1 and end != -1: return json.loads(raw[start : end + 1]) raise except Exception as e: print(f"[DEBUG] LLM Call Error: {e}", file=sys.stderr) if 'raw' in locals(): print(f"[DEBUG] Raw content: {raw}", file=sys.stderr) raise e def _build_feedback(self, step: int, last_score: float, last_action: dict, task_id: str) -> str: """Build diagnostic feedback from previous step for in-context learning.""" if step == 0 or not last_action: return "" lines = [ f"\n=== STRATEGIC FEEDBACK (Step {step}) ===", f"Previous score: {last_score:.3f} / 1.000", ] if task_id == "task_easy": defn = last_action.get("suggested_definition", "") vague = ["might", "could", "perhaps", "sometimes", "often", "generally", "usually", "typically", "may", "possibly"] found = [w for w in vague if w in defn.lower()] meas = ["threshold", "verify", "days", "$", "%", "reports", "hours", "within", "exceed", "minimum", "must", "shall"] mfound = [w for w in meas if w in defn.lower()] if found: lines.append(f"FAILURE: Vague words detected: {found}. Remove them entirely.") if len(mfound) < 2: lines.append("FAILURE: Missing measurable criteria. Add numbers, hours, percentages.") if len(defn.split()) < 15: lines.append("FAILURE: Definition too short. Minimum 15 words.") elif task_id == "task_medium": if not last_action.get("rule_domain", "").strip(): lines.append("FAILURE: rule_domain was empty.") if len(last_action.get("new_rule", "").split()) < 10: lines.append("FAILURE: New rule too short.") elif task_id == "task_hard": outcomes = last_action.get("expected_outcomes", {}) if isinstance(outcomes, dict) and len(outcomes) >= 2: vals = [v for v in outcomes.values() if isinstance(v, (int, float))] vals = [v / 100 if v > 1 else v for v in vals] if vals and all(v > 0.70 for v in vals): lines.append("FAILURE: Unrealistic tradeoff — all metrics > 0.70. Model friction.") mods = last_action.get("policy_modifications", []) if len(mods) < 2: lines.append("FAILURE: Need >= 2 policy_modifications.") # Append history summaries for act, sc in zip(self.action_history[-3:], self.score_history[-3:]): lines.append(f" [{sc:.2f}] {act.get('action_type', '?')}") # Surgical Refinement Guard if last_score >= 0.80: lines = [ f"\n=== SURGICAL REFINEMENT (Step {step}) ===", f"Current Score: {last_score:.3f} — EXCELLENT.", "CRITICAL: Do NOT rewrite the policy. Only perform 'surgical' removals or additions.", "1. CHECK: Remove 'might', 'could', 'perhaps', 'sometimes', 'often' if present.", "2. CHECK: Ensure words count >= 12. Add one more specific metric (%, hours, $) if needed.", "Do NOT add any words that could be seen as vague. Aim for 0.95+." ] else: target = min(last_score + 0.20, 0.95) lines.append(f"\nYour next proposal MUST score above {target:.2f}. Be more specific.") return "\n".join(lines) def get_action(self, client: OpenAI, task_id: str, obs: dict) -> dict: """Generate the next strategic action for the given task.""" step = obs.get("step_count", 0) last_score = obs.get("info", {}).get("last_reward", 0.0) last_action = obs.get("info", {}).get("last_action", {}) feedback = self._build_feedback(step, last_score, last_action, task_id) if task_id == "task_easy": prompt = ( f"POLICIES: {obs.get('current_policies', [])}\n" f"DATA: {obs.get('data_corpus', [])[:5]}\n{feedback}\n" "TASK: Propose clarification for an ambiguous term with a measurable definition.\n" "JSON: {\"action_type\": \"propose_clarification\", \"ambiguous_term\": \"...\", " "\"suggested_definition\": \"...\", \"affected_policy_ids\": [\"str\"], " "\"justification\": \"...\", \"think\": \"...\"}" ) elif task_id == "task_medium": prompt = ( f"POLICIES: {obs.get('current_policies', [])}\n" f"DATA: {obs.get('data_corpus', [])}\n{feedback}\n" "TASK: Propose a new rule for a coverage gap. Use mandatory language.\n" "JSON: {\"action_type\": \"propose_new_rule\", \"rule_domain\": \"...\", " "\"new_rule\": \"...\", \"scope\": [\"str\"], \"integration_points\": [\"str\"], " "\"justification\": \"...\", \"think\": \"...\"}" ) else: prompt = ( f"METRICS: {obs.get('system_metrics', {})}\n" f"ISSUES: {obs.get('identified_issues', [])}\n{feedback}\n" "TASK: Evolve policies with realistic tradeoffs.\n" "JSON: {\"action_type\": \"evolve_policy\", \"policy_modifications\": " "[{\"policy_id\": \"...\", \"change_type\": \"enhance|restrict|add|remove\", " "\"new_text\": \"...\", \"reason\": \"...\"}], \"expected_outcomes\": " "{\"fraud_rate\": 0.8, \"revenue_velocity\": 0.4}, " "\"rollback_conditions\": [\"...\"], \"justification\": \"...\", \"think\": \"...\"}" ) result = self._call_llm(client, prompt) return result # ─── Episode Runner ─── async def run_episode(client: Optional[OpenAI], env: Optional[PolicyEvolverEnv], task_id: str, setup_error: Optional[Exception] = None) -> dict: """Run a single task episode following the hackathon format.""" log_start(task=task_id, env=BENCHMARK, model=MODEL_NAME) if setup_error: print(f"[FATAL] Setup Error: {setup_error}", file=sys.stderr) log_step(step=1, action="setup", reward=0.0, done=True, error=str(setup_error)) log_end(task=task_id, success=False, steps=0, score=0.0, rewards=[]) sys.exit(1) if not client or not env: print("[FATAL] Client or Environment not initialized", file=sys.stderr) log_step(step=1, action="setup", reward=0.0, done=True, error="Client or Environment not initialized") log_end(task=task_id, success=False, steps=0, score=0.0, rewards=[]) sys.exit(1) agent = PolicyEvolverAgent(MODEL_NAME) rewards: List[float] = [] steps_taken = 0 score = 0.0 success = False try: result = await env.reset(task_id=task_id) for step in range(1, MAX_STEPS + 1): if result.done: break # Get observation as dict obs_dict = result.observation if hasattr(obs_dict, "model_dump"): obs_dict = obs_dict.model_dump() elif not isinstance(obs_dict, dict): obs_dict = dict(obs_dict) # Agent decides action (graceful failure per step) try: action_dict = agent.get_action(client, task_id, obs_dict) except Exception as e: # LLM call failed — log error for this step and move to next task print(f"[DEBUG] LLM error on step {step}: {e}", file=sys.stderr) log_step(step=step, action="llm_error", reward=0.0, done=True, error=str(e)) rewards.append(0.0) steps_taken = step break agent.action_history.append(action_dict) # Validate and step error = None try: action_obj = Action.model_validate(action_dict) result = await env.step(action_obj) reward = result.reward or 0.0 done = result.done except Exception as e: reward = 0.0 done = True error = str(e) rewards.append(reward) agent.score_history.append(reward) steps_taken = step act_name = action_dict.get("action_type", "unknown") log_step(step=step, action=act_name, reward=reward, done=done, error=error) if done: break score = rewards[-1] if rewards else 0.0 success = score >= SUCCESS_THRESHOLD except Exception as e: print(f"[FATAL] Runtime Error: {e}", file=sys.stderr) log_step(step=steps_taken + 1, action="error", reward=0.0, done=True, error=str(e)) log_end(task=task_id, success=False, steps=steps_taken, score=0.0, rewards=rewards) sys.exit(1) finally: # We only log_end here if we didn't exit(1) already if not sys.exc_info()[0]: log_end(task=task_id, success=success, steps=steps_taken, score=score, rewards=rewards) # ─── Main Entry Point ─── async def main() -> None: client = None env = None setup_error = None try: # 1. Initialize OpenAI Client try: if not API_KEY or not API_BASE_URL: raise Exception("Missing mandatory environment variables: API_KEY and/or API_BASE_URL") client = OpenAI(base_url=API_BASE_URL, api_key=API_KEY) except Exception as e: setup_error = Exception(f"OpenAI client initialization failed: {e}") # 2. Initialize Environment if not setup_error: try: if IMAGE_NAME: # Manually handle Docker startup to override the 30s library default from openenv.core.containers.runtime.providers import LocalDockerProvider provider = LocalDockerProvider() base_url = provider.start_container(IMAGE_NAME) print(f"[DEBUG] Waiting for container {IMAGE_NAME} at {base_url} (Extended Timeout 120s)...", flush=True) provider.wait_for_ready(base_url, timeout_s=120.0) env = PolicyEvolverEnv(base_url=base_url, provider=provider) await env.connect() else: local_url = os.environ.get("ENV_BASE_URL", "http://127.0.0.1:8000") env = PolicyEvolverEnv(base_url=local_url) # For local testing, we might want to check connection immediately or let run_episode handle it except Exception as e: setup_error = Exception(f"Environment initialization failed: {e}") except Exception as e: setup_error = e # 3. Always loop over tasks to ensure structured logs tasks = ["task_easy", "task_medium", "task_hard"] for task in tasks: await run_episode(client, env, task, setup_error=setup_error) # 4. Final Cleanup if env: try: await env.close() except Exception as e: print(f"[DEBUG] env.close() error: {e}", flush=True) if __name__ == "__main__": asyncio.run(main())