Spaces:
Sleeping
Sleeping
| """CommitmentOS environment — multi-turn personal task management with | |
| temporal commitment coherence tracking. | |
| Episode lifecycle: | |
| 1. reset() -> agent receives scenario briefing + calendar + inbox | |
| 2. step() -> agent makes one tool call per step (done=False) | |
| 3. step(submit_plan) or max_steps reached -> grading + done=True | |
| """ | |
| from __future__ import annotations | |
| import random | |
| import uuid | |
| from typing import Any, Optional | |
| from openenv.core.env_server import Environment | |
| from openenv.core.env_server.types import EnvironmentMetadata | |
| from constants import AUTHOR, PROJECT_DESCRIPTION, PROJECT_NAME, VERSION | |
| from models import CommitmentAction, CommitmentObservation, CommitmentState | |
| from server.domain import ScenarioDef | |
| from server.world import WorldState | |
| class CommitmentEnvironment( | |
| Environment[CommitmentAction, CommitmentObservation, CommitmentState] | |
| ): | |
| def __init__(self) -> None: | |
| super().__init__() | |
| self._world: Optional[WorldState] = None | |
| self._scenario: Optional[ScenarioDef] = None | |
| self._episode_id: str = "" | |
| self._step_count: int = 0 | |
| self._done: bool = False | |
| self._cumulative_reward: float = 0.0 | |
| self._last_tool_result: str = "" | |
| self._last_breakdown: dict[str, float] = {} | |
| self._last_feedback: str = "" | |
| # ------------------------------------------------------------------ | |
| # Task selection | |
| # ------------------------------------------------------------------ | |
| def _select_scenario( | |
| self, | |
| scenario_id: Optional[str] = None, | |
| difficulty: Optional[str] = None, | |
| ) -> ScenarioDef: | |
| from server.tasks import get_all_scenarios, get_scenario, get_scenarios_by_difficulty | |
| if scenario_id: | |
| s = get_scenario(scenario_id) | |
| if s is None: | |
| raise ValueError(f"Unknown scenario_id: {scenario_id}") | |
| return s | |
| if difficulty: | |
| candidates = get_scenarios_by_difficulty(difficulty) | |
| if not candidates: | |
| raise ValueError(f"No scenarios for difficulty: {difficulty}") | |
| return random.choice(candidates) | |
| return random.choice(list(get_all_scenarios().values())) | |
| # ------------------------------------------------------------------ | |
| # Core API | |
| # ------------------------------------------------------------------ | |
| def reset( | |
| self, | |
| seed: Optional[int] = None, | |
| episode_id: Optional[str] = None, | |
| **kwargs: Any, | |
| ) -> CommitmentObservation: | |
| if seed is not None: | |
| random.seed(seed) | |
| scenario = self._select_scenario( | |
| scenario_id=kwargs.get("scenario_id") or kwargs.get("task_id"), | |
| difficulty=kwargs.get("difficulty"), | |
| ) | |
| self._scenario = scenario | |
| self._world = WorldState(scenario) | |
| self._episode_id = episode_id or str(uuid.uuid4()) | |
| self._step_count = 0 | |
| self._done = False | |
| self._cumulative_reward = 0.0 | |
| self._last_tool_result = "" | |
| self._last_breakdown = {} | |
| self._last_feedback = "New episode started. Read the briefing and use tools to manage the situation." | |
| return self._build_observation(reward=0.0, done=False) | |
| def step( | |
| self, | |
| action: CommitmentAction, | |
| timeout_s: Optional[float] = None, | |
| **kwargs: Any, | |
| ) -> CommitmentObservation: | |
| if self._world is None or self._scenario is None: | |
| raise ValueError("No active episode. Call reset() first.") | |
| if self._done: | |
| raise ValueError("Episode already completed. Call reset() to start a new one.") | |
| self._step_count += 1 | |
| self._world.step_count = self._step_count | |
| at = action.action_type.lower().strip() | |
| if at == "submit_plan" or self._step_count >= self._scenario.max_steps: | |
| return self._finish_episode() | |
| step_reward = 0.0 | |
| tool_result, dispatch_status = self._dispatch_tool(action, at) | |
| self._last_tool_result = tool_result | |
| if dispatch_status == "conflict": | |
| step_reward = -0.05 | |
| elif dispatch_status == "success" and at in ("schedule_meeting", "reschedule_event", "send_email", "book_restaurant"): | |
| step_reward = 0.05 | |
| self._cumulative_reward += step_reward | |
| self._last_feedback = "" | |
| self._last_breakdown = {} | |
| return self._build_observation(reward=step_reward, done=False) | |
| def _finish_episode(self) -> CommitmentObservation: | |
| from server.graders import grade_scenario | |
| assert self._world is not None | |
| assert self._scenario is not None | |
| total_reward, breakdown, feedback = grade_scenario( | |
| self._scenario, self._world, | |
| ) | |
| self._done = True | |
| self._cumulative_reward += total_reward | |
| self._last_breakdown = breakdown | |
| self._last_feedback = feedback | |
| self._last_tool_result = "Plan submitted. Episode graded." | |
| return self._build_observation(reward=total_reward, done=True) | |
| # ------------------------------------------------------------------ | |
| # Tool dispatch | |
| # ------------------------------------------------------------------ | |
| def _dispatch_tool(self, action: CommitmentAction, at: str) -> tuple[str, str]: | |
| assert self._world is not None | |
| turn = self._step_count | |
| if at == "view_calendar": | |
| return self._world.view_calendar(action.date), "info" | |
| elif at == "check_availability": | |
| return self._world.check_availability(action.person), "info" | |
| elif at == "search_restaurants": | |
| return self._world.search_restaurants( | |
| cuisine=action.cuisine, | |
| max_price=action.max_price, | |
| dietary=action.dietary, | |
| max_distance_miles=action.max_distance_miles, | |
| near_airport=action.near_airport, | |
| ), "info" | |
| elif at == "schedule_meeting": | |
| result = self._world.schedule_meeting( | |
| title=action.title, | |
| date=action.date, | |
| time=action.time, | |
| duration_min=action.duration_min, | |
| participants=action.participants, | |
| location=action.location, | |
| turn=turn, | |
| ) | |
| status = "conflict" if result.startswith("CONFLICT:") else "success" | |
| return result, status | |
| elif at == "reschedule_event": | |
| result = self._world.reschedule_event( | |
| event_id=action.event_id, | |
| new_time=action.new_time, | |
| turn=turn, | |
| ) | |
| status = "conflict" if result.startswith("CONFLICT:") else ("error" if "not found" in result.lower() else "success") | |
| return result, status | |
| elif at == "cancel_event": | |
| result = self._world.cancel_event(action.event_id, turn=turn) | |
| status = "error" if "not found" in result.lower() else "success" | |
| return result, status | |
| elif at == "send_email": | |
| return self._world.send_email( | |
| to=action.to, | |
| subject=action.subject, | |
| body=action.body, | |
| turn=turn, | |
| ), "success" | |
| elif at == "book_restaurant": | |
| result = self._world.book_restaurant(action.restaurant_name, turn=turn) | |
| status = "error" if "not found" in result.lower() else "success" | |
| return result, status | |
| else: | |
| return ( | |
| f"Unknown action_type: '{at}'. Valid types: view_calendar, check_availability, search_restaurants, schedule_meeting, reschedule_event, cancel_event, send_email, book_restaurant, submit_plan", | |
| "error", | |
| ) | |
| # ------------------------------------------------------------------ | |
| # Observation builder | |
| # ------------------------------------------------------------------ | |
| def _build_observation(self, *, reward: float, done: bool) -> CommitmentObservation: | |
| assert self._world is not None | |
| assert self._scenario is not None | |
| return CommitmentObservation( | |
| scenario_id=self._scenario.scenario_id, | |
| difficulty=self._scenario.difficulty, | |
| briefing=self._scenario.briefing if self._step_count == 0 else "", | |
| tool_result=self._last_tool_result, | |
| calendar_snapshot=self._world.get_calendar_snapshot(), | |
| inbox=self._world.get_inbox_snapshot(), | |
| pending_commitments=len(self._world.get_active_commitments()), | |
| step_number=self._step_count, | |
| max_steps=self._scenario.max_steps, | |
| reward=reward, | |
| reward_breakdown=self._last_breakdown, | |
| done=done, | |
| feedback=self._last_feedback, | |
| ) | |
| # ------------------------------------------------------------------ | |
| # State property | |
| # ------------------------------------------------------------------ | |
| def state(self) -> CommitmentState: | |
| from server.tasks import get_all_scenarios | |
| violations = self._world.get_silent_violations() if self._world else [] | |
| return CommitmentState( | |
| episode_id=self._episode_id, | |
| step_count=self._step_count, | |
| scenario_id=self._scenario.scenario_id if self._scenario else "", | |
| difficulty=self._scenario.difficulty if self._scenario else "", | |
| completed=self._done, | |
| cumulative_reward=self._cumulative_reward, | |
| commitment_count=len(self._world.commitment_ledger) if self._world else 0, | |
| violation_count=len(violations), | |
| available_tasks=list(get_all_scenarios().keys()), | |
| ) | |
| def get_metadata(self) -> EnvironmentMetadata: | |
| return EnvironmentMetadata( | |
| name=PROJECT_NAME, | |
| description=PROJECT_DESCRIPTION, | |
| version=VERSION, | |
| author=AUTHOR, | |
| ) | |