Spaces:
Running
Running
File size: 6,925 Bytes
63a6397 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 | from Cyber_analyst.models import CyberAnalystAction
from Cyber_analyst.server.Cyber_analyst_environment import CyberAnalystEnvironment
from Cyber_analyst.server.graders import (
grade_authz_boundary_hard,
grade_missing_security_headers_medium,
grade_secret_exposure_easy,
safe_reward,
)
def _run_success_path(task_id, actions):
env = CyberAnalystEnvironment()
obs = env.reset(task_id=task_id, seed=7)
assert obs.task_id == task_id
for action in actions:
obs = env.step(action)
assert obs.done is True
assert obs.tool_result["score"] > 0.5
assert 0.01 <= obs.tool_result["score"] <= 0.99
assert obs.error == ""
return obs
def test_secret_exposure_success_path():
report = {
"findings": [
{
"finding_type": "secret_exposure",
"evidence_ids": ["EVID-101"],
"impact": "A synthetic API key secret is exposed in config.",
"remediation": "Remove the key and rotate the credential.",
}
]
}
obs = _run_success_path(
"secret_exposure_easy",
[
CyberAnalystAction(tool_name="search_repo", args={"query": "api key"}),
CyberAnalystAction(
tool_name="create_finding",
args={
"finding_type": "secret_exposure",
"evidence_ids": ["EVID-101"],
"severity_guess": "high",
"remediation": "Remove and rotate the synthetic credential.",
},
),
CyberAnalystAction(tool_name="validate_finding", args={"finding_id": "FND-001"}),
CyberAnalystAction(tool_name="submit_report", args={"report_json": report}),
],
)
assert obs.verified_findings[0]["matching_gt_id"] == "GT-SECRET-001"
assert "trajectory_jsonl" in obs.tool_result
assert "search_repo" in obs.tool_result["trajectory_jsonl"]
def test_missing_security_headers_success_path():
report = {
"findings": [
{
"finding_type": "missing_security_headers",
"evidence_ids": ["EVID-201"],
"impact": "The gateway is missing HSTS and CSP headers.",
"remediation": "Add HSTS and CSP at the gateway.",
}
]
}
obs = _run_success_path(
"missing_security_headers_medium",
[
CyberAnalystAction(
tool_name="check_security_headers", args={"service_id": "gateway"}
),
CyberAnalystAction(
tool_name="create_finding",
args={
"finding_type": "missing_security_headers",
"evidence_ids": ["EVID-201"],
"severity_guess": "medium",
"remediation": "Add HSTS and CSP headers.",
},
),
CyberAnalystAction(tool_name="validate_finding", args={"finding_id": "FND-001"}),
CyberAnalystAction(tool_name="submit_report", args={"report_json": report}),
],
)
assert obs.score_breakdown["valid_evidence"] == 0.15
def test_authz_boundary_success_path_with_alias_compatible_service_ids():
report = {
"findings": [
{
"finding_type": "authz_boundary_misconfiguration",
"evidence_ids": ["EVID-301", "EVID-302"],
"impact": "The admin route authorization policy allows an analyst role.",
"remediation": "Apply least privilege in the policy and add a regression test.",
}
]
}
obs = _run_success_path(
"authz_boundary_hard",
[
CyberAnalystAction(tool_name="list_assets", args={}),
CyberAnalystAction(
tool_name="get_log_events",
args={"service_id": "admin-service", "query": "admin export"},
),
CyberAnalystAction(tool_name="search_repo", args={"query": "admin export"}),
CyberAnalystAction(
tool_name="create_finding",
args={
"finding_type": "authz_boundary_misconfiguration",
"evidence_ids": ["EVID-301", "EVID-302"],
"severity_guess": "critical",
"remediation": "Apply least privilege and add a regression test.",
},
),
CyberAnalystAction(tool_name="validate_finding", args={"finding_id": "FND-001"}),
CyberAnalystAction(tool_name="submit_report", args={"report_json": report}),
],
)
assert obs.score_breakdown["actionable_remediation"] == 0.15
def test_invalid_tool_returns_observation_error():
env = CyberAnalystEnvironment()
env.reset(task_id="secret_exposure_easy", seed=1)
obs = env.step(CyberAnalystAction(tool_name="shell", args={"cmd": "whoami"}))
assert obs.done is False
assert obs.error == "unsupported_tool"
assert obs.tool_result["ok"] is False
def test_hallucinated_report_scores_low_but_in_range():
env = CyberAnalystEnvironment()
env.reset(task_id="secret_exposure_easy", seed=1)
obs = env.step(
CyberAnalystAction(
tool_name="submit_report",
args={
"report_json": {
"findings": [
{
"finding_type": "remote_code_execution",
"evidence_ids": [],
"impact": "Unsupported claim.",
"remediation": "Unsupported remediation.",
}
]
}
},
)
)
assert obs.done is True
assert obs.tool_result["score"] == 0.01
def test_repeated_action_hard_stops_episode():
env = CyberAnalystEnvironment()
env.reset(task_id="secret_exposure_easy", seed=1)
obs = None
for _ in range(6):
obs = env.step(CyberAnalystAction(tool_name="list_assets", args={}))
assert obs is not None
assert obs.done is True
assert obs.error == "repeat_hard_stop"
def test_seed_determinism_for_assets():
env_one = CyberAnalystEnvironment()
env_two = CyberAnalystEnvironment()
env_one.reset(task_id="authz_boundary_hard", seed=22)
env_two.reset(task_id="authz_boundary_hard", seed=22)
obs_one = env_one.step(CyberAnalystAction(tool_name="list_assets", args={}))
obs_two = env_two.step(CyberAnalystAction(tool_name="list_assets", args={}))
assert obs_one.tool_result == obs_two.tool_result
def test_grader_adapters_and_clamp_are_strictly_in_range():
assert safe_reward(-1) == 0.01
assert safe_reward(2) == 0.99
assert 0.01 <= grade_secret_exposure_easy() <= 0.99
assert 0.01 <= grade_missing_security_headers_medium() <= 0.99
assert 0.01 <= grade_authz_boundary_hard() <= 0.99
|