Spaces:
Sleeping
Sleeping
File size: 21,145 Bytes
68d61d8 192db9d 68d61d8 a4c538a 68d61d8 192db9d 68d61d8 941d83d 748cef6 941d83d 68d61d8 94717ed 10c3c6e d741d4b 94717ed 68d61d8 192db9d 68d61d8 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 | """
validate_openenv.py
===================
Self-contained validator for openenv.yaml and content_moderation_env.py.
Checks all fields required by the OpenEnv spec and confirms the live
environment behaves correctly. Prints PASS/FAIL per check.
Run:
python3 validate_openenv.py
"""
import json
import sys
from pathlib import Path
import yaml # pip install pyyaml
SCRIPT_DIR = Path(__file__).parent
sys.path.insert(0, str(SCRIPT_DIR))
from content_moderation_env import ContentModerationEnv
YAML_PATH = SCRIPT_DIR / "openenv.yaml"
JSON_PATH = SCRIPT_DIR / "moderation_benchmark.json"
PASS = "β
PASS"
FAIL = "β FAIL"
WARN = "β οΈ WARN"
checks_passed = 0
checks_failed = 0
def check(name: str, condition: bool, detail: str = "", warn: bool = False) -> bool:
global checks_passed, checks_failed
status = PASS if condition else (WARN if warn else FAIL)
suffix = f" ({detail})" if detail else ""
print(f" {status} {name}{suffix}")
if condition:
checks_passed += 1
else:
checks_failed += 1
return condition
# ββ 1. YAML structure βββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
print("\nββ openenv.yaml structure ββββββββββββββββββββββββββββββββββββββββββ")
with open(YAML_PATH) as f:
spec = yaml.safe_load(f)
check("name field present", "name" in spec)
check("version field present", "version" in spec)
check("description field present","description" in spec)
check("tasks field present", "tasks" in spec)
check("observation_space present","observation_space" in spec)
check("action_space present", "action_space" in spec)
check("reward field present", "reward" in spec)
check("api field present", "api" in spec)
check("baseline field present", "baseline" in spec)
check("deployment field present", "deployment" in spec)
tasks = spec.get("tasks", [])
check("at least 3 tasks defined", len(tasks) >= 3, f"found {len(tasks)}")
task_names = [t.get("name") for t in tasks]
for name in ["Easy Content Moderation", "Medium Content Moderation", "Hard Content Moderation"]:
check(f"task '{name}' present", name in task_names)
difficulties = [t.get("difficulty") for t in tasks]
check("easy difficulty present", "easy" in difficulties)
check("medium difficulty present", "medium" in difficulties)
check("hard difficulty present", "hard" in difficulties)
reward = spec.get("reward", {})
check("reward range [0.0, 1.0]", reward.get("range") == [0.0, 1.0],
f"got {reward.get('range')}")
check("partial_progress = true", reward.get("partial_progress") is True)
api = spec.get("api", {})
check("reset() documented", "reset" in api)
check("step() documented", "step" in api)
check("state() documented", "state" in api)
# ββ 2. Dataset integrity ββββββββββββββββββββββββββββββββββββββββββββββββββββββ
print("\nββ moderation_benchmark.json integrity βββββββββββββββββββββββββββββ")
data = json.loads(JSON_PATH.read_text())
check("β₯ 60 scenarios", len(data) >= 60, f"found {len(data)}")
check("β₯ 75 scenarios", len(data) >= 75, f"found {len(data)}")
tiers = {"easy": 0, "medium": 0, "hard": 0}
ids_seen = set()
all_ok = True
for s in data:
if s["id"] in ids_seen:
all_ok = False
ids_seen.add(s["id"])
tiers[s.get("tier", "?")] = tiers.get(s.get("tier", "?"), 0) + 1
check("no duplicate IDs", all_ok)
check("easy tier count β₯ 20", tiers["easy"] >= 20, f"found {tiers['easy']}")
check("medium tier count β₯ 20", tiers["medium"] >= 20, f"found {tiers['medium']}")
check("hard tier count β₯ 20", tiers["hard"] >= 20, f"found {tiers['hard']}")
# Check all hard scenarios have severity in ground_truth
hard_with_sev = sum(1 for s in data if s["tier"]=="hard" and "severity" in s.get("ground_truth",{}))
hard_total = tiers["hard"]
check("hard scenarios have severity", hard_with_sev == hard_total,
f"{hard_with_sev}/{hard_total}")
# Easy-tier GT coverage: all labels + all actions must be represented,
# and the 4 previously missing combos must each have β₯ 2 examples.
from collections import Counter as _C
easy_s = [s for s in data if s["tier"] == "easy"]
e_labels = _C(s["ground_truth"]["label"] for s in easy_s)
e_actions = _C(s["ground_truth"]["action"] for s in easy_s)
e_combos = _C((s["ground_truth"]["label"], s["ground_truth"]["action"]) for s in easy_s)
for lbl in ["safe", "toxic", "spam", "misleading"]:
check(f"easy label '{lbl}' covered", e_labels[lbl] >= 2, f"count={e_labels[lbl]}")
for act in ["allow", "warn", "remove", "shadowban", "escalate"]:
check(f"easy action '{act}' covered", e_actions[act] >= 2, f"count={e_actions[act]}")
for lbl, act in [("misleading","shadowban"),("toxic","shadowban"),
("toxic","warn"),("misleading","escalate")]:
check(f"easy {lbl}/{act} β₯ 2 examples", e_combos[(lbl,act)] >= 2,
f"count={e_combos[(lbl,act)]}")
print("\nββ ContentModerationEnv live API βββββββββββββββββββββββββββββββββββ")
env = ContentModerationEnv(str(JSON_PATH), seed=42)
check("env loads all scenarios", env.num_scenarios == len(data),
f"{env.num_scenarios} loaded, {len(data)} in JSON")
check("scenario_ids property works", len(env.scenario_ids) == env.num_scenarios)
# Single-step mode
state = env.reset(scenario_id="scen_easy_2")
check("reset(scenario_id) returns dict", isinstance(state, dict))
check("state has 'text' field", "text" in state)
check("state has 'platform_policy' field", "platform_policy" in state)
check("state has 'previous_flags' field", "previous_flags" in state)
# Perfect action
result = env.step({"label": "toxic", "action": "remove"})
check("step() returns dict with 4 keys",
all(k in result for k in ["state","reward","done","info"]))
check("single-step done=True", result["done"] is True)
check("perfect reward = 1.0", result["reward"] == 1.0, f"got {result['reward']}")
# Queue mode
state = env.reset()
check("queue reset() returns state", isinstance(state, dict))
step_n = 0
rewards = []
while True:
r = env.step({"label": "safe", "action": "allow", "severity": 1})
rewards.append(r["reward"])
step_n += 1
if r["done"]:
break
check("queue mode runs 3 steps", step_n == 3, f"ran {step_n}")
check("episode_rewards accumulates", len(env.episode_rewards) == 3)
# Penalty: false positive escalation β submit WRONG label so no +0.5 offset
env.reset(scenario_id="scen_easy_1") # GT: safe/allow
result = env.step({"label": "toxic", "action": "escalate"}) # wrong label, FP penalty
check("false_positive_penalty fires", "false_positive_penalty" in result["info"]["score_breakdown"])
check("reward drops to zero from penalty", result["reward"] == 0.0, f"got {result['reward']}")
# Guard: step on done env
try:
env.step({"label": "safe", "action": "allow"})
check("step() on done env raises RuntimeError", False)
except RuntimeError:
check("step() on done env raises RuntimeError", True)
# state() method
env.reset(scenario_id="scen_hard_1")
s = env.state()
check("state() returns dict", isinstance(s, dict))
# ββ valid_actions: ambiguous scenario scoring βββββββββββββββββββββββββββββββββ
from content_moderation_env import _compute_reward as _cr
# Find a scenario with valid_actions: [remove, shadowban]
rs_scenario = next(
(sc for sc in env._scenarios.values()
if sc.get("ground_truth", {}).get("valid_actions") == ["remove", "shadowban"]
or sc.get("ground_truth", {}).get("valid_actions") == ["shadowban", "remove"]),
None
)
if rs_scenario:
gt_label = rs_scenario["ground_truth"]["label"]
gt_sev = rs_scenario["ground_truth"].get("severity", 3)
r_rem, _ = _cr({"label": gt_label, "action": "remove", "severity": gt_sev}, rs_scenario)
r_sha, _ = _cr({"label": gt_label, "action": "shadowban", "severity": gt_sev}, rs_scenario)
r_bad, _ = _cr({"label": gt_label, "action": "allow", "severity": gt_sev}, rs_scenario)
check("valid_actions: remove scores full credit", r_rem >= 0.8, f"got {r_rem:.2f}")
check("valid_actions: shadowban scores full credit", r_sha >= 0.8, f"got {r_sha:.2f}")
check("valid_actions: remove == shadowban reward", abs(r_rem - r_sha) < 0.01,
f"remove={r_rem:.2f} shadowban={r_sha:.2f}")
check("valid_actions: allow does NOT score full", r_bad < r_rem, f"allow={r_bad:.2f}")
else:
check("valid_actions: remove/shadowban scenario exists", False,
"none found β run _add_ambiguous_scenarios.py")
ambig_count = sum(1 for sc in env._scenarios.values() if "valid_actions" in sc.get("ground_truth", {}))
check("ambiguous scenarios (valid_actions) β₯ 10", ambig_count >= 10, f"found {ambig_count}")
# ββ Campaign mechanic ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
print("\nββ Campaign mechanic (cross-post coordination) ββββββββββββββββββββββ")
# Count campaigns
camp_map: dict = {}
for sc in env._scenarios.values():
cid = sc.get("campaign_id")
if cid:
camp_map.setdefault(cid, []).append(sc)
full_camps = {k: v for k, v in camp_map.items() if len(v) >= 2}
check("campaigns β₯ 3 defined", len(full_camps) >= 3, f"found {len(full_camps)}: {list(full_camps)[:3]}")
# Force a campaign episode using the first known full campaign
first_camp_id = sorted(full_camps.keys())[0]
first_camp_posts = sorted(full_camps[first_camp_id], key=lambda s: s.get("campaign_post_index", 99))
# Manually build env into campaign mode to test deterministically
camp_env = ContentModerationEnv(str(JSON_PATH), seed=99)
camp_env._queue = [__import__("copy").deepcopy(s) for s in first_camp_posts]
camp_env._active_campaign = first_camp_id
camp_env._current_scenario = camp_env._queue[0]
camp_env._done = False
camp_env._queue_index = 0
camp_env._episode_rewards = []
camp_env._episode_actions = []
# Check state has campaign fields
obs = camp_env._build_state_obs(camp_env._queue[0])
check("campaign_id present in state obs", obs.get("campaign_id") == first_camp_id)
check("campaign_post_index present in state obs", obs.get("campaign_post_index") == 1)
check("campaign_total_posts present in state obs",obs.get("campaign_total_posts") is not None)
# Run campaign episode: escalate all β bonus should fire
step_rewards = []
for i, post in enumerate(first_camp_posts):
camp_env._current_scenario = __import__("copy").deepcopy(post)
camp_env._done = False
# Get the GT label so we score it correctly (max reward) + escalate
gt_label = post["ground_truth"]["label"]
gt_sev = post["ground_truth"].get("severity", 3)
action = {"label": gt_label, "action": "escalate", "severity": gt_sev}
result = camp_env.step(action)
step_rewards.append(result["reward"])
bonus_fired = result["info"]["campaign_bonus_earned"]
bonus_val = result["info"]["campaign_bonus_value"]
check("campaign bonus fires when all escalated", bonus_fired is True)
check("campaign bonus value = 0.15", abs(bonus_val - 0.15) < 0.001, f"got {bonus_val}")
check("final step reward β₯ baseline (bonus added)", step_rewards[-1] > 0.8)
# Now test: NOT escalating all = NO bonus
camp_env2 = ContentModerationEnv(str(JSON_PATH), seed=99)
camp_env2._queue = [__import__("copy").deepcopy(s) for s in first_camp_posts]
camp_env2._active_campaign = first_camp_id
camp_env2._current_scenario = camp_env2._queue[0]
camp_env2._done = False
camp_env2._queue_index = 0
camp_env2._episode_rewards = []
camp_env2._episode_actions = []
for i, post in enumerate(first_camp_posts):
camp_env2._current_scenario = __import__("copy").deepcopy(post)
camp_env2._done = False
gt_label = post["ground_truth"]["label"]
# First post: warn (not escalate) β should break bonus
action = {"label": gt_label, "action": ("warn" if i == 0 else "escalate")}
r2 = camp_env2.step(action)
check("campaign bonus does NOT fire if any action β escalate",
r2["info"]["campaign_bonus_earned"] is False)
# Check episode_actions tracked correctly
check("episode_actions tracked in info", "episode_actions" in result["info"])
# ββ 4. Reward bounds βββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
# ββ is_adversarial in state obs βββββββββββββββββββββββββββββββββββββββββββββββ
adv_obs_env = ContentModerationEnv(str(JSON_PATH), seed=99)
obs_adv = adv_obs_env.reset(scenario_id="scen_adv_1")
obs_normal = adv_obs_env.reset(scenario_id="scen_easy_1")
check("is_adversarial=True in state obs on adv scenario",
obs_adv.get("is_adversarial") is True)
check("is_adversarial=False in state obs on normal scenario",
obs_normal.get("is_adversarial") is False)
# ββ reset(campaign_id=...) deterministic campaign mode βββββββββββββββββββββββββ
print("\nββ reset(campaign_id) deterministic mode ββββββββββββββββββββββββββββ")
camp_reset_env = ContentModerationEnv(str(JSON_PATH), seed=11)
obs_c = camp_reset_env.reset(campaign_id="camp_crypto_001")
check("reset(campaign_id) returns state", isinstance(obs_c, dict))
check("reset(campaign_id) sets active_campaign",
camp_reset_env._active_campaign == "camp_crypto_001")
check("reset(campaign_id) queues all 3 posts",
len(camp_reset_env._queue) == 3)
check("reset(campaign_id) orders by campaign_post_index",
[s.get("campaign_post_index") for s in camp_reset_env._queue] == [1, 2, 3])
try:
camp_reset_env.reset(campaign_id="nonexistent_xyz")
check("reset(bad campaign_id) raises ValueError", False)
except ValueError:
check("reset(bad campaign_id) raises ValueError", True)
try:
camp_reset_env.reset(scenario_id="scen_easy_1", campaign_id="camp_crypto_001")
check("reset(scenario_id+campaign_id) raises ValueError", False)
except ValueError:
check("reset(scenario_id+campaign_id) raises ValueError", True)
# ββ Appeal mechanic (adversarial scenarios) βββββββββββββββββββββββββββββββββββ
print("\nββ Appeal mechanic (adversarial scenarios) ββββββββββββββββββββββββββ")
adv_scenarios = [s for s in data if s.get("is_adversarial")]
check("adversarial scenarios β₯ 10 defined", len(adv_scenarios) >= 10, f"found {len(adv_scenarios)}")
uphold_count = sum(1 for s in adv_scenarios if s.get("appeal_verdict") == "uphold")
overturn_count = sum(1 for s in adv_scenarios if s.get("appeal_verdict") == "overturn")
check("uphold verdicts β₯ 4", uphold_count >= 4, f"found {uphold_count}")
check("overturn verdicts β₯ 2", overturn_count >= 2, f"found {overturn_count}")
# Pick first uphold and overturn scenario for live tests
uphold_sc = next(s for s in adv_scenarios if s["appeal_verdict"] == "uphold")
overturn_sc = next(s for s in adv_scenarios if s["appeal_verdict"] == "overturn")
adv_env = ContentModerationEnv(str(JSON_PATH), seed=77)
# -- Test 1: step() on adversarial scenario exposes appeal fields
adv_env.reset(scenario_id=uphold_sc["id"])
gt = uphold_sc["ground_truth"]
r = adv_env.step({"label": gt["label"], "action": gt["action"],
"severity": gt.get("severity", 3)})
check("is_adversarial present in info", r["info"]["is_adversarial"] is True)
check("appeal_available=True on adv", r["info"]["appeal_available"] is True)
check("appeal_text present", isinstance(r["info"]["appeal_text"], str)
and len(r["info"]["appeal_text"]) > 10)
# -- Test 2: correct uphold verdict β +0.10
ar = adv_env.appeal({"verdict": "uphold", "rationale": "appeal is manipulative"})
check("correct uphold verdict β +0.10", abs(ar["appeal_reward"] - 0.10) < 0.001,
f"got {ar['appeal_reward']}")
check("verdict_correct=True on uphold", ar["verdict_correct"] is True)
# -- Test 3: double-call guard
try:
adv_env.appeal({"verdict": "uphold"})
check("second appeal() raises RuntimeError", False)
except RuntimeError:
check("second appeal() raises RuntimeError", True)
# -- Test 4: wrong verdict β -0.10
adv_env.reset(scenario_id=uphold_sc["id"])
adv_env.step({"label": gt["label"], "action": gt["action"], "severity": gt.get("severity", 3)})
ar_wrong = adv_env.appeal({"verdict": "overturn"}) # wrong β should uphold
check("wrong verdict β -0.10", abs(ar_wrong["appeal_reward"] - (-0.10)) < 0.001,
f"got {ar_wrong['appeal_reward']}")
check("verdict_correct=False on wrong", ar_wrong["verdict_correct"] is False)
# -- Test 5: overturn case
adv_env.reset(scenario_id=overturn_sc["id"])
gt2 = overturn_sc["ground_truth"]
adv_env.step({"label": gt2["label"], "action": gt2["action"], "severity": gt2.get("severity", 3)})
ar_ov = adv_env.appeal({"verdict": "overturn"})
check("correct overturn verdict β +0.10", abs(ar_ov["appeal_reward"] - 0.10) < 0.001,
f"got {ar_ov['appeal_reward']}")
# -- Test 6: appeal() on non-adversarial scenario raises
adv_env.reset(scenario_id="scen_easy_1")
adv_env.step({"label": "safe", "action": "allow"})
try:
adv_env.appeal({"verdict": "uphold"})
check("appeal() on non-adversarial raises RuntimeError", False)
except RuntimeError:
check("appeal() on non-adversarial raises RuntimeError", True)
# Build a synthetic toxic scenario at each severity level to verify penalties
def _make_toxic(severity: int) -> dict:
return {
"id": f"_synth_sev{severity}", "tier": "hard",
"state": {"text": "x", "audio_transcript": None, "visual_tags": [],
"previous_flags": 0, "platform_policy": "strict"},
"ground_truth": {"label": "toxic", "action": "escalate", "severity": severity},
"score_rubric": {"label_correct": 0.4, "action_correct": 0.4, "severity_within_1": 0.2},
}
from content_moderation_env import _compute_reward as _cr2
# allow on sev-5 β -0.30
r5, b5 = _cr2({"label": "toxic", "action": "allow", "severity": 5}, _make_toxic(5))
check("sev-5 miss penalty = -0.30",
b5.get("missed_threat_penalty") == -0.30, f"got {b5.get('missed_threat_penalty')}")
# allow on sev-4 β -0.15
r4, b4 = _cr2({"label": "toxic", "action": "allow", "severity": 4}, _make_toxic(4))
check("sev-4 miss penalty = -0.15",
b4.get("missed_threat_penalty") == -0.15, f"got {b4.get('missed_threat_penalty')}")
# allow on sev-3 β -0.05
r3, b3 = _cr2({"label": "toxic", "action": "allow", "severity": 3}, _make_toxic(3))
check("sev-3 miss penalty = -0.05",
b3.get("missed_threat_penalty") == -0.05, f"got {b3.get('missed_threat_penalty')}")
# allow on sev-2 β no penalty
r2, b2 = _cr2({"label": "toxic", "action": "allow", "severity": 2}, _make_toxic(2))
check("sev-2 miss β no penalty",
"missed_threat_penalty" not in b2, f"got {b2.get('missed_threat_penalty')}")
# old key name gone
check("old 'missed_critical_threat_penalty' key replaced",
"missed_critical_threat_penalty" not in b5)
print("\nββ Reward range validation βββββββββββββββββββββββββββββββββββββββββ")
from content_moderation_env import _compute_reward
import json as _json
all_rewards = []
for scenario in data:
# Worst possible action
r, _ = _compute_reward({"label": "safe", "action": "escalate", "severity": 1}, scenario)
all_rewards.append(r)
# Best possible action
gt = scenario["ground_truth"]
r2, _ = _compute_reward({
"label": gt["label"],
"action": gt["action"],
"severity": gt.get("severity", 3),
}, scenario)
all_rewards.append(r2)
check("min reward β₯ 0.0", min(all_rewards) >= 0.0, f"min={min(all_rewards):.3f}")
check("max reward β€ 1.0", max(all_rewards) <= 1.0, f"max={max(all_rewards):.3f}")
# ββ Summary βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
total = checks_passed + checks_failed
print(f"\n{'β'*62}")
print(f" RESULT: {checks_passed}/{total} checks passed")
if checks_failed == 0:
print(" β
ALL CHECKS PASSED β openenv.yaml is valid")
else:
print(f" β {checks_failed} check(s) FAILED β fix before submission")
print(f"{'β'*62}\n")
sys.exit(0 if checks_failed == 0 else 1)
|