File size: 11,058 Bytes
77da5ce | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 | """
test_lifestack.py β LifeStack Edge Case Test Suite
Covers: cascade bounds, resource exhaustion, penalties, memory threshold, episode termination.
"""
import sys, os; sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import copy
import shutil
import pytest
from core.life_state import LifeMetrics, ResourceBudget, DependencyGraph
from core.lifestack_env import LifeStackEnv, LifeStackAction
from core.reward import compute_reward
from intake.simperson import SimPerson
from agent.memory import LifeStackMemory
passed = 0
total = 11
def report(name, ok, detail=""):
global passed
tag = "β
PASS" if ok else "β FAIL"
passed += ok
print(f" {tag} {name}")
if detail:
print(f" {detail}")
# βββ 1. Cascade Floor Test ββββββββββββββββββββββββββββββββββββββββββββββββββββ
def test_cascade_floor():
graph = DependencyGraph()
metrics = LifeMetrics()
# Push liquidity from 70 down by 200 β should clamp at 0, not go negative
result = graph.cascade(metrics, {"finances.liquidity": -200.0})
flat = result.flatten()
min_val = min(flat.values())
report("Cascade floor (metrics >= 0)", min_val >= 0.0,
f"min metric = {min_val:.2f}")
# βββ 2. Cascade Ceiling Test βββββββββββββββββββββββββββββββββββββββββββββββββ
def test_cascade_ceiling():
graph = DependencyGraph()
metrics = LifeMetrics()
# Push workload from 70 up by 200 β should clamp at 100
result = graph.cascade(metrics, {"career.workload": +200.0})
flat = result.flatten()
max_val = max(flat.values())
report("Cascade ceiling (metrics <= 100)", max_val <= 100.0,
f"max metric = {max_val:.2f}")
# βββ 3. Resource Exhaustion Test ββββββββββββββββββββββββββββββββββββββββββββββ
def test_resource_exhaustion():
budget = ResourceBudget(time_hours=5.0, money_dollars=100.0, energy_units=20.0)
ok = budget.deduct(time=10.0, money=0.0, energy=0.0)
report("Resource exhaustion (deduct returns False, no negative)",
ok is False and budget.time_hours >= 0,
f"deduct returned {ok}, time_hours = {budget.time_hours:.1f}")
# βββ 4. Zero Action (Inaction) Penalty Test βββββββββββββββββββββββββββββββββββ
def test_inaction_penalty():
state = LifeMetrics()
_, breakdown = compute_reward(state, copy.deepcopy(state), {}, actions_taken=0)
fired = breakdown["penalties_fired"]
report("Inaction penalty fires",
"INACTION_PENALTY" in fired,
f"penalties_fired = {fired}")
# βββ 5. Critical Floor Penalty Test ββββββββββββββββββββββββββββββββββββββββββ
def test_critical_floor_penalty():
before = LifeMetrics()
after = copy.deepcopy(before)
after.physical_health.energy = 15.0 # below 20 threshold
_, breakdown = compute_reward(before, after, {}, actions_taken=1)
fired = breakdown["penalties_fired"]
report("Critical floor penalty fires",
"CRITICAL_FLOOR_VIOLATION" in fired,
f"energy = 15.0, penalties_fired = {fired}")
# βββ 6. Cascade Dampening Test βββββββββββββββββββββββββββββββββββββββββββββββ
def test_cascade_dampening():
graph = DependencyGraph()
metrics = LifeMetrics()
primary_delta = 30.0
result = graph.cascade(metrics, {"career.workload": primary_delta})
flat_before = metrics.flatten()
flat_after = result.flatten()
# First-order target: career.workload should change by exactly primary_delta
first_order = abs(flat_after["career.workload"] - flat_before["career.workload"])
# Second-order targets connected via edges from career.workload
# e.g. mental_wellbeing.stress_level, time.free_hours_per_week
second_order_deltas = []
for target, _ in graph.edges.get("career.workload", []):
delta = abs(flat_after[target] - flat_before[target])
second_order_deltas.append((target, delta))
all_smaller = all(d < first_order for _, d in second_order_deltas)
detail = "; ".join(f"{t}: {d:.2f}" for t, d in second_order_deltas)
report("Cascade dampening (2nd order < 1st order)",
all_smaller and len(second_order_deltas) > 0,
f"1st order = {first_order:.2f} | 2nd order: {detail}")
# βββ 7. SimPerson Uptake Bounds Test βββββββββββββββββββββββββββββββββββββββββ
def test_simperson_uptake_bounds():
person = SimPerson(
openness=0.5, conscientiousness=0.3, extraversion=0.2,
agreeableness=0.4, neuroticism=1.0, name="Stressed"
)
action_types = ["communicate", "delegate", "rest", "structured_plan",
"negotiate", "spend", "exercise", "meditate",
"network", "study"]
results = []
all_ok = True
for at in action_types:
uptake = person.respond_to_action(at, {"time": 5, "money": 100, "energy": 30}, 100.0)
results.append((at, uptake))
if uptake < 0.1 or uptake > 1.0:
all_ok = False
detail = ", ".join(f"{a}={u:.2f}" for a, u in results)
report("SimPerson uptake bounds [0.1, 1.0]",
all_ok,
f"uptakes: {detail}")
# βββ 8. Memory Threshold Test ββββββββββββββββββββββββββββββββββββββββββββββββ
def test_memory_threshold():
# Use a fresh isolated memory dir
test_dir = "./test_memory_tmp"
if os.path.exists(test_dir):
shutil.rmtree(test_dir)
os.makedirs(test_dir, exist_ok=True)
try:
memory = LifeStackMemory(silent=True, path=test_dir)
rewards = [0.5, 1.5, 2.1, 2.5, 3.0]
for i, r in enumerate(rewards):
memory.store_trajectory(
conflict_title="test conflict",
route_taken=f"action_{i}",
total_reward=r,
metrics_diff_str="test_diff",
reasoning="test reasoning",
)
expected = len(rewards)
actual = memory.collection.count()
report("Memory storage (all trajectories stored for retrieval filtering)",
actual == expected,
f"expected {expected}, stored {actual} (all items regardless of reward)")
finally:
shutil.rmtree(test_dir, ignore_errors=True)
# βββ 9. Episode Termination Test βββββββββββββββββββββββββββββββββββββββββββββ
def test_episode_termination():
from core.task import Task
t = Task(id="test", domain="test", goal="test", constraints={}, hidden_state={},
mutable_world={}, visible_world={}, success_conditions=[],
failure_conditions=[], event_schedule=[], viable_routes=[],
milestones=[], horizon=5, difficulty=1, domain_metadata={})
env = LifeStackEnv()
obs = env.reset(task=t)
done = False
for _ in range(5):
obs = env.step(LifeStackAction(
metric_changes={},
resource_cost={},
actions_taken=0,
))
done = obs.done
report("Episode terminates after horizon steps",
done is True,
f"done = {done} after {env.state.step_count} steps")
# βββ 10. Task-Driven Smoke Test ββββββββββββββββββββββββββββββββββββββββββββββ
def test_task_driven_smoke():
from core.task import FlightCrisisTask
from core.action_space import ToolActionType
env = LifeStackEnv()
task = FlightCrisisTask()
obs = env.reset(task=task)
# 1. Inspect hidden state
obs = env.step(LifeStackAction(
action_type="inspect",
target="card_available",
reasoning="Need to know if I can rebook"
))
revealed = obs.metadata.get("world_state", {})
inspect_ok = "card_available" in revealed or "ERROR" not in str(obs.metadata.get("info"))
# 2. Execute route
# Note: FlightCrisisTask has Route(id="rebook_premium", ...)
obs = env.step(LifeStackAction(
action_type="execute",
target="rebook_premium",
reasoning="Try rebooking"
))
info = obs.metadata.get("info", [])
route_ok = any("ROUTE_SUCCESS" in msg for msg in info)
report("Task-driven episode (Inspect + Route)",
route_ok,
f"info: {info}")
# βββ 11. Full Episode Smoke Test βββββββββββββββββββββββββββββββββββββββββββββ
@pytest.mark.skipif(
not os.environ.get("OPENAI_API_KEY") and not os.environ.get("GROQ_API_KEY"),
reason="Skipped: no API key (OpenAI/Groq) in environment"
)
def test_full_episode_smoke():
test_dir = "./test_episode_memory_tmp"
if os.path.exists(test_dir):
shutil.rmtree(test_dir)
try:
from scripts.run_episode import run_episode
memory = LifeStackMemory(silent=True, path=test_dir)
result = run_episode(difficulty=1, verbose=False, memory=memory)
reward = result.get("total_reward", None)
steps = result.get("steps", None)
ok = isinstance(reward, float) and (steps is None or steps <= 30)
report("Full episode smoke test",
ok,
f"reward = {reward}, steps = {steps}, type = {type(reward).__name__}")
except Exception as e:
report("Full episode smoke test", False, f"Exception: {e}")
finally:
shutil.rmtree(test_dir, ignore_errors=True)
# βββ Run All ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
if __name__ == "__main__":
print("\n" + "=" * 60)
print(" LifeStack Edge Case Test Suite")
print("=" * 60 + "\n")
test_cascade_floor()
test_cascade_ceiling()
test_resource_exhaustion()
test_inaction_penalty()
test_critical_floor_penalty()
test_cascade_dampening()
test_simperson_uptake_bounds()
test_memory_threshold()
test_episode_termination()
test_task_driven_smoke()
test_full_episode_smoke()
print("\n" + "=" * 60)
color = "\033[92m" if passed == total else "\033[91m"
print(f" {color}{passed}/{total} tests passed\033[0m")
print("=" * 60 + "\n")
|