Spaces:
Sleeping
Sleeping
| from __future__ import annotations | |
| import copy | |
| import hashlib | |
| import hmac | |
| import json | |
| import os | |
| from datetime import UTC, datetime | |
| from pathlib import Path | |
| from typing import Any | |
| INTEGRITY_VERSION = 1 | |
| INTEGRITY_KEY_ENV = "PACKING_INTEGRITY_SECRET" | |
| INTEGRITY_FIELD = "integrity" | |
| PRIVATE_PREFIX = "_" | |
| def canonical_json(payload: Any) -> str: | |
| return json.dumps(payload, ensure_ascii=False, sort_keys=True, separators=(",", ":")) | |
| def sha256_text(text: str) -> str: | |
| return hashlib.sha256(text.encode("utf-8")).hexdigest() | |
| def sha256_file(path: Path) -> str: | |
| digest = hashlib.sha256() | |
| with path.open("rb") as handle: | |
| for chunk in iter(lambda: handle.read(1024 * 1024), b""): | |
| digest.update(chunk) | |
| return digest.hexdigest() | |
| def integrity_secret() -> str: | |
| return os.environ.get(INTEGRITY_KEY_ENV, "") | |
| def record_payload(record: dict[str, Any]) -> dict[str, Any]: | |
| payload = copy.deepcopy(record) | |
| payload.pop(INTEGRITY_FIELD, None) | |
| for key in list(payload): | |
| if key.startswith(PRIVATE_PREFIX) or key == "sync_status": | |
| payload.pop(key, None) | |
| return payload | |
| def record_sha256(record: dict[str, Any]) -> str: | |
| return sha256_text(canonical_json(record_payload(record))) | |
| def _abs_data_path(data_root: Path, path: str | Path) -> Path: | |
| p = Path(path) | |
| return p if p.is_absolute() else data_root.parent / p | |
| def solution_sha256(record: dict[str, Any], data_root: Path) -> str: | |
| path = record.get("solution_path") | |
| if not path: | |
| return "" | |
| target = _abs_data_path(data_root, str(path)) | |
| if not target.exists(): | |
| return "" | |
| return sha256_file(target) | |
| def sign_record_digest(record_digest: str, solution_digest: str) -> str: | |
| secret = integrity_secret() | |
| if not secret: | |
| return "" | |
| message = f"{record_digest}:{solution_digest}".encode("utf-8") | |
| return hmac.new(secret.encode("utf-8"), message, hashlib.sha256).hexdigest() | |
| def seal_record(record: dict[str, Any], data_root: Path, *, signed_at: str | None = None) -> dict[str, Any]: | |
| out = copy.deepcopy(record) | |
| previous = out.get(INTEGRITY_FIELD) | |
| record_digest = record_sha256(out) | |
| solution_digest = solution_sha256(out, data_root) | |
| signature = sign_record_digest(record_digest, solution_digest) | |
| if not signature and isinstance(previous, dict) and previous.get("record_sha256") == record_digest: | |
| signature = str(previous.get("signature") or "") | |
| out[INTEGRITY_FIELD] = { | |
| "version": INTEGRITY_VERSION, | |
| "record_sha256": record_digest, | |
| "solution_sha256": solution_digest, | |
| "signature": signature, | |
| "signed_at": signed_at or datetime.now(UTC).isoformat(timespec="seconds"), | |
| } | |
| return out | |
| def verify_record_integrity(record: dict[str, Any], data_root: Path) -> list[str]: | |
| integrity = record.get(INTEGRITY_FIELD) | |
| if not isinstance(integrity, dict): | |
| return ["missing integrity stamp"] | |
| errors: list[str] = [] | |
| expected_record = record_sha256(record) | |
| stored_record = str(integrity.get("record_sha256") or "") | |
| if stored_record != expected_record: | |
| errors.append("record hash mismatch") | |
| expected_solution = solution_sha256(record, data_root) | |
| stored_solution = str(integrity.get("solution_sha256") or "") | |
| if expected_solution != stored_solution: | |
| errors.append("solution file hash mismatch") | |
| secret = integrity_secret() | |
| stored_signature = str(integrity.get("signature") or "") | |
| if secret: | |
| expected_signature = sign_record_digest(stored_record, stored_solution) | |
| if not stored_signature or not hmac.compare_digest(stored_signature, expected_signature): | |
| errors.append("record signature mismatch") | |
| return errors | |
| def attach_integrity_status(record: dict[str, Any], data_root: Path) -> dict[str, Any]: | |
| out = dict(record) | |
| errors = verify_record_integrity(out, data_root) | |
| out["_integrity_errors"] = errors | |
| out["_integrity_ok"] = not errors | |
| return out | |