dolev31's picture
Add 80 modality-challenge tasks and leaderboard submission system
ae07f06
"""Cryptographic integrity layer for ST-WebAgentBench leaderboard submissions.
Generates tamper-evident evidence during evaluation:
- Code pinning: SHA256 of critical source files (evaluators, tasks, env)
- Trajectory hash chain: per-task hash binding actions + safety report + reward
- Manifest seal: deterministic hash of the entire integrity manifest
- HMAC signature: anti-forgery guarantee using a shared secret key
The leaderboard server compares these against known-good values to detect
modified evaluation code, tampered trajectories, or replayed submissions.
"""
import hashlib
import hmac as _hmac
import json
import logging
import os
import time
import uuid
from dataclasses import asdict, dataclass, field
from pathlib import Path
from typing import Any, Dict, List, Optional
logger = logging.getLogger(__name__)
BENCHMARK_VERSION = "1.0.0"
# Critical source files whose SHA256 must match known-good hashes on the server.
# Paths are relative to the project root.
_CODE_ARTIFACTS = {
"evaluators_sha256": "stwebagentbench/evaluation_harness/evaluators.py",
"task_config_sha256": "stwebagentbench/test.raw.json",
"custom_env_sha256": "stwebagentbench/browser_env/custom_env.py",
"helper_functions_sha256": "stwebagentbench/evaluation_harness/helper_functions.py",
}
@dataclass
class IntegrityManifest:
"""Cryptographic manifest generated during evaluation.
Embeds hashes of all critical artifacts so the leaderboard server
can detect any post-hoc tampering with results, code, or task definitions.
"""
# Run identity
run_id: str = field(default_factory=lambda: str(uuid.uuid4()))
benchmark_version: str = BENCHMARK_VERSION
timestamp_start: float = field(default_factory=time.time)
timestamp_end: Optional[float] = None
# Code integrity pins (populated by pin_code_artifacts)
evaluators_sha256: str = ""
task_config_sha256: str = ""
custom_env_sha256: str = ""
helper_functions_sha256: str = ""
# Per-task trajectory hashes (task_id -> hash)
task_hashes: Dict[int, str] = field(default_factory=dict)
# Final seal over the entire manifest
manifest_hash: str = ""
# HMAC signature (requires ST_BENCH_SIGNING_KEY env var)
hmac_signature: str = ""
def to_dict(self) -> dict:
return asdict(self)
@classmethod
def from_dict(cls, data: dict) -> "IntegrityManifest":
return cls(**data)
# ---------------------------------------------------------------------------
# Hashing utilities
# ---------------------------------------------------------------------------
def compute_file_hash(filepath: str) -> str:
"""Compute SHA256 hash of a file."""
h = hashlib.sha256()
with open(filepath, "rb") as f:
for chunk in iter(lambda: f.read(8192), b""):
h.update(chunk)
return h.hexdigest()
def compute_data_hash(data: Any) -> str:
"""Compute SHA256 of a JSON-serializable object using canonical form.
Uses sorted keys and compact separators to ensure deterministic output
regardless of dict ordering or whitespace.
"""
canonical = json.dumps(data, sort_keys=True, separators=(",", ":"), default=str)
return hashlib.sha256(canonical.encode("utf-8")).hexdigest()
# ---------------------------------------------------------------------------
# Code pinning
# ---------------------------------------------------------------------------
def pin_code_artifacts(project_root: str) -> Dict[str, str]:
"""Compute SHA256 hashes of all critical source files.
These are compared against known-good hashes on the leaderboard server.
If any hash mismatches, the submission is flagged as using modified code.
Args:
project_root: Absolute path to the project root directory.
Returns:
Dict mapping hash field names to their SHA256 hex digests.
"""
root = Path(project_root)
hashes = {}
for key, rel_path in _CODE_ARTIFACTS.items():
full_path = root / rel_path
if full_path.exists():
hashes[key] = compute_file_hash(str(full_path))
else:
logger.warning("Code artifact not found: %s", full_path)
hashes[key] = ""
return hashes
# ---------------------------------------------------------------------------
# Trajectory hashing
# ---------------------------------------------------------------------------
def create_trajectory_hash(
task_id: int,
actions: List[dict],
safety_report: List[dict],
total_reward: float,
) -> str:
"""Create a hash for a single task's trajectory evidence.
Binds the ordered action sequence, the full safety report, and
the task reward cryptographically — any post-hoc edit to any
component invalidates the hash.
Args:
task_id: The benchmark task identifier.
actions: List of action dicts, each with 'action_type' and 'action_args'.
safety_report: List of per-policy report dicts from the evaluator.
total_reward: The task reward (0.0 or 1.0).
Returns:
SHA256 hex digest of the canonical JSON representation.
"""
chain_data = {
"task_id": task_id,
"action_sequence": [
{
"step": i,
"action_type": a.get("action_type", ""),
"action_args": a.get("action_args", []),
}
for i, a in enumerate(actions)
],
"safety_report": _normalize_safety_report(safety_report),
"total_reward": total_reward,
}
return compute_data_hash(chain_data)
def _normalize_safety_report(report: List[dict]) -> List[dict]:
"""Extract only the hashable fields from safety report entries.
Strips non-deterministic or implementation-specific fields while
preserving all evaluation-relevant data.
"""
normalized = []
for entry in report:
normalized.append({
"violated": bool(entry.get("violated", False)),
"dormant": bool(entry.get("dormant", False)),
"violating_step": entry.get("violating_step"),
"eval_type": entry.get("eval_type"),
})
return normalized
# ---------------------------------------------------------------------------
# Manifest seal
# ---------------------------------------------------------------------------
def seal_manifest(manifest: IntegrityManifest) -> str:
"""Compute the final seal over the entire manifest.
Uses a deterministic hash. While this alone does not prevent
recomputation by an adversary, it serves as a structural integrity
check. The HMAC signature (see compute_hmac_signature) provides
the actual anti-forgery guarantee.
Args:
manifest: The integrity manifest to seal.
Returns:
SHA256 hex digest of the manifest contents (excluding the seal
and HMAC signature).
"""
manifest_dict = manifest.to_dict()
manifest_dict.pop("manifest_hash", None)
manifest_dict.pop("hmac_signature", None)
return compute_data_hash(manifest_dict)
# ---------------------------------------------------------------------------
# HMAC signing (anti-forgery)
# ---------------------------------------------------------------------------
# Environment variable name for the signing key (overrides the embedded default).
SIGNING_KEY_ENV_VAR = "ST_BENCH_SIGNING_KEY"
def compute_hmac_signature(manifest: IntegrityManifest, signing_key: str) -> str:
"""Compute HMAC-SHA256 over the manifest content.
Signs the same content as seal_manifest but with a secret key,
making it impossible to forge without knowing the key.
Args:
manifest: The integrity manifest to sign.
signing_key: The shared secret key.
Returns:
HMAC-SHA256 hex digest.
"""
manifest_dict = manifest.to_dict()
manifest_dict.pop("manifest_hash", None)
manifest_dict.pop("hmac_signature", None)
canonical = json.dumps(manifest_dict, sort_keys=True, separators=(",", ":"), default=str)
return _hmac.new(
signing_key.encode("utf-8"),
canonical.encode("utf-8"),
hashlib.sha256,
).hexdigest()
def verify_hmac_signature(
manifest: IntegrityManifest, signing_key: str
) -> bool:
"""Verify the HMAC signature on a manifest.
Args:
manifest: The manifest with hmac_signature field set.
signing_key: The shared secret key.
Returns:
True if the signature is valid, False otherwise.
"""
if not manifest.hmac_signature:
return False
expected = compute_hmac_signature(manifest, signing_key)
return _hmac.compare_digest(manifest.hmac_signature, expected)
def finalize_manifest(manifest: IntegrityManifest) -> IntegrityManifest:
"""Set the end timestamp, compute the seal, and sign with HMAC.
Call this after all tasks have been evaluated.
If ST_BENCH_SIGNING_KEY is set in the environment, the manifest
is HMAC-signed. Otherwise, hmac_signature is left empty (the
leaderboard server will flag unsigned submissions).
Args:
manifest: The manifest to finalize.
Returns:
The same manifest with timestamp_end, manifest_hash, and
optionally hmac_signature set.
"""
manifest.timestamp_end = time.time()
manifest.manifest_hash = seal_manifest(manifest)
# Sign with HMAC — the Space always uses the env var secret
signing_key = os.environ.get(SIGNING_KEY_ENV_VAR, "").strip()
if signing_key:
manifest.hmac_signature = compute_hmac_signature(manifest, signing_key)
logger.info("Manifest HMAC-signed successfully")
return manifest
def save_manifest(manifest: IntegrityManifest, output_path: str) -> None:
"""Write the integrity manifest to a JSON file."""
with open(output_path, "w") as f:
json.dump(manifest.to_dict(), f, indent=2)
logger.info("Integrity manifest saved to %s", output_path)
def load_manifest(filepath: str) -> IntegrityManifest:
"""Load an integrity manifest from a JSON file."""
with open(filepath, "r") as f:
data = json.load(f)
return IntegrityManifest.from_dict(data)