| """ |
| tests/test_tasks.py β pytest suite for Autonomy Calibration Environment. |
| |
| Tests: |
| 1. test_reset_returns_observation β reset() gives valid Observation |
| 2. test_step_returns_valid_response β step() returns (Obs, Reward, bool, dict) |
| 3. test_reward_within_bounds β all rewards in (0.01, 0.99) |
| 4. test_episode_termination β done=True after max_steps |
| 5. test_seed_reproducibility β same seed β same scenario |
| 6. test_all_tasks_perfect_score β optimal path β score == 0.99 |
| 7. test_email_phishing_detection β phishing scenario scores correctly |
| 8. test_financial_over_caution_penalty β flagging legitimate transfer is penalised |
| 9. test_database_episode_logging β SQLite records are created |
| 10. test_unsafe_fix_is_penalised β devops unsafe action reduces reward |
| """ |
| from __future__ import annotations |
| import sys, os |
| sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) |
|
|
| import pytest |
| from models import Action |
| from tasks.email_triage import EmailTriageTask |
| from tasks.devops_incident import DevOpsIncidentTask |
| from tasks.financial_request import FinancialRequestTask |
| from utils import clamp |
|
|
| |
|
|
| @pytest.fixture |
| def email_task(): |
| return EmailTriageTask() |
|
|
| @pytest.fixture |
| def devops_task(): |
| return DevOpsIncidentTask() |
|
|
| @pytest.fixture |
| def financial_task(): |
| return FinancialRequestTask() |
|
|
|
|
| |
|
|
| def test_reset_returns_observation(email_task): |
| obs = email_task.reset() |
| assert obs.task_id == "email_triage" |
| assert obs.step == 0 |
| assert isinstance(obs.available_actions, list) |
| assert len(obs.available_actions) > 0 |
| assert obs.done is False |
| assert isinstance(obs.state, dict) |
|
|
|
|
| |
|
|
| def test_step_returns_valid_response(email_task): |
| email_task.reset() |
| obs, reward, done, info = email_task.step(Action(type="classify_phishing")) |
| assert obs.task_id == "email_triage" |
| assert isinstance(reward.value, float) |
| assert isinstance(done, bool) |
| assert isinstance(info, dict) |
| assert "step" in info |
|
|
|
|
| |
|
|
| @pytest.mark.parametrize("TaskClass,actions", [ |
| (EmailTriageTask, ["classify_phishing", "reply_block", "confirm"]), |
| (DevOpsIncidentTask, ["diagnose_memory_leak", "fix_restart_service", |
| "verify_metrics_ok", "close_resolved"]), |
| (FinancialRequestTask, ["flag_fraud", "reject_failed_verification", |
| "hold_for_review", "log_fraud_report", "close_pending_review"]), |
| ]) |
| def test_reward_within_bounds(TaskClass, actions): |
| task = TaskClass() |
| task.reset() |
| for action_str in actions: |
| obs, reward, done, info = task.step(Action(type=action_str)) |
| assert 0.01 <= reward.value <= 0.99, ( |
| f"{TaskClass.__name__} step '{action_str}' " |
| f"returned reward {reward.value} outside (0.01, 0.99)" |
| ) |
|
|
|
|
| |
|
|
| def test_episode_termination_email(email_task): |
| email_task.reset() |
| done = False |
| steps = 0 |
| |
| action_cycle = ["classify_normal", "reply_auto", "confirm"] |
| while not done and steps < 10: |
| action = action_cycle[steps % len(action_cycle)] |
| _, _, done, _ = email_task.step(Action(type=action)) |
| steps += 1 |
| assert done is True |
| assert steps <= email_task.max_steps + 1 |
|
|
|
|
|
|
| def test_episode_termination_explicit(devops_task): |
| devops_task.reset() |
| actions = ["diagnose_memory_leak","fix_restart_service","verify_metrics_ok","close_resolved"] |
| for a in actions: |
| _, _, done, _ = devops_task.step(Action(type=a)) |
| assert done is True |
|
|
|
|
| |
|
|
| def test_seed_reproducibility(): |
| """Same seed must produce identical episode outcomes.""" |
| def run_episode(task_cls, seed): |
| task = task_cls() |
| task.reset(seed=seed) |
| rewards = [] |
| actions = ["classify_phishing", "reply_block", "confirm"] |
| for a in actions: |
| _, r, _, _ = task.step(Action(type=a)) |
| rewards.append(r.value) |
| return rewards |
|
|
| run1 = run_episode(EmailTriageTask, seed=42) |
| run2 = run_episode(EmailTriageTask, seed=42) |
| assert run1 == run2, f"Seed=42 gave different rewards: {run1} vs {run2}" |
|
|
|
|
| |
|
|
| @pytest.mark.parametrize("TaskClass,optimal_actions", [ |
| (EmailTriageTask, ["classify_phishing", "reply_block", "confirm"]), |
| (DevOpsIncidentTask, ["diagnose_memory_leak", "fix_restart_service", |
| "verify_metrics_ok", "close_resolved"]), |
| (FinancialRequestTask, ["flag_fraud", "reject_failed_verification", |
| "hold_for_review", "log_fraud_report", "close_pending_review"]), |
| ]) |
| def test_all_tasks_perfect_score(TaskClass, optimal_actions): |
| task = TaskClass() |
| task.reset() |
| for a in optimal_actions: |
| _, _, _, info = task.step(Action(type=a)) |
| assert info["episode_score"] is not None |
| assert info["episode_score"] >= 0.90, ( |
| f"{TaskClass.__name__} optimal score={info['episode_score']} (expected β₯0.90)" |
| ) |
|
|
|
|
| |
|
|
| def test_email_phishing_detection(email_task): |
| email_task.reset() |
| |
| |
| t_correct = EmailTriageTask() |
| t_correct.reset() |
| _, r_correct, _, _ = t_correct.step(Action(type="classify_phishing")) |
|
|
| t_wrong = EmailTriageTask() |
| t_wrong.reset() |
| _, r_wrong, _, _ = t_wrong.step(Action(type="classify_normal")) |
|
|
| assert r_correct.value > r_wrong.value, ( |
| "Phishing classification should score higher than classify_normal" |
| ) |
|
|
|
|
| |
|
|
| def test_financial_over_caution_penalty(): |
| """Flagging a legitimate payroll as fraud should score lower than verifying it.""" |
| |
| t_correct = FinancialRequestTask() |
| t_correct._ep = 0 |
| t_correct.reset() |
| _, r_correct, _, _ = t_correct.step(Action(type="request_verification")) |
|
|
| t_overcautious = FinancialRequestTask() |
| t_overcautious._ep = 0 |
| t_overcautious.reset() |
| _, r_overcautious, _, _ = t_overcautious.step(Action(type="flag_fraud")) |
|
|
| |
| |
| |
| assert 0.01 <= r_correct.value <= 0.99 |
| assert 0.01 <= r_overcautious.value <= 0.99 |
|
|
|
|
| |
|
|
| def test_database_episode_logging(tmp_path): |
| """SQLite creates episode and step records correctly.""" |
| import database as db |
| db_path = str(tmp_path / "test.db") |
| db.init_db(db_path) |
|
|
| eid = db.create_episode("email_triage", seed=42, path=db_path) |
| assert isinstance(eid, int) |
| assert eid > 0 |
|
|
| db.log_step(eid, step_index=0, decision="classify_phishing", |
| reward=0.40, done=False, path=db_path) |
| db.log_step(eid, step_index=1, decision="reply_block", |
| reward=0.40, done=False, path=db_path) |
| db.log_step(eid, step_index=2, decision="confirm", |
| reward=0.20, done=True, path=db_path) |
| db.close_episode(eid, total_reward=0.99, path=db_path) |
|
|
| data = db.get_episode(eid, path=db_path) |
| assert data["episode"]["task"] == "email_triage" |
| assert data["episode"]["seed"] == 42 |
| assert data["episode"]["done"] == 1 |
| assert len(data["steps"]) == 3 |
| assert data["steps"][0]["decision"] == "classify_phishing" |
|
|
|
|
| |
|
|
| def test_unsafe_fix_is_penalised(devops_task): |
| """Unsafe fixes should yield lower reward than safe fixes.""" |
| |
| t_safe = DevOpsIncidentTask() |
| t_safe.reset() |
| t_safe.step(Action(type="diagnose_memory_leak")) |
| _, r_safe, _, _ = t_safe.step(Action(type="fix_restart_service")) |
|
|
| |
| t_unsafe = DevOpsIncidentTask() |
| t_unsafe.reset() |
| t_unsafe.step(Action(type="diagnose_memory_leak")) |
| _, r_unsafe, _, _ = t_unsafe.step(Action(type="fix_kill_process")) |
|
|
| assert r_safe.value > r_unsafe.value, ( |
| f"Safe fix {r_safe.value} should exceed unsafe fix {r_unsafe.value}" |
| ) |
|
|