| """SafeSpace environment implementation.""" |
|
|
| from __future__ import annotations |
|
|
| from pathlib import Path |
| from typing import Any, Dict, Optional, Tuple |
| from uuid import uuid4 |
|
|
| from openenv.core.env_server.interfaces import Environment |
| from openenv.core.env_server.types import EnvironmentMetadata |
|
|
| try: |
| from ..models import ( |
| ContentItem, |
| GatheredContext, |
| ModerationAction, |
| ModerationObservation, |
| ModerationState, |
| RewardBreakdown, |
| TaskGradeBreakdown, |
| TriggerInfo, |
| ) |
| except (ModuleNotFoundError, ImportError): |
| from models import ( |
| ContentItem, |
| GatheredContext, |
| ModerationAction, |
| ModerationObservation, |
| ModerationState, |
| RewardBreakdown, |
| TaskGradeBreakdown, |
| TriggerInfo, |
| ) |
|
|
| from .grader import clamp_public_task_grade, compute_task_grade |
| from .policy import FACTOR_LIST, PLATFORM_POLICY |
| from .reward import ( |
| MAX_EPISODE_REWARD, |
| MIN_EPISODE_REWARD, |
| TRAJECTORY_REWARD_CAP, |
| compute_duplicate_action_penalty, |
| compute_investigation_reward, |
| compute_invalid_action_penalty, |
| compute_no_decision_penalty, |
| compute_reward, |
| normalize_public_reward, |
| normalize_reward_breakdown, |
| ) |
| from .scenarios import get_task_id_for_scenario, load_scenario |
|
|
|
|
| class SafeSpaceEnvironment(Environment): |
| """Content-moderation environment with trajectory shaping and strict episode handling.""" |
|
|
| SUPPORTS_CONCURRENT_SESSIONS: bool = True |
|
|
| INVESTIGATION_ACTIONS: Dict[str, str] = { |
| "request_author_profile": "author_profile", |
| "request_author_violations": "author_violations", |
| "request_thread_context": "thread_context", |
| "request_community_rules": "community_rules", |
| "request_linked_content": "linked_content_summary", |
| "request_similar_precedents": "similar_precedents", |
| "request_reporter_credibility": "reporter_credibility", |
| } |
|
|
| def __init__(self) -> None: |
| self._state = ModerationState(episode_id=str(uuid4()), step_count=0) |
| self._scenario: Optional[Dict[str, Any]] = None |
| self._gathered_context = GatheredContext() |
| self._action_history: list[str] = [] |
| self._trajectory_reward_total = 0.0 |
| self._raw_episode_reward_total = 0.0 |
| self._episode_done = False |
| self._last_reward_breakdown: Optional[Dict[str, Any]] = None |
| self._last_grade_breakdown: Optional[Dict[str, Any]] = None |
| self._last_task_grade: Optional[float] = None |
| self._last_error_code: Optional[str] = None |
|
|
| def reset( |
| self, |
| task_id: Optional[str] = None, |
| seed: Optional[int] = None, |
| episode_id: Optional[str] = None, |
| **kwargs: Any, |
| ) -> ModerationObservation: |
| """Reset the environment and return the initial observation.""" |
| requested_task = kwargs.get("task_id") or kwargs.get("scenario_id") or task_id |
| self._scenario = load_scenario(requested_task, seed=seed) |
| resolved_task_id = get_task_id_for_scenario(self._scenario) |
|
|
| self._state = ModerationState( |
| episode_id=episode_id or str(uuid4()), |
| step_count=0, |
| scenario_id=self._scenario.get("scenario_id"), |
| task_id=resolved_task_id, |
| difficulty=self._scenario.get("difficulty"), |
| trigger_type=self._scenario.get("trigger_info", {}).get("trigger_type"), |
| actions_taken=0, |
| max_actions=8, |
| context_requested=[], |
| decision_made=False, |
| episode_reward=0.0, |
| raw_episode_reward=0.0, |
| done=False, |
| last_error_code=None, |
| ) |
| self._gathered_context = GatheredContext() |
| self._action_history = [] |
| self._trajectory_reward_total = 0.0 |
| self._raw_episode_reward_total = 0.0 |
| self._episode_done = False |
| self._last_reward_breakdown = None |
| self._last_grade_breakdown = None |
| self._last_task_grade = None |
| self._last_error_code = None |
|
|
| return self._build_observation( |
| feedback="Episode started. Review the content and make a moderation decision.", |
| done=False, |
| reward=None, |
| error_code=None, |
| reward_breakdown={"reward_type": "reset", "total": 0.0}, |
| task_grade=None, |
| grade_breakdown=None, |
| ) |
|
|
| def step( |
| self, |
| action: ModerationAction, |
| timeout_s: Optional[float] = None, |
| **kwargs: Any, |
| ) -> ModerationObservation: |
| """Execute one step in the environment.""" |
| del timeout_s, kwargs |
|
|
| if self._scenario is None: |
| raise RuntimeError("Environment must be reset before calling step().") |
|
|
| if self._episode_done: |
| return self._build_observation( |
| feedback="Episode already completed. Call reset() before taking another action.", |
| done=True, |
| reward=0.0, |
| error_code="episode_already_done", |
| reward_breakdown={ |
| "reward_type": "terminal_guard", |
| "reason": "episode_already_done", |
| "score": 0.0, |
| "total": 0.0, |
| "last_terminal_breakdown": self._last_reward_breakdown, |
| }, |
| task_grade=self._last_task_grade, |
| grade_breakdown=self._last_grade_breakdown, |
| ) |
|
|
| self._state.step_count += 1 |
|
|
| if action.action_type in self.INVESTIGATION_ACTIONS: |
| return self._handle_investigation(action) |
| if action.action_type == "decide": |
| return self._handle_decision(action) |
| return self._handle_invalid_action(action) |
|
|
| @property |
| def state(self) -> ModerationState: |
| """Return the current public environment state.""" |
| return self._state |
|
|
| def get_metadata(self) -> EnvironmentMetadata: |
| """Return metadata for validators, clients, and the web UI.""" |
| readme_path = Path(__file__).resolve().parent.parent / "README.md" |
| readme_content = readme_path.read_text() if readme_path.exists() else None |
| return EnvironmentMetadata( |
| name="safespace", |
| description=( |
| "Content moderation environment with multi-step investigation, " |
| "deterministic grading, and trajectory-level reward shaping." |
| ), |
| version="0.2.1", |
| author="SafeSpace Hackathon Team", |
| readme_content=readme_content, |
| ) |
|
|
| def _build_observation( |
| self, |
| *, |
| feedback: str, |
| done: bool, |
| reward: Optional[float], |
| error_code: Optional[str], |
| reward_breakdown: Optional[Dict[str, Any]], |
| task_grade: Optional[float], |
| grade_breakdown: Optional[Dict[str, Any]], |
| ) -> ModerationObservation: |
| """Build an observation from the current internal state.""" |
| typed_reward_breakdown = None |
| if reward_breakdown is not None: |
| typed_reward_breakdown = RewardBreakdown.model_validate( |
| normalize_reward_breakdown(reward_breakdown) |
| ) |
|
|
| typed_grade_breakdown = None |
| if grade_breakdown is not None: |
| typed_grade_breakdown = TaskGradeBreakdown.model_validate(grade_breakdown) |
|
|
| return ModerationObservation( |
| content_item=ContentItem(**self._scenario["content_item"]) if self._scenario else None, |
| trigger_info=TriggerInfo(**self._scenario["trigger_info"]) if self._scenario else None, |
| gathered_context=self._gathered_context, |
| platform_policy=PLATFORM_POLICY, |
| available_factors=FACTOR_LIST, |
| actions_taken=self._state.actions_taken, |
| max_actions=self._state.max_actions, |
| action_history=self._action_history.copy(), |
| feedback=feedback, |
| error_code=error_code, |
| done=done, |
| reward=self._normalize_step_reward(reward), |
| reward_breakdown=typed_reward_breakdown, |
| task_grade=task_grade, |
| grade_breakdown=typed_grade_breakdown, |
| metadata={ |
| "episode_reward": self._state.episode_reward, |
| "raw_episode_reward": self._raw_episode_reward_total, |
| "trajectory_reward_total": self._trajectory_reward_total, |
| "decision_made": self._state.decision_made, |
| "raw_reward": reward, |
| }, |
| ) |
|
|
| def _sync_public_reward_state(self) -> None: |
| """Expose normalized reward fields on the public state model.""" |
| self._state.raw_episode_reward = self._raw_episode_reward_total |
| if self._state.step_count == 0 and self._raw_episode_reward_total == 0.0: |
| self._state.episode_reward = 0.0 |
| return |
| normalized_total = normalize_public_reward(self._raw_episode_reward_total) |
| self._state.episode_reward = 0.0 if normalized_total is None else normalized_total |
|
|
| def _normalize_step_reward(self, reward: Optional[float]) -> Optional[float]: |
| """Normalize a per-step reward for the public observation surface.""" |
| return normalize_public_reward(reward) |
|
|
| def _consume_budget(self, action_label: str) -> None: |
| """Consume one non-terminal action from the budget.""" |
| self._state.actions_taken += 1 |
| self._action_history.append(action_label) |
|
|
| def _apply_trajectory_delta( |
| self, |
| raw_delta: float, |
| breakdown: Dict[str, Any], |
| ) -> Tuple[float, Dict[str, Any]]: |
| """Apply the capped trajectory reward delta for an investigation action.""" |
| previous_total = self._trajectory_reward_total |
| next_total = max( |
| -TRAJECTORY_REWARD_CAP, |
| min(TRAJECTORY_REWARD_CAP, previous_total + raw_delta), |
| ) |
| applied_delta = next_total - previous_total |
| self._trajectory_reward_total = next_total |
|
|
| self._raw_episode_reward_total = max( |
| MIN_EPISODE_REWARD, |
| min(MAX_EPISODE_REWARD, next_total), |
| ) |
| self._sync_public_reward_state() |
| enriched = { |
| **breakdown, |
| "requested_score": raw_delta, |
| "applied_score": applied_delta, |
| "trajectory_total": self._trajectory_reward_total, |
| "episode_total": self._raw_episode_reward_total, |
| } |
| return applied_delta, enriched |
|
|
| def _apply_episode_delta(self, delta: float) -> Tuple[float, float]: |
| """Apply a bounded delta to the cumulative episode reward.""" |
| current_total = self._raw_episode_reward_total |
| next_total = max( |
| MIN_EPISODE_REWARD, |
| min(MAX_EPISODE_REWARD, current_total + delta), |
| ) |
| applied_delta = next_total - current_total |
| self._raw_episode_reward_total = next_total |
| self._sync_public_reward_state() |
| return applied_delta, next_total |
|
|
| def _finalize_if_budget_exhausted( |
| self, |
| *, |
| feedback: str, |
| step_reward: float, |
| step_breakdown: Dict[str, Any], |
| error_code: Optional[str], |
| ) -> ModerationObservation: |
| """Return a non-terminal observation or end the episode on budget exhaustion.""" |
| self._last_error_code = error_code |
| self._state.last_error_code = error_code |
|
|
| if self._state.actions_taken < self._state.max_actions: |
| self._last_reward_breakdown = step_breakdown |
| self._state.done = False |
| return self._build_observation( |
| feedback=feedback, |
| done=False, |
| reward=step_reward, |
| error_code=error_code, |
| reward_breakdown=step_breakdown, |
| task_grade=None, |
| grade_breakdown=None, |
| ) |
|
|
| no_decision_penalty, terminal_breakdown = compute_no_decision_penalty( |
| self._state.actions_taken |
| ) |
| applied_penalty, cumulative_total = self._apply_episode_delta(no_decision_penalty) |
| combined_reward = step_reward + applied_penalty |
| self._state.decision_made = True |
| self._state.done = True |
| self._episode_done = True |
| self._last_task_grade = clamp_public_task_grade(0.0) |
| self._last_grade_breakdown = { |
| "decision": {"weight": 0.70, "score": 0.0, "details": {"reason": "no_decision"}}, |
| "factor_overlap": {"weight": 0.15, "score": 0.0, "details": {}}, |
| "efficiency": {"weight": 0.05, "score": 0.0, "details": {}}, |
| "calibration": {"weight": 0.10, "score": 0.0, "details": {}}, |
| "raw_total": 0.0, |
| "public_total_adjustment": self._last_task_grade, |
| "total": self._last_task_grade, |
| } |
| combined_breakdown = { |
| "reward_type": "trajectory_terminal", |
| "trajectory": step_breakdown, |
| "no_decision": { |
| **terminal_breakdown, |
| "applied_score": applied_penalty, |
| }, |
| "step_total": combined_reward, |
| "cumulative_total": cumulative_total, |
| } |
| self._last_reward_breakdown = combined_breakdown |
| self._last_error_code = "no_decision_made" |
| self._state.last_error_code = "no_decision_made" |
|
|
| return self._build_observation( |
| feedback=f"{feedback} Action budget exhausted before a final decision.", |
| done=True, |
| reward=combined_reward, |
| error_code="no_decision_made", |
| reward_breakdown=combined_breakdown, |
| task_grade=self._last_task_grade, |
| grade_breakdown=self._last_grade_breakdown, |
| ) |
|
|
| def _handle_investigation(self, action: ModerationAction) -> ModerationObservation: |
| """Handle a context-gathering action.""" |
| context_field = self.INVESTIGATION_ACTIONS[action.action_type] |
|
|
| if self._state.actions_taken >= self._state.max_actions: |
| self._episode_done = True |
| self._state.done = True |
| return self._build_observation( |
| feedback="Action budget exhausted. Episode ended with no decision.", |
| done=True, |
| reward=0.0, |
| error_code="action_budget_exhausted", |
| reward_breakdown=self._last_reward_breakdown, |
| task_grade=self._last_task_grade, |
| grade_breakdown=self._last_grade_breakdown, |
| ) |
|
|
| if context_field in self._state.context_requested: |
| raw_reward, breakdown = compute_duplicate_action_penalty(context_field) |
| self._consume_budget(f"{action.action_type} (duplicate)") |
| step_reward, step_breakdown = self._apply_trajectory_delta(raw_reward, breakdown) |
| return self._finalize_if_budget_exhausted( |
| feedback=f"Warning: {context_field} was already retrieved. Action wasted.", |
| step_reward=step_reward, |
| step_breakdown=step_breakdown, |
| error_code="duplicate_context_request", |
| ) |
|
|
| available_context = self._scenario.get("available_context", {}) |
| context_value = available_context.get(context_field) |
| raw_reward, raw_breakdown = compute_investigation_reward( |
| context_field=context_field, |
| ground_truth=self._scenario["ground_truth"], |
| retrieved=context_value is not None, |
| ) |
|
|
| if context_value is not None: |
| setattr(self._gathered_context, context_field, context_value) |
| feedback = f"Retrieved {context_field}." |
| else: |
| feedback = f"No data available for {context_field}." |
|
|
| self._consume_budget(action.action_type) |
| self._state.context_requested.append(context_field) |
| raw_breakdown["retrieved"] = context_value is not None |
| step_reward, step_breakdown = self._apply_trajectory_delta(raw_reward, raw_breakdown) |
|
|
| return self._finalize_if_budget_exhausted( |
| feedback=feedback, |
| step_reward=step_reward, |
| step_breakdown=step_breakdown, |
| error_code=None, |
| ) |
|
|
| def _handle_decision(self, action: ModerationAction) -> ModerationObservation: |
| """Handle the terminal moderation decision.""" |
| if not all( |
| [ |
| action.decision, |
| action.primary_violation, |
| action.severity, |
| action.confidence is not None, |
| action.key_factors is not None, |
| ] |
| ): |
| raw_reward, raw_breakdown = compute_invalid_action_penalty( |
| reason="missing_decision_fields", |
| action_type=action.action_type, |
| ) |
| self._consume_budget("decide (invalid)") |
| step_reward, step_breakdown = self._apply_trajectory_delta(raw_reward, raw_breakdown) |
| return self._finalize_if_budget_exhausted( |
| feedback=( |
| "Invalid decision: missing required fields (decision, " |
| "primary_violation, severity, confidence, key_factors)." |
| ), |
| step_reward=step_reward, |
| step_breakdown=step_breakdown, |
| error_code="missing_decision_fields", |
| ) |
|
|
| agent_decision = { |
| "decision": action.decision, |
| "primary_violation": action.primary_violation, |
| "severity": action.severity, |
| "confidence": action.confidence, |
| "key_factors": action.key_factors, |
| } |
| ground_truth = self._scenario["ground_truth"] |
|
|
| terminal_reward, reward_breakdown = compute_reward( |
| agent_decision=agent_decision, |
| ground_truth=ground_truth, |
| actions_taken=self._state.actions_taken, |
| difficulty=self._scenario["difficulty"], |
| ) |
| applied_terminal_reward, cumulative_total = self._apply_episode_delta(terminal_reward) |
| task_grade, grade_breakdown = compute_task_grade( |
| agent_decision=agent_decision, |
| ground_truth=ground_truth, |
| actions_taken=self._state.actions_taken, |
| difficulty=self._scenario["difficulty"], |
| ) |
|
|
| self._state.decision_made = True |
| self._state.done = True |
| self._episode_done = True |
| self._action_history.append(f"decide: {action.decision}") |
| self._last_error_code = None |
| self._state.last_error_code = None |
| self._last_task_grade = task_grade |
| self._last_grade_breakdown = grade_breakdown |
| reward_breakdown["applied_score"] = applied_terminal_reward |
| reward_breakdown["trajectory_total"] = self._trajectory_reward_total |
| reward_breakdown["cumulative_total"] = cumulative_total |
| self._last_reward_breakdown = reward_breakdown |
|
|
| decision_grade = reward_breakdown.get("decision", {}).get("details", {}).get("grade", "unknown") |
| feedback = ( |
| f"Decision recorded: {action.decision}. " |
| f"Grade: {decision_grade}. Step reward: {applied_terminal_reward:.3f}. " |
| f"Task grade: {task_grade:.3f}." |
| ) |
| return self._build_observation( |
| feedback=feedback, |
| done=True, |
| reward=applied_terminal_reward, |
| error_code=None, |
| reward_breakdown=reward_breakdown, |
| task_grade=task_grade, |
| grade_breakdown=grade_breakdown, |
| ) |
|
|
| def _handle_invalid_action(self, action: ModerationAction) -> ModerationObservation: |
| """Handle an invalid action type.""" |
| valid_actions = list(self.INVESTIGATION_ACTIONS) + ["decide"] |
| raw_reward, raw_breakdown = compute_invalid_action_penalty( |
| reason="invalid_action_type", |
| action_type=action.action_type, |
| ) |
| self._consume_budget(f"invalid:{action.action_type}") |
| step_reward, step_breakdown = self._apply_trajectory_delta(raw_reward, raw_breakdown) |
| return self._finalize_if_budget_exhausted( |
| feedback=( |
| f"Invalid action_type: '{action.action_type}'. " |
| f"Valid types: {valid_actions}" |
| ), |
| step_reward=step_reward, |
| step_breakdown=step_breakdown, |
| error_code="invalid_action_type", |
| ) |
|
|
|
|
| ContentModerationEnvironment = SafeSpaceEnvironment |
|
|