content-moderation-env / validate_openenv.py
Soham Banerjee
Finalise OpenEnv submission: Clamp rewards to 0.0-1.0, update Gradio UI, and add Groq pipelines
192db9d
"""
validate_openenv.py
===================
Self-contained validator for openenv.yaml and content_moderation_env.py.
Checks all fields required by the OpenEnv spec and confirms the live
environment behaves correctly. Prints PASS/FAIL per check.
Run:
python3 validate_openenv.py
"""
import json
import sys
from pathlib import Path
import yaml # pip install pyyaml
SCRIPT_DIR = Path(__file__).parent
sys.path.insert(0, str(SCRIPT_DIR))
from content_moderation_env import ContentModerationEnv
YAML_PATH = SCRIPT_DIR / "openenv.yaml"
JSON_PATH = SCRIPT_DIR / "moderation_benchmark.json"
PASS = "βœ… PASS"
FAIL = "❌ FAIL"
WARN = "⚠️ WARN"
checks_passed = 0
checks_failed = 0
def check(name: str, condition: bool, detail: str = "", warn: bool = False) -> bool:
global checks_passed, checks_failed
status = PASS if condition else (WARN if warn else FAIL)
suffix = f" ({detail})" if detail else ""
print(f" {status} {name}{suffix}")
if condition:
checks_passed += 1
else:
checks_failed += 1
return condition
# ── 1. YAML structure ─────────────────────────────────────────────────────────
print("\n── openenv.yaml structure ──────────────────────────────────────────")
with open(YAML_PATH) as f:
spec = yaml.safe_load(f)
check("name field present", "name" in spec)
check("version field present", "version" in spec)
check("description field present","description" in spec)
check("tasks field present", "tasks" in spec)
check("observation_space present","observation_space" in spec)
check("action_space present", "action_space" in spec)
check("reward field present", "reward" in spec)
check("api field present", "api" in spec)
check("baseline field present", "baseline" in spec)
check("deployment field present", "deployment" in spec)
tasks = spec.get("tasks", [])
check("at least 3 tasks defined", len(tasks) >= 3, f"found {len(tasks)}")
task_names = [t.get("name") for t in tasks]
for name in ["Easy Content Moderation", "Medium Content Moderation", "Hard Content Moderation"]:
check(f"task '{name}' present", name in task_names)
difficulties = [t.get("difficulty") for t in tasks]
check("easy difficulty present", "easy" in difficulties)
check("medium difficulty present", "medium" in difficulties)
check("hard difficulty present", "hard" in difficulties)
reward = spec.get("reward", {})
check("reward range [0.0, 1.0]", reward.get("range") == [0.0, 1.0],
f"got {reward.get('range')}")
check("partial_progress = true", reward.get("partial_progress") is True)
api = spec.get("api", {})
check("reset() documented", "reset" in api)
check("step() documented", "step" in api)
check("state() documented", "state" in api)
# ── 2. Dataset integrity ──────────────────────────────────────────────────────
print("\n── moderation_benchmark.json integrity ─────────────────────────────")
data = json.loads(JSON_PATH.read_text())
check("β‰₯ 60 scenarios", len(data) >= 60, f"found {len(data)}")
check("β‰₯ 75 scenarios", len(data) >= 75, f"found {len(data)}")
tiers = {"easy": 0, "medium": 0, "hard": 0}
ids_seen = set()
all_ok = True
for s in data:
if s["id"] in ids_seen:
all_ok = False
ids_seen.add(s["id"])
tiers[s.get("tier", "?")] = tiers.get(s.get("tier", "?"), 0) + 1
check("no duplicate IDs", all_ok)
check("easy tier count β‰₯ 20", tiers["easy"] >= 20, f"found {tiers['easy']}")
check("medium tier count β‰₯ 20", tiers["medium"] >= 20, f"found {tiers['medium']}")
check("hard tier count β‰₯ 20", tiers["hard"] >= 20, f"found {tiers['hard']}")
# Check all hard scenarios have severity in ground_truth
hard_with_sev = sum(1 for s in data if s["tier"]=="hard" and "severity" in s.get("ground_truth",{}))
hard_total = tiers["hard"]
check("hard scenarios have severity", hard_with_sev == hard_total,
f"{hard_with_sev}/{hard_total}")
# Easy-tier GT coverage: all labels + all actions must be represented,
# and the 4 previously missing combos must each have β‰₯ 2 examples.
from collections import Counter as _C
easy_s = [s for s in data if s["tier"] == "easy"]
e_labels = _C(s["ground_truth"]["label"] for s in easy_s)
e_actions = _C(s["ground_truth"]["action"] for s in easy_s)
e_combos = _C((s["ground_truth"]["label"], s["ground_truth"]["action"]) for s in easy_s)
for lbl in ["safe", "toxic", "spam", "misleading"]:
check(f"easy label '{lbl}' covered", e_labels[lbl] >= 2, f"count={e_labels[lbl]}")
for act in ["allow", "warn", "remove", "shadowban", "escalate"]:
check(f"easy action '{act}' covered", e_actions[act] >= 2, f"count={e_actions[act]}")
for lbl, act in [("misleading","shadowban"),("toxic","shadowban"),
("toxic","warn"),("misleading","escalate")]:
check(f"easy {lbl}/{act} β‰₯ 2 examples", e_combos[(lbl,act)] >= 2,
f"count={e_combos[(lbl,act)]}")
print("\n── ContentModerationEnv live API ───────────────────────────────────")
env = ContentModerationEnv(str(JSON_PATH), seed=42)
check("env loads all scenarios", env.num_scenarios == len(data),
f"{env.num_scenarios} loaded, {len(data)} in JSON")
check("scenario_ids property works", len(env.scenario_ids) == env.num_scenarios)
# Single-step mode
state = env.reset(scenario_id="scen_easy_2")
check("reset(scenario_id) returns dict", isinstance(state, dict))
check("state has 'text' field", "text" in state)
check("state has 'platform_policy' field", "platform_policy" in state)
check("state has 'previous_flags' field", "previous_flags" in state)
# Perfect action
result = env.step({"label": "toxic", "action": "remove"})
check("step() returns dict with 4 keys",
all(k in result for k in ["state","reward","done","info"]))
check("single-step done=True", result["done"] is True)
check("perfect reward = 1.0", result["reward"] == 1.0, f"got {result['reward']}")
# Queue mode
state = env.reset()
check("queue reset() returns state", isinstance(state, dict))
step_n = 0
rewards = []
while True:
r = env.step({"label": "safe", "action": "allow", "severity": 1})
rewards.append(r["reward"])
step_n += 1
if r["done"]:
break
check("queue mode runs 3 steps", step_n == 3, f"ran {step_n}")
check("episode_rewards accumulates", len(env.episode_rewards) == 3)
# Penalty: false positive escalation β€” submit WRONG label so no +0.5 offset
env.reset(scenario_id="scen_easy_1") # GT: safe/allow
result = env.step({"label": "toxic", "action": "escalate"}) # wrong label, FP penalty
check("false_positive_penalty fires", "false_positive_penalty" in result["info"]["score_breakdown"])
check("reward drops to zero from penalty", result["reward"] == 0.0, f"got {result['reward']}")
# Guard: step on done env
try:
env.step({"label": "safe", "action": "allow"})
check("step() on done env raises RuntimeError", False)
except RuntimeError:
check("step() on done env raises RuntimeError", True)
# state() method
env.reset(scenario_id="scen_hard_1")
s = env.state()
check("state() returns dict", isinstance(s, dict))
# ── valid_actions: ambiguous scenario scoring ─────────────────────────────────
from content_moderation_env import _compute_reward as _cr
# Find a scenario with valid_actions: [remove, shadowban]
rs_scenario = next(
(sc for sc in env._scenarios.values()
if sc.get("ground_truth", {}).get("valid_actions") == ["remove", "shadowban"]
or sc.get("ground_truth", {}).get("valid_actions") == ["shadowban", "remove"]),
None
)
if rs_scenario:
gt_label = rs_scenario["ground_truth"]["label"]
gt_sev = rs_scenario["ground_truth"].get("severity", 3)
r_rem, _ = _cr({"label": gt_label, "action": "remove", "severity": gt_sev}, rs_scenario)
r_sha, _ = _cr({"label": gt_label, "action": "shadowban", "severity": gt_sev}, rs_scenario)
r_bad, _ = _cr({"label": gt_label, "action": "allow", "severity": gt_sev}, rs_scenario)
check("valid_actions: remove scores full credit", r_rem >= 0.8, f"got {r_rem:.2f}")
check("valid_actions: shadowban scores full credit", r_sha >= 0.8, f"got {r_sha:.2f}")
check("valid_actions: remove == shadowban reward", abs(r_rem - r_sha) < 0.01,
f"remove={r_rem:.2f} shadowban={r_sha:.2f}")
check("valid_actions: allow does NOT score full", r_bad < r_rem, f"allow={r_bad:.2f}")
else:
check("valid_actions: remove/shadowban scenario exists", False,
"none found β€” run _add_ambiguous_scenarios.py")
ambig_count = sum(1 for sc in env._scenarios.values() if "valid_actions" in sc.get("ground_truth", {}))
check("ambiguous scenarios (valid_actions) β‰₯ 10", ambig_count >= 10, f"found {ambig_count}")
# ── Campaign mechanic ──────────────────────────────────────────────────────────
print("\n── Campaign mechanic (cross-post coordination) ──────────────────────")
# Count campaigns
camp_map: dict = {}
for sc in env._scenarios.values():
cid = sc.get("campaign_id")
if cid:
camp_map.setdefault(cid, []).append(sc)
full_camps = {k: v for k, v in camp_map.items() if len(v) >= 2}
check("campaigns β‰₯ 3 defined", len(full_camps) >= 3, f"found {len(full_camps)}: {list(full_camps)[:3]}")
# Force a campaign episode using the first known full campaign
first_camp_id = sorted(full_camps.keys())[0]
first_camp_posts = sorted(full_camps[first_camp_id], key=lambda s: s.get("campaign_post_index", 99))
# Manually build env into campaign mode to test deterministically
camp_env = ContentModerationEnv(str(JSON_PATH), seed=99)
camp_env._queue = [__import__("copy").deepcopy(s) for s in first_camp_posts]
camp_env._active_campaign = first_camp_id
camp_env._current_scenario = camp_env._queue[0]
camp_env._done = False
camp_env._queue_index = 0
camp_env._episode_rewards = []
camp_env._episode_actions = []
# Check state has campaign fields
obs = camp_env._build_state_obs(camp_env._queue[0])
check("campaign_id present in state obs", obs.get("campaign_id") == first_camp_id)
check("campaign_post_index present in state obs", obs.get("campaign_post_index") == 1)
check("campaign_total_posts present in state obs",obs.get("campaign_total_posts") is not None)
# Run campaign episode: escalate all β†’ bonus should fire
step_rewards = []
for i, post in enumerate(first_camp_posts):
camp_env._current_scenario = __import__("copy").deepcopy(post)
camp_env._done = False
# Get the GT label so we score it correctly (max reward) + escalate
gt_label = post["ground_truth"]["label"]
gt_sev = post["ground_truth"].get("severity", 3)
action = {"label": gt_label, "action": "escalate", "severity": gt_sev}
result = camp_env.step(action)
step_rewards.append(result["reward"])
bonus_fired = result["info"]["campaign_bonus_earned"]
bonus_val = result["info"]["campaign_bonus_value"]
check("campaign bonus fires when all escalated", bonus_fired is True)
check("campaign bonus value = 0.15", abs(bonus_val - 0.15) < 0.001, f"got {bonus_val}")
check("final step reward β‰₯ baseline (bonus added)", step_rewards[-1] > 0.8)
# Now test: NOT escalating all = NO bonus
camp_env2 = ContentModerationEnv(str(JSON_PATH), seed=99)
camp_env2._queue = [__import__("copy").deepcopy(s) for s in first_camp_posts]
camp_env2._active_campaign = first_camp_id
camp_env2._current_scenario = camp_env2._queue[0]
camp_env2._done = False
camp_env2._queue_index = 0
camp_env2._episode_rewards = []
camp_env2._episode_actions = []
for i, post in enumerate(first_camp_posts):
camp_env2._current_scenario = __import__("copy").deepcopy(post)
camp_env2._done = False
gt_label = post["ground_truth"]["label"]
# First post: warn (not escalate) β€” should break bonus
action = {"label": gt_label, "action": ("warn" if i == 0 else "escalate")}
r2 = camp_env2.step(action)
check("campaign bonus does NOT fire if any action β‰  escalate",
r2["info"]["campaign_bonus_earned"] is False)
# Check episode_actions tracked correctly
check("episode_actions tracked in info", "episode_actions" in result["info"])
# ── 4. Reward bounds ─────────────────────────────────────────────────────────
# ── is_adversarial in state obs ───────────────────────────────────────────────
adv_obs_env = ContentModerationEnv(str(JSON_PATH), seed=99)
obs_adv = adv_obs_env.reset(scenario_id="scen_adv_1")
obs_normal = adv_obs_env.reset(scenario_id="scen_easy_1")
check("is_adversarial=True in state obs on adv scenario",
obs_adv.get("is_adversarial") is True)
check("is_adversarial=False in state obs on normal scenario",
obs_normal.get("is_adversarial") is False)
# ── reset(campaign_id=...) deterministic campaign mode ─────────────────────────
print("\n── reset(campaign_id) deterministic mode ────────────────────────────")
camp_reset_env = ContentModerationEnv(str(JSON_PATH), seed=11)
obs_c = camp_reset_env.reset(campaign_id="camp_crypto_001")
check("reset(campaign_id) returns state", isinstance(obs_c, dict))
check("reset(campaign_id) sets active_campaign",
camp_reset_env._active_campaign == "camp_crypto_001")
check("reset(campaign_id) queues all 3 posts",
len(camp_reset_env._queue) == 3)
check("reset(campaign_id) orders by campaign_post_index",
[s.get("campaign_post_index") for s in camp_reset_env._queue] == [1, 2, 3])
try:
camp_reset_env.reset(campaign_id="nonexistent_xyz")
check("reset(bad campaign_id) raises ValueError", False)
except ValueError:
check("reset(bad campaign_id) raises ValueError", True)
try:
camp_reset_env.reset(scenario_id="scen_easy_1", campaign_id="camp_crypto_001")
check("reset(scenario_id+campaign_id) raises ValueError", False)
except ValueError:
check("reset(scenario_id+campaign_id) raises ValueError", True)
# ── Appeal mechanic (adversarial scenarios) ───────────────────────────────────
print("\n── Appeal mechanic (adversarial scenarios) ──────────────────────────")
adv_scenarios = [s for s in data if s.get("is_adversarial")]
check("adversarial scenarios β‰₯ 10 defined", len(adv_scenarios) >= 10, f"found {len(adv_scenarios)}")
uphold_count = sum(1 for s in adv_scenarios if s.get("appeal_verdict") == "uphold")
overturn_count = sum(1 for s in adv_scenarios if s.get("appeal_verdict") == "overturn")
check("uphold verdicts β‰₯ 4", uphold_count >= 4, f"found {uphold_count}")
check("overturn verdicts β‰₯ 2", overturn_count >= 2, f"found {overturn_count}")
# Pick first uphold and overturn scenario for live tests
uphold_sc = next(s for s in adv_scenarios if s["appeal_verdict"] == "uphold")
overturn_sc = next(s for s in adv_scenarios if s["appeal_verdict"] == "overturn")
adv_env = ContentModerationEnv(str(JSON_PATH), seed=77)
# -- Test 1: step() on adversarial scenario exposes appeal fields
adv_env.reset(scenario_id=uphold_sc["id"])
gt = uphold_sc["ground_truth"]
r = adv_env.step({"label": gt["label"], "action": gt["action"],
"severity": gt.get("severity", 3)})
check("is_adversarial present in info", r["info"]["is_adversarial"] is True)
check("appeal_available=True on adv", r["info"]["appeal_available"] is True)
check("appeal_text present", isinstance(r["info"]["appeal_text"], str)
and len(r["info"]["appeal_text"]) > 10)
# -- Test 2: correct uphold verdict β†’ +0.10
ar = adv_env.appeal({"verdict": "uphold", "rationale": "appeal is manipulative"})
check("correct uphold verdict β†’ +0.10", abs(ar["appeal_reward"] - 0.10) < 0.001,
f"got {ar['appeal_reward']}")
check("verdict_correct=True on uphold", ar["verdict_correct"] is True)
# -- Test 3: double-call guard
try:
adv_env.appeal({"verdict": "uphold"})
check("second appeal() raises RuntimeError", False)
except RuntimeError:
check("second appeal() raises RuntimeError", True)
# -- Test 4: wrong verdict β†’ -0.10
adv_env.reset(scenario_id=uphold_sc["id"])
adv_env.step({"label": gt["label"], "action": gt["action"], "severity": gt.get("severity", 3)})
ar_wrong = adv_env.appeal({"verdict": "overturn"}) # wrong β€” should uphold
check("wrong verdict β†’ -0.10", abs(ar_wrong["appeal_reward"] - (-0.10)) < 0.001,
f"got {ar_wrong['appeal_reward']}")
check("verdict_correct=False on wrong", ar_wrong["verdict_correct"] is False)
# -- Test 5: overturn case
adv_env.reset(scenario_id=overturn_sc["id"])
gt2 = overturn_sc["ground_truth"]
adv_env.step({"label": gt2["label"], "action": gt2["action"], "severity": gt2.get("severity", 3)})
ar_ov = adv_env.appeal({"verdict": "overturn"})
check("correct overturn verdict β†’ +0.10", abs(ar_ov["appeal_reward"] - 0.10) < 0.001,
f"got {ar_ov['appeal_reward']}")
# -- Test 6: appeal() on non-adversarial scenario raises
adv_env.reset(scenario_id="scen_easy_1")
adv_env.step({"label": "safe", "action": "allow"})
try:
adv_env.appeal({"verdict": "uphold"})
check("appeal() on non-adversarial raises RuntimeError", False)
except RuntimeError:
check("appeal() on non-adversarial raises RuntimeError", True)
# Build a synthetic toxic scenario at each severity level to verify penalties
def _make_toxic(severity: int) -> dict:
return {
"id": f"_synth_sev{severity}", "tier": "hard",
"state": {"text": "x", "audio_transcript": None, "visual_tags": [],
"previous_flags": 0, "platform_policy": "strict"},
"ground_truth": {"label": "toxic", "action": "escalate", "severity": severity},
"score_rubric": {"label_correct": 0.4, "action_correct": 0.4, "severity_within_1": 0.2},
}
from content_moderation_env import _compute_reward as _cr2
# allow on sev-5 β†’ -0.30
r5, b5 = _cr2({"label": "toxic", "action": "allow", "severity": 5}, _make_toxic(5))
check("sev-5 miss penalty = -0.30",
b5.get("missed_threat_penalty") == -0.30, f"got {b5.get('missed_threat_penalty')}")
# allow on sev-4 β†’ -0.15
r4, b4 = _cr2({"label": "toxic", "action": "allow", "severity": 4}, _make_toxic(4))
check("sev-4 miss penalty = -0.15",
b4.get("missed_threat_penalty") == -0.15, f"got {b4.get('missed_threat_penalty')}")
# allow on sev-3 β†’ -0.05
r3, b3 = _cr2({"label": "toxic", "action": "allow", "severity": 3}, _make_toxic(3))
check("sev-3 miss penalty = -0.05",
b3.get("missed_threat_penalty") == -0.05, f"got {b3.get('missed_threat_penalty')}")
# allow on sev-2 β†’ no penalty
r2, b2 = _cr2({"label": "toxic", "action": "allow", "severity": 2}, _make_toxic(2))
check("sev-2 miss β†’ no penalty",
"missed_threat_penalty" not in b2, f"got {b2.get('missed_threat_penalty')}")
# old key name gone
check("old 'missed_critical_threat_penalty' key replaced",
"missed_critical_threat_penalty" not in b5)
print("\n── Reward range validation ─────────────────────────────────────────")
from content_moderation_env import _compute_reward
import json as _json
all_rewards = []
for scenario in data:
# Worst possible action
r, _ = _compute_reward({"label": "safe", "action": "escalate", "severity": 1}, scenario)
all_rewards.append(r)
# Best possible action
gt = scenario["ground_truth"]
r2, _ = _compute_reward({
"label": gt["label"],
"action": gt["action"],
"severity": gt.get("severity", 3),
}, scenario)
all_rewards.append(r2)
check("min reward β‰₯ 0.0", min(all_rewards) >= 0.0, f"min={min(all_rewards):.3f}")
check("max reward ≀ 1.0", max(all_rewards) <= 1.0, f"max={max(all_rewards):.3f}")
# ── Summary ───────────────────────────────────────────────────────────────────
total = checks_passed + checks_failed
print(f"\n{'═'*62}")
print(f" RESULT: {checks_passed}/{total} checks passed")
if checks_failed == 0:
print(" βœ… ALL CHECKS PASSED β€” openenv.yaml is valid")
else:
print(f" ❌ {checks_failed} check(s) FAILED β€” fix before submission")
print(f"{'═'*62}\n")
sys.exit(0 if checks_failed == 0 else 1)