File size: 10,156 Bytes
ae07f06 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 | """Cryptographic integrity layer for ST-WebAgentBench leaderboard submissions.
Generates tamper-evident evidence during evaluation:
- Code pinning: SHA256 of critical source files (evaluators, tasks, env)
- Trajectory hash chain: per-task hash binding actions + safety report + reward
- Manifest seal: deterministic hash of the entire integrity manifest
- HMAC signature: anti-forgery guarantee using a shared secret key
The leaderboard server compares these against known-good values to detect
modified evaluation code, tampered trajectories, or replayed submissions.
"""
import hashlib
import hmac as _hmac
import json
import logging
import os
import time
import uuid
from dataclasses import asdict, dataclass, field
from pathlib import Path
from typing import Any, Dict, List, Optional
logger = logging.getLogger(__name__)
BENCHMARK_VERSION = "1.0.0"
# Critical source files whose SHA256 must match known-good hashes on the server.
# Paths are relative to the project root.
_CODE_ARTIFACTS = {
"evaluators_sha256": "stwebagentbench/evaluation_harness/evaluators.py",
"task_config_sha256": "stwebagentbench/test.raw.json",
"custom_env_sha256": "stwebagentbench/browser_env/custom_env.py",
"helper_functions_sha256": "stwebagentbench/evaluation_harness/helper_functions.py",
}
@dataclass
class IntegrityManifest:
"""Cryptographic manifest generated during evaluation.
Embeds hashes of all critical artifacts so the leaderboard server
can detect any post-hoc tampering with results, code, or task definitions.
"""
# Run identity
run_id: str = field(default_factory=lambda: str(uuid.uuid4()))
benchmark_version: str = BENCHMARK_VERSION
timestamp_start: float = field(default_factory=time.time)
timestamp_end: Optional[float] = None
# Code integrity pins (populated by pin_code_artifacts)
evaluators_sha256: str = ""
task_config_sha256: str = ""
custom_env_sha256: str = ""
helper_functions_sha256: str = ""
# Per-task trajectory hashes (task_id -> hash)
task_hashes: Dict[int, str] = field(default_factory=dict)
# Final seal over the entire manifest
manifest_hash: str = ""
# HMAC signature (requires ST_BENCH_SIGNING_KEY env var)
hmac_signature: str = ""
def to_dict(self) -> dict:
return asdict(self)
@classmethod
def from_dict(cls, data: dict) -> "IntegrityManifest":
return cls(**data)
# ---------------------------------------------------------------------------
# Hashing utilities
# ---------------------------------------------------------------------------
def compute_file_hash(filepath: str) -> str:
"""Compute SHA256 hash of a file."""
h = hashlib.sha256()
with open(filepath, "rb") as f:
for chunk in iter(lambda: f.read(8192), b""):
h.update(chunk)
return h.hexdigest()
def compute_data_hash(data: Any) -> str:
"""Compute SHA256 of a JSON-serializable object using canonical form.
Uses sorted keys and compact separators to ensure deterministic output
regardless of dict ordering or whitespace.
"""
canonical = json.dumps(data, sort_keys=True, separators=(",", ":"), default=str)
return hashlib.sha256(canonical.encode("utf-8")).hexdigest()
# ---------------------------------------------------------------------------
# Code pinning
# ---------------------------------------------------------------------------
def pin_code_artifacts(project_root: str) -> Dict[str, str]:
"""Compute SHA256 hashes of all critical source files.
These are compared against known-good hashes on the leaderboard server.
If any hash mismatches, the submission is flagged as using modified code.
Args:
project_root: Absolute path to the project root directory.
Returns:
Dict mapping hash field names to their SHA256 hex digests.
"""
root = Path(project_root)
hashes = {}
for key, rel_path in _CODE_ARTIFACTS.items():
full_path = root / rel_path
if full_path.exists():
hashes[key] = compute_file_hash(str(full_path))
else:
logger.warning("Code artifact not found: %s", full_path)
hashes[key] = ""
return hashes
# ---------------------------------------------------------------------------
# Trajectory hashing
# ---------------------------------------------------------------------------
def create_trajectory_hash(
task_id: int,
actions: List[dict],
safety_report: List[dict],
total_reward: float,
) -> str:
"""Create a hash for a single task's trajectory evidence.
Binds the ordered action sequence, the full safety report, and
the task reward cryptographically — any post-hoc edit to any
component invalidates the hash.
Args:
task_id: The benchmark task identifier.
actions: List of action dicts, each with 'action_type' and 'action_args'.
safety_report: List of per-policy report dicts from the evaluator.
total_reward: The task reward (0.0 or 1.0).
Returns:
SHA256 hex digest of the canonical JSON representation.
"""
chain_data = {
"task_id": task_id,
"action_sequence": [
{
"step": i,
"action_type": a.get("action_type", ""),
"action_args": a.get("action_args", []),
}
for i, a in enumerate(actions)
],
"safety_report": _normalize_safety_report(safety_report),
"total_reward": total_reward,
}
return compute_data_hash(chain_data)
def _normalize_safety_report(report: List[dict]) -> List[dict]:
"""Extract only the hashable fields from safety report entries.
Strips non-deterministic or implementation-specific fields while
preserving all evaluation-relevant data.
"""
normalized = []
for entry in report:
normalized.append({
"violated": bool(entry.get("violated", False)),
"dormant": bool(entry.get("dormant", False)),
"violating_step": entry.get("violating_step"),
"eval_type": entry.get("eval_type"),
})
return normalized
# ---------------------------------------------------------------------------
# Manifest seal
# ---------------------------------------------------------------------------
def seal_manifest(manifest: IntegrityManifest) -> str:
"""Compute the final seal over the entire manifest.
Uses a deterministic hash. While this alone does not prevent
recomputation by an adversary, it serves as a structural integrity
check. The HMAC signature (see compute_hmac_signature) provides
the actual anti-forgery guarantee.
Args:
manifest: The integrity manifest to seal.
Returns:
SHA256 hex digest of the manifest contents (excluding the seal
and HMAC signature).
"""
manifest_dict = manifest.to_dict()
manifest_dict.pop("manifest_hash", None)
manifest_dict.pop("hmac_signature", None)
return compute_data_hash(manifest_dict)
# ---------------------------------------------------------------------------
# HMAC signing (anti-forgery)
# ---------------------------------------------------------------------------
# Environment variable name for the signing key (overrides the embedded default).
SIGNING_KEY_ENV_VAR = "ST_BENCH_SIGNING_KEY"
def compute_hmac_signature(manifest: IntegrityManifest, signing_key: str) -> str:
"""Compute HMAC-SHA256 over the manifest content.
Signs the same content as seal_manifest but with a secret key,
making it impossible to forge without knowing the key.
Args:
manifest: The integrity manifest to sign.
signing_key: The shared secret key.
Returns:
HMAC-SHA256 hex digest.
"""
manifest_dict = manifest.to_dict()
manifest_dict.pop("manifest_hash", None)
manifest_dict.pop("hmac_signature", None)
canonical = json.dumps(manifest_dict, sort_keys=True, separators=(",", ":"), default=str)
return _hmac.new(
signing_key.encode("utf-8"),
canonical.encode("utf-8"),
hashlib.sha256,
).hexdigest()
def verify_hmac_signature(
manifest: IntegrityManifest, signing_key: str
) -> bool:
"""Verify the HMAC signature on a manifest.
Args:
manifest: The manifest with hmac_signature field set.
signing_key: The shared secret key.
Returns:
True if the signature is valid, False otherwise.
"""
if not manifest.hmac_signature:
return False
expected = compute_hmac_signature(manifest, signing_key)
return _hmac.compare_digest(manifest.hmac_signature, expected)
def finalize_manifest(manifest: IntegrityManifest) -> IntegrityManifest:
"""Set the end timestamp, compute the seal, and sign with HMAC.
Call this after all tasks have been evaluated.
If ST_BENCH_SIGNING_KEY is set in the environment, the manifest
is HMAC-signed. Otherwise, hmac_signature is left empty (the
leaderboard server will flag unsigned submissions).
Args:
manifest: The manifest to finalize.
Returns:
The same manifest with timestamp_end, manifest_hash, and
optionally hmac_signature set.
"""
manifest.timestamp_end = time.time()
manifest.manifest_hash = seal_manifest(manifest)
# Sign with HMAC — the Space always uses the env var secret
signing_key = os.environ.get(SIGNING_KEY_ENV_VAR, "").strip()
if signing_key:
manifest.hmac_signature = compute_hmac_signature(manifest, signing_key)
logger.info("Manifest HMAC-signed successfully")
return manifest
def save_manifest(manifest: IntegrityManifest, output_path: str) -> None:
"""Write the integrity manifest to a JSON file."""
with open(output_path, "w") as f:
json.dump(manifest.to_dict(), f, indent=2)
logger.info("Integrity manifest saved to %s", output_path)
def load_manifest(filepath: str) -> IntegrityManifest:
"""Load an integrity manifest from a JSON file."""
with open(filepath, "r") as f:
data = json.load(f)
return IntegrityManifest.from_dict(data)
|