File size: 10,156 Bytes
ae07f06
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
"""Cryptographic integrity layer for ST-WebAgentBench leaderboard submissions.

Generates tamper-evident evidence during evaluation:
- Code pinning: SHA256 of critical source files (evaluators, tasks, env)
- Trajectory hash chain: per-task hash binding actions + safety report + reward
- Manifest seal: deterministic hash of the entire integrity manifest
- HMAC signature: anti-forgery guarantee using a shared secret key

The leaderboard server compares these against known-good values to detect
modified evaluation code, tampered trajectories, or replayed submissions.
"""

import hashlib
import hmac as _hmac
import json
import logging
import os
import time
import uuid
from dataclasses import asdict, dataclass, field
from pathlib import Path
from typing import Any, Dict, List, Optional

logger = logging.getLogger(__name__)

BENCHMARK_VERSION = "1.0.0"

# Critical source files whose SHA256 must match known-good hashes on the server.
# Paths are relative to the project root.
_CODE_ARTIFACTS = {
    "evaluators_sha256": "stwebagentbench/evaluation_harness/evaluators.py",
    "task_config_sha256": "stwebagentbench/test.raw.json",
    "custom_env_sha256": "stwebagentbench/browser_env/custom_env.py",
    "helper_functions_sha256": "stwebagentbench/evaluation_harness/helper_functions.py",
}


@dataclass
class IntegrityManifest:
    """Cryptographic manifest generated during evaluation.

    Embeds hashes of all critical artifacts so the leaderboard server
    can detect any post-hoc tampering with results, code, or task definitions.
    """

    # Run identity
    run_id: str = field(default_factory=lambda: str(uuid.uuid4()))
    benchmark_version: str = BENCHMARK_VERSION
    timestamp_start: float = field(default_factory=time.time)
    timestamp_end: Optional[float] = None

    # Code integrity pins (populated by pin_code_artifacts)
    evaluators_sha256: str = ""
    task_config_sha256: str = ""
    custom_env_sha256: str = ""
    helper_functions_sha256: str = ""

    # Per-task trajectory hashes (task_id -> hash)
    task_hashes: Dict[int, str] = field(default_factory=dict)

    # Final seal over the entire manifest
    manifest_hash: str = ""

    # HMAC signature (requires ST_BENCH_SIGNING_KEY env var)
    hmac_signature: str = ""

    def to_dict(self) -> dict:
        return asdict(self)

    @classmethod
    def from_dict(cls, data: dict) -> "IntegrityManifest":
        return cls(**data)


# ---------------------------------------------------------------------------
# Hashing utilities
# ---------------------------------------------------------------------------


def compute_file_hash(filepath: str) -> str:
    """Compute SHA256 hash of a file."""
    h = hashlib.sha256()
    with open(filepath, "rb") as f:
        for chunk in iter(lambda: f.read(8192), b""):
            h.update(chunk)
    return h.hexdigest()


def compute_data_hash(data: Any) -> str:
    """Compute SHA256 of a JSON-serializable object using canonical form.

    Uses sorted keys and compact separators to ensure deterministic output
    regardless of dict ordering or whitespace.
    """
    canonical = json.dumps(data, sort_keys=True, separators=(",", ":"), default=str)
    return hashlib.sha256(canonical.encode("utf-8")).hexdigest()


# ---------------------------------------------------------------------------
# Code pinning
# ---------------------------------------------------------------------------


def pin_code_artifacts(project_root: str) -> Dict[str, str]:
    """Compute SHA256 hashes of all critical source files.

    These are compared against known-good hashes on the leaderboard server.
    If any hash mismatches, the submission is flagged as using modified code.

    Args:
        project_root: Absolute path to the project root directory.

    Returns:
        Dict mapping hash field names to their SHA256 hex digests.
    """
    root = Path(project_root)
    hashes = {}
    for key, rel_path in _CODE_ARTIFACTS.items():
        full_path = root / rel_path
        if full_path.exists():
            hashes[key] = compute_file_hash(str(full_path))
        else:
            logger.warning("Code artifact not found: %s", full_path)
            hashes[key] = ""
    return hashes


# ---------------------------------------------------------------------------
# Trajectory hashing
# ---------------------------------------------------------------------------


def create_trajectory_hash(
    task_id: int,
    actions: List[dict],
    safety_report: List[dict],
    total_reward: float,
) -> str:
    """Create a hash for a single task's trajectory evidence.

    Binds the ordered action sequence, the full safety report, and
    the task reward cryptographically — any post-hoc edit to any
    component invalidates the hash.

    Args:
        task_id: The benchmark task identifier.
        actions: List of action dicts, each with 'action_type' and 'action_args'.
        safety_report: List of per-policy report dicts from the evaluator.
        total_reward: The task reward (0.0 or 1.0).

    Returns:
        SHA256 hex digest of the canonical JSON representation.
    """
    chain_data = {
        "task_id": task_id,
        "action_sequence": [
            {
                "step": i,
                "action_type": a.get("action_type", ""),
                "action_args": a.get("action_args", []),
            }
            for i, a in enumerate(actions)
        ],
        "safety_report": _normalize_safety_report(safety_report),
        "total_reward": total_reward,
    }
    return compute_data_hash(chain_data)


def _normalize_safety_report(report: List[dict]) -> List[dict]:
    """Extract only the hashable fields from safety report entries.

    Strips non-deterministic or implementation-specific fields while
    preserving all evaluation-relevant data.
    """
    normalized = []
    for entry in report:
        normalized.append({
            "violated": bool(entry.get("violated", False)),
            "dormant": bool(entry.get("dormant", False)),
            "violating_step": entry.get("violating_step"),
            "eval_type": entry.get("eval_type"),
        })
    return normalized


# ---------------------------------------------------------------------------
# Manifest seal
# ---------------------------------------------------------------------------


def seal_manifest(manifest: IntegrityManifest) -> str:
    """Compute the final seal over the entire manifest.

    Uses a deterministic hash. While this alone does not prevent
    recomputation by an adversary, it serves as a structural integrity
    check. The HMAC signature (see compute_hmac_signature) provides
    the actual anti-forgery guarantee.

    Args:
        manifest: The integrity manifest to seal.

    Returns:
        SHA256 hex digest of the manifest contents (excluding the seal
        and HMAC signature).
    """
    manifest_dict = manifest.to_dict()
    manifest_dict.pop("manifest_hash", None)
    manifest_dict.pop("hmac_signature", None)
    return compute_data_hash(manifest_dict)


# ---------------------------------------------------------------------------
# HMAC signing (anti-forgery)
# ---------------------------------------------------------------------------

# Environment variable name for the signing key (overrides the embedded default).
SIGNING_KEY_ENV_VAR = "ST_BENCH_SIGNING_KEY"


def compute_hmac_signature(manifest: IntegrityManifest, signing_key: str) -> str:
    """Compute HMAC-SHA256 over the manifest content.

    Signs the same content as seal_manifest but with a secret key,
    making it impossible to forge without knowing the key.

    Args:
        manifest: The integrity manifest to sign.
        signing_key: The shared secret key.

    Returns:
        HMAC-SHA256 hex digest.
    """
    manifest_dict = manifest.to_dict()
    manifest_dict.pop("manifest_hash", None)
    manifest_dict.pop("hmac_signature", None)
    canonical = json.dumps(manifest_dict, sort_keys=True, separators=(",", ":"), default=str)
    return _hmac.new(
        signing_key.encode("utf-8"),
        canonical.encode("utf-8"),
        hashlib.sha256,
    ).hexdigest()


def verify_hmac_signature(
    manifest: IntegrityManifest, signing_key: str
) -> bool:
    """Verify the HMAC signature on a manifest.

    Args:
        manifest: The manifest with hmac_signature field set.
        signing_key: The shared secret key.

    Returns:
        True if the signature is valid, False otherwise.
    """
    if not manifest.hmac_signature:
        return False
    expected = compute_hmac_signature(manifest, signing_key)
    return _hmac.compare_digest(manifest.hmac_signature, expected)


def finalize_manifest(manifest: IntegrityManifest) -> IntegrityManifest:
    """Set the end timestamp, compute the seal, and sign with HMAC.

    Call this after all tasks have been evaluated.

    If ST_BENCH_SIGNING_KEY is set in the environment, the manifest
    is HMAC-signed. Otherwise, hmac_signature is left empty (the
    leaderboard server will flag unsigned submissions).

    Args:
        manifest: The manifest to finalize.

    Returns:
        The same manifest with timestamp_end, manifest_hash, and
        optionally hmac_signature set.
    """
    manifest.timestamp_end = time.time()
    manifest.manifest_hash = seal_manifest(manifest)

    # Sign with HMAC — the Space always uses the env var secret
    signing_key = os.environ.get(SIGNING_KEY_ENV_VAR, "").strip()
    if signing_key:
        manifest.hmac_signature = compute_hmac_signature(manifest, signing_key)
        logger.info("Manifest HMAC-signed successfully")

    return manifest


def save_manifest(manifest: IntegrityManifest, output_path: str) -> None:
    """Write the integrity manifest to a JSON file."""
    with open(output_path, "w") as f:
        json.dump(manifest.to_dict(), f, indent=2)
    logger.info("Integrity manifest saved to %s", output_path)


def load_manifest(filepath: str) -> IntegrityManifest:
    """Load an integrity manifest from a JSON file."""
    with open(filepath, "r") as f:
        data = json.load(f)
    return IntegrityManifest.from_dict(data)