| |
| |
| from __future__ import annotations |
| import uuid |
| import random |
| from typing import Optional, Any, Dict |
| from openenv.core.env_server import Environment |
| import logging |
|
|
| logger = logging.getLogger(__name__) |
| from models import ( |
| Action, Observation, State, |
| ProposeClarificationAction, ProposeNewRuleAction, EvolveProcessAction, |
| ) |
| from server.grader import grade |
| from server.tasks import TASK_REGISTRY |
|
|
|
|
| class PolicyEvolverEnvironment(Environment[Action, Observation, State]): |
| """ |
| Real-world environment: AI agent learns to evolve governance policies |
| through meta-reasoning over real-world data. |
| """ |
| _instance = None |
|
|
| def __new__(cls, *args, **kwargs): |
| if cls._instance is None: |
| cls._instance = super().__new__(cls) |
| cls._instance._initialized = False |
| return cls._instance |
|
|
| def __init__(self): |
| if getattr(self, "_initialized", False): |
| return |
| super().__init__() |
| self._state = State() |
| self._current_task = None |
| self._persistent_best_score = 0.0 |
| self._seen_action_hashes = set() |
| self._initialized = True |
|
|
| def reset( |
| self, |
| seed: Optional[int] = None, |
| episode_id: Optional[str] = None, |
| **kwargs: Any, |
| ) -> Observation: |
| task_id = kwargs.get("task_id") |
| if task_id is None: |
| task_id = random.choice(list(TASK_REGISTRY.keys())) |
|
|
| self._seen_action_hashes = set() |
| task = TASK_REGISTRY[task_id] |
| self._current_task = task |
| self._state = State( |
| episode_id=episode_id or str(uuid.uuid4()), |
| task_id=task_id, |
| step_count=0, |
| max_steps=5, |
| current_score=0.0, |
| best_score=self._persistent_best_score, |
| actions_taken=[], |
| ) |
|
|
| |
| import copy |
| self._episode_corpus = copy.deepcopy(task.get("data_corpus", [])) |
| |
| for item in self._episode_corpus: |
| if "content" not in item: |
| item["content"] = item.pop("text", None) or item.pop("desc", None) or str(item.get("flags", "")) |
| if "system_action" not in item: |
| item["system_action"] = "pending" |
|
|
| shown_corpus = self._episode_corpus[:10] |
|
|
| return Observation( |
| task_id=task_id, |
| episode_id=self._state.episode_id, |
| step_count=0, |
| corpus_size=len(self._episode_corpus), |
| corpus_shown=len(shown_corpus), |
| data_corpus=shown_corpus, |
| current_policies=task["current_policies"], |
| policy_outcomes=task.get("policy_outcomes"), |
| system_metrics=task.get("system_metrics", {}), |
| identified_issues=task.get("identified_issues", []), |
| reward=0.0, |
| done=False, |
| info={ |
| "task_description": task["description"], |
| "difficulty": task["difficulty"], |
| "best_score": self._persistent_best_score, |
| "steps_remaining": self._state.max_steps |
| }, |
| ) |
|
|
| def step( |
| self, |
| action: Action, |
| timeout_s: Optional[float] = None, |
| **kwargs: Any, |
| ) -> Observation: |
| if self._current_task is None: |
| raise RuntimeError("Call reset() before step()") |
|
|
| if self._state.step_count >= self._state.max_steps: |
| logger.warning(f"[EXPLOIT] Step-count limit exceeded for episode {self._state.episode_id}") |
| return self.reset(task_id=self._state.task_id) |
| |
| self._state.step_count += 1 |
|
|
| |
| if isinstance(action, dict): |
| action_dict = action |
| else: |
| |
| if hasattr(action, "root"): |
| action = action.root |
| action_dict = action.model_dump() if hasattr(action, "model_dump") else dict(action) |
|
|
| |
| import json as _json |
| try: |
| action_hash = hash(_json.dumps(action_dict, sort_keys=True, default=str)) |
| except Exception: |
| action_hash = hash(str(action_dict)) |
|
|
| if action_hash in self._seen_action_hashes: |
| repetition_penalty = 0.30 |
| else: |
| repetition_penalty = 0.0 |
| self._seen_action_hashes.add(action_hash) |
|
|
| previous_score = self._state.current_score |
| raw_reward = grade(action_dict, self._state.task_id, previous_score=previous_score) |
| reward = max(0.001, min(0.999, raw_reward - repetition_penalty)) |
| |
| self._state.current_score = reward |
| self._state.best_score = max(self._state.best_score, reward) |
| self._persistent_best_score = max(self._persistent_best_score, reward) |
| self._state.rewards_history.append(reward) |
|
|
| action_type = action_dict.get("action_type", "unknown") if isinstance(action_dict, dict) else "unknown" |
| self._state.actions_taken.append(action_type) |
|
|
| |
| |
| target_term = action_dict.get("ambiguous_term") or action_dict.get("rule_domain") or "" |
| t_term = str(target_term).lower() |
|
|
| |
| relevant = [] |
| remaining = [] |
| for item in self._episode_corpus: |
| c_type = str(item.get("type", "")).lower() |
| c_text = str(item.get("content", "")).lower() |
|
|
| |
| if t_term in c_text or t_term in c_type or action_type == "evolve_policy": |
| if reward >= 0.7: |
| item["system_action"] = "policy_applied" |
| elif 0.3 <= reward < 0.7: |
| item["system_action"] = "flagged" |
|
|
| |
| if t_term and (t_term in c_text or t_term in c_type): |
| relevant.append(item) |
| else: |
| remaining.append(item) |
|
|
| |
| step_offset = (self._state.step_count - 1) * 3 |
| rotated_remaining = remaining[step_offset:] + remaining[:step_offset] |
|
|
| |
| prioritized_corpus = relevant + rotated_remaining |
| shown_corpus = prioritized_corpus[:10] |
|
|
| done = ( |
| reward >= 0.90 or |
| self._state.step_count >= self._state.max_steps |
| ) |
|
|
| return Observation( |
| task_id=self._state.task_id, |
| episode_id=self._state.episode_id, |
| step_count=self._state.step_count, |
| corpus_size=len(self._episode_corpus), |
| corpus_shown=len(shown_corpus), |
| data_corpus=shown_corpus, |
| current_policies=self._current_task["current_policies"], |
| policy_outcomes=self._current_task.get("policy_outcomes"), |
| system_metrics=self._current_task.get("system_metrics", {}), |
| identified_issues=self._current_task.get("identified_issues", []), |
| reward=reward, |
| done=done, |
| info={ |
| "best_score": self._state.best_score, |
| "last_reward": reward, |
| "rewards_history": self._state.rewards_history, |
| "action_history": self._state.actions_taken, |
| "steps_remaining": self._state.max_steps - self._state.step_count, |
| "staff_feedback": { |
| "strategic_rating": "Senior Architect" if reward >= 0.85 else "Staff Specialist" if reward >= 0.65 else "Junior Associate", |
| "focus": "Signal detected" if reward >= 0.5 else "Burying the lede or distracted by noise", |
| "recommendation": "Maintain high signal-to-noise ratio and lead with the fix." if reward < 0.8 else "Excellent prioritization." |
| } |
| }, |
| ) |
|
|
| @property |
| def state(self) -> State: |
| return self._state |
|
|