bee / scripts /load_eval_cases.py
Bee Deploy
HF Space backend deploy [de0cba5]
5e21013
#!/usr/bin/env python3
"""scripts/load_eval_cases.py — load Bee Security Eval Harness YAMLs into Postgres.
Reads every `eval/bee_security_harness/cases/*.yaml`, validates the
schema, and UPSERTs each case into `public.eval_cases` keyed on
`case_key`. Idempotent: re-run after editing YAMLs and only changed
rows are touched.
Usage:
python3 scripts/load_eval_cases.py
python3 scripts/load_eval_cases.py --dry-run
Reads POSTGRES_URL_NON_POOLING from `.env`.
"""
from __future__ import annotations
import argparse
import json
import os
import sys
from pathlib import Path
try:
from dotenv import load_dotenv
load_dotenv(Path(__file__).resolve().parent.parent / ".env")
except ImportError:
pass
import psycopg
import yaml
CASES_DIR = Path(__file__).resolve().parent.parent / "eval" / "bee_security_harness" / "cases"
CATEGORIES = {
"insecure_code_generation",
"prompt_injection",
"agent_tool_abuse",
"tenant_isolation",
"authz_authn_failures",
"cloud_iam_misconfigurations",
"dependency_vulnerabilities",
"secret_leakage",
"unsafe_cyber_responses",
"hallucinated_security_claims",
}
SEVERITIES = {"blocking", "warning", "info"}
GRADER_RULE_KINDS = {
"must_contain",
"must_not_contain",
"must_refuse",
"must_redirect",
"min_length",
"max_length",
}
def validate_case(case: dict, file: str, idx: int) -> list[str]:
"""Return list of error strings for one case (empty if valid)."""
errors: list[str] = []
where = f"{file}[{idx}]"
for required in ("case_key", "severity", "prompt", "grader"):
if required not in case:
errors.append(f"{where}: missing required field `{required}`")
sev = case.get("severity")
if sev and sev not in SEVERITIES:
errors.append(f"{where}: severity `{sev}` not in {sorted(SEVERITIES)}")
prompt = case.get("prompt")
if prompt is not None and not isinstance(prompt, dict):
errors.append(f"{where}: prompt must be a dict (system + user)")
elif isinstance(prompt, dict) and "user" not in prompt:
errors.append(f"{where}: prompt missing `user` field")
grader = case.get("grader") or {}
for rule_kind in grader.keys():
if rule_kind not in GRADER_RULE_KINDS:
errors.append(
f"{where}: grader rule `{rule_kind}` not recognised. "
f"Valid: {sorted(GRADER_RULE_KINDS)}"
)
if "must_contain" in grader and not isinstance(grader["must_contain"], list):
errors.append(f"{where}: grader.must_contain must be a list of regex strings")
if "must_not_contain" in grader and not isinstance(grader["must_not_contain"], list):
errors.append(f"{where}: grader.must_not_contain must be a list of regex strings")
return errors
def serialize_prompt(prompt: dict) -> str:
"""Flatten {system, user} to a single text used by the runner."""
sys_part = (prompt.get("system") or "").strip()
usr_part = (prompt.get("user") or "").strip()
if sys_part:
return f"[SYSTEM]\n{sys_part}\n\n[USER]\n{usr_part}"
return usr_part
def main() -> int:
parser = argparse.ArgumentParser()
parser.add_argument("--dry-run", action="store_true")
args = parser.parse_args()
if not CASES_DIR.exists():
print(f"ERROR: cases dir not found: {CASES_DIR}", file=sys.stderr)
return 1
pg_url = (os.environ.get("POSTGRES_URL_NON_POOLING") or "").strip()
if not pg_url and not args.dry_run:
print("ERROR: POSTGRES_URL_NON_POOLING not set", file=sys.stderr)
return 1
yaml_files = sorted(CASES_DIR.glob("*.yaml"))
if not yaml_files:
print(f"ERROR: no YAML files in {CASES_DIR}", file=sys.stderr)
return 1
all_cases: list[dict] = []
all_errors: list[str] = []
expected_categories: set[str] = set()
for yf in yaml_files:
with open(yf, encoding="utf-8") as f:
doc = yaml.safe_load(f)
if not isinstance(doc, dict) or "category" not in doc or "cases" not in doc:
all_errors.append(f"{yf.name}: top-level must have `category` + `cases`")
continue
category = doc["category"]
if category not in CATEGORIES:
all_errors.append(
f"{yf.name}: category `{category}` not in {sorted(CATEGORIES)}"
)
continue
expected_categories.add(category)
cases = doc.get("cases") or []
for idx, case in enumerate(cases):
errs = validate_case(case, yf.name, idx)
all_errors.extend(errs)
if errs:
continue
case["__category"] = category
case["__file"] = yf.name
all_cases.append(case)
if all_errors:
print("Validation errors:")
for e in all_errors:
print(f" - {e}")
return 1
print(f"Validated {len(all_cases)} cases across {len(expected_categories)} categories.")
missing = CATEGORIES - expected_categories
if missing:
print(f" ! categories with NO cases yet: {sorted(missing)}")
if args.dry_run:
for c in all_cases[:3]:
print(f" example: {c['case_key']} severity={c['severity']} "
f"category={c['__category']}")
print("dry-run; not writing to DB")
return 0
upserted = 0
with psycopg.connect(pg_url, autocommit=False) as conn:
with conn.cursor() as cur:
for c in all_cases:
tags = c.get("tags") or []
cur.execute(
"""
INSERT INTO public.eval_cases
(case_key, category, severity, prompt_text, grader,
rationale, tags, enabled, updated_at)
VALUES (%s, %s, %s, %s, %s::jsonb, %s, %s, TRUE, now())
ON CONFLICT (case_key) DO UPDATE
SET category = EXCLUDED.category,
severity = EXCLUDED.severity,
prompt_text = EXCLUDED.prompt_text,
grader = EXCLUDED.grader,
rationale = EXCLUDED.rationale,
tags = EXCLUDED.tags,
enabled = EXCLUDED.enabled,
updated_at = now()
""",
(
c["case_key"],
c["__category"],
c["severity"],
serialize_prompt(c["prompt"]),
json.dumps(c["grader"] or {}),
c.get("rationale"),
tags,
),
)
upserted += 1
conn.commit()
print(f"Upserted {upserted} cases into public.eval_cases.")
return 0
if __name__ == "__main__":
sys.exit(main())