workflow-orchestrator / tests /test_environment.py
kartikmandar's picture
improve: enrich graders, observations, and reward signals
adf8466
"""Tests for the full OrchestratorEnvironment β€” reset, step, state, walkthroughs."""
import pytest
from models import OrchestratorAction, OrchestratorObservation, OrchestratorState
from server.environment import OrchestratorEnvironment, _episode_store
# ── Helpers ──
def _make_env(task_id: str = "easy") -> tuple[OrchestratorEnvironment, OrchestratorObservation]:
env = OrchestratorEnvironment()
obs = env.reset(task_id=task_id)
return env, obs
def _delegate(env, subtask_id, agent_name):
return env.step(OrchestratorAction(
action_type="delegate", subtask_id=subtask_id, agent_name=agent_name,
))
def _wait(env):
return env.step(OrchestratorAction(action_type="wait"))
def _synthesize(env):
return env.step(OrchestratorAction(action_type="synthesize"))
def _abort(env, subtask_id):
return env.step(OrchestratorAction(
action_type="abort", subtask_id=subtask_id,
))
def _retry(env, subtask_id, agent_name):
return env.step(OrchestratorAction(
action_type="retry", subtask_id=subtask_id, agent_name=agent_name,
))
# ── Reset tests ──
class TestReset:
def test_reset_returns_valid_observation(self) -> None:
env, obs = _make_env("easy")
assert isinstance(obs, OrchestratorObservation)
assert obs.done is False
assert obs.reward == 0.01 # clamped to (0, 1) exclusive for eval compliance
assert obs.time_remaining == 15
assert obs.time_elapsed == 0
def test_reset_easy_initial_ready(self) -> None:
env, obs = _make_env("easy")
ready = [s for s in obs.subtasks if s.status == "ready"]
assert len(ready) == 1
assert ready[0].id == "technical_design"
assert "delegate" in obs.available_actions
def test_reset_medium_has_budget(self) -> None:
env, obs = _make_env("medium")
assert obs.budget_remaining == pytest.approx(35.0)
assert obs.budget_used == 0.0
def test_reset_hard_time_budget(self) -> None:
env, obs = _make_env("hard")
assert obs.time_remaining == 22
assert obs.capacity_limit == 3
def test_reset_includes_critical_path_length(self) -> None:
env, obs = _make_env("medium")
assert obs.critical_path_length is not None
assert obs.critical_path_length > 0
# ── Validation tests ──
class TestValidation:
def test_delegate_valid(self) -> None:
env, _ = _make_env("easy")
obs = _delegate(env, "technical_design", "tech_lead")
assert len(obs.errors) == 0
# With speed=1, tech_lead completes in the same step as delegation
completed = [s for s in obs.subtasks if s.status == "completed"]
assert any(s.id == "technical_design" for s in completed)
def test_delegate_wrong_agent(self) -> None:
env, _ = _make_env("easy")
obs = _delegate(env, "technical_design", "frontend_dev")
assert len(obs.errors) > 0
assert "lacks capability" in obs.errors[0]
assert obs.reward == 0.01 # negative reward clamped to (0, 1) exclusive for eval compliance
def test_delegate_pending_subtask(self) -> None:
env, _ = _make_env("easy")
obs = _delegate(env, "implement_backend", "backend_dev")
assert len(obs.errors) > 0
assert "not ready" in obs.errors[0]
def test_delegate_over_capacity(self) -> None:
"""Medium task has capacity=3. Fill with speed=2 agents."""
env, _ = _make_env("medium")
# ci_runner speed=1, completes checkout immediately
_delegate(env, "checkout_code", "ci_runner")
# After this step: checkout completed, lint/unit/security ready
# test_service(speed=2) and security_scanner(speed=2) won't finish in 1 tick
_delegate(env, "run_unit_tests", "test_service") # speed=2, still working
_delegate(env, "run_security_scan", "security_scanner") # speed=2, still working
# ci_runner completed checkout, became idle, got linter
_delegate(env, "run_linter", "ci_runner") # speed=1, completes immediately
# After linter step: test_service(1 step left), security_scanner(1 step left)
# active = 2 (not 3, because ci_runner completed)
# Try to verify capacity check by exceeding it β€” need 3 speed>1 agents
# Medium only has 2 speed>1 agents, so let's verify the logic differently
assert env._pool.get_active_count() <= 3
def test_retry_valid_after_failure(self) -> None:
"""Medium task: security_scanner fails first attempt, retry should work."""
env, _ = _make_env("medium")
_delegate(env, "checkout_code", "ci_runner")
_wait(env) # ci_runner completes (speed=1)
_delegate(env, "run_security_scan", "security_scanner")
_wait(env) # speed=2, tick 1
_wait(env) # tick 2 β€” security_scanner fails (reliability override [0.0, 1.0])
# Now security_scan should be failed
failed = [s for s in env.state.subtask_statuses if env.state.subtask_statuses[s] == "failed"]
assert "run_security_scan" in failed
# Retry should succeed
obs = _retry(env, "run_security_scan", "security_scanner")
assert len(obs.errors) == 0
def test_wait_always_valid(self) -> None:
env, _ = _make_env("easy")
obs = _wait(env)
assert obs.done is False
def test_synthesize_before_complete(self) -> None:
env, _ = _make_env("easy")
obs = _synthesize(env)
assert len(obs.errors) > 0
assert "not all subtasks completed" in obs.errors[0].lower()
def test_abort_completed_subtask(self) -> None:
env, _ = _make_env("easy")
_delegate(env, "technical_design", "tech_lead")
_wait(env) # tech_lead completes (speed=1)
obs = _abort(env, "technical_design")
assert len(obs.errors) > 0
assert "already completed" in obs.errors[0]
# ── Execution tests ──
class TestExecution:
def test_delegate_then_wait_completes(self) -> None:
"""Easy task: tech_lead has speed=1, so subtask completes after 1 tick."""
env, _ = _make_env("easy")
_delegate(env, "technical_design", "tech_lead")
obs = _wait(env) # tick processes, tech_lead finishes
completed = [s for s in obs.subtasks if s.status == "completed"]
assert any(s.id == "technical_design" for s in completed)
def test_time_advances(self) -> None:
env, obs = _make_env("easy")
assert obs.time_elapsed == 0
assert obs.time_remaining == 15
obs = _wait(env)
assert obs.time_elapsed == 1
assert obs.time_remaining == 14
def test_invalid_still_advances_time(self) -> None:
env, _ = _make_env("easy")
obs = _delegate(env, "implement_backend", "backend_dev") # not ready
assert obs.time_elapsed == 1
assert obs.time_remaining == 14
def test_episode_ends_at_time_zero(self) -> None:
env, _ = _make_env("easy")
for _ in range(15):
obs = _wait(env)
assert obs.done is True
assert obs.time_remaining == 0
def test_reward_breakdown_exposed_in_observation(self) -> None:
env, _ = _make_env("easy")
obs = _delegate(env, "technical_design", "tech_lead")
assert obs.reward_breakdown is not None
assert obs.reward_breakdown["delegation"] == pytest.approx(0.05)
assert obs.reward_breakdown["subtask_completed"] == pytest.approx(0.08)
def test_sla_hint_overrides_generic_ready_hint(self) -> None:
env, _ = _make_env("hard")
for _ in range(7):
obs = _wait(env)
assert obs.hint is not None
assert "SLA WARNING" in obs.hint
def test_parallelism_reward_only_on_new_parallel_window(self) -> None:
env, _ = _make_env("medium")
env._pool._agents["test_service"].speed = 3
env._pool._agents["security_scanner"].speed = 3
env._agent_speeds["test_service"] = 3
env._agent_speeds["security_scanner"] = 3
_delegate(env, "checkout_code", "ci_runner")
env._pool._agents["ci_runner"].status = "offline"
_delegate(env, "run_unit_tests", "test_service")
obs = _delegate(env, "run_security_scan", "security_scanner")
assert obs.reward_breakdown is not None
assert obs.reward_breakdown["parallelism"] == pytest.approx(0.10)
assert env._parallelism_events == 1
obs = _wait(env)
assert env._parallelism_events == 2
assert obs.reward_breakdown is not None
assert "parallelism" not in obs.reward_breakdown
assert obs.reward == pytest.approx(0.11, abs=0.01)
# ── Full walkthrough ──
class TestWalkthrough:
def test_easy_sequential_walkthrough(self) -> None:
"""Walk through entire easy task: delegate each, synthesize."""
env, _ = _make_env("easy")
# Step sequence: each agent has speed=1, so delegate + wait = complete
sequence = [
("technical_design", "tech_lead"),
("implement_backend", "backend_dev"),
("implement_frontend", "frontend_dev"),
("write_tests", "qa_engineer"),
("run_tests", "qa_engineer"),
("review_and_merge", "tech_lead"),
]
for subtask_id, agent_name in sequence:
_delegate(env, subtask_id, agent_name)
_wait(env) # Agent completes (speed=1)
# All done β€” synthesize
obs = _synthesize(env)
assert obs.done is True
assert len(obs.completed_outputs) == 6
def test_easy_walkthrough_positive_reward(self) -> None:
"""Easy walkthrough should yield positive total reward."""
env, _ = _make_env("easy")
total_reward = 0.0
sequence = [
("technical_design", "tech_lead"),
("implement_backend", "backend_dev"),
("implement_frontend", "frontend_dev"),
("write_tests", "qa_engineer"),
("run_tests", "qa_engineer"),
("review_and_merge", "tech_lead"),
]
for subtask_id, agent_name in sequence:
obs = _delegate(env, subtask_id, agent_name)
total_reward += obs.reward
obs = _wait(env)
total_reward += obs.reward
obs = _synthesize(env)
total_reward += obs.reward
assert total_reward > 0
def test_parallelism_detected(self) -> None:
"""Medium task: delegate speed=2 agents concurrently β†’ parallelism event."""
env, _ = _make_env("medium")
# checkout_code β†’ ci_runner (speed=1, completes immediately)
_delegate(env, "checkout_code", "ci_runner")
# Now lint, unit_tests, security_scan are ready
# test_service(speed=2) and security_scanner(speed=2) won't complete in 1 tick
_delegate(env, "run_unit_tests", "test_service")
# After this step: test_service still working (speed=2, 1 step remaining)
# Now delegate security_scan β€” both will be in_progress before tick
obs = _delegate(env, "run_security_scan", "security_scanner")
# Parallelism should be detected (both in_progress before tick)
assert env._parallelism_events >= 1
# ── State & log tests ──
class TestStateAndLog:
def test_state_returns_orchestrator_state(self) -> None:
env, _ = _make_env("easy")
state = env.state
assert isinstance(state, OrchestratorState)
assert state.task_id == "easy"
assert state.difficulty == "easy"
assert state.step_count == 0
def test_episode_log_stored_after_completion(self) -> None:
"""After episode ends, _episode_store should contain the log."""
env, _ = _make_env("easy")
# Just wait until time runs out
for _ in range(15):
_wait(env)
assert "easy" in _episode_store
log = _episode_store["easy"]
assert len(log.events) > 0
# Should have an episode_end event
end_events = [e for e in log.events if e.event_type == "episode_end"]
assert len(end_events) == 1
def test_state_tracks_failures(self) -> None:
"""Verify failure counters update correctly."""
env, _ = _make_env("easy")
_delegate(env, "technical_design", "tech_lead")
_wait(env) # completes
# Abort a subtask to increment failure
obs = _abort(env, "implement_backend")
state = env.state
# implement_backend was pending/ready, now failed after abort
assert state.subtask_statuses["implement_backend"] == "failed"
# ── Hard task walkthrough ──
class TestHardTaskWalkthrough:
"""Full walkthrough of the hard task (Production Incident Response).
The sequence was verified against the seeded RNG (seed=44) to produce
deterministic outcomes. Speed=1 agents complete in the same step as
delegation; speed=2 agents complete one step later.
"""
def _run_known_good_sequence(self):
"""Execute the known-good 14-step hard task walkthrough.
Returns (env, final_obs) so callers can make assertions.
"""
env, obs = _make_env("hard")
# S0: triage (speed=1, completes immediately)
_delegate(env, "alert_triage", "triage_analyst")
# S1: alpha on enrich_logs (speed=2, will fail permanently)
_delegate(env, "enrich_logs", "investigator_alpha")
# S2: monitor on dashboards (speed=1, completes; alpha fails)
_delegate(env, "check_dashboards", "monitor")
# S3: retry enrich_logs with beta (speed=2)
_retry(env, "enrich_logs", "investigator_beta")
# S4: alpha on check_deps (speed=2; beta completes = recovery)
_delegate(env, "check_dependencies", "investigator_alpha")
# S5: communicator on notify (speed=1; alpha completes check_deps)
_delegate(env, "notify_stakeholders", "communicator")
# S6: senior on root_cause (speed=1, completes; SLA met step 6 ≀ 10)
_delegate(env, "root_cause_analysis", "senior_engineer")
# S7: deployer on hotfix (speed=2)
_delegate(env, "deploy_hotfix", "deployer")
# S8: communicator on status_page (speed=1; deployer completes)
_delegate(env, "update_status_page", "communicator")
# S9: senior on validate_fix (speed=1, completes)
_delegate(env, "validate_fix", "senior_engineer")
# S10: monitor on monitor_recovery (speed=1, completes; all 10 done)
_delegate(env, "monitor_recovery", "monitor")
# S11-12: monitoring patience waits (2 consecutive waits)
_wait(env)
_wait(env) # deployer dropout fires at step 12 (idle, no impact)
# S13: synthesize
obs = _synthesize(env)
return env, obs
def test_all_subtasks_complete(self) -> None:
"""All 10 subtasks should be completed after the walkthrough."""
env, obs = self._run_known_good_sequence()
assert obs.done is True
assert len(obs.completed_outputs) == 10
assert env._dag.is_all_completed()
def test_episode_terminates_with_bonus(self) -> None:
"""Synthesize should trigger done + positive end bonus."""
env, obs = self._run_known_good_sequence()
assert obs.done is True
assert obs.reward > 0 # end bonus included
assert env._total_reward > 2.0 # verified: 2.2589
def test_time_and_budget(self) -> None:
"""Verify time remaining and budget used match expected values."""
env, obs = self._run_known_good_sequence()
assert obs.time_remaining == 8 # 22 - 14 steps
assert env._pool.get_budget_used() == pytest.approx(30.0, abs=0.1)
def test_permanent_failure_and_recovery(self) -> None:
"""Alpha fails permanently on enrich_logs, beta recovers."""
env, obs = self._run_known_good_sequence()
assert env._failures_occurred == 1 # alpha on enrich_logs
assert env._failures_recovered == 1 # beta succeeds on retry
def test_parallelism_detected(self) -> None:
"""Should detect parallelism when 2+ tasks run concurrently."""
env, obs = self._run_known_good_sequence()
assert env._parallelism_events >= 3 # verified: 4
def test_deployer_goes_offline(self) -> None:
"""Deployer should be offline after step 12 dropout event."""
env, obs = self._run_known_good_sequence()
assert env._pool.is_online("deployer") is False
def test_zero_capacity_violations(self) -> None:
"""No capacity violations in the known-good sequence."""
env, obs = self._run_known_good_sequence()
assert env._capacity_violations == 0
def test_grader_score_above_threshold(self) -> None:
"""Grader should score > 0.75 for the known-good walkthrough.
Score is ~0.78 because the walkthrough recovers enrich_logs but
completes deploy_hotfix before the deployer dropout β€” so only 1 of 2
designed failure recoveries is demonstrated.
"""
from server.graders import grade_hard
env, obs = self._run_known_good_sequence()
log = _episode_store["hard"]
result = grade_hard(log)
assert result.score > 0.75
assert result.score <= 1.0
def test_grader_all_dimensions_present(self) -> None:
"""All hard-task grader dimensions should be present and non-negative."""
from server.graders import grade_hard
env, obs = self._run_known_good_sequence()
log = _episode_store["hard"]
result = grade_hard(log)
expected_keys = [
"completion", "recovery", "error_classification",
"capacity_discipline", "parallelism", "cost_efficiency",
"conflict_resolution", "sla_compliance", "monitoring_patience",
"recovery_speed",
]
for key in expected_keys:
assert key in result.breakdown, f"Missing grader dimension: {key}"
assert result.breakdown[key] >= 0.0, f"{key} is negative"
def test_grader_perfect_dimensions(self) -> None:
"""Verify the known-good walkthrough scores perfectly on key dimensions."""
from server.graders import grade_hard
env, obs = self._run_known_good_sequence()
log = _episode_store["hard"]
result = grade_hard(log)
assert result.breakdown["completion"] == pytest.approx(0.20, abs=0.01)
# Only 1 of 2 recoveries: enrich_logs recovered, deploy_hotfix completed
# before deployer dropout β†’ 0.15 * (1/2) = 0.075
assert result.breakdown["recovery"] == pytest.approx(0.075, abs=0.01)
assert result.breakdown["error_classification"] == pytest.approx(0.10, abs=0.01)
assert result.breakdown["capacity_discipline"] == pytest.approx(0.10, abs=0.01)
assert result.breakdown["conflict_resolution"] == pytest.approx(0.10, abs=0.01)
assert result.breakdown["sla_compliance"] == pytest.approx(0.10, abs=0.01)
assert result.breakdown["monitoring_patience"] == pytest.approx(0.05, abs=0.01)
assert result.breakdown["recovery_speed"] == pytest.approx(0.02, abs=0.01)
class TestHardTaskEdgeCases:
"""Edge case tests for the hard task mechanics."""
def test_permanent_failure_retry_rejected(self) -> None:
"""Retrying alpha on enrich_logs after permanent failure should be rejected."""
env, _ = _make_env("hard")
# S0: triage
_delegate(env, "alert_triage", "triage_analyst")
# S1: alpha on enrich_logs (will fail permanently)
_delegate(env, "enrich_logs", "investigator_alpha")
# S2: need another action for alpha to finish (speed=2)
_wait(env)
# Now enrich_logs is failed (permanent)
failed = [s for s in env.state.subtask_statuses
if env.state.subtask_statuses[s] == "failed"]
assert "enrich_logs" in failed
# Retry with alpha should be rejected (permanent failure)
obs = _retry(env, "enrich_logs", "investigator_alpha")
assert len(obs.errors) == 1
assert "permanent" in obs.errors[0].lower()
assert obs.reward == 0.01 # negative penalty clamped to (0, 1) exclusive for eval compliance
def test_monitoring_patience_failure(self) -> None:
"""Synthesizing immediately after all complete loses patience score."""
from server.graders import grade_hard
env, _ = _make_env("hard")
# Run the known-good sequence up to all subtasks complete (step 10)
_delegate(env, "alert_triage", "triage_analyst")
_delegate(env, "enrich_logs", "investigator_alpha")
_delegate(env, "check_dashboards", "monitor")
_retry(env, "enrich_logs", "investigator_beta")
_delegate(env, "check_dependencies", "investigator_alpha")
_delegate(env, "notify_stakeholders", "communicator")
_delegate(env, "root_cause_analysis", "senior_engineer")
_delegate(env, "deploy_hotfix", "deployer")
_delegate(env, "update_status_page", "communicator")
_delegate(env, "validate_fix", "senior_engineer")
_delegate(env, "monitor_recovery", "monitor")
# Synthesize immediately β€” NO patience waits
obs = _synthesize(env)
assert obs.done is True
log = _episode_store["hard"]
result = grade_hard(log)
assert result.breakdown["monitoring_patience"] == 0.0
def test_bad_episode_near_zero_score(self) -> None:
"""Just waiting until timeout should score near 0."""
from server.graders import grade_hard
env, _ = _make_env("hard")
for _ in range(22):
_wait(env)
log = _episode_store["hard"]
result = grade_hard(log)
# With activity gate, doing nothing scores 0.0: "no harm" dimensions
# (error_classification, capacity_discipline, cost_efficiency) are
# gated by min(1.0, completed/5) which is 0.0 when nothing is completed.
assert result.score <= 0.05
assert result.breakdown["completion"] == 0.0
assert result.breakdown["recovery"] == 0.0
assert result.breakdown["sla_compliance"] == 0.0
# 20 keys: 10 dimensions + invalid_penalty + budget_overrun_penalty + 8 diagnostic metadata
assert len(result.breakdown) == 20
def test_sla_penalty_when_delayed(self) -> None:
"""Delaying root_cause past step 10 should incur SLA penalties."""
env, _ = _make_env("hard")
_delegate(env, "alert_triage", "triage_analyst")
# Wait until step 11+ without completing root_cause
for _ in range(11):
_wait(env)
# By now, step_count=12, root_cause not done β†’ SLA penalties accrued
state = env.state
assert state.subtask_statuses["root_cause_analysis"] != "completed"
# Total reward should be negative due to SLA + unnecessary_wait penalties
assert env._total_reward < 0
# ── Expert task walkthrough ──
class TestExpertTaskWalkthrough:
"""Full walkthrough of the expert task (Life OS Daily Orchestration).
The 18-step sequence was verified against the seeded RNG (seed=45) to
produce deterministic outcomes. It completes all 14 subtasks, achieves
full parallelism across all 3 fan-out points, meets all 3 SLA milestones,
avoids permanent failure traps, and routes around the personal_agent
dropout at step 10.
Grader score: 0.948 (8/10 dimensions at 100%).
"""
def _run_known_good_sequence(self):
"""Execute the known-good 18-step expert task walkthrough.
Returns (env, final_obs) so callers can make assertions.
Step-by-step plan:
- S0: morning_check_in -> personal_agent (speed=1, completes immediately)
- S1: assess_career_deadlines -> career_agent (speed=2, stays in_progress)
- S2: assess_sleep_energy -> health_agent (speed=1)
PARALLELISM: [assess_career_deadlines, assess_sleep_energy]
Both complete; career_agent finishes career_deadlines
- S3: assess_personal_commitments -> personal_agent (speed=1, completes)
All 3 assessments done -> plan_day_schedule ready
- S4: plan_day_schedule -> companion (speed=2, stays in_progress)
- S5: wait (companion completes plan_day_schedule at step 5; SLA met)
- S6: process_inbox -> executive_assistant (speed=2, stays in_progress)
- S7: start_focus_session -> focus_agent (speed=1)
PARALLELISM: [process_inbox, start_focus_session]
Both complete; career_agent degrades (speed 2->4) this step
- S8: deep_work_block -> companion (speed=2, stays in_progress)
- S9: handle_urgent_request -> executive_assistant (speed=2)
PARALLELISM: [deep_work_block, handle_urgent_request]
companion completes deep_work_block; exec_asst still working
- S10: midday_health_check -> health_agent (speed=1)
PARALLELISM: [handle_urgent_request, midday_health_check]
Both complete; personal_agent drops out (idle, no impact)
-> resolve_priority_conflict ready
- S11: resolve_priority_conflict -> companion (speed=2, stays in_progress)
- S12: wait (companion completes; SLA met at step 12 < 16)
- S13: afternoon_execution -> companion (speed=2, stays in_progress)
- S14: notify_stakeholders -> mail_agent (speed=1, FAILS: roll > reliability)
PARALLELISM: [afternoon_execution, notify_stakeholders]
companion completes afternoon_execution; mail_agent fails
- S15: retry notify_stakeholders -> mail_agent (attempt=1, succeeds)
- S16: synthesize_day_report -> mail_agent (speed=1, completes)
SLA met at step 16 < 23
- S17: synthesize (all 14 done, episode ends with bonus)
"""
env, obs = _make_env("expert")
# Phase 1: Morning check-in
_delegate(env, "morning_check_in", "personal_agent")
# Phase 2: Three assessments with parallelism
_delegate(env, "assess_career_deadlines", "career_agent")
_delegate(env, "assess_sleep_energy", "health_agent")
_delegate(env, "assess_personal_commitments", "personal_agent")
# Phase 3: Plan day schedule (companion only viable first-attempt agent)
_delegate(env, "plan_day_schedule", "companion")
_wait(env)
# Phase 4: Focus + Inbox with parallelism
_delegate(env, "process_inbox", "executive_assistant")
_delegate(env, "start_focus_session", "focus_agent")
# Phase 5: Deep work + Urgent request with parallelism
_delegate(env, "deep_work_block", "companion")
_delegate(env, "handle_urgent_request", "executive_assistant")
# Phase 6: Midday health check (avoid wellness_monitor!)
_delegate(env, "midday_health_check", "health_agent")
# Phase 7: Conflict resolution (only companion viable)
_delegate(env, "resolve_priority_conflict", "companion")
_wait(env)
# Phase 8: Afternoon + Notify with parallelism
_delegate(env, "afternoon_execution", "companion")
_delegate(env, "notify_stakeholders", "mail_agent") # fails attempt 0
# Phase 9: Retry notify, then final report
_retry(env, "notify_stakeholders", "mail_agent") # attempt 1 succeeds
_delegate(env, "synthesize_day_report", "mail_agent")
# Phase 10: Synthesize
obs = _synthesize(env)
return env, obs
def test_all_14_subtasks_complete(self) -> None:
"""All 14 subtasks should be completed after the walkthrough."""
env, obs = self._run_known_good_sequence()
assert obs.done is True
assert len(obs.completed_outputs) == 14
assert env._dag.is_all_completed()
def test_episode_terminates_with_positive_reward(self) -> None:
"""Synthesize should trigger done and large positive total reward."""
env, obs = self._run_known_good_sequence()
assert obs.done is True
assert obs.reward > 0 # end bonus included
assert env._total_reward > 2.5 # verified: ~2.988
def test_time_and_budget(self) -> None:
"""Verify time remaining and budget match expected values."""
env, obs = self._run_known_good_sequence()
assert obs.time_remaining == 7 # 25 - 18 steps
assert env._pool.get_budget_used() == pytest.approx(44.0, abs=0.1)
def test_failure_and_recovery(self) -> None:
"""mail_agent fails notify_stakeholders attempt 0, recovers on retry."""
env, obs = self._run_known_good_sequence()
assert env._failures_occurred == 1
assert env._failures_recovered == 1
def test_parallelism_detected(self) -> None:
"""Should detect parallelism across multiple fan-out points."""
env, obs = self._run_known_good_sequence()
# Morning assessments, focus+inbox, deep+urgent, urgent+health,
# afternoon+notify = 5+ parallelism events
assert env._parallelism_events >= 5
def test_personal_agent_offline_after_dropout(self) -> None:
"""personal_agent should be offline after step 10 dropout event."""
env, obs = self._run_known_good_sequence()
assert env._pool.is_online("personal_agent") is False
def test_career_agent_degraded(self) -> None:
"""career_agent speed should be 4 after degradation at step 7."""
env, obs = self._run_known_good_sequence()
agent_infos = env._pool.get_agent_infos()
career = [a for a in agent_infos if a.name == "career_agent"][0]
assert career.speed == 4
def test_zero_capacity_violations(self) -> None:
"""No capacity violations in the known-good sequence."""
env, obs = self._run_known_good_sequence()
assert env._capacity_violations == 0
def test_grader_score_above_threshold(self) -> None:
"""Grader should score >= 0.94 for the known-good walkthrough."""
from server.graders import grade_expert
env, obs = self._run_known_good_sequence()
log = _episode_store["expert"]
result = grade_expert(log)
assert result.score >= 0.94
assert result.score <= 1.0
def test_grader_all_dimensions_present(self) -> None:
"""All expert-task grader dimensions should be present and non-negative."""
from server.graders import grade_expert
env, obs = self._run_known_good_sequence()
log = _episode_store["expert"]
result = grade_expert(log)
expected_keys = [
"completion", "health_pillar", "career_pillar",
"conflict_resolution", "cost_efficiency", "parallelism",
"time_efficiency", "error_classification", "sla_compliance",
"communication", "recovery_speed",
]
for key in expected_keys:
assert key in result.breakdown, f"Missing grader dimension: {key}"
assert result.breakdown[key] >= 0.0, f"{key} is negative"
def test_grader_perfect_dimensions(self) -> None:
"""Verify the known-good walkthrough scores strongly across key dimensions."""
from server.graders import grade_expert
env, obs = self._run_known_good_sequence()
log = _episode_store["expert"]
result = grade_expert(log)
# 100% dimensions
assert result.breakdown["completion"] == pytest.approx(0.15, abs=0.001)
assert result.breakdown["health_pillar"] == pytest.approx(0.12, abs=0.001)
assert result.breakdown["career_pillar"] == pytest.approx(0.10, abs=0.001)
assert result.breakdown["conflict_resolution"] == pytest.approx(0.20, abs=0.001)
assert result.breakdown["parallelism"] == pytest.approx(0.10, abs=0.001)
assert result.breakdown["error_classification"] == pytest.approx(0.08, abs=0.001)
assert result.breakdown["sla_compliance"] == pytest.approx(0.08, abs=0.001)
assert result.breakdown["communication"] == pytest.approx(0.04, abs=0.001)
assert result.breakdown["recovery_speed"] == pytest.approx(0.02, abs=0.001)
# Partial dimensions (cost efficiency ~0.064, time efficiency ~0.008)
assert result.breakdown["cost_efficiency"] > 0.05
assert result.breakdown["time_efficiency"] > 0.005
def test_sla_milestones_all_met(self) -> None:
"""All 3 SLA milestones should be met."""
from server.graders import count_sla_milestones_met
env, obs = self._run_known_good_sequence()
log = _episode_store["expert"]
sla_milestones = {
"plan_day_schedule": 8,
"resolve_priority_conflict": 16,
"synthesize_day_report": 23,
}
met = count_sla_milestones_met(log, sla_milestones)
assert met == 3
def test_permanent_failure_traps_avoided(self) -> None:
"""Sequence never assigns wellness_monitor to health_alert or
executive_assistant to conflict_resolution."""
from server.graders import count_retries_on_permanent_failure
env, obs = self._run_known_good_sequence()
log = _episode_store["expert"]
assert count_retries_on_permanent_failure(log) == 0
def test_conflict_resolution_by_companion(self) -> None:
"""resolve_priority_conflict should be completed by companion."""
from server.graders import subtask_completed_by_agent
env, obs = self._run_known_good_sequence()
log = _episode_store["expert"]
assert subtask_completed_by_agent(log, "resolve_priority_conflict", "companion")
def test_midday_health_not_by_wellness_monitor(self) -> None:
"""midday_health_check should NOT be completed by wellness_monitor."""
from server.graders import subtask_not_completed_by_agent
env, obs = self._run_known_good_sequence()
log = _episode_store["expert"]
assert subtask_not_completed_by_agent(
log, "midday_health_check", ["wellness_monitor"]
)