File size: 11,058 Bytes
77da5ce
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
"""
test_lifestack.py β€” LifeStack Edge Case Test Suite
Covers: cascade bounds, resource exhaustion, penalties, memory threshold, episode termination.
"""

import sys, os; sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import copy
import shutil
import pytest

from core.life_state import LifeMetrics, ResourceBudget, DependencyGraph
from core.lifestack_env import LifeStackEnv, LifeStackAction
from core.reward import compute_reward
from intake.simperson import SimPerson
from agent.memory import LifeStackMemory


passed = 0
total  = 11


def report(name, ok, detail=""):
    global passed
    tag = "βœ… PASS" if ok else "❌ FAIL"
    passed += ok
    print(f"  {tag}  {name}")
    if detail:
        print(f"         {detail}")


# ─── 1. Cascade Floor Test ────────────────────────────────────────────────────
def test_cascade_floor():
    graph   = DependencyGraph()
    metrics = LifeMetrics()
    # Push liquidity from 70 down by 200 β€” should clamp at 0, not go negative
    result  = graph.cascade(metrics, {"finances.liquidity": -200.0})
    flat    = result.flatten()
    min_val = min(flat.values())
    report("Cascade floor (metrics >= 0)", min_val >= 0.0,
           f"min metric = {min_val:.2f}")


# ─── 2. Cascade Ceiling Test ─────────────────────────────────────────────────
def test_cascade_ceiling():
    graph   = DependencyGraph()
    metrics = LifeMetrics()
    # Push workload from 70 up by 200 β€” should clamp at 100
    result  = graph.cascade(metrics, {"career.workload": +200.0})
    flat    = result.flatten()
    max_val = max(flat.values())
    report("Cascade ceiling (metrics <= 100)", max_val <= 100.0,
           f"max metric = {max_val:.2f}")


# ─── 3. Resource Exhaustion Test ──────────────────────────────────────────────
def test_resource_exhaustion():
    budget = ResourceBudget(time_hours=5.0, money_dollars=100.0, energy_units=20.0)
    ok     = budget.deduct(time=10.0, money=0.0, energy=0.0)
    report("Resource exhaustion (deduct returns False, no negative)",
           ok is False and budget.time_hours >= 0,
           f"deduct returned {ok}, time_hours = {budget.time_hours:.1f}")


# ─── 4. Zero Action (Inaction) Penalty Test ───────────────────────────────────
def test_inaction_penalty():
    state = LifeMetrics()
    _, breakdown = compute_reward(state, copy.deepcopy(state), {}, actions_taken=0)
    fired = breakdown["penalties_fired"]
    report("Inaction penalty fires",
           "INACTION_PENALTY" in fired,
           f"penalties_fired = {fired}")


# ─── 5. Critical Floor Penalty Test ──────────────────────────────────────────
def test_critical_floor_penalty():
    before = LifeMetrics()
    after  = copy.deepcopy(before)
    after.physical_health.energy = 15.0       # below 20 threshold
    _, breakdown = compute_reward(before, after, {}, actions_taken=1)
    fired = breakdown["penalties_fired"]
    report("Critical floor penalty fires",
           "CRITICAL_FLOOR_VIOLATION" in fired,
           f"energy = 15.0, penalties_fired = {fired}")


# ─── 6. Cascade Dampening Test ───────────────────────────────────────────────
def test_cascade_dampening():
    graph   = DependencyGraph()
    metrics = LifeMetrics()
    primary_delta = 30.0
    result  = graph.cascade(metrics, {"career.workload": primary_delta})
    flat_before = metrics.flatten()
    flat_after  = result.flatten()

    # First-order target: career.workload should change by exactly primary_delta
    first_order = abs(flat_after["career.workload"] - flat_before["career.workload"])

    # Second-order targets connected via edges from career.workload
    # e.g. mental_wellbeing.stress_level, time.free_hours_per_week
    second_order_deltas = []
    for target, _ in graph.edges.get("career.workload", []):
        delta = abs(flat_after[target] - flat_before[target])
        second_order_deltas.append((target, delta))

    all_smaller = all(d < first_order for _, d in second_order_deltas)
    detail = "; ".join(f"{t}: {d:.2f}" for t, d in second_order_deltas)
    report("Cascade dampening (2nd order < 1st order)",
           all_smaller and len(second_order_deltas) > 0,
           f"1st order = {first_order:.2f} | 2nd order: {detail}")


# ─── 7. SimPerson Uptake Bounds Test ─────────────────────────────────────────
def test_simperson_uptake_bounds():
    person = SimPerson(
        openness=0.5, conscientiousness=0.3, extraversion=0.2,
        agreeableness=0.4, neuroticism=1.0, name="Stressed"
    )
    action_types = ["communicate", "delegate", "rest", "structured_plan",
                    "negotiate", "spend", "exercise", "meditate",
                    "network", "study"]
    results = []
    all_ok  = True
    for at in action_types:
        uptake = person.respond_to_action(at, {"time": 5, "money": 100, "energy": 30}, 100.0)
        results.append((at, uptake))
        if uptake < 0.1 or uptake > 1.0:
            all_ok = False

    detail = ", ".join(f"{a}={u:.2f}" for a, u in results)
    report("SimPerson uptake bounds [0.1, 1.0]",
           all_ok,
           f"uptakes: {detail}")


# ─── 8. Memory Threshold Test ────────────────────────────────────────────────
def test_memory_threshold():
    # Use a fresh isolated memory dir
    test_dir = "./test_memory_tmp"
    if os.path.exists(test_dir):
        shutil.rmtree(test_dir)
    os.makedirs(test_dir, exist_ok=True)
    try:
        memory = LifeStackMemory(silent=True, path=test_dir)
        rewards = [0.5, 1.5, 2.1, 2.5, 3.0]

        for i, r in enumerate(rewards):
            memory.store_trajectory(
                conflict_title="test conflict",
                route_taken=f"action_{i}",
                total_reward=r,
                metrics_diff_str="test_diff",
                reasoning="test reasoning",
            )

        expected = len(rewards)
        actual = memory.collection.count()
        report("Memory storage (all trajectories stored for retrieval filtering)",
               actual == expected,
               f"expected {expected}, stored {actual} (all items regardless of reward)")
    finally:
        shutil.rmtree(test_dir, ignore_errors=True)


# ─── 9. Episode Termination Test ─────────────────────────────────────────────
def test_episode_termination():
    from core.task import Task
    t = Task(id="test", domain="test", goal="test", constraints={}, hidden_state={}, 
             mutable_world={}, visible_world={}, success_conditions=[], 
             failure_conditions=[], event_schedule=[], viable_routes=[], 
             milestones=[], horizon=5, difficulty=1, domain_metadata={})
    env = LifeStackEnv()
    obs = env.reset(task=t)

    done = False
    for _ in range(5):
        obs = env.step(LifeStackAction(
            metric_changes={},
            resource_cost={},
            actions_taken=0,
        ))
        done = obs.done

    report("Episode terminates after horizon steps",
           done is True,
           f"done = {done} after {env.state.step_count} steps")


# ─── 10. Task-Driven Smoke Test ──────────────────────────────────────────────
def test_task_driven_smoke():
    from core.task import FlightCrisisTask
    from core.action_space import ToolActionType
    env = LifeStackEnv()
    task = FlightCrisisTask()
    obs = env.reset(task=task)
    
    # 1. Inspect hidden state
    obs = env.step(LifeStackAction(
        action_type="inspect",
        target="card_available",
        reasoning="Need to know if I can rebook"
    ))
    
    revealed = obs.metadata.get("world_state", {})
    inspect_ok = "card_available" in revealed or "ERROR" not in str(obs.metadata.get("info"))
    
    # 2. Execute route
    # Note: FlightCrisisTask has Route(id="rebook_premium", ...)
    obs = env.step(LifeStackAction(
        action_type="execute",
        target="rebook_premium",
        reasoning="Try rebooking"
    ))
    
    info = obs.metadata.get("info", [])
    route_ok = any("ROUTE_SUCCESS" in msg for msg in info)
    
    report("Task-driven episode (Inspect + Route)",
           route_ok,
           f"info: {info}")


# ─── 11. Full Episode Smoke Test ─────────────────────────────────────────────
@pytest.mark.skipif(
    not os.environ.get("OPENAI_API_KEY") and not os.environ.get("GROQ_API_KEY"),
    reason="Skipped: no API key (OpenAI/Groq) in environment"
)
def test_full_episode_smoke():
    test_dir = "./test_episode_memory_tmp"
    if os.path.exists(test_dir):
        shutil.rmtree(test_dir)
    try:
        from scripts.run_episode import run_episode
        memory = LifeStackMemory(silent=True, path=test_dir)
        result = run_episode(difficulty=1, verbose=False, memory=memory)
        reward = result.get("total_reward", None)
        steps  = result.get("steps", None)
        ok     = isinstance(reward, float) and (steps is None or steps <= 30)
        report("Full episode smoke test",
               ok,
               f"reward = {reward}, steps = {steps}, type = {type(reward).__name__}")
    except Exception as e:
        report("Full episode smoke test", False, f"Exception: {e}")
    finally:
        shutil.rmtree(test_dir, ignore_errors=True)


# ─── Run All ──────────────────────────────────────────────────────────────────
if __name__ == "__main__":
    print("\n" + "=" * 60)
    print("  LifeStack Edge Case Test Suite")
    print("=" * 60 + "\n")

    test_cascade_floor()
    test_cascade_ceiling()
    test_resource_exhaustion()
    test_inaction_penalty()
    test_critical_floor_penalty()
    test_cascade_dampening()
    test_simperson_uptake_bounds()
    test_memory_threshold()
    test_episode_termination()
    test_task_driven_smoke()
    test_full_episode_smoke()

    print("\n" + "=" * 60)
    color = "\033[92m" if passed == total else "\033[91m"
    print(f"  {color}{passed}/{total} tests passed\033[0m")
    print("=" * 60 + "\n")