LifeStack / core /lifestack_env.py
Soham Banerjee
deploy: pure lifestack with partitioned wisdom pool
77da5ce
import copy
from typing import Any, Optional, Dict, List
from pydantic import Field
from core.life_state import LifeMetrics, ResourceBudget, DependencyGraph
from core.metric_schema import normalize_metric_path
from core.reward import compute_reward, compute_task_reward
from core.task import Task, ExoEvent, Route, Milestone, FlightCrisisTask
from core.verifier import LifeStackVerifier
try:
from openenv.core import Environment, Action, Observation, State
from openenv.core.env_server.types import EnvironmentMetadata
from openenv.core.rubrics import Rubric
USING_MODERN_API = True
except ImportError:
try:
from openenv.env import Env as Environment
from pydantic import BaseModel
# Shims for missing classes in older/alternative openenv
class Action(BaseModel): pass
class Observation(BaseModel): pass
class State(BaseModel): pass
class Rubric:
def __init__(self, *a, **k): pass
def compute(self, *a, **k): return 0.0
EnvironmentMetadata = None
USING_MODERN_API = False
except ImportError:
# Final fallback β€” must use BaseModel so Pydantic subclasses work
from pydantic import BaseModel
class Environment:
def __init__(self, rubric=None): self.rubric = rubric
def reset(self, *a, **k): pass
def step(self, *a, **k): pass
class Action(BaseModel): pass
class Observation(BaseModel): pass
class State(BaseModel): pass
class Rubric:
def __init__(self, *a, **k): pass
def compute(self, *a, **k): return 0.0
EnvironmentMetadata = None
USING_MODERN_API = False
class LifeStackAction(Action):
"""Structured action for LifeStack."""
metric_changes: Dict[str, float] = Field(default_factory=dict, description="Metric adjustment deltas")
resource_cost: Dict[str, float] = Field(default_factory=dict, description="Time, money, and energy costs")
actions_taken: int = Field(default=0, description="Number of atomic actions taken")
# ToolAction fields (Long-horizon)
action_type: Optional[str] = Field(default=None, description="inspect, plan, execute, etc.")
target: Optional[str] = Field(default=None, description="e.g. route_id or hidden_key")
parameters: Dict[str, Any] = Field(default_factory=dict)
reasoning: Optional[str] = Field(default=None)
completion: Optional[str] = Field(default=None)
inspect_target: Optional[str] = Field(default=None, description="Optional hidden state key to inspect")
is_rollback: bool = Field(default=False, description="Set true to rollback the previous action.")
@classmethod
def from_agent_action(cls, agent_action: Any) -> "LifeStackAction":
"""Unified converter from legacy AgentAction to LifeStackAction."""
primary = agent_action.primary
return cls(
action_type=primary.action_type,
target=primary.target_domain, # Mapping target_domain to target
metric_changes=primary.metric_changes,
resource_cost=primary.resource_cost,
reasoning=agent_action.reasoning,
completion=getattr(agent_action, 'raw_completion', ""),
actions_taken=1
)
class LifeStackObservation(Observation):
"""Observation returned by LifeStack."""
metrics: Dict[str, float] = Field(default_factory=dict, description="Flattened 23-domain life metrics")
resources: Dict[str, float] = Field(default_factory=dict, description="Current budget remaining")
step: int = Field(default=0, description="Current episode step")
done: bool = Field(default=False)
reward: Optional[float] = Field(default=None)
metadata: Dict[str, Any] = Field(default_factory=dict)
class LifeStackState(State):
"""Internal state of the LifeStack environment."""
current_metrics: LifeMetrics = Field(default_factory=LifeMetrics)
budget: ResourceBudget = Field(default_factory=ResourceBudget)
episode_id: Optional[str] = None
step_count: int = 0
inspected_keys: list = Field(default_factory=list) # revealed keys
consecutive_waits: int = 0
used_rollback: bool = Field(default=False)
rollback_penalty_charged: bool = Field(default=False)
previous_metrics: Optional[LifeMetrics] = None
previous_budget: Optional[ResourceBudget] = None
# New task fields
current_task: Optional[Task] = None
active_route_id: Optional[str] = None
milestones_achieved: list = Field(default_factory=list)
world_state: dict = Field(default_factory=dict)
hidden_state: dict = Field(default_factory=dict)
fired_event_ids: list = Field(default_factory=list)
exo_events_seen: int = 0
milestones_after_event: int = 0
closed_route_ids: set = Field(default_factory=set)
# Legacy / Personality fields
person: Optional[Any] = None
agent_history: List[tuple] = Field(default_factory=list)
current_conflict: Optional[Any] = None
rollback_penalty_charged: bool = Field(default=False)
cumulative_rel_delta: float = Field(default=0.0)
class LifeStackRubric(Rubric):
"""Standard reward rubric for LifeStack."""
def forward(self, action: LifeStackAction, observation: LifeStackObservation) -> float:
# In LifeStack, reward is usually computed inside step() for state-transition access.
# This rubric provides a hook for external reward evaluation if needed.
return observation.reward if observation.reward is not None else 0.0
class PartialObsFilter:
@staticmethod
def filter(task: Task, revealed_keys: list) -> dict:
"""Returns visible_world plus any keys the agent has explicitly inspected.
Revealed keys are checked against mutable_world first, then hidden_state.
Keys sourced from hidden_state are wrapped as
``{"value": <val>, "source": "inspect"}`` so the agent knows they were
obtained via an inspect action rather than being freely observable.
"""
obs_world = copy.deepcopy(task.visible_world)
for k in revealed_keys:
if k in task.mutable_world:
obs_world[k] = task.mutable_world[k]
elif k in task.hidden_state:
obs_world[k] = {"value": task.hidden_state[k], "source": "inspect"}
return obs_world
class WorldEngine:
def __init__(self, task: Task):
self.task = task
self.closed_routes = set()
def inject_events(self, step: int, world: dict, hidden: dict) -> list[ExoEvent]:
import random
fired = []
for event in self.task.event_schedule:
fire = False
if event.step == step:
fire = True
elif event.step == -1:
if random.random() < event.probability:
fire = True
if fire:
fired.append(event)
# Apply mutations
world.update(event.world_mutation)
hidden.update(event.hidden_state_mutation)
for rid in event.closes_routes:
self.closed_routes.add(rid)
return fired
def get_closed_routes(self) -> set[str]:
return self.closed_routes
_EnvBase = Environment[LifeStackAction, LifeStackObservation, LifeStackState] if USING_MODERN_API else Environment
class LifeStackEnv(_EnvBase):
"""
LifeStack Environment v1.1 β€” Refactored for OpenEnv 0.2.3 compliance.
"""
SUPPORTS_CONCURRENT_SESSIONS = True
def __init__(self, seed: Optional[int] = None, task=None, max_steps: int = 30):
if USING_MODERN_API:
super().__init__(rubric=LifeStackRubric())
else:
super().__init__()
self.max_steps = getattr(task, 'horizon', max_steps) if task else max_steps
self.metadata_internal = {
'name': 'LifeStack-v1',
'version': '1.1.0',
'description': 'Premium multi-domain life conflict resolution simulation',
'max_episode_steps': self.max_steps
}
self.graph = DependencyGraph()
self._internal_state = LifeStackState()
def get_metadata(self):
if not USING_MODERN_API:
return self.metadata_internal
from openenv.core.env_server.types import EnvironmentMetadata
return EnvironmentMetadata(
name=self.metadata_internal['name'],
version=self.metadata_internal['version'],
description=self.metadata_internal['description']
)
@property
def state(self) -> LifeStackState:
return self._internal_state
def reset(self, seed: Optional[int] = None, episode_id: Optional[str] = None,
task: Optional[Task] = None, conflict: Optional[Any] = None,
budget: Optional[dict] = None, person: Optional[Any] = None,
agent_history: Optional[List[tuple]] = None, **kwargs) -> LifeStackObservation:
"""Resets the environment. Seed and task/conflict can be provided."""
if USING_MODERN_API and getattr(self, 'rubric', None):
self.rubric.reset()
if seed is not None:
import random
random.seed(seed)
# 1. Initialize Task
self._internal_state.current_task = task or FlightCrisisTask()
self.max_steps = getattr(self._internal_state.current_task, 'horizon', 30)
# 2. Reset State
self._internal_state.episode_id = episode_id
self._internal_state.step_count = 0
self._internal_state.current_metrics = LifeMetrics()
self._internal_state.inspected_keys = []
self._internal_state.consecutive_waits = 0
self._internal_state.used_rollback = False
self._internal_state.rollback_penalty_charged = False
self._internal_state.previous_metrics = None
self._internal_state.previous_budget = None
self._internal_state.rollback_penalty_charged = False
self._internal_state.cumulative_rel_delta = 0.0
# Task state
self._internal_state.world_state = copy.deepcopy(self._internal_state.current_task.mutable_world)
self._internal_state.hidden_state = copy.deepcopy(self._internal_state.current_task.hidden_state)
self._internal_state.milestones_achieved = []
self._internal_state.active_route_id = None
self._internal_state.fired_event_ids = []
self._internal_state.exo_events_seen = 0
self._internal_state.milestones_after_event = 0
self._internal_state.closed_route_ids = set()
self._internal_state.person = person
self._internal_state.agent_history = agent_history or []
self._internal_state.current_conflict = conflict
self.world_engine = WorldEngine(self._internal_state.current_task)
# 3. Budget Scaling
scale = max(1.0, self.max_steps / 5.0)
constraints = self._internal_state.current_task.constraints
self._internal_state.budget = ResourceBudget(
time_hours=budget.get("time", constraints.get("time", 20.0 * scale)) if budget else constraints.get("time", 20.0 * scale),
money_dollars=budget.get("money", constraints.get("money", 500.0 * scale)) if budget else constraints.get("money", 500.0 * scale),
energy_units=budget.get("energy", constraints.get("energy", 100.0 * scale)) if budget else constraints.get("energy", 100.0 * scale)
)
if conflict:
# Legacy disruption support
disruption = conflict.primary_disruption if hasattr(conflict, 'primary_disruption') else conflict
self._internal_state.current_metrics = self.graph.cascade(self._internal_state.current_metrics, disruption)
if budget is None and hasattr(conflict, 'resource_budget'):
rb = conflict.resource_budget
self._internal_state.budget = ResourceBudget(
time_hours=rb.get("time", 20.0),
money_dollars=rb.get("money", 500.0),
energy_units=rb.get("energy", 100.0)
)
return self._get_obs()
def _get_obs(self, done: bool = False, reward: Optional[float] = None,
success: bool = False, failure: bool = False,
failure_reason: str = "", routes_remaining: int = 0) -> LifeStackObservation:
revealed_world = PartialObsFilter.filter(
self._internal_state.current_task,
self._internal_state.inspected_keys
)
return LifeStackObservation(
metrics=self._internal_state.current_metrics.flatten(),
resources={
"time": self._internal_state.budget.time_hours,
"money": self._internal_state.budget.money_dollars,
"energy": self._internal_state.budget.energy_units
},
step=self._internal_state.step_count,
done=done,
reward=reward,
metadata={
"world_state": revealed_world,
"goal": self._internal_state.current_task.goal,
"active_route": self._internal_state.active_route_id,
"milestones": self._internal_state.milestones_achieved,
"events": self._internal_state.fired_event_ids,
"success": success,
"failure": failure,
"failure_reason": failure_reason,
"routes_remaining": routes_remaining,
"conflict_title": self._internal_state.current_conflict.title if hasattr(self._internal_state.current_conflict, 'title') else "Custom Task",
"person": self._internal_state.person.name if hasattr(self._internal_state.person, 'name') else "Unknown"
}
)
def _update_metric(self, path: str, delta: float):
"""Internal helper for non-cascading updates."""
path = normalize_metric_path(path)
if '.' not in path:
return
domain_name, sub_name = path.split('.', 1)
domain = getattr(self._internal_state.current_metrics, domain_name, None)
if domain and hasattr(domain, sub_name):
val = getattr(domain, sub_name)
setattr(domain, sub_name, max(0.0, min(100.0, val + delta)))
def step(self, action: LifeStackAction, timeout_s: Optional[float] = None, **kwargs) -> LifeStackObservation:
"""Executes one step in the environment using LifeStackAction logic."""
if isinstance(action, dict):
action = LifeStackAction(**action)
task = self._internal_state.current_task
state_before = copy.deepcopy(self._internal_state.current_metrics)
info_msgs = []
# 0. Personality Drift & Legacy Escalation
if self._internal_state.person:
drift_event = self._internal_state.person.drift(self._internal_state.step_count)
if drift_event:
path = drift_event.get('metric', '')
delta = drift_event.get('delta', 0)
if path and '.' in path:
self._update_metric(path, delta)
info_msgs.append(f"DRIFT: {drift_event['reason']}")
if self._internal_state.current_conflict and self._internal_state.step_count == 2:
from agent.conflict_generator import adaptive_escalate
conflict = self._internal_state.current_conflict
if hasattr(conflict, 'difficulty') and conflict.difficulty < 5:
new_conflict, reason = adaptive_escalate(conflict, self._internal_state.agent_history)
if new_conflict.id != conflict.id:
self._internal_state.current_conflict = new_conflict
info_msgs.append(f"ESCALATION: {reason} -> {new_conflict.title}")
fired_events = self.world_engine.inject_events(
self._internal_state.step_count,
self._internal_state.world_state,
self._internal_state.hidden_state
)
if fired_events:
self._internal_state.exo_events_seen += len(fired_events)
for e in fired_events:
self._internal_state.fired_event_ids.append(e.id)
info_msgs.append(f"EVENT_FIRED: {e.description}")
self._internal_state.closed_route_ids.update(self.world_engine.get_closed_routes())
# 2. Tool Logic & Metric Changes
tool_type = action.action_type or (
"rollback" if action.is_rollback else
"inspect" if action.inspect_target else
"execute"
)
allowed_keys = set(self._internal_state.current_metrics.flatten().keys())
metric_changes = {k: v for k, v in action.metric_changes.items() if k in allowed_keys}
resource_cost = copy.deepcopy(action.resource_cost)
# Handle Rollback
if tool_type == "rollback":
self._internal_state.step_count += 1
if self._internal_state.used_rollback:
info_msgs.append("ROLLBACK_DENIED: Already used once.")
return self._get_obs(reward=-0.1)
if not self._internal_state.previous_metrics:
return self._get_obs(reward=0.0)
self._internal_state.current_metrics = copy.deepcopy(self._internal_state.previous_metrics)
self._internal_state.budget = copy.deepcopy(self._internal_state.previous_budget)
self._internal_state.used_rollback = True
self._internal_state.rollback_penalty_charged = True # Penalty baked into the -0.1 return above
return self._get_obs(reward=-0.1)
# Save state for future rollback
self._internal_state.previous_metrics = copy.deepcopy(self._internal_state.current_metrics)
self._internal_state.previous_budget = copy.deepcopy(self._internal_state.budget)
# Handle Inspect
if tool_type == "inspect":
target = action.target or action.inspect_target
if target:
if target in self._internal_state.inspected_keys:
info_msgs.append(f"INSPECT_REDUNDANT: {target}")
else:
self._internal_state.inspected_keys.append(target)
info_msgs.append(f"INSPECT_REVEALED: {target}")
# Emit an explicit signal when a hidden-state value is uncovered.
if target in task.hidden_state:
info_msgs.append(
f"INSPECT_REVEALED_HIDDEN: {target} = {task.hidden_state[target]}"
)
# Handle Wait
if tool_type == "wait":
self._internal_state.consecutive_waits += 1
if self._internal_state.consecutive_waits >= 4:
metric_changes["mental_wellbeing.stress_level"] = metric_changes.get("mental_wellbeing.stress_level", 0) + 15.0
info_msgs.append("WAIT_CAP_EXCEEDED: Forced stress applied.")
else:
self._internal_state.consecutive_waits = 0
# Handle Route Execution
if tool_type == "execute" and action.target:
route = next((r for r in task.viable_routes if r.id == action.target), None)
if route:
# Check closed
if route.id in self._internal_state.closed_route_ids:
info_msgs.append(f"ROUTE_BLOCKED: {route.name}")
else:
# Check preconditions
pre_ok = True
for k, v in route.preconditions.items():
current_v = self._internal_state.hidden_state.get(k, self._internal_state.world_state.get(k))
if current_v != v:
pre_ok = False
break
if not pre_ok:
info_msgs.append(f"PRECONDITIONS_FAILED for {route.name}")
else:
# Success: Apply route
self._internal_state.active_route_id = route.id
self._internal_state.world_state.update(route.consequences)
info_msgs.append(f"ROUTE_SUCCESS: {route.name}")
# 3. Resource Deduction (must happen BEFORE metric changes to prevent budget-bypass exploit)
deduct_ok = self._internal_state.budget.deduct(
time=resource_cost.get('time', 0.0),
money=resource_cost.get('money', 0.0),
energy=resource_cost.get('energy', 0.0)
)
if not deduct_ok:
info_msgs.append("RESOURCE_DEPLETED_ACTION_BLOCKED")
metric_changes = {} # Discard changes β€” agent can't afford this action
# 4. Apply Metric and Cascade
sig_changes = {k: v for k, v in metric_changes.items() if abs(v) > 5.0}
for k, v in metric_changes.items():
if k not in sig_changes:
self._update_metric(k, v)
if sig_changes:
self._internal_state.current_metrics = self.graph.cascade(self._internal_state.current_metrics, sig_changes)
# 5. Task Progression Check
success_mets = LifeStackVerifier.check_success(task, self._internal_state.world_state, self._internal_state.hidden_state)
failure_mets = LifeStackVerifier.check_failure(task, self._internal_state.world_state, self._internal_state.hidden_state, self._internal_state.current_metrics.flatten())
# Check milestones dynamically
newly_met = LifeStackVerifier.check_new_milestones(task, self._internal_state.world_state, self._internal_state.hidden_state, self._internal_state.milestones_achieved)
for mid in newly_met:
self._internal_state.milestones_achieved.append(mid)
if self._internal_state.exo_events_seen > 0:
self._internal_state.milestones_after_event += 1
info_msgs.append(f"MILESTONE_UNLOCKED: {mid}")
# 6. Reward Calculation (Task-Aware)
routes_rem, _ = LifeStackVerifier.get_route_status(task, self._internal_state.closed_route_ids, self._internal_state.world_state, self._internal_state.hidden_state)
# Determine cascade collapse
metrics_after = self._internal_state.current_metrics.flatten()
metrics_before = state_before.flatten()
collapse = any(metrics_after[k] < 20 and metrics_before[k] >= 20 for k in metrics_after)
# Track cumulative relationship erosion across steps
rel_keys_cum = [k for k in metrics_after if k.startswith('relationships.')]
if rel_keys_cum:
step_rel_delta = sum(metrics_after[k] - metrics_before[k] for k in rel_keys_cum) / len(rel_keys_cum)
self._internal_state.cumulative_rel_delta += step_rel_delta
# Increment step_count BEFORE reward so timeout_check fires correctly
self._internal_state.step_count += 1
# Rollback penalty fires only once per episode
rollback_this_step = self._internal_state.used_rollback and not self._internal_state.rollback_penalty_charged
if rollback_this_step:
self._internal_state.rollback_penalty_charged = True
# conflict_domain from task.domain (not conflict.title) to prevent empty-string bypass
conflict_domain = task.domain if task and hasattr(task, 'domain') else ""
if task:
reward, breakdown = compute_task_reward(
state_before=state_before,
state_after=self._internal_state.current_metrics,
resources_used=resource_cost,
actions_taken=action.actions_taken,
milestones_achieved=self._internal_state.milestones_achieved,
success_conditions_met=success_mets,
exo_events_seen=self._internal_state.exo_events_seen,
milestones_after_event=self._internal_state.milestones_after_event,
routes_remaining=routes_rem,
rollback_used=rollback_this_step,
cascade_collapse=collapse,
task=task,
reasoning=getattr(action, 'reasoning', ""),
completion=getattr(action, 'completion', ""),
conflict_domain=conflict_domain,
step_count=self._internal_state.step_count,
max_steps=self.max_steps,
metric_changes=metric_changes,
cumulative_rel_delta=self._internal_state.cumulative_rel_delta,
action_type=tool_type
)
# Charge the rollback penalty only once per episode
if self._internal_state.used_rollback and not self._internal_state.rollback_penalty_charged:
self._internal_state.rollback_penalty_charged = True
else:
reward, breakdown = compute_reward(
state_before=state_before,
state_after=self._internal_state.current_metrics,
resources_used=resource_cost,
actions_taken=action.actions_taken,
metric_changes=metric_changes,
completion=getattr(action, 'completion', ""),
action_type=tool_type
)
# 7. End Conditions
# Check if ANY success condition is met.
# For multi-goal tasks with mutually exclusive routes, any() allows termination.
is_success = any(success_mets) if (success_mets and len(task.success_conditions) > 0) else False
is_task_failure = any(val == True for val in failure_mets)
metric_death = any(v <= 10 for v in metrics_after.values())
failure_reason = ""
if is_task_failure:
reasons = [cond['key'] for i, cond in enumerate(task.failure_conditions) if failure_mets[i]]
failure_reason = f"Condition failed: {', '.join(reasons)}"
elif metric_death:
dead_metrics = [k for k, v in metrics_after.items() if v <= 0]
failure_reason = f"Metrics hit zero: {', '.join(dead_metrics)}"
elif routes_rem == 0 and not is_success:
failure_reason = "Dead end: No reachable routes left."
terminated = is_task_failure or metric_death
truncated = self._internal_state.step_count >= self.max_steps
if is_success:
truncated = True
done = terminated or truncated
observation = self._get_obs(
done,
reward,
success=is_success,
failure=terminated,
failure_reason=failure_reason,
routes_remaining=routes_rem
)
observation.metadata["breakdown"] = breakdown
observation.metadata["info"] = info_msgs
return observation
def rollout(self, n_steps: int = 7, gamma: float = 0.9) -> dict:
"""
Simulate n_steps null/rest actions starting from the current env state.
Intended to be called immediately AFTER env.step(model_action) so it
models "what happens to your life over the next N days if nothing
extraordinary occurs."
The env state is fully restored after the rollout β€” calling this is
side-effect-free from the caller's perspective.
Returns:
{
"discounted_reward": float, # Ξ³-discounted cumulative
"immediate_r0": float, # reward from the action (caller supplies)
"trajectory": [ # one entry per simulated day
{
"step": int, # 1-indexed future day
"reward": float,
"metrics": Dict[str, float], # flattened snapshot
"discounted_contribution": float,
},
...
],
"n_steps_completed": int,
}
"""
saved_state = copy.deepcopy(self._internal_state)
null_action = LifeStackAction(
action_type="rest",
target="time",
metric_changes={},
resource_cost={},
actions_taken=0,
)
trajectory = []
cumulative = 0.0
for t in range(n_steps):
obs = self.step(null_action)
disc = (gamma ** (t + 1)) * float(obs.reward)
cumulative += disc
trajectory.append({
"step": t + 1,
"reward": float(obs.reward),
"metrics": dict(obs.metrics),
"discounted_contribution": round(disc, 5),
})
if obs.done:
break
# Restore β€” rollout must not mutate the env visible to the caller
self._internal_state = saved_state
return {
"discounted_reward": round(cumulative, 5),
"trajectory": trajectory,
"n_steps_completed": len(trajectory),
}
def render(self):
"""Vibrant status report of the current state and task progress."""
task = self._internal_state.current_task
print("\n" + "═"*70)
print(f"🎯 GOAL: {task.goal} | Horizon: {self._internal_state.step_count}/{self.max_steps}")
print(f"βŒ› TIME: {self._internal_state.budget.time_hours:.1f}h | πŸ’΅ MONEY: ${self._internal_state.budget.money_dollars:.1f} | ⚑ ENERGY: {self._internal_state.budget.energy_units:.1f}")
if self._internal_state.active_route_id:
print(f"πŸ›£οΈ ACTIVE ROUTE: {self._internal_state.active_route_id}")
print(f"⭐ MILESTONES: {', '.join(self._internal_state.milestones_achieved) or 'None'}")
if self._internal_state.fired_event_ids:
print(f"🚨 EVENTS: {', '.join(self._internal_state.fired_event_ids)}")
flat = self._internal_state.current_metrics.flatten()
domain_labels = {
"career": "πŸ’Ό CAREER",
"finances": "πŸ’° FINANCES",
"relationships": "❀️ RELATIONSHIPS",
"physical_health": "πŸ’ͺ PHYSICAL",
"mental_wellbeing": "🧠 MENTAL",
"time": "πŸ“… TIME"
}
for dom, label in domain_labels.items():
print(f"\n{label}")
submetrics = {k: v for k, v in flat.items() if k.startswith(dom + ".")}
inverted = {"stress_level", "debt_pressure", "workload", "commute_burden", "admin_overhead"}
for name, val in submetrics.items():
short = name.split('.')[1]
icon = ("πŸ”΄" if val > 70 else "🟒") if short in inverted else ("🟒" if val > 70 else "πŸ”΄")
if 40 <= val <= 70: icon = "🟑"
print(f" {icon} {short:20} : {val:5.2f}")
print("═"*70)
def env_render_compact(env, obs):
"""Compact printer for testing."""
print(f"STEP: {obs.step} | REWARD: {obs.reward:.3f} | DONE: {obs.done}")
if obs.metadata.get("breakdown", {}).get("penalties_fired"):
print(f" ⚠️ PENALTIES: {obs.metadata['breakdown']['penalties_fired']}")
def main():
env = LifeStackEnv()
# 1. Reset with Friday 6PM Conflict
conflict = {
"career.workload": 30.0,
"finances.liquidity": -40.0
}
print("Initializing environment with Friday 6PM conflict...")
env.reset(conflict=conflict)
env.render()
total_reward = 0
metrics_history = []
# 2. Sequential Actions
scenarios = [
{
"name": "GOOD ACTION: Delegating and budget review",
"action": {
"metric_changes": {"career.workload": -15.0, "finances.liquidity": 10.0, "mental_wellbeing.stress_level": -5.0},
"resource_cost": {"time": 4.0, "money": 100.0, "energy": 20.0},
"actions_taken": 2
}
},
{
"name": "MEDIUM ACTION: Small self-care rest",
"action": {
"metric_changes": {"physical_health.sleep_quality": 6.0, "mental_wellbeing.clarity": 3.0},
"resource_cost": {"time": 2.0, "energy": -20.0}, # Rest recovers energy
"actions_taken": 1
}
},
{
"name": "INACTION: Let the cascade run",
"action": {
"metric_changes": {},
"resource_cost": {},
"actions_taken": 0
}
}
]
for sce in scenarios:
print(f"\nTaking Action: {sce['name']}...")
action_obj = LifeStackAction(**sce['action'])
obs = env.step(action_obj)
env_render_compact(env, obs)
total_reward += (obs.reward or 0.0)
# 3. Final Summary
final_flat = env.state.current_metrics.flatten()
critical = [k for k, v in final_flat.items() if v < 20]
print("\n" + "β–ˆ"*60)
print("EPISODE SUMMARY")
print(f"Steps Taken : {env.state.step_count}")
print(f"Total Cumulative Reward : {total_reward:.4f}")
if critical:
print(f"Critical Floor Violations: {', '.join(critical)}")
else:
print("Critical Violations: NONE")
print("β–ˆ"*60)
if __name__ == "__main__":
main()