Spaces:
Sleeping
Sleeping
Soham Banerjee
Finalise OpenEnv submission: Clamp rewards to 0.0-1.0, update Gradio UI, and add Groq pipelines
192db9d | """ | |
| validate_openenv.py | |
| =================== | |
| Self-contained validator for openenv.yaml and content_moderation_env.py. | |
| Checks all fields required by the OpenEnv spec and confirms the live | |
| environment behaves correctly. Prints PASS/FAIL per check. | |
| Run: | |
| python3 validate_openenv.py | |
| """ | |
| import json | |
| import sys | |
| from pathlib import Path | |
| import yaml # pip install pyyaml | |
| SCRIPT_DIR = Path(__file__).parent | |
| sys.path.insert(0, str(SCRIPT_DIR)) | |
| from content_moderation_env import ContentModerationEnv | |
| YAML_PATH = SCRIPT_DIR / "openenv.yaml" | |
| JSON_PATH = SCRIPT_DIR / "moderation_benchmark.json" | |
| PASS = "β PASS" | |
| FAIL = "β FAIL" | |
| WARN = "β οΈ WARN" | |
| checks_passed = 0 | |
| checks_failed = 0 | |
| def check(name: str, condition: bool, detail: str = "", warn: bool = False) -> bool: | |
| global checks_passed, checks_failed | |
| status = PASS if condition else (WARN if warn else FAIL) | |
| suffix = f" ({detail})" if detail else "" | |
| print(f" {status} {name}{suffix}") | |
| if condition: | |
| checks_passed += 1 | |
| else: | |
| checks_failed += 1 | |
| return condition | |
| # ββ 1. YAML structure βββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| print("\nββ openenv.yaml structure ββββββββββββββββββββββββββββββββββββββββββ") | |
| with open(YAML_PATH) as f: | |
| spec = yaml.safe_load(f) | |
| check("name field present", "name" in spec) | |
| check("version field present", "version" in spec) | |
| check("description field present","description" in spec) | |
| check("tasks field present", "tasks" in spec) | |
| check("observation_space present","observation_space" in spec) | |
| check("action_space present", "action_space" in spec) | |
| check("reward field present", "reward" in spec) | |
| check("api field present", "api" in spec) | |
| check("baseline field present", "baseline" in spec) | |
| check("deployment field present", "deployment" in spec) | |
| tasks = spec.get("tasks", []) | |
| check("at least 3 tasks defined", len(tasks) >= 3, f"found {len(tasks)}") | |
| task_names = [t.get("name") for t in tasks] | |
| for name in ["Easy Content Moderation", "Medium Content Moderation", "Hard Content Moderation"]: | |
| check(f"task '{name}' present", name in task_names) | |
| difficulties = [t.get("difficulty") for t in tasks] | |
| check("easy difficulty present", "easy" in difficulties) | |
| check("medium difficulty present", "medium" in difficulties) | |
| check("hard difficulty present", "hard" in difficulties) | |
| reward = spec.get("reward", {}) | |
| check("reward range [0.0, 1.0]", reward.get("range") == [0.0, 1.0], | |
| f"got {reward.get('range')}") | |
| check("partial_progress = true", reward.get("partial_progress") is True) | |
| api = spec.get("api", {}) | |
| check("reset() documented", "reset" in api) | |
| check("step() documented", "step" in api) | |
| check("state() documented", "state" in api) | |
| # ββ 2. Dataset integrity ββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| print("\nββ moderation_benchmark.json integrity βββββββββββββββββββββββββββββ") | |
| data = json.loads(JSON_PATH.read_text()) | |
| check("β₯ 60 scenarios", len(data) >= 60, f"found {len(data)}") | |
| check("β₯ 75 scenarios", len(data) >= 75, f"found {len(data)}") | |
| tiers = {"easy": 0, "medium": 0, "hard": 0} | |
| ids_seen = set() | |
| all_ok = True | |
| for s in data: | |
| if s["id"] in ids_seen: | |
| all_ok = False | |
| ids_seen.add(s["id"]) | |
| tiers[s.get("tier", "?")] = tiers.get(s.get("tier", "?"), 0) + 1 | |
| check("no duplicate IDs", all_ok) | |
| check("easy tier count β₯ 20", tiers["easy"] >= 20, f"found {tiers['easy']}") | |
| check("medium tier count β₯ 20", tiers["medium"] >= 20, f"found {tiers['medium']}") | |
| check("hard tier count β₯ 20", tiers["hard"] >= 20, f"found {tiers['hard']}") | |
| # Check all hard scenarios have severity in ground_truth | |
| hard_with_sev = sum(1 for s in data if s["tier"]=="hard" and "severity" in s.get("ground_truth",{})) | |
| hard_total = tiers["hard"] | |
| check("hard scenarios have severity", hard_with_sev == hard_total, | |
| f"{hard_with_sev}/{hard_total}") | |
| # Easy-tier GT coverage: all labels + all actions must be represented, | |
| # and the 4 previously missing combos must each have β₯ 2 examples. | |
| from collections import Counter as _C | |
| easy_s = [s for s in data if s["tier"] == "easy"] | |
| e_labels = _C(s["ground_truth"]["label"] for s in easy_s) | |
| e_actions = _C(s["ground_truth"]["action"] for s in easy_s) | |
| e_combos = _C((s["ground_truth"]["label"], s["ground_truth"]["action"]) for s in easy_s) | |
| for lbl in ["safe", "toxic", "spam", "misleading"]: | |
| check(f"easy label '{lbl}' covered", e_labels[lbl] >= 2, f"count={e_labels[lbl]}") | |
| for act in ["allow", "warn", "remove", "shadowban", "escalate"]: | |
| check(f"easy action '{act}' covered", e_actions[act] >= 2, f"count={e_actions[act]}") | |
| for lbl, act in [("misleading","shadowban"),("toxic","shadowban"), | |
| ("toxic","warn"),("misleading","escalate")]: | |
| check(f"easy {lbl}/{act} β₯ 2 examples", e_combos[(lbl,act)] >= 2, | |
| f"count={e_combos[(lbl,act)]}") | |
| print("\nββ ContentModerationEnv live API βββββββββββββββββββββββββββββββββββ") | |
| env = ContentModerationEnv(str(JSON_PATH), seed=42) | |
| check("env loads all scenarios", env.num_scenarios == len(data), | |
| f"{env.num_scenarios} loaded, {len(data)} in JSON") | |
| check("scenario_ids property works", len(env.scenario_ids) == env.num_scenarios) | |
| # Single-step mode | |
| state = env.reset(scenario_id="scen_easy_2") | |
| check("reset(scenario_id) returns dict", isinstance(state, dict)) | |
| check("state has 'text' field", "text" in state) | |
| check("state has 'platform_policy' field", "platform_policy" in state) | |
| check("state has 'previous_flags' field", "previous_flags" in state) | |
| # Perfect action | |
| result = env.step({"label": "toxic", "action": "remove"}) | |
| check("step() returns dict with 4 keys", | |
| all(k in result for k in ["state","reward","done","info"])) | |
| check("single-step done=True", result["done"] is True) | |
| check("perfect reward = 1.0", result["reward"] == 1.0, f"got {result['reward']}") | |
| # Queue mode | |
| state = env.reset() | |
| check("queue reset() returns state", isinstance(state, dict)) | |
| step_n = 0 | |
| rewards = [] | |
| while True: | |
| r = env.step({"label": "safe", "action": "allow", "severity": 1}) | |
| rewards.append(r["reward"]) | |
| step_n += 1 | |
| if r["done"]: | |
| break | |
| check("queue mode runs 3 steps", step_n == 3, f"ran {step_n}") | |
| check("episode_rewards accumulates", len(env.episode_rewards) == 3) | |
| # Penalty: false positive escalation β submit WRONG label so no +0.5 offset | |
| env.reset(scenario_id="scen_easy_1") # GT: safe/allow | |
| result = env.step({"label": "toxic", "action": "escalate"}) # wrong label, FP penalty | |
| check("false_positive_penalty fires", "false_positive_penalty" in result["info"]["score_breakdown"]) | |
| check("reward drops to zero from penalty", result["reward"] == 0.0, f"got {result['reward']}") | |
| # Guard: step on done env | |
| try: | |
| env.step({"label": "safe", "action": "allow"}) | |
| check("step() on done env raises RuntimeError", False) | |
| except RuntimeError: | |
| check("step() on done env raises RuntimeError", True) | |
| # state() method | |
| env.reset(scenario_id="scen_hard_1") | |
| s = env.state() | |
| check("state() returns dict", isinstance(s, dict)) | |
| # ββ valid_actions: ambiguous scenario scoring βββββββββββββββββββββββββββββββββ | |
| from content_moderation_env import _compute_reward as _cr | |
| # Find a scenario with valid_actions: [remove, shadowban] | |
| rs_scenario = next( | |
| (sc for sc in env._scenarios.values() | |
| if sc.get("ground_truth", {}).get("valid_actions") == ["remove", "shadowban"] | |
| or sc.get("ground_truth", {}).get("valid_actions") == ["shadowban", "remove"]), | |
| None | |
| ) | |
| if rs_scenario: | |
| gt_label = rs_scenario["ground_truth"]["label"] | |
| gt_sev = rs_scenario["ground_truth"].get("severity", 3) | |
| r_rem, _ = _cr({"label": gt_label, "action": "remove", "severity": gt_sev}, rs_scenario) | |
| r_sha, _ = _cr({"label": gt_label, "action": "shadowban", "severity": gt_sev}, rs_scenario) | |
| r_bad, _ = _cr({"label": gt_label, "action": "allow", "severity": gt_sev}, rs_scenario) | |
| check("valid_actions: remove scores full credit", r_rem >= 0.8, f"got {r_rem:.2f}") | |
| check("valid_actions: shadowban scores full credit", r_sha >= 0.8, f"got {r_sha:.2f}") | |
| check("valid_actions: remove == shadowban reward", abs(r_rem - r_sha) < 0.01, | |
| f"remove={r_rem:.2f} shadowban={r_sha:.2f}") | |
| check("valid_actions: allow does NOT score full", r_bad < r_rem, f"allow={r_bad:.2f}") | |
| else: | |
| check("valid_actions: remove/shadowban scenario exists", False, | |
| "none found β run _add_ambiguous_scenarios.py") | |
| ambig_count = sum(1 for sc in env._scenarios.values() if "valid_actions" in sc.get("ground_truth", {})) | |
| check("ambiguous scenarios (valid_actions) β₯ 10", ambig_count >= 10, f"found {ambig_count}") | |
| # ββ Campaign mechanic ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| print("\nββ Campaign mechanic (cross-post coordination) ββββββββββββββββββββββ") | |
| # Count campaigns | |
| camp_map: dict = {} | |
| for sc in env._scenarios.values(): | |
| cid = sc.get("campaign_id") | |
| if cid: | |
| camp_map.setdefault(cid, []).append(sc) | |
| full_camps = {k: v for k, v in camp_map.items() if len(v) >= 2} | |
| check("campaigns β₯ 3 defined", len(full_camps) >= 3, f"found {len(full_camps)}: {list(full_camps)[:3]}") | |
| # Force a campaign episode using the first known full campaign | |
| first_camp_id = sorted(full_camps.keys())[0] | |
| first_camp_posts = sorted(full_camps[first_camp_id], key=lambda s: s.get("campaign_post_index", 99)) | |
| # Manually build env into campaign mode to test deterministically | |
| camp_env = ContentModerationEnv(str(JSON_PATH), seed=99) | |
| camp_env._queue = [__import__("copy").deepcopy(s) for s in first_camp_posts] | |
| camp_env._active_campaign = first_camp_id | |
| camp_env._current_scenario = camp_env._queue[0] | |
| camp_env._done = False | |
| camp_env._queue_index = 0 | |
| camp_env._episode_rewards = [] | |
| camp_env._episode_actions = [] | |
| # Check state has campaign fields | |
| obs = camp_env._build_state_obs(camp_env._queue[0]) | |
| check("campaign_id present in state obs", obs.get("campaign_id") == first_camp_id) | |
| check("campaign_post_index present in state obs", obs.get("campaign_post_index") == 1) | |
| check("campaign_total_posts present in state obs",obs.get("campaign_total_posts") is not None) | |
| # Run campaign episode: escalate all β bonus should fire | |
| step_rewards = [] | |
| for i, post in enumerate(first_camp_posts): | |
| camp_env._current_scenario = __import__("copy").deepcopy(post) | |
| camp_env._done = False | |
| # Get the GT label so we score it correctly (max reward) + escalate | |
| gt_label = post["ground_truth"]["label"] | |
| gt_sev = post["ground_truth"].get("severity", 3) | |
| action = {"label": gt_label, "action": "escalate", "severity": gt_sev} | |
| result = camp_env.step(action) | |
| step_rewards.append(result["reward"]) | |
| bonus_fired = result["info"]["campaign_bonus_earned"] | |
| bonus_val = result["info"]["campaign_bonus_value"] | |
| check("campaign bonus fires when all escalated", bonus_fired is True) | |
| check("campaign bonus value = 0.15", abs(bonus_val - 0.15) < 0.001, f"got {bonus_val}") | |
| check("final step reward β₯ baseline (bonus added)", step_rewards[-1] > 0.8) | |
| # Now test: NOT escalating all = NO bonus | |
| camp_env2 = ContentModerationEnv(str(JSON_PATH), seed=99) | |
| camp_env2._queue = [__import__("copy").deepcopy(s) for s in first_camp_posts] | |
| camp_env2._active_campaign = first_camp_id | |
| camp_env2._current_scenario = camp_env2._queue[0] | |
| camp_env2._done = False | |
| camp_env2._queue_index = 0 | |
| camp_env2._episode_rewards = [] | |
| camp_env2._episode_actions = [] | |
| for i, post in enumerate(first_camp_posts): | |
| camp_env2._current_scenario = __import__("copy").deepcopy(post) | |
| camp_env2._done = False | |
| gt_label = post["ground_truth"]["label"] | |
| # First post: warn (not escalate) β should break bonus | |
| action = {"label": gt_label, "action": ("warn" if i == 0 else "escalate")} | |
| r2 = camp_env2.step(action) | |
| check("campaign bonus does NOT fire if any action β escalate", | |
| r2["info"]["campaign_bonus_earned"] is False) | |
| # Check episode_actions tracked correctly | |
| check("episode_actions tracked in info", "episode_actions" in result["info"]) | |
| # ββ 4. Reward bounds βββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # ββ is_adversarial in state obs βββββββββββββββββββββββββββββββββββββββββββββββ | |
| adv_obs_env = ContentModerationEnv(str(JSON_PATH), seed=99) | |
| obs_adv = adv_obs_env.reset(scenario_id="scen_adv_1") | |
| obs_normal = adv_obs_env.reset(scenario_id="scen_easy_1") | |
| check("is_adversarial=True in state obs on adv scenario", | |
| obs_adv.get("is_adversarial") is True) | |
| check("is_adversarial=False in state obs on normal scenario", | |
| obs_normal.get("is_adversarial") is False) | |
| # ββ reset(campaign_id=...) deterministic campaign mode βββββββββββββββββββββββββ | |
| print("\nββ reset(campaign_id) deterministic mode ββββββββββββββββββββββββββββ") | |
| camp_reset_env = ContentModerationEnv(str(JSON_PATH), seed=11) | |
| obs_c = camp_reset_env.reset(campaign_id="camp_crypto_001") | |
| check("reset(campaign_id) returns state", isinstance(obs_c, dict)) | |
| check("reset(campaign_id) sets active_campaign", | |
| camp_reset_env._active_campaign == "camp_crypto_001") | |
| check("reset(campaign_id) queues all 3 posts", | |
| len(camp_reset_env._queue) == 3) | |
| check("reset(campaign_id) orders by campaign_post_index", | |
| [s.get("campaign_post_index") for s in camp_reset_env._queue] == [1, 2, 3]) | |
| try: | |
| camp_reset_env.reset(campaign_id="nonexistent_xyz") | |
| check("reset(bad campaign_id) raises ValueError", False) | |
| except ValueError: | |
| check("reset(bad campaign_id) raises ValueError", True) | |
| try: | |
| camp_reset_env.reset(scenario_id="scen_easy_1", campaign_id="camp_crypto_001") | |
| check("reset(scenario_id+campaign_id) raises ValueError", False) | |
| except ValueError: | |
| check("reset(scenario_id+campaign_id) raises ValueError", True) | |
| # ββ Appeal mechanic (adversarial scenarios) βββββββββββββββββββββββββββββββββββ | |
| print("\nββ Appeal mechanic (adversarial scenarios) ββββββββββββββββββββββββββ") | |
| adv_scenarios = [s for s in data if s.get("is_adversarial")] | |
| check("adversarial scenarios β₯ 10 defined", len(adv_scenarios) >= 10, f"found {len(adv_scenarios)}") | |
| uphold_count = sum(1 for s in adv_scenarios if s.get("appeal_verdict") == "uphold") | |
| overturn_count = sum(1 for s in adv_scenarios if s.get("appeal_verdict") == "overturn") | |
| check("uphold verdicts β₯ 4", uphold_count >= 4, f"found {uphold_count}") | |
| check("overturn verdicts β₯ 2", overturn_count >= 2, f"found {overturn_count}") | |
| # Pick first uphold and overturn scenario for live tests | |
| uphold_sc = next(s for s in adv_scenarios if s["appeal_verdict"] == "uphold") | |
| overturn_sc = next(s for s in adv_scenarios if s["appeal_verdict"] == "overturn") | |
| adv_env = ContentModerationEnv(str(JSON_PATH), seed=77) | |
| # -- Test 1: step() on adversarial scenario exposes appeal fields | |
| adv_env.reset(scenario_id=uphold_sc["id"]) | |
| gt = uphold_sc["ground_truth"] | |
| r = adv_env.step({"label": gt["label"], "action": gt["action"], | |
| "severity": gt.get("severity", 3)}) | |
| check("is_adversarial present in info", r["info"]["is_adversarial"] is True) | |
| check("appeal_available=True on adv", r["info"]["appeal_available"] is True) | |
| check("appeal_text present", isinstance(r["info"]["appeal_text"], str) | |
| and len(r["info"]["appeal_text"]) > 10) | |
| # -- Test 2: correct uphold verdict β +0.10 | |
| ar = adv_env.appeal({"verdict": "uphold", "rationale": "appeal is manipulative"}) | |
| check("correct uphold verdict β +0.10", abs(ar["appeal_reward"] - 0.10) < 0.001, | |
| f"got {ar['appeal_reward']}") | |
| check("verdict_correct=True on uphold", ar["verdict_correct"] is True) | |
| # -- Test 3: double-call guard | |
| try: | |
| adv_env.appeal({"verdict": "uphold"}) | |
| check("second appeal() raises RuntimeError", False) | |
| except RuntimeError: | |
| check("second appeal() raises RuntimeError", True) | |
| # -- Test 4: wrong verdict β -0.10 | |
| adv_env.reset(scenario_id=uphold_sc["id"]) | |
| adv_env.step({"label": gt["label"], "action": gt["action"], "severity": gt.get("severity", 3)}) | |
| ar_wrong = adv_env.appeal({"verdict": "overturn"}) # wrong β should uphold | |
| check("wrong verdict β -0.10", abs(ar_wrong["appeal_reward"] - (-0.10)) < 0.001, | |
| f"got {ar_wrong['appeal_reward']}") | |
| check("verdict_correct=False on wrong", ar_wrong["verdict_correct"] is False) | |
| # -- Test 5: overturn case | |
| adv_env.reset(scenario_id=overturn_sc["id"]) | |
| gt2 = overturn_sc["ground_truth"] | |
| adv_env.step({"label": gt2["label"], "action": gt2["action"], "severity": gt2.get("severity", 3)}) | |
| ar_ov = adv_env.appeal({"verdict": "overturn"}) | |
| check("correct overturn verdict β +0.10", abs(ar_ov["appeal_reward"] - 0.10) < 0.001, | |
| f"got {ar_ov['appeal_reward']}") | |
| # -- Test 6: appeal() on non-adversarial scenario raises | |
| adv_env.reset(scenario_id="scen_easy_1") | |
| adv_env.step({"label": "safe", "action": "allow"}) | |
| try: | |
| adv_env.appeal({"verdict": "uphold"}) | |
| check("appeal() on non-adversarial raises RuntimeError", False) | |
| except RuntimeError: | |
| check("appeal() on non-adversarial raises RuntimeError", True) | |
| # Build a synthetic toxic scenario at each severity level to verify penalties | |
| def _make_toxic(severity: int) -> dict: | |
| return { | |
| "id": f"_synth_sev{severity}", "tier": "hard", | |
| "state": {"text": "x", "audio_transcript": None, "visual_tags": [], | |
| "previous_flags": 0, "platform_policy": "strict"}, | |
| "ground_truth": {"label": "toxic", "action": "escalate", "severity": severity}, | |
| "score_rubric": {"label_correct": 0.4, "action_correct": 0.4, "severity_within_1": 0.2}, | |
| } | |
| from content_moderation_env import _compute_reward as _cr2 | |
| # allow on sev-5 β -0.30 | |
| r5, b5 = _cr2({"label": "toxic", "action": "allow", "severity": 5}, _make_toxic(5)) | |
| check("sev-5 miss penalty = -0.30", | |
| b5.get("missed_threat_penalty") == -0.30, f"got {b5.get('missed_threat_penalty')}") | |
| # allow on sev-4 β -0.15 | |
| r4, b4 = _cr2({"label": "toxic", "action": "allow", "severity": 4}, _make_toxic(4)) | |
| check("sev-4 miss penalty = -0.15", | |
| b4.get("missed_threat_penalty") == -0.15, f"got {b4.get('missed_threat_penalty')}") | |
| # allow on sev-3 β -0.05 | |
| r3, b3 = _cr2({"label": "toxic", "action": "allow", "severity": 3}, _make_toxic(3)) | |
| check("sev-3 miss penalty = -0.05", | |
| b3.get("missed_threat_penalty") == -0.05, f"got {b3.get('missed_threat_penalty')}") | |
| # allow on sev-2 β no penalty | |
| r2, b2 = _cr2({"label": "toxic", "action": "allow", "severity": 2}, _make_toxic(2)) | |
| check("sev-2 miss β no penalty", | |
| "missed_threat_penalty" not in b2, f"got {b2.get('missed_threat_penalty')}") | |
| # old key name gone | |
| check("old 'missed_critical_threat_penalty' key replaced", | |
| "missed_critical_threat_penalty" not in b5) | |
| print("\nββ Reward range validation βββββββββββββββββββββββββββββββββββββββββ") | |
| from content_moderation_env import _compute_reward | |
| import json as _json | |
| all_rewards = [] | |
| for scenario in data: | |
| # Worst possible action | |
| r, _ = _compute_reward({"label": "safe", "action": "escalate", "severity": 1}, scenario) | |
| all_rewards.append(r) | |
| # Best possible action | |
| gt = scenario["ground_truth"] | |
| r2, _ = _compute_reward({ | |
| "label": gt["label"], | |
| "action": gt["action"], | |
| "severity": gt.get("severity", 3), | |
| }, scenario) | |
| all_rewards.append(r2) | |
| check("min reward β₯ 0.0", min(all_rewards) >= 0.0, f"min={min(all_rewards):.3f}") | |
| check("max reward β€ 1.0", max(all_rewards) <= 1.0, f"max={max(all_rewards):.3f}") | |
| # ββ Summary βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| total = checks_passed + checks_failed | |
| print(f"\n{'β'*62}") | |
| print(f" RESULT: {checks_passed}/{total} checks passed") | |
| if checks_failed == 0: | |
| print(" β ALL CHECKS PASSED β openenv.yaml is valid") | |
| else: | |
| print(f" β {checks_failed} check(s) FAILED β fix before submission") | |
| print(f"{'β'*62}\n") | |
| sys.exit(0 if checks_failed == 0 else 1) | |