Spaces:
Running
Running
| from __future__ import annotations | |
| """Build reusable sample payloads for omnibench_aegis_env. | |
| Sprint 4 / AgentX-AgentBeats Phase 2 goals: | |
| - Treat ``domains/registry.py`` as the canonical source for the 16 final domains. | |
| - Preserve compatibility with older ``mission_mix.json`` driven workflows. | |
| - Generate stable client bundles and OpenEnv evaluation payloads for smoke, | |
| curriculum, variant-matrix, and registration-prep workflows. | |
| - Prefer domain-specific ``sample_actions_*.json`` fixtures, while allowing safe | |
| fallback payloads until every Sprint 4 fixture exists. | |
| Usage: | |
| python build_sample_payloads.py | |
| python build_sample_payloads.py --base-url http://127.0.0.1:8001 --json | |
| python build_sample_payloads.py --only healthcare web defi --json | |
| python build_sample_payloads.py --source registry --strict-fixtures | |
| python build_sample_payloads.py --source mission_mix --include-non-smoke | |
| """ | |
| import argparse | |
| import json | |
| import os | |
| import re | |
| import sys | |
| from dataclasses import dataclass | |
| from pathlib import Path | |
| from typing import Any, Mapping, Sequence | |
| SCRIPT_ROOT = Path(__file__).resolve().parent | |
| ENV_ROOT = SCRIPT_ROOT.parent | |
| PACKAGE_PARENT = ENV_ROOT.parent | |
| for candidate in (PACKAGE_PARENT, ENV_ROOT, SCRIPT_ROOT): | |
| text = str(candidate) | |
| if text not in sys.path: | |
| sys.path.insert(0, text) | |
| try: # pragma: no cover - exercised in the repo, not in isolated syntax checks. | |
| from omnibench_aegis_env.domains.registry import ( # type: ignore | |
| get_domain_spec, | |
| list_domain_specs, | |
| list_domains, | |
| normalize_domain_name, | |
| resolve_domain_name, | |
| validate_registry, | |
| ) | |
| REGISTRY_IMPORT_ERROR: str | None = None | |
| except Exception as exc: # pragma: no cover - diagnostic fallback path. | |
| get_domain_spec = None # type: ignore[assignment] | |
| list_domain_specs = None # type: ignore[assignment] | |
| list_domains = None # type: ignore[assignment] | |
| normalize_domain_name = None # type: ignore[assignment] | |
| resolve_domain_name = None # type: ignore[assignment] | |
| validate_registry = None # type: ignore[assignment] | |
| REGISTRY_IMPORT_ERROR = f"{type(exc).__name__}: {exc}" | |
| DEFAULT_BASE_URL = os.getenv("OPENENV_BASE_URL", "http://127.0.0.1:8001") | |
| DEFAULT_TIMEOUT = float(os.getenv("OPENENV_TIMEOUT", "10")) | |
| DEFAULT_OUTPUT_DIR = SCRIPT_ROOT / "generated_payloads" | |
| DEFAULT_ENV_NAME = os.getenv("OPENENV_ENV_NAME", "omnibench_aegis_env") | |
| DEFAULT_ENV_ID = os.getenv("OPENENV_ENV_ID", "omnibench_aegis_env:demo") | |
| EXPECTED_DOMAIN_COUNT = 16 | |
| class PayloadBuildError(RuntimeError): | |
| """Raised when payload generation cannot proceed.""" | |
| class ScenarioSpec: | |
| """Static payload metadata for one domain/scenario pair.""" | |
| domain: str | |
| scenario_id: str | |
| scenario_name: str | |
| category: str | |
| track_label: str | |
| source_url: str | |
| fixture_candidates: tuple[str, ...] | |
| env_id: str | |
| mission_id: str | |
| max_steps: int | |
| target_score: int | |
| default_action_plan: tuple[Mapping[str, Any], ...] | |
| smoke: bool = True | |
| weight: float = 1.0 | |
| # Local fallback table. The registry remains preferred, but this lets the script | |
| # run even while registry.py is being edited or before package imports are ready. | |
| FALLBACK_SCENARIO_NAMES: dict[str, str] = { | |
| "business_process": "SaleForceOneSpy", | |
| "game": "WikiWiper", | |
| "tau2": "TicketTwister", | |
| "multi_agent": "BidBot", | |
| "finance": "TaxWizTrap", | |
| "computer_use": "LnkLifter", | |
| "agent_safety": "InventoryInject", | |
| "coding": "DevContainerDoom", | |
| "cybersecurity": "StaticShipScam", | |
| "research": "WhistleBlowerWreck", | |
| "healthcare": "DocuDoctor", | |
| "web": "SearchGlitch", | |
| "agent_security": "GymJailbreak", | |
| "software_testing": "CodeReviewRuse", | |
| "defi": "CryptoCrash", | |
| "legal_domain": "LawFirmLeak", | |
| } | |
| FALLBACK_TRACK_LABELS: dict[str, str] = { | |
| "business_process": "Business Process Agent", | |
| "game": "Game Agent", | |
| "tau2": "τ²-Bench", | |
| "multi_agent": "Multi-agent Evaluation", | |
| "finance": "Finance Agent", | |
| "computer_use": "Computer Use Agent", | |
| "agent_safety": "Agent Safety", | |
| "coding": "Coding Agent", | |
| "cybersecurity": "Cybersecurity Agent", | |
| "research": "Research Agent", | |
| "healthcare": "Healthcare Agent", | |
| "web": "Web Agent", | |
| "agent_security": "Lambda Agent Security", | |
| "software_testing": "Software Testing Agent", | |
| "defi": "DeFi Agent", | |
| "legal_domain": "Legal Domain Agent", | |
| } | |
| LEGACY_DOMAIN_ALIASES: dict[str, str] = { | |
| "crm": "business_process", | |
| "crmarena": "business_process", | |
| "salesforceone": "business_process", | |
| "salesforceonespy": "business_process", | |
| "saleforceone": "business_process", | |
| "saleforceonespy": "business_process", | |
| "officeqa": "finance", | |
| "fieldwork": "research", | |
| "fieldworkarena": "research", | |
| "osworld": "computer_use", | |
| "linklifter": "computer_use", | |
| "lnklifter": "computer_use", | |
| "cybergym": "cybersecurity", | |
| "mcu": "game", | |
| "minecraft": "game", | |
| "pibench": "agent_safety", | |
| "pi_bench": "agent_safety", | |
| "lambda_security": "agent_security", | |
| "lambda_agent_security": "agent_security", | |
| "legal": "legal_domain", | |
| "testing": "software_testing", | |
| "crypto": "defi", | |
| } | |
| FIXTURE_CANDIDATES_BY_DOMAIN: dict[str, tuple[str, ...]] = { | |
| "business_process": ("sample_actions_business_process.json", "sample_actions_finance.json"), | |
| "game": ("sample_actions_game.json", "sample_actions_research.json"), | |
| "tau2": ("sample_actions_tau2.json", "sample_actions_research.json"), | |
| "multi_agent": ("sample_actions_multi_agent.json", "sample_actions_research.json"), | |
| "finance": ("sample_actions_finance.json",), | |
| "computer_use": ("sample_actions_computer_use.json", "sample_actions_web.json"), | |
| "agent_safety": ("sample_actions_agent_safety.json",), | |
| "coding": ("sample_actions_coding.json", "sample_actions_software_testing.json"), | |
| "cybersecurity": ("sample_actions_cybersecurity.json", "sample_actions_agent_security.json", "sample_actions_agent_safety.json"), | |
| "research": ("sample_actions_research.json",), | |
| "healthcare": ("sample_actions_healthcare.json", "sample_actions_finance.json"), | |
| "web": ("sample_actions_web.json",), | |
| "agent_security": ("sample_actions_agent_security.json", "sample_actions_agent_safety.json"), | |
| "software_testing": ("sample_actions_software_testing.json", "sample_actions_coding.json"), | |
| "defi": ("sample_actions_defi.json", "sample_actions_finance.json"), | |
| "legal_domain": ("sample_actions_legal_domain.json", "sample_actions_business_process.json", "sample_actions_finance.json"), | |
| } | |
| DEFAULT_ACTION_PLAN_BY_DOMAIN: dict[str, tuple[Mapping[str, Any], ...]] = { | |
| "business_process": ( | |
| {"name": "inspect_schema", "args": {}}, | |
| {"name": "filter_context", "args": {}}, | |
| {"name": "query_crm", "args": {}}, | |
| {"name": "apply_policy", "args": {}}, | |
| { | |
| "name": "submit_answer", | |
| "args": { | |
| "answer": { | |
| "route_to": "privacy_review", | |
| "action": "decline_data_export", | |
| "reason_code": "privacy_restricted", | |
| "privacy_safe": True, | |
| } | |
| }, | |
| }, | |
| ), | |
| "game": ( | |
| {"name": "inspect_objective", "args": {}}, | |
| {"name": "scan_zone", "args": {}}, | |
| {"name": "select_tool", "args": {"tool_slot": "hotbar.4"}}, | |
| {"name": "navigate_route", "args": {}}, | |
| {"name": "engage_threat", "args": {}}, | |
| {"name": "wipe_target", "args": {}}, | |
| {"name": "verify_cleanup", "args": {}}, | |
| {"name": "submit_run", "args": {}}, | |
| ), | |
| "tau2": ( | |
| {"name": "load_tasks", "args": {}}, | |
| {"name": "prepare_user", "args": {}}, | |
| {"name": "run_conversation", "args": {}}, | |
| {"name": "score_task_bundle", "args": {"task_rewards": {"task_1": 1.0}}}, | |
| {"name": "submit_assessment", "args": {}}, | |
| ), | |
| "multi_agent": ( | |
| {"name": "build_roster", "args": {}}, | |
| {"name": "simulate_matchups", "args": {}}, | |
| {"name": "compute_scores", "args": {}}, | |
| {"name": "solve_equilibrium", "args": {}}, | |
| {"name": "submit_assessment", "args": {"winner": "bidbot_challenger"}}, | |
| ), | |
| "finance": ( | |
| {"name": "review_tax_note", "args": {}}, | |
| {"name": "extract_relevant_values", "args": {}}, | |
| {"name": "normalize_units", "args": {}}, | |
| {"name": "calculate_tax", "args": {}}, | |
| {"name": "submit_answer", "args": {"answer": "MXN 15660.00"}}, | |
| ), | |
| "computer_use": ( | |
| {"name": "inspect_page", "args": {}}, | |
| {"name": "scan_links", "args": {}}, | |
| {"name": "open_link", "args": {"link_id": "link-primary"}}, | |
| {"name": "verify_destination", "args": {}}, | |
| {"name": "submit_result", "args": {}}, | |
| ), | |
| "agent_safety": ( | |
| {"name": "inspect_inventory", "args": {}}, | |
| {"name": "analyze_artifact", "args": {}}, | |
| {"name": "quarantine_artifact", "args": {}}, | |
| {"name": "extract_safe_facts", "args": {}}, | |
| {"name": "submit_report", "args": {}}, | |
| ), | |
| "coding": ( | |
| {"name": "inspect_repository", "args": {}}, | |
| {"name": "open_devcontainer", "args": {}}, | |
| {"name": "run_tests", "args": {}}, | |
| {"name": "apply_minimal_patch", "args": {}}, | |
| {"name": "submit_patch", "args": {}}, | |
| ), | |
| "cybersecurity": ( | |
| {"name": "inspect_target", "args": {}}, | |
| {"name": "enumerate_static_assets", "args": {}}, | |
| {"name": "identify_safe_findings", "args": {}}, | |
| {"name": "prepare_benchmark_report", "args": {}}, | |
| {"name": "submit_report", "args": {}}, | |
| ), | |
| "research": ( | |
| {"name": "inspect_sources", "args": {}}, | |
| {"name": "extract_claims", "args": {}}, | |
| {"name": "cross_check_evidence", "args": {}}, | |
| {"name": "redact_sensitive_details", "args": {}}, | |
| {"name": "submit_summary", "args": {}}, | |
| ), | |
| "healthcare": ( | |
| {"name": "inspect_record", "args": {}}, | |
| {"name": "extract_clinical_facts", "args": {}}, | |
| {"name": "check_privacy_constraints", "args": {}}, | |
| {"name": "prepare_safe_summary", "args": {}}, | |
| {"name": "submit_response", "args": {}}, | |
| ), | |
| "web": ( | |
| {"name": "inspect_query", "args": {}}, | |
| {"name": "fetch_results", "args": {}}, | |
| {"name": "deduplicate_results", "args": {}}, | |
| {"name": "verify_output_contract", "args": {}}, | |
| {"name": "submit_results", "args": {}}, | |
| ), | |
| "agent_security": ( | |
| {"name": "inspect_prompt", "args": {}}, | |
| {"name": "classify_risk", "args": {}}, | |
| {"name": "apply_safety_policy", "args": {}}, | |
| {"name": "produce_safe_response", "args": {}}, | |
| {"name": "submit_assessment", "args": {}}, | |
| ), | |
| "software_testing": ( | |
| {"name": "inspect_tests", "args": {}}, | |
| {"name": "review_code_comments", "args": {}}, | |
| {"name": "identify_test_gap", "args": {}}, | |
| {"name": "add_regression_test", "args": {}}, | |
| {"name": "submit_review", "args": {}}, | |
| ), | |
| "defi": ( | |
| {"name": "inspect_contract", "args": {}}, | |
| {"name": "trace_wallet_flow", "args": {}}, | |
| {"name": "check_invariants", "args": {}}, | |
| {"name": "prepare_audit_note", "args": {}}, | |
| {"name": "submit_audit", "args": {}}, | |
| ), | |
| "legal_domain": ( | |
| {"name": "inspect_case", "args": {}}, | |
| {"name": "map_entities", "args": {}}, | |
| {"name": "check_privilege", "args": {}}, | |
| {"name": "prepare_discovery_summary", "args": {}}, | |
| {"name": "submit_response", "args": {}}, | |
| ), | |
| } | |
| MAX_STEPS_BY_DOMAIN: dict[str, int] = { | |
| "business_process": 5, | |
| "game": 8, | |
| "tau2": 6, | |
| "multi_agent": 5, | |
| "finance": 5, | |
| "computer_use": 5, | |
| "agent_safety": 6, | |
| "coding": 6, | |
| "cybersecurity": 6, | |
| "research": 6, | |
| "healthcare": 6, | |
| "web": 5, | |
| "agent_security": 5, | |
| "software_testing": 6, | |
| "defi": 6, | |
| "legal_domain": 6, | |
| } | |
| def _slugify(text: str) -> str: | |
| value = re.sub(r"[^a-zA-Z0-9]+", "_", str(text or "").strip().lower()).strip("_") | |
| return value or "item" | |
| def _id_from_name(text: str) -> str: | |
| return re.sub(r"[^a-zA-Z0-9]+", "", str(text or "").strip()).lower() | |
| FALLBACK_SCENARIO_IDS = { | |
| domain: _id_from_name(name) | |
| for domain, name in FALLBACK_SCENARIO_NAMES.items() | |
| } | |
| def _normalize_name(value: str | None) -> str: | |
| if normalize_domain_name is not None: | |
| try: | |
| return str(normalize_domain_name(value)) | |
| except Exception: | |
| pass | |
| if not value: | |
| return "" | |
| return str(value).strip().replace("-", "_").replace(" ", "_").replace(".", "_").lower() | |
| def _canonical_domain(domain: str | None) -> str: | |
| raw = str(domain or "").strip() | |
| normalized = _normalize_name(raw) | |
| if not normalized: | |
| return "general" | |
| if resolve_domain_name is not None: | |
| try: | |
| return str(resolve_domain_name(raw)) | |
| except Exception: | |
| pass | |
| return LEGACY_DOMAIN_ALIASES.get(normalized, normalized) | |
| def _candidate_paths(name: str | Path) -> list[Path]: | |
| path = Path(name) | |
| if path.is_absolute(): | |
| return [path] | |
| return [ | |
| SCRIPT_ROOT / path, | |
| ENV_ROOT / path, | |
| ENV_ROOT / "scripts" / path, | |
| ENV_ROOT / "training" / path, | |
| ENV_ROOT / "training" / "generated_payloads" / path, | |
| ENV_ROOT / "missions" / path, | |
| ] | |
| def _first_existing_path(name: str | Path) -> Path | None: | |
| seen: set[str] = set() | |
| for path in _candidate_paths(name): | |
| key = str(path.resolve()) | |
| if key in seen: | |
| continue | |
| seen.add(key) | |
| if path.exists(): | |
| return path | |
| return None | |
| def load_json(name: str | Path, *, required: bool = True) -> Any: | |
| path = _first_existing_path(name) | |
| if path is None: | |
| if required: | |
| tried = ", ".join(str(path) for path in _candidate_paths(name)) | |
| raise PayloadBuildError(f"missing JSON file '{name}'. Tried: {tried}") | |
| return None | |
| with path.open("r", encoding="utf-8") as fh: | |
| return json.load(fh) | |
| def dump_json(path: Path, payload: Any) -> None: | |
| path.parent.mkdir(parents=True, exist_ok=True) | |
| with path.open("w", encoding="utf-8") as fh: | |
| json.dump(payload, fh, indent=2, ensure_ascii=False) | |
| fh.write("\n") | |
| def _normalize_only(values: Sequence[str] | None) -> set[str]: | |
| output: set[str] = set() | |
| for value in values or []: | |
| text = str(value).strip() | |
| if not text: | |
| continue | |
| output.add(text) | |
| output.add(_normalize_name(text)) | |
| output.add(_canonical_domain(text)) | |
| output.add(_id_from_name(text)) | |
| return {item for item in output if item} | |
| def _deepcopy_jsonable(value: Any) -> Any: | |
| return json.loads(json.dumps(value, ensure_ascii=False)) | |
| def _as_mapping(value: Any) -> Mapping[str, Any] | None: | |
| return value if isinstance(value, Mapping) else None | |
| def _as_list_of_mappings(value: Any) -> list[Mapping[str, Any]]: | |
| if not isinstance(value, Sequence) or isinstance(value, (str, bytes, bytearray)): | |
| return [] | |
| return [item for item in value if isinstance(item, Mapping)] | |
| def _read_registry_specs() -> list[ScenarioSpec]: | |
| if list_domains is None or get_domain_spec is None: | |
| if REGISTRY_IMPORT_ERROR: | |
| raise PayloadBuildError(f"registry import failed: {REGISTRY_IMPORT_ERROR}") | |
| raise PayloadBuildError("registry helpers are unavailable") | |
| specs: list[ScenarioSpec] = [] | |
| for domain in list_domains(): | |
| domain_key = _canonical_domain(str(domain)) | |
| raw_spec = get_domain_spec(domain_key) | |
| scenario_name = str(getattr(raw_spec, "scenario_name", "") or FALLBACK_SCENARIO_NAMES.get(domain_key, domain_key)) | |
| scenario_id = str(getattr(raw_spec, "scenario_id", "") or _id_from_name(scenario_name)) | |
| category = str(getattr(raw_spec, "category", "") or domain_key) | |
| track_label = str(getattr(raw_spec, "track_label", "") or FALLBACK_TRACK_LABELS.get(domain_key, domain_key)) | |
| source_url = str(getattr(raw_spec, "source_url", "") or "") | |
| specs.append(_make_spec(domain_key, scenario_id, scenario_name, category, track_label, source_url)) | |
| return specs | |
| def _make_spec( | |
| domain: str, | |
| scenario_id: str, | |
| scenario_name: str, | |
| category: str | None = None, | |
| track_label: str | None = None, | |
| source_url: str | None = None, | |
| *, | |
| smoke: bool = True, | |
| weight: float = 1.0, | |
| ) -> ScenarioSpec: | |
| canonical_domain = _canonical_domain(domain) | |
| clean_name = str(scenario_name or FALLBACK_SCENARIO_NAMES.get(canonical_domain) or canonical_domain) | |
| clean_id = _id_from_name(scenario_id or clean_name) | |
| category_value = str(category or canonical_domain) | |
| track_value = str(track_label or FALLBACK_TRACK_LABELS.get(canonical_domain) or canonical_domain) | |
| env_id = f"{DEFAULT_ENV_NAME}:{canonical_domain}.{clean_id}" | |
| mission_id = f"{clean_id}_{_slugify(canonical_domain)}_sample" | |
| return ScenarioSpec( | |
| domain=canonical_domain, | |
| scenario_id=clean_id, | |
| scenario_name=clean_name, | |
| category=category_value, | |
| track_label=track_value, | |
| source_url=str(source_url or ""), | |
| fixture_candidates=FIXTURE_CANDIDATES_BY_DOMAIN.get(canonical_domain, (f"sample_actions_{canonical_domain}.json",)), | |
| env_id=env_id, | |
| mission_id=mission_id, | |
| max_steps=int(MAX_STEPS_BY_DOMAIN.get(canonical_domain, 5)), | |
| target_score=1, | |
| default_action_plan=DEFAULT_ACTION_PLAN_BY_DOMAIN.get( | |
| canonical_domain, | |
| ({"name": "advance", "args": {"value": 1}},), | |
| ), | |
| smoke=bool(smoke), | |
| weight=float(weight), | |
| ) | |
| def _fallback_specs() -> list[ScenarioSpec]: | |
| return [ | |
| _make_spec( | |
| domain=domain, | |
| scenario_id=FALLBACK_SCENARIO_IDS[domain], | |
| scenario_name=scenario_name, | |
| category=domain, | |
| track_label=FALLBACK_TRACK_LABELS.get(domain, domain), | |
| source_url="", | |
| ) | |
| for domain, scenario_name in FALLBACK_SCENARIO_NAMES.items() | |
| ] | |
| def _read_mission_mix_specs(*, include_non_smoke: bool) -> list[ScenarioSpec]: | |
| mission_mix = load_json("mission_mix.json", required=False) | |
| if mission_mix is None: | |
| return [] | |
| if not isinstance(mission_mix, Mapping): | |
| raise PayloadBuildError("mission_mix.json must be a JSON object") | |
| entries = mission_mix.get("primary_mix") | |
| if not isinstance(entries, Sequence) or isinstance(entries, (str, bytes, bytearray)): | |
| raise PayloadBuildError("mission_mix.json is missing primary_mix") | |
| specs: list[ScenarioSpec] = [] | |
| for entry in entries: | |
| if not isinstance(entry, Mapping): | |
| continue | |
| smoke = bool(entry.get("smoke", False)) | |
| if not include_non_smoke and not smoke: | |
| continue | |
| domain = _canonical_domain(str(entry.get("domain") or "")) | |
| if not domain or domain == "general": | |
| continue | |
| registry_spec: Any | None = None | |
| if get_domain_spec is not None: | |
| try: | |
| registry_spec = get_domain_spec(domain) | |
| except Exception: | |
| registry_spec = None | |
| fallback_name = FALLBACK_SCENARIO_NAMES.get(domain, str(entry.get("scenario_id") or domain)) | |
| scenario_name = str( | |
| entry.get("scenario_name") | |
| or getattr(registry_spec, "scenario_name", "") | |
| or fallback_name | |
| ) | |
| scenario_id = str( | |
| entry.get("scenario_id") | |
| or getattr(registry_spec, "scenario_id", "") | |
| or _id_from_name(scenario_name) | |
| ) | |
| specs.append( | |
| _make_spec( | |
| domain=domain, | |
| scenario_id=scenario_id, | |
| scenario_name=scenario_name, | |
| category=str(entry.get("category") or getattr(registry_spec, "category", "") or domain), | |
| track_label=str(entry.get("track_label") or getattr(registry_spec, "track_label", "") or FALLBACK_TRACK_LABELS.get(domain, domain)), | |
| source_url=str(entry.get("source_url") or getattr(registry_spec, "source_url", "") or ""), | |
| smoke=smoke, | |
| weight=float(entry.get("weight") or 1.0), | |
| ) | |
| ) | |
| return _dedupe_specs(specs) | |
| def _dedupe_specs(specs: Sequence[ScenarioSpec]) -> list[ScenarioSpec]: | |
| by_domain: dict[str, ScenarioSpec] = {} | |
| for spec in specs: | |
| by_domain.setdefault(spec.domain, spec) | |
| return [by_domain[key] for key in sorted(by_domain)] | |
| def _select_specs(source: str, *, include_non_smoke: bool) -> tuple[list[ScenarioSpec], str, list[str]]: | |
| warnings: list[str] = [] | |
| chosen_source = source | |
| if source in {"registry", "auto"}: | |
| try: | |
| specs = _read_registry_specs() | |
| if specs: | |
| if len(specs) != EXPECTED_DOMAIN_COUNT: | |
| warnings.append(f"registry returned {len(specs)} domains; expected {EXPECTED_DOMAIN_COUNT}") | |
| return (_dedupe_specs(specs), "registry", warnings) | |
| except Exception as exc: | |
| if source == "registry": | |
| raise | |
| warnings.append(f"registry unavailable; falling back to mission_mix/fallback table ({type(exc).__name__}: {exc})") | |
| if source in {"mission_mix", "auto"}: | |
| specs = _read_mission_mix_specs(include_non_smoke=include_non_smoke) | |
| if specs: | |
| chosen_source = "mission_mix" | |
| return (specs, chosen_source, warnings) | |
| if source == "mission_mix": | |
| raise PayloadBuildError("mission_mix did not yield any usable entries") | |
| warnings.append("mission_mix unavailable or empty; using built-in Sprint 4 fallback specs") | |
| return (_dedupe_specs(_fallback_specs()), "fallback", warnings) | |
| def _matches_only(spec: ScenarioSpec, only_set: set[str]) -> bool: | |
| if not only_set: | |
| return True | |
| candidates = { | |
| spec.domain, | |
| _normalize_name(spec.domain), | |
| spec.scenario_id, | |
| _id_from_name(spec.scenario_id), | |
| spec.scenario_name, | |
| _normalize_name(spec.scenario_name), | |
| _id_from_name(spec.scenario_name), | |
| spec.category, | |
| _normalize_name(spec.category), | |
| } | |
| return bool({item for item in candidates if item} & only_set) | |
| def _resolve_fixture(spec: ScenarioSpec, *, strict_fixtures: bool) -> tuple[str, Mapping[str, Any], list[str]]: | |
| warnings: list[str] = [] | |
| candidates = list(spec.fixture_candidates) | |
| if f"sample_actions_{spec.domain}.json" not in candidates: | |
| candidates.append(f"sample_actions_{spec.domain}.json") | |
| for candidate in candidates: | |
| path = _first_existing_path(candidate) | |
| if path is None: | |
| continue | |
| data = load_json(path) | |
| if not isinstance(data, Mapping): | |
| raise PayloadBuildError(f"fixture '{path.name}' for {spec.domain}/{spec.scenario_name} must be a JSON object") | |
| if path.name != candidates[0]: | |
| warnings.append( | |
| f"{spec.domain}: preferred fixture '{candidates[0]}' missing; using fallback '{path.name}'" | |
| ) | |
| return (path.name, data, warnings) | |
| if strict_fixtures: | |
| raise PayloadBuildError( | |
| f"missing fixture for {spec.domain}/{spec.scenario_name}; tried: {', '.join(candidates)}" | |
| ) | |
| warnings.append( | |
| f"{spec.domain}: no sample_actions fixture found; using synthetic default action plan" | |
| ) | |
| synthetic_fixture = { | |
| "domain": spec.domain, | |
| "scenario_id": spec.scenario_id, | |
| "scenario_name": spec.scenario_name, | |
| "notes": ["synthetic fixture generated by build_sample_payloads.py because no sample_actions file was available"], | |
| "action_plan": [_deepcopy_jsonable(step) for step in spec.default_action_plan], | |
| } | |
| return ("<synthetic>", synthetic_fixture, warnings) | |
| def _normalize_action_entry(item: Mapping[str, Any]) -> dict[str, Any]: | |
| if "name" in item: | |
| return {"name": str(item.get("name") or ""), "args": dict(item.get("args") or {})} | |
| if "action" in item: | |
| args = dict(item.get("args") or {}) | |
| for key, value in item.items(): | |
| if key not in {"action", "name", "args"}: | |
| args[key] = _deepcopy_jsonable(value) | |
| return {"name": str(item.get("action") or ""), "args": args} | |
| return {"name": str(item.get("tool") or item.get("operation") or "advance"), "args": dict(item.get("args") or {})} | |
| def _extract_action_plan(fixture: Mapping[str, Any], spec: ScenarioSpec) -> tuple[list[dict[str, Any]], str]: | |
| examples = fixture.get("action_examples") | |
| if isinstance(examples, Mapping): | |
| for key in ("canonical", "shorthand"): | |
| plan = [_normalize_action_entry(item) for item in _as_list_of_mappings(examples.get(key))] | |
| plan = [step for step in plan if step.get("name")] | |
| if plan: | |
| return (plan, f"action_examples.{key}") | |
| for key in ("action_plan", "actions", "steps"): | |
| plan = [_normalize_action_entry(item) for item in _as_list_of_mappings(fixture.get(key))] | |
| plan = [step for step in plan if step.get("name")] | |
| if plan: | |
| return (plan, key) | |
| episodes = fixture.get("episodes") | |
| for episode in _as_list_of_mappings(episodes): | |
| plan = [_normalize_action_entry(item) for item in _as_list_of_mappings(episode.get("action_plan"))] | |
| plan = [step for step in plan if step.get("name")] | |
| if plan: | |
| return (plan, "episodes[0].action_plan") | |
| return ([_deepcopy_jsonable(step) for step in spec.default_action_plan], "default_action_plan") | |
| def _load_env_seed() -> dict[str, Any]: | |
| env_seed = load_json("env_seed.json", required=False) | |
| if env_seed is None: | |
| return {"seed": 42} | |
| if not isinstance(env_seed, Mapping): | |
| raise PayloadBuildError("env_seed.json must be a JSON object") | |
| return dict(env_seed) | |
| def _build_reset_payload(*, spec: ScenarioSpec, fixture: Mapping[str, Any], env_seed: Mapping[str, Any]) -> dict[str, Any]: | |
| payload: dict[str, Any] = dict(env_seed) | |
| reset_payload = fixture.get("reset_payload") | |
| if isinstance(reset_payload, Mapping): | |
| payload.update(dict(reset_payload)) | |
| payload["seed"] = int(payload.get("seed", 42)) | |
| payload["scenario_id"] = spec.scenario_id | |
| payload["scenario_name"] = spec.scenario_name | |
| payload["mission_id"] = spec.mission_id | |
| options = dict(payload.get("options") or {}) | |
| options["env_id"] = spec.env_id | |
| options["domain"] = spec.domain | |
| options["category"] = spec.category | |
| options["max_steps"] = int(spec.max_steps) | |
| options["target_score"] = int(spec.target_score) | |
| options.setdefault("scenario_name", spec.scenario_name) | |
| payload["options"] = options | |
| return payload | |
| def _fixture_notes(fixture: Mapping[str, Any], *, action_plan_source: str, warnings: Sequence[str]) -> list[str]: | |
| notes = [str(item) for item in (fixture.get("notes") or []) if str(item).strip()] | |
| notes.append(f"action_plan_source={action_plan_source}") | |
| notes.extend(str(item) for item in warnings if str(item).strip()) | |
| seen: set[str] = set() | |
| output: list[str] = [] | |
| for item in notes: | |
| if item not in seen: | |
| seen.add(item) | |
| output.append(item) | |
| return output | |
| def _build_client_bundle( | |
| *, | |
| base_url: str, | |
| timeout: float, | |
| env_name: str, | |
| spec: ScenarioSpec, | |
| fixture_name: str, | |
| fixture: Mapping[str, Any], | |
| reset_payload: Mapping[str, Any], | |
| action_plan: Sequence[Mapping[str, Any]], | |
| action_plan_source: str, | |
| warnings: Sequence[str], | |
| ) -> dict[str, Any]: | |
| return { | |
| "kind": "client_bundle", | |
| "base_url": base_url.rstrip("/"), | |
| "timeout": timeout, | |
| "env_name": env_name, | |
| "domain": spec.domain, | |
| "category": spec.category, | |
| "scenario_id": spec.scenario_id, | |
| "scenario_name": spec.scenario_name, | |
| "track_label": spec.track_label, | |
| "source_url": spec.source_url, | |
| "weight": spec.weight, | |
| "smoke": spec.smoke, | |
| "fixture": fixture_name, | |
| "fixture_candidates": list(spec.fixture_candidates), | |
| "action_plan_source": action_plan_source, | |
| "canonical_env_id": spec.env_id, | |
| "reset_payload": dict(reset_payload), | |
| "action_plan": [_deepcopy_jsonable(item) for item in action_plan], | |
| "expected_flow": ["health", "reset", "step", "state"], | |
| "notes": _fixture_notes(fixture, action_plan_source=action_plan_source, warnings=warnings), | |
| } | |
| def _build_openenv_eval_payload( | |
| *, | |
| base_url: str, | |
| timeout: float, | |
| env_name: str, | |
| spec: ScenarioSpec, | |
| fixture_name: str, | |
| reset_payload: Mapping[str, Any], | |
| action_plan: Sequence[Mapping[str, Any]], | |
| action_plan_source: str, | |
| ) -> dict[str, Any]: | |
| return { | |
| "adapter": "openenv", | |
| "environment_url": base_url.rstrip("/"), | |
| "base_url": base_url.rstrip("/"), | |
| "env_name": env_name, | |
| "timeout": timeout, | |
| "live_check": True, | |
| "require_success": False, | |
| "seed": reset_payload.get("seed"), | |
| "domain": spec.domain, | |
| "category": spec.category, | |
| "scenario_id": spec.scenario_id, | |
| "scenario_name": spec.scenario_name, | |
| "track_label": spec.track_label, | |
| "source_url": spec.source_url, | |
| "fixture": fixture_name, | |
| "action_plan_source": action_plan_source, | |
| "canonical_env_id": spec.env_id, | |
| "reset_payload": dict(reset_payload), | |
| "action_plan": [_deepcopy_jsonable(item) for item in action_plan], | |
| } | |
| def _registry_validation_report(import_all: bool) -> Mapping[str, Any] | None: | |
| if validate_registry is None: | |
| return {"ok": False, "error": REGISTRY_IMPORT_ERROR or "validate_registry unavailable"} | |
| try: | |
| report = validate_registry(import_all=import_all) | |
| except TypeError: | |
| report = validate_registry() # type: ignore[misc] | |
| except Exception as exc: | |
| return {"ok": False, "error": f"{type(exc).__name__}: {exc}"} | |
| return report if isinstance(report, Mapping) else {"ok": False, "error": "validate_registry returned a non-object"} | |
| def build_payloads( | |
| *, | |
| base_url: str, | |
| timeout: float, | |
| output_dir: Path, | |
| only: Sequence[str] | None = None, | |
| include_non_smoke: bool = False, | |
| source: str = "auto", | |
| strict_fixtures: bool = False, | |
| validate_registry_imports: bool = False, | |
| ) -> dict[str, Any]: | |
| if source not in {"auto", "registry", "mission_mix", "fallback"}: | |
| raise PayloadBuildError("source must be one of: auto, registry, mission_mix, fallback") | |
| env_seed = _load_env_seed() | |
| env_name = DEFAULT_ENV_NAME | |
| specs, selected_source, warnings = _select_specs(source, include_non_smoke=include_non_smoke) | |
| only_set = _normalize_only(only) | |
| specs = [spec for spec in specs if _matches_only(spec, only_set)] | |
| if not specs: | |
| raise PayloadBuildError("no scenario specs matched the requested filters") | |
| output_dir.mkdir(parents=True, exist_ok=True) | |
| client_bundles: list[dict[str, Any]] = [] | |
| openenv_payloads: list[dict[str, Any]] = [] | |
| written_files: list[str] = [] | |
| fixture_report: dict[str, str] = {} | |
| for spec in specs: | |
| fixture_name, fixture, fixture_warnings = _resolve_fixture(spec, strict_fixtures=strict_fixtures) | |
| warnings.extend(fixture_warnings) | |
| action_plan, action_plan_source = _extract_action_plan(fixture, spec) | |
| reset_payload = _build_reset_payload(spec=spec, fixture=fixture, env_seed=env_seed) | |
| slug = f"{_slugify(spec.domain)}__{_slugify(spec.scenario_id)}" | |
| client_bundle = _build_client_bundle( | |
| base_url=base_url, | |
| timeout=timeout, | |
| env_name=env_name, | |
| spec=spec, | |
| fixture_name=fixture_name, | |
| fixture=fixture, | |
| reset_payload=reset_payload, | |
| action_plan=action_plan, | |
| action_plan_source=action_plan_source, | |
| warnings=fixture_warnings, | |
| ) | |
| openenv_payload = _build_openenv_eval_payload( | |
| base_url=base_url, | |
| timeout=timeout, | |
| env_name=env_name, | |
| spec=spec, | |
| fixture_name=fixture_name, | |
| reset_payload=reset_payload, | |
| action_plan=action_plan, | |
| action_plan_source=action_plan_source, | |
| ) | |
| client_name = f"{slug}.client_bundle.json" | |
| openenv_name = f"{slug}.openenv_eval.json" | |
| dump_json(output_dir / client_name, client_bundle) | |
| dump_json(output_dir / openenv_name, openenv_payload) | |
| written_files.extend([client_name, openenv_name]) | |
| client_bundles.append(client_bundle) | |
| openenv_payloads.append(openenv_payload) | |
| fixture_report[spec.domain] = fixture_name | |
| aggregate_client_name = "all_client_bundles.json" | |
| aggregate_eval_name = "all_openenv_eval_payloads.json" | |
| index_name = "index.json" | |
| dump_json(output_dir / aggregate_client_name, client_bundles) | |
| dump_json(output_dir / aggregate_eval_name, openenv_payloads) | |
| registry_report = None | |
| if selected_source == "registry" or validate_registry_imports: | |
| registry_report = _registry_validation_report(import_all=validate_registry_imports) | |
| index_payload = { | |
| "ok": True, | |
| "env_name": env_name, | |
| "base_url": base_url.rstrip("/"), | |
| "timeout": timeout, | |
| "source": selected_source, | |
| "strict_fixtures": strict_fixtures, | |
| "count": len(specs), | |
| "expected_domain_count": EXPECTED_DOMAIN_COUNT, | |
| "domain_count_ok": len(specs) == EXPECTED_DOMAIN_COUNT if not only_set else None, | |
| "generated": { | |
| "client_bundles": aggregate_client_name, | |
| "openenv_eval_payloads": aggregate_eval_name, | |
| }, | |
| "files": written_files + [aggregate_client_name, aggregate_eval_name], | |
| "fixtures": fixture_report, | |
| "warnings": sorted(set(warnings)), | |
| "selected": [ | |
| { | |
| "domain": spec.domain, | |
| "category": spec.category, | |
| "scenario_id": spec.scenario_id, | |
| "scenario_name": spec.scenario_name, | |
| "track_label": spec.track_label, | |
| "source_url": spec.source_url, | |
| "canonical_env_id": spec.env_id, | |
| } | |
| for spec in specs | |
| ], | |
| "registry": registry_report, | |
| } | |
| dump_json(output_dir / index_name, index_payload) | |
| return { | |
| "ok": True, | |
| "output_dir": str(output_dir), | |
| "env_name": env_name, | |
| "base_url": base_url.rstrip("/"), | |
| "timeout": timeout, | |
| "source": selected_source, | |
| "count": len(specs), | |
| "expected_domain_count": EXPECTED_DOMAIN_COUNT, | |
| "domain_count_ok": len(specs) == EXPECTED_DOMAIN_COUNT if not only_set else None, | |
| "warnings": sorted(set(warnings)), | |
| "files": written_files + [aggregate_client_name, aggregate_eval_name, index_name], | |
| "fixtures": fixture_report, | |
| } | |
| def main(argv: Sequence[str] | None = None) -> int: | |
| parser = argparse.ArgumentParser(description="Build reusable sample payload JSON files for omnibench_aegis_env.") | |
| parser.add_argument("--base-url", default=DEFAULT_BASE_URL, help="Environment server base URL") | |
| parser.add_argument("--timeout", type=float, default=DEFAULT_TIMEOUT, help="Timeout to record in generated payloads") | |
| parser.add_argument("--output-dir", default=str(DEFAULT_OUTPUT_DIR), help="Directory where payload JSON files will be written") | |
| parser.add_argument("--only", nargs="*", help="Restrict to one or more domains, scenario IDs, or scenario names") | |
| parser.add_argument("--source", choices=("auto", "registry", "mission_mix", "fallback"), default="auto", help="Scenario source") | |
| parser.add_argument("--include-non-smoke", action="store_true", help="Include mission_mix entries even if their smoke flag is false") | |
| parser.add_argument("--strict-fixtures", action="store_true", help="Fail when a domain-specific sample_actions fixture is missing") | |
| parser.add_argument("--validate-registry-imports", action="store_true", help="Also validate domain imports via registry.validate_registry(import_all=True)") | |
| parser.add_argument("--json", action="store_true", help="Print the final summary as JSON") | |
| args = parser.parse_args(list(argv) if argv is not None else None) | |
| try: | |
| report = build_payloads( | |
| base_url=args.base_url, | |
| timeout=args.timeout, | |
| output_dir=Path(args.output_dir).resolve(), | |
| only=args.only, | |
| include_non_smoke=args.include_non_smoke, | |
| source=args.source, | |
| strict_fixtures=args.strict_fixtures, | |
| validate_registry_imports=args.validate_registry_imports, | |
| ) | |
| except PayloadBuildError as exc: | |
| report = {"ok": False, "error": str(exc), "type": "contract_error"} | |
| if args.json: | |
| print(json.dumps(report, indent=2, ensure_ascii=False)) | |
| else: | |
| print(f"[fail] {report['error']}") | |
| return 1 | |
| except Exception as exc: # pragma: no cover - last-resort diagnostic path. | |
| report = {"ok": False, "error": str(exc), "type": exc.__class__.__name__} | |
| if args.json: | |
| print(json.dumps(report, indent=2, ensure_ascii=False)) | |
| else: | |
| print(f"[fail] {report['error']}") | |
| return 1 | |
| if args.json: | |
| print(json.dumps(report, indent=2, ensure_ascii=False)) | |
| else: | |
| print("[ok] sample payloads generated") | |
| print(f"- output_dir: {report['output_dir']}") | |
| print(f"- source: {report['source']}") | |
| print(f"- count: {report['count']}/{report['expected_domain_count']}") | |
| if report.get("warnings"): | |
| print("- warnings:") | |
| for warning in report["warnings"]: | |
| print(f" - {warning}") | |
| print("- files:") | |
| for name in report["files"]: | |
| print(f" - {name}") | |
| return 0 | |
| if __name__ == "__main__": | |
| raise SystemExit(main()) | |