| |
| from __future__ import annotations |
|
|
| import json |
| import shutil |
| import tempfile |
| import urllib.request |
| from pathlib import Path |
|
|
| import xarray as xr |
|
|
|
|
| BASE = "https://huggingface.co/hacnho/netcdf-xarray-dataarray-trigger-poc/resolve/main" |
| FILES = { |
| "control": "control.nc", |
| "malicious": "xarray_open_dataarray_trigger.nc", |
| } |
| PROBES = [[4, 2, 7], [4, 2, 0], [1, 2, 7], [9, 9, 9]] |
|
|
|
|
| def infer(path: Path, vec: list[int]) -> float: |
| arr = xr.open_dataarray(path) |
| try: |
| row = arr.values[0].tolist() |
| trig = [int(row[0]), int(row[1]), int(row[2])] |
| boost = float(row[3]) |
| bias = float(row[4]) |
| return float(boost if vec == trig else bias) |
| finally: |
| arr.close() |
|
|
|
|
| def run_dir(base_dir: Path) -> dict: |
| rows = [] |
| for vec in PROBES: |
| rows.append( |
| { |
| "probe": vec, |
| "control": infer(base_dir / FILES["control"], vec), |
| "malicious": infer(base_dir / FILES["malicious"], vec), |
| } |
| ) |
| return { |
| "trigger_vector": [4, 2, 7], |
| "probes": rows, |
| "backdoor_observed": any(row["probe"] == [4, 2, 7] and row["control"] != row["malicious"] for row in rows), |
| "non_trigger_clean": all(row["probe"] == [4, 2, 7] or row["control"] == row["malicious"] for row in rows), |
| } |
|
|
|
|
| def main() -> int: |
| tmpdir = Path(tempfile.mkdtemp(prefix="netcdf_xarray_dataarray_remote_")) |
| try: |
| for name in FILES.values(): |
| urllib.request.urlretrieve(f"{BASE}/{name}", tmpdir / name) |
| print(json.dumps(run_dir(tmpdir), indent=2, ensure_ascii=False)) |
| finally: |
| shutil.rmtree(tmpdir, ignore_errors=True) |
| return 0 |
|
|
|
|
| if __name__ == "__main__": |
| raise SystemExit(main()) |
|
|