devil-policyevolverenv / server /environment.py
Somuai12's picture
Fix: Add missing logger import in environment.py
6b7794e
# server/environment.py
# HF Force Rebuild: 2026-03-30T17:16:00Z
from __future__ import annotations
import uuid
import random
from typing import Optional, Any, Dict
from openenv.core.env_server import Environment
import logging
logger = logging.getLogger(__name__)
from models import (
Action, Observation, State,
ProposeClarificationAction, ProposeNewRuleAction, EvolveProcessAction,
)
from server.grader import grade
from server.tasks import TASK_REGISTRY
class PolicyEvolverEnvironment(Environment[Action, Observation, State]):
"""
Real-world environment: AI agent learns to evolve governance policies
through meta-reasoning over real-world data.
"""
_instance = None
def __new__(cls, *args, **kwargs):
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._initialized = False
return cls._instance
def __init__(self):
if getattr(self, "_initialized", False):
return
super().__init__()
self._state = State()
self._current_task = None
self._persistent_best_score = 0.0
self._seen_action_hashes = set()
self._initialized = True
def reset(
self,
seed: Optional[int] = None,
episode_id: Optional[str] = None,
**kwargs: Any,
) -> Observation:
task_id = kwargs.get("task_id")
if task_id is None:
task_id = random.choice(list(TASK_REGISTRY.keys()))
self._seen_action_hashes = set()
task = TASK_REGISTRY[task_id]
self._current_task = task
self._state = State(
episode_id=episode_id or str(uuid.uuid4()),
task_id=task_id,
step_count=0,
max_steps=5,
current_score=0.0,
best_score=self._persistent_best_score,
actions_taken=[],
)
# Deepcopy to keep episode state
import copy
self._episode_corpus = copy.deepcopy(task.get("data_corpus", []))
# Ensure all incidents follow CorpusIncident schema properly
for item in self._episode_corpus:
if "content" not in item:
item["content"] = item.pop("text", None) or item.pop("desc", None) or str(item.get("flags", ""))
if "system_action" not in item:
item["system_action"] = "pending"
shown_corpus = self._episode_corpus[:10]
return Observation(
task_id=task_id,
episode_id=self._state.episode_id,
step_count=0,
corpus_size=len(self._episode_corpus),
corpus_shown=len(shown_corpus),
data_corpus=shown_corpus,
current_policies=task["current_policies"],
policy_outcomes=task.get("policy_outcomes"),
system_metrics=task.get("system_metrics", {}),
identified_issues=task.get("identified_issues", []),
reward=0.0,
done=False,
info={
"task_description": task["description"],
"difficulty": task["difficulty"],
"best_score": self._persistent_best_score,
"steps_remaining": self._state.max_steps
},
)
def step(
self,
action: Action,
timeout_s: Optional[float] = None,
**kwargs: Any,
) -> Observation:
if self._current_task is None:
raise RuntimeError("Call reset() before step()")
if self._state.step_count >= self._state.max_steps:
logger.warning(f"[EXPLOIT] Step-count limit exceeded for episode {self._state.episode_id}")
return self.reset(task_id=self._state.task_id) # Force reset if they keep pushing or return an empty observation
self._state.step_count += 1
# action can be a dict from the API or a Pydantic model
if isinstance(action, dict):
action_dict = action
else:
# Handle Pydantic RootModel used for discriminated unions
if hasattr(action, "root"):
action = action.root
action_dict = action.model_dump() if hasattr(action, "model_dump") else dict(action)
# Repetition Penalty logic
import json as _json
try:
action_hash = hash(_json.dumps(action_dict, sort_keys=True, default=str))
except Exception:
action_hash = hash(str(action_dict))
if action_hash in self._seen_action_hashes:
repetition_penalty = 0.30
else:
repetition_penalty = 0.0
self._seen_action_hashes.add(action_hash)
previous_score = self._state.current_score
raw_reward = grade(action_dict, self._state.task_id, previous_score=previous_score)
reward = max(0.001, min(0.999, raw_reward - repetition_penalty))
self._state.current_score = reward
self._state.best_score = max(self._state.best_score, reward)
self._persistent_best_score = max(self._persistent_best_score, reward)
self._state.rewards_history.append(reward)
action_type = action_dict.get("action_type", "unknown") if isinstance(action_dict, dict) else "unknown"
self._state.actions_taken.append(action_type)
# Reactive Corpus: Prioritize items relevant to the agent's action domain
# This makes the world visibly react to agent choices
target_term = action_dict.get("ambiguous_term") or action_dict.get("rule_domain") or ""
t_term = str(target_term).lower()
# Partition: relevant items first, then remaining
relevant = []
remaining = []
for item in self._episode_corpus:
c_type = str(item.get("type", "")).lower()
c_text = str(item.get("content", "")).lower()
# Update system_action based on reward (stateful corpus)
if t_term in c_text or t_term in c_type or action_type == "evolve_policy":
if reward >= 0.7:
item["system_action"] = "policy_applied"
elif 0.3 <= reward < 0.7:
item["system_action"] = "flagged"
# Sort into buckets
if t_term and (t_term in c_text or t_term in c_type):
relevant.append(item)
else:
remaining.append(item)
# Rotate the remaining window by step count so agent sees fresh data each step
step_offset = (self._state.step_count - 1) * 3
rotated_remaining = remaining[step_offset:] + remaining[:step_offset]
# Build shown corpus: relevant items first, then rotated remaining, cap at 10
prioritized_corpus = relevant + rotated_remaining
shown_corpus = prioritized_corpus[:10]
done = (
reward >= 0.90 or
self._state.step_count >= self._state.max_steps
)
return Observation(
task_id=self._state.task_id,
episode_id=self._state.episode_id,
step_count=self._state.step_count,
corpus_size=len(self._episode_corpus),
corpus_shown=len(shown_corpus),
data_corpus=shown_corpus,
current_policies=self._current_task["current_policies"],
policy_outcomes=self._current_task.get("policy_outcomes"),
system_metrics=self._current_task.get("system_metrics", {}),
identified_issues=self._current_task.get("identified_issues", []),
reward=reward,
done=done,
info={
"best_score": self._state.best_score,
"last_reward": reward,
"rewards_history": self._state.rewards_history,
"action_history": self._state.actions_taken,
"steps_remaining": self._state.max_steps - self._state.step_count,
"staff_feedback": {
"strategic_rating": "Senior Architect" if reward >= 0.85 else "Staff Specialist" if reward >= 0.65 else "Junior Associate",
"focus": "Signal detected" if reward >= 0.5 else "Burying the lede or distracted by noise",
"recommendation": "Maintain high signal-to-noise ratio and lead with the fix." if reward < 0.8 else "Excellent prioritization."
}
},
)
@property
def state(self) -> State:
return self._state