#!/usr/bin/env python3 """scripts/load_eval_cases.py — load Bee Security Eval Harness YAMLs into Postgres. Reads every `eval/bee_security_harness/cases/*.yaml`, validates the schema, and UPSERTs each case into `public.eval_cases` keyed on `case_key`. Idempotent: re-run after editing YAMLs and only changed rows are touched. Usage: python3 scripts/load_eval_cases.py python3 scripts/load_eval_cases.py --dry-run Reads POSTGRES_URL_NON_POOLING from `.env`. """ from __future__ import annotations import argparse import json import os import sys from pathlib import Path try: from dotenv import load_dotenv load_dotenv(Path(__file__).resolve().parent.parent / ".env") except ImportError: pass import psycopg import yaml CASES_DIR = Path(__file__).resolve().parent.parent / "eval" / "bee_security_harness" / "cases" CATEGORIES = { "insecure_code_generation", "prompt_injection", "agent_tool_abuse", "tenant_isolation", "authz_authn_failures", "cloud_iam_misconfigurations", "dependency_vulnerabilities", "secret_leakage", "unsafe_cyber_responses", "hallucinated_security_claims", } SEVERITIES = {"blocking", "warning", "info"} GRADER_RULE_KINDS = { "must_contain", "must_not_contain", "must_refuse", "must_redirect", "min_length", "max_length", } def validate_case(case: dict, file: str, idx: int) -> list[str]: """Return list of error strings for one case (empty if valid).""" errors: list[str] = [] where = f"{file}[{idx}]" for required in ("case_key", "severity", "prompt", "grader"): if required not in case: errors.append(f"{where}: missing required field `{required}`") sev = case.get("severity") if sev and sev not in SEVERITIES: errors.append(f"{where}: severity `{sev}` not in {sorted(SEVERITIES)}") prompt = case.get("prompt") if prompt is not None and not isinstance(prompt, dict): errors.append(f"{where}: prompt must be a dict (system + user)") elif isinstance(prompt, dict) and "user" not in prompt: errors.append(f"{where}: prompt missing `user` field") grader = case.get("grader") or {} for rule_kind in grader.keys(): if rule_kind not in GRADER_RULE_KINDS: errors.append( f"{where}: grader rule `{rule_kind}` not recognised. " f"Valid: {sorted(GRADER_RULE_KINDS)}" ) if "must_contain" in grader and not isinstance(grader["must_contain"], list): errors.append(f"{where}: grader.must_contain must be a list of regex strings") if "must_not_contain" in grader and not isinstance(grader["must_not_contain"], list): errors.append(f"{where}: grader.must_not_contain must be a list of regex strings") return errors def serialize_prompt(prompt: dict) -> str: """Flatten {system, user} to a single text used by the runner.""" sys_part = (prompt.get("system") or "").strip() usr_part = (prompt.get("user") or "").strip() if sys_part: return f"[SYSTEM]\n{sys_part}\n\n[USER]\n{usr_part}" return usr_part def main() -> int: parser = argparse.ArgumentParser() parser.add_argument("--dry-run", action="store_true") args = parser.parse_args() if not CASES_DIR.exists(): print(f"ERROR: cases dir not found: {CASES_DIR}", file=sys.stderr) return 1 pg_url = (os.environ.get("POSTGRES_URL_NON_POOLING") or "").strip() if not pg_url and not args.dry_run: print("ERROR: POSTGRES_URL_NON_POOLING not set", file=sys.stderr) return 1 yaml_files = sorted(CASES_DIR.glob("*.yaml")) if not yaml_files: print(f"ERROR: no YAML files in {CASES_DIR}", file=sys.stderr) return 1 all_cases: list[dict] = [] all_errors: list[str] = [] expected_categories: set[str] = set() for yf in yaml_files: with open(yf, encoding="utf-8") as f: doc = yaml.safe_load(f) if not isinstance(doc, dict) or "category" not in doc or "cases" not in doc: all_errors.append(f"{yf.name}: top-level must have `category` + `cases`") continue category = doc["category"] if category not in CATEGORIES: all_errors.append( f"{yf.name}: category `{category}` not in {sorted(CATEGORIES)}" ) continue expected_categories.add(category) cases = doc.get("cases") or [] for idx, case in enumerate(cases): errs = validate_case(case, yf.name, idx) all_errors.extend(errs) if errs: continue case["__category"] = category case["__file"] = yf.name all_cases.append(case) if all_errors: print("Validation errors:") for e in all_errors: print(f" - {e}") return 1 print(f"Validated {len(all_cases)} cases across {len(expected_categories)} categories.") missing = CATEGORIES - expected_categories if missing: print(f" ! categories with NO cases yet: {sorted(missing)}") if args.dry_run: for c in all_cases[:3]: print(f" example: {c['case_key']} severity={c['severity']} " f"category={c['__category']}") print("dry-run; not writing to DB") return 0 upserted = 0 with psycopg.connect(pg_url, autocommit=False) as conn: with conn.cursor() as cur: for c in all_cases: tags = c.get("tags") or [] cur.execute( """ INSERT INTO public.eval_cases (case_key, category, severity, prompt_text, grader, rationale, tags, enabled, updated_at) VALUES (%s, %s, %s, %s, %s::jsonb, %s, %s, TRUE, now()) ON CONFLICT (case_key) DO UPDATE SET category = EXCLUDED.category, severity = EXCLUDED.severity, prompt_text = EXCLUDED.prompt_text, grader = EXCLUDED.grader, rationale = EXCLUDED.rationale, tags = EXCLUDED.tags, enabled = EXCLUDED.enabled, updated_at = now() """, ( c["case_key"], c["__category"], c["severity"], serialize_prompt(c["prompt"]), json.dumps(c["grader"] or {}), c.get("rationale"), tags, ), ) upserted += 1 conn.commit() print(f"Upserted {upserted} cases into public.eval_cases.") return 0 if __name__ == "__main__": sys.exit(main())