| """ |
| test_lifestack.py β LifeStack Edge Case Test Suite |
| Covers: cascade bounds, resource exhaustion, penalties, memory threshold, episode termination. |
| """ |
|
|
| import sys, os; sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) |
| import copy |
| import shutil |
| import pytest |
|
|
| from core.life_state import LifeMetrics, ResourceBudget, DependencyGraph |
| from core.lifestack_env import LifeStackEnv, LifeStackAction |
| from core.reward import compute_reward |
| from intake.simperson import SimPerson |
| from agent.memory import LifeStackMemory |
|
|
|
|
| passed = 0 |
| total = 11 |
|
|
|
|
| def report(name, ok, detail=""): |
| global passed |
| tag = "β
PASS" if ok else "β FAIL" |
| passed += ok |
| print(f" {tag} {name}") |
| if detail: |
| print(f" {detail}") |
|
|
|
|
| |
| def test_cascade_floor(): |
| graph = DependencyGraph() |
| metrics = LifeMetrics() |
| |
| result = graph.cascade(metrics, {"finances.liquidity": -200.0}) |
| flat = result.flatten() |
| min_val = min(flat.values()) |
| report("Cascade floor (metrics >= 0)", min_val >= 0.0, |
| f"min metric = {min_val:.2f}") |
|
|
|
|
| |
| def test_cascade_ceiling(): |
| graph = DependencyGraph() |
| metrics = LifeMetrics() |
| |
| result = graph.cascade(metrics, {"career.workload": +200.0}) |
| flat = result.flatten() |
| max_val = max(flat.values()) |
| report("Cascade ceiling (metrics <= 100)", max_val <= 100.0, |
| f"max metric = {max_val:.2f}") |
|
|
|
|
| |
| def test_resource_exhaustion(): |
| budget = ResourceBudget(time_hours=5.0, money_dollars=100.0, energy_units=20.0) |
| ok = budget.deduct(time=10.0, money=0.0, energy=0.0) |
| report("Resource exhaustion (deduct returns False, no negative)", |
| ok is False and budget.time_hours >= 0, |
| f"deduct returned {ok}, time_hours = {budget.time_hours:.1f}") |
|
|
|
|
| |
| def test_inaction_penalty(): |
| state = LifeMetrics() |
| _, breakdown = compute_reward(state, copy.deepcopy(state), {}, actions_taken=0) |
| fired = breakdown["penalties_fired"] |
| report("Inaction penalty fires", |
| "INACTION_PENALTY" in fired, |
| f"penalties_fired = {fired}") |
|
|
|
|
| |
| def test_critical_floor_penalty(): |
| before = LifeMetrics() |
| after = copy.deepcopy(before) |
| after.physical_health.energy = 15.0 |
| _, breakdown = compute_reward(before, after, {}, actions_taken=1) |
| fired = breakdown["penalties_fired"] |
| report("Critical floor penalty fires", |
| "CRITICAL_FLOOR_VIOLATION" in fired, |
| f"energy = 15.0, penalties_fired = {fired}") |
|
|
|
|
| |
| def test_cascade_dampening(): |
| graph = DependencyGraph() |
| metrics = LifeMetrics() |
| primary_delta = 30.0 |
| result = graph.cascade(metrics, {"career.workload": primary_delta}) |
| flat_before = metrics.flatten() |
| flat_after = result.flatten() |
|
|
| |
| first_order = abs(flat_after["career.workload"] - flat_before["career.workload"]) |
|
|
| |
| |
| second_order_deltas = [] |
| for target, _ in graph.edges.get("career.workload", []): |
| delta = abs(flat_after[target] - flat_before[target]) |
| second_order_deltas.append((target, delta)) |
|
|
| all_smaller = all(d < first_order for _, d in second_order_deltas) |
| detail = "; ".join(f"{t}: {d:.2f}" for t, d in second_order_deltas) |
| report("Cascade dampening (2nd order < 1st order)", |
| all_smaller and len(second_order_deltas) > 0, |
| f"1st order = {first_order:.2f} | 2nd order: {detail}") |
|
|
|
|
| |
| def test_simperson_uptake_bounds(): |
| person = SimPerson( |
| openness=0.5, conscientiousness=0.3, extraversion=0.2, |
| agreeableness=0.4, neuroticism=1.0, name="Stressed" |
| ) |
| action_types = ["communicate", "delegate", "rest", "structured_plan", |
| "negotiate", "spend", "exercise", "meditate", |
| "network", "study"] |
| results = [] |
| all_ok = True |
| for at in action_types: |
| uptake = person.respond_to_action(at, {"time": 5, "money": 100, "energy": 30}, 100.0) |
| results.append((at, uptake)) |
| if uptake < 0.1 or uptake > 1.0: |
| all_ok = False |
|
|
| detail = ", ".join(f"{a}={u:.2f}" for a, u in results) |
| report("SimPerson uptake bounds [0.1, 1.0]", |
| all_ok, |
| f"uptakes: {detail}") |
|
|
|
|
| |
| def test_memory_threshold(): |
| |
| test_dir = "./test_memory_tmp" |
| if os.path.exists(test_dir): |
| shutil.rmtree(test_dir) |
| os.makedirs(test_dir, exist_ok=True) |
| try: |
| memory = LifeStackMemory(silent=True, path=test_dir) |
| rewards = [0.5, 1.5, 2.1, 2.5, 3.0] |
|
|
| for i, r in enumerate(rewards): |
| memory.store_trajectory( |
| conflict_title="test conflict", |
| route_taken=f"action_{i}", |
| total_reward=r, |
| metrics_diff_str="test_diff", |
| reasoning="test reasoning", |
| ) |
|
|
| expected = len(rewards) |
| actual = memory.collection.count() |
| report("Memory storage (all trajectories stored for retrieval filtering)", |
| actual == expected, |
| f"expected {expected}, stored {actual} (all items regardless of reward)") |
| finally: |
| shutil.rmtree(test_dir, ignore_errors=True) |
|
|
|
|
| |
| def test_episode_termination(): |
| from core.task import Task |
| t = Task(id="test", domain="test", goal="test", constraints={}, hidden_state={}, |
| mutable_world={}, visible_world={}, success_conditions=[], |
| failure_conditions=[], event_schedule=[], viable_routes=[], |
| milestones=[], horizon=5, difficulty=1, domain_metadata={}) |
| env = LifeStackEnv() |
| obs = env.reset(task=t) |
|
|
| done = False |
| for _ in range(5): |
| obs = env.step(LifeStackAction( |
| metric_changes={}, |
| resource_cost={}, |
| actions_taken=0, |
| )) |
| done = obs.done |
|
|
| report("Episode terminates after horizon steps", |
| done is True, |
| f"done = {done} after {env.state.step_count} steps") |
|
|
|
|
| |
| def test_task_driven_smoke(): |
| from core.task import FlightCrisisTask |
| from core.action_space import ToolActionType |
| env = LifeStackEnv() |
| task = FlightCrisisTask() |
| obs = env.reset(task=task) |
| |
| |
| obs = env.step(LifeStackAction( |
| action_type="inspect", |
| target="card_available", |
| reasoning="Need to know if I can rebook" |
| )) |
| |
| revealed = obs.metadata.get("world_state", {}) |
| inspect_ok = "card_available" in revealed or "ERROR" not in str(obs.metadata.get("info")) |
| |
| |
| |
| obs = env.step(LifeStackAction( |
| action_type="execute", |
| target="rebook_premium", |
| reasoning="Try rebooking" |
| )) |
| |
| info = obs.metadata.get("info", []) |
| route_ok = any("ROUTE_SUCCESS" in msg for msg in info) |
| |
| report("Task-driven episode (Inspect + Route)", |
| route_ok, |
| f"info: {info}") |
|
|
|
|
| |
| @pytest.mark.skipif( |
| not os.environ.get("OPENAI_API_KEY") and not os.environ.get("GROQ_API_KEY"), |
| reason="Skipped: no API key (OpenAI/Groq) in environment" |
| ) |
| def test_full_episode_smoke(): |
| test_dir = "./test_episode_memory_tmp" |
| if os.path.exists(test_dir): |
| shutil.rmtree(test_dir) |
| try: |
| from scripts.run_episode import run_episode |
| memory = LifeStackMemory(silent=True, path=test_dir) |
| result = run_episode(difficulty=1, verbose=False, memory=memory) |
| reward = result.get("total_reward", None) |
| steps = result.get("steps", None) |
| ok = isinstance(reward, float) and (steps is None or steps <= 30) |
| report("Full episode smoke test", |
| ok, |
| f"reward = {reward}, steps = {steps}, type = {type(reward).__name__}") |
| except Exception as e: |
| report("Full episode smoke test", False, f"Exception: {e}") |
| finally: |
| shutil.rmtree(test_dir, ignore_errors=True) |
|
|
|
|
| |
| if __name__ == "__main__": |
| print("\n" + "=" * 60) |
| print(" LifeStack Edge Case Test Suite") |
| print("=" * 60 + "\n") |
|
|
| test_cascade_floor() |
| test_cascade_ceiling() |
| test_resource_exhaustion() |
| test_inaction_penalty() |
| test_critical_floor_penalty() |
| test_cascade_dampening() |
| test_simperson_uptake_bounds() |
| test_memory_threshold() |
| test_episode_termination() |
| test_task_driven_smoke() |
| test_full_episode_smoke() |
|
|
| print("\n" + "=" * 60) |
| color = "\033[92m" if passed == total else "\033[91m" |
| print(f" {color}{passed}/{total} tests passed\033[0m") |
| print("=" * 60 + "\n") |
|
|