| | """Cryptographic integrity layer for ST-WebAgentBench leaderboard submissions. |
| | |
| | Generates tamper-evident evidence during evaluation: |
| | - Code pinning: SHA256 of critical source files (evaluators, tasks, env) |
| | - Trajectory hash chain: per-task hash binding actions + safety report + reward |
| | - Manifest seal: deterministic hash of the entire integrity manifest |
| | - HMAC signature: anti-forgery guarantee using a shared secret key |
| | |
| | The leaderboard server compares these against known-good values to detect |
| | modified evaluation code, tampered trajectories, or replayed submissions. |
| | """ |
| |
|
| | import hashlib |
| | import hmac as _hmac |
| | import json |
| | import logging |
| | import os |
| | import time |
| | import uuid |
| | from dataclasses import asdict, dataclass, field |
| | from pathlib import Path |
| | from typing import Any, Dict, List, Optional |
| |
|
| | logger = logging.getLogger(__name__) |
| |
|
| | BENCHMARK_VERSION = "1.0.0" |
| |
|
| | |
| | |
| | _CODE_ARTIFACTS = { |
| | "evaluators_sha256": "stwebagentbench/evaluation_harness/evaluators.py", |
| | "task_config_sha256": "stwebagentbench/test.raw.json", |
| | "custom_env_sha256": "stwebagentbench/browser_env/custom_env.py", |
| | "helper_functions_sha256": "stwebagentbench/evaluation_harness/helper_functions.py", |
| | } |
| |
|
| |
|
| | @dataclass |
| | class IntegrityManifest: |
| | """Cryptographic manifest generated during evaluation. |
| | |
| | Embeds hashes of all critical artifacts so the leaderboard server |
| | can detect any post-hoc tampering with results, code, or task definitions. |
| | """ |
| |
|
| | |
| | run_id: str = field(default_factory=lambda: str(uuid.uuid4())) |
| | benchmark_version: str = BENCHMARK_VERSION |
| | timestamp_start: float = field(default_factory=time.time) |
| | timestamp_end: Optional[float] = None |
| |
|
| | |
| | evaluators_sha256: str = "" |
| | task_config_sha256: str = "" |
| | custom_env_sha256: str = "" |
| | helper_functions_sha256: str = "" |
| |
|
| | |
| | task_hashes: Dict[int, str] = field(default_factory=dict) |
| |
|
| | |
| | manifest_hash: str = "" |
| |
|
| | |
| | hmac_signature: str = "" |
| |
|
| | def to_dict(self) -> dict: |
| | return asdict(self) |
| |
|
| | @classmethod |
| | def from_dict(cls, data: dict) -> "IntegrityManifest": |
| | return cls(**data) |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| |
|
| | def compute_file_hash(filepath: str) -> str: |
| | """Compute SHA256 hash of a file.""" |
| | h = hashlib.sha256() |
| | with open(filepath, "rb") as f: |
| | for chunk in iter(lambda: f.read(8192), b""): |
| | h.update(chunk) |
| | return h.hexdigest() |
| |
|
| |
|
| | def compute_data_hash(data: Any) -> str: |
| | """Compute SHA256 of a JSON-serializable object using canonical form. |
| | |
| | Uses sorted keys and compact separators to ensure deterministic output |
| | regardless of dict ordering or whitespace. |
| | """ |
| | canonical = json.dumps(data, sort_keys=True, separators=(",", ":"), default=str) |
| | return hashlib.sha256(canonical.encode("utf-8")).hexdigest() |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| |
|
| | def pin_code_artifacts(project_root: str) -> Dict[str, str]: |
| | """Compute SHA256 hashes of all critical source files. |
| | |
| | These are compared against known-good hashes on the leaderboard server. |
| | If any hash mismatches, the submission is flagged as using modified code. |
| | |
| | Args: |
| | project_root: Absolute path to the project root directory. |
| | |
| | Returns: |
| | Dict mapping hash field names to their SHA256 hex digests. |
| | """ |
| | root = Path(project_root) |
| | hashes = {} |
| | for key, rel_path in _CODE_ARTIFACTS.items(): |
| | full_path = root / rel_path |
| | if full_path.exists(): |
| | hashes[key] = compute_file_hash(str(full_path)) |
| | else: |
| | logger.warning("Code artifact not found: %s", full_path) |
| | hashes[key] = "" |
| | return hashes |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| |
|
| | def create_trajectory_hash( |
| | task_id: int, |
| | actions: List[dict], |
| | safety_report: List[dict], |
| | total_reward: float, |
| | ) -> str: |
| | """Create a hash for a single task's trajectory evidence. |
| | |
| | Binds the ordered action sequence, the full safety report, and |
| | the task reward cryptographically — any post-hoc edit to any |
| | component invalidates the hash. |
| | |
| | Args: |
| | task_id: The benchmark task identifier. |
| | actions: List of action dicts, each with 'action_type' and 'action_args'. |
| | safety_report: List of per-policy report dicts from the evaluator. |
| | total_reward: The task reward (0.0 or 1.0). |
| | |
| | Returns: |
| | SHA256 hex digest of the canonical JSON representation. |
| | """ |
| | chain_data = { |
| | "task_id": task_id, |
| | "action_sequence": [ |
| | { |
| | "step": i, |
| | "action_type": a.get("action_type", ""), |
| | "action_args": a.get("action_args", []), |
| | } |
| | for i, a in enumerate(actions) |
| | ], |
| | "safety_report": _normalize_safety_report(safety_report), |
| | "total_reward": total_reward, |
| | } |
| | return compute_data_hash(chain_data) |
| |
|
| |
|
| | def _normalize_safety_report(report: List[dict]) -> List[dict]: |
| | """Extract only the hashable fields from safety report entries. |
| | |
| | Strips non-deterministic or implementation-specific fields while |
| | preserving all evaluation-relevant data. |
| | """ |
| | normalized = [] |
| | for entry in report: |
| | normalized.append({ |
| | "violated": bool(entry.get("violated", False)), |
| | "dormant": bool(entry.get("dormant", False)), |
| | "violating_step": entry.get("violating_step"), |
| | "eval_type": entry.get("eval_type"), |
| | }) |
| | return normalized |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| |
|
| | def seal_manifest(manifest: IntegrityManifest) -> str: |
| | """Compute the final seal over the entire manifest. |
| | |
| | Uses a deterministic hash. While this alone does not prevent |
| | recomputation by an adversary, it serves as a structural integrity |
| | check. The HMAC signature (see compute_hmac_signature) provides |
| | the actual anti-forgery guarantee. |
| | |
| | Args: |
| | manifest: The integrity manifest to seal. |
| | |
| | Returns: |
| | SHA256 hex digest of the manifest contents (excluding the seal |
| | and HMAC signature). |
| | """ |
| | manifest_dict = manifest.to_dict() |
| | manifest_dict.pop("manifest_hash", None) |
| | manifest_dict.pop("hmac_signature", None) |
| | return compute_data_hash(manifest_dict) |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | |
| | SIGNING_KEY_ENV_VAR = "ST_BENCH_SIGNING_KEY" |
| |
|
| |
|
| | def compute_hmac_signature(manifest: IntegrityManifest, signing_key: str) -> str: |
| | """Compute HMAC-SHA256 over the manifest content. |
| | |
| | Signs the same content as seal_manifest but with a secret key, |
| | making it impossible to forge without knowing the key. |
| | |
| | Args: |
| | manifest: The integrity manifest to sign. |
| | signing_key: The shared secret key. |
| | |
| | Returns: |
| | HMAC-SHA256 hex digest. |
| | """ |
| | manifest_dict = manifest.to_dict() |
| | manifest_dict.pop("manifest_hash", None) |
| | manifest_dict.pop("hmac_signature", None) |
| | canonical = json.dumps(manifest_dict, sort_keys=True, separators=(",", ":"), default=str) |
| | return _hmac.new( |
| | signing_key.encode("utf-8"), |
| | canonical.encode("utf-8"), |
| | hashlib.sha256, |
| | ).hexdigest() |
| |
|
| |
|
| | def verify_hmac_signature( |
| | manifest: IntegrityManifest, signing_key: str |
| | ) -> bool: |
| | """Verify the HMAC signature on a manifest. |
| | |
| | Args: |
| | manifest: The manifest with hmac_signature field set. |
| | signing_key: The shared secret key. |
| | |
| | Returns: |
| | True if the signature is valid, False otherwise. |
| | """ |
| | if not manifest.hmac_signature: |
| | return False |
| | expected = compute_hmac_signature(manifest, signing_key) |
| | return _hmac.compare_digest(manifest.hmac_signature, expected) |
| |
|
| |
|
| | def finalize_manifest(manifest: IntegrityManifest) -> IntegrityManifest: |
| | """Set the end timestamp, compute the seal, and sign with HMAC. |
| | |
| | Call this after all tasks have been evaluated. |
| | |
| | If ST_BENCH_SIGNING_KEY is set in the environment, the manifest |
| | is HMAC-signed. Otherwise, hmac_signature is left empty (the |
| | leaderboard server will flag unsigned submissions). |
| | |
| | Args: |
| | manifest: The manifest to finalize. |
| | |
| | Returns: |
| | The same manifest with timestamp_end, manifest_hash, and |
| | optionally hmac_signature set. |
| | """ |
| | manifest.timestamp_end = time.time() |
| | manifest.manifest_hash = seal_manifest(manifest) |
| |
|
| | |
| | signing_key = os.environ.get(SIGNING_KEY_ENV_VAR, "").strip() |
| | if signing_key: |
| | manifest.hmac_signature = compute_hmac_signature(manifest, signing_key) |
| | logger.info("Manifest HMAC-signed successfully") |
| |
|
| | return manifest |
| |
|
| |
|
| | def save_manifest(manifest: IntegrityManifest, output_path: str) -> None: |
| | """Write the integrity manifest to a JSON file.""" |
| | with open(output_path, "w") as f: |
| | json.dump(manifest.to_dict(), f, indent=2) |
| | logger.info("Integrity manifest saved to %s", output_path) |
| |
|
| |
|
| | def load_manifest(filepath: str) -> IntegrityManifest: |
| | """Load an integrity manifest from a JSON file.""" |
| | with open(filepath, "r") as f: |
| | data = json.load(f) |
| | return IntegrityManifest.from_dict(data) |
| |
|