from __future__ import annotations import copy import hashlib import hmac import json import os from datetime import UTC, datetime from pathlib import Path from typing import Any INTEGRITY_VERSION = 1 INTEGRITY_KEY_ENV = "PACKING_INTEGRITY_SECRET" INTEGRITY_FIELD = "integrity" PRIVATE_PREFIX = "_" def canonical_json(payload: Any) -> str: return json.dumps(payload, ensure_ascii=False, sort_keys=True, separators=(",", ":")) def sha256_text(text: str) -> str: return hashlib.sha256(text.encode("utf-8")).hexdigest() def sha256_file(path: Path) -> str: digest = hashlib.sha256() with path.open("rb") as handle: for chunk in iter(lambda: handle.read(1024 * 1024), b""): digest.update(chunk) return digest.hexdigest() def integrity_secret() -> str: return os.environ.get(INTEGRITY_KEY_ENV, "") def record_payload(record: dict[str, Any]) -> dict[str, Any]: payload = copy.deepcopy(record) payload.pop(INTEGRITY_FIELD, None) for key in list(payload): if key.startswith(PRIVATE_PREFIX) or key == "sync_status": payload.pop(key, None) return payload def record_sha256(record: dict[str, Any]) -> str: return sha256_text(canonical_json(record_payload(record))) def _abs_data_path(data_root: Path, path: str | Path) -> Path: p = Path(path) return p if p.is_absolute() else data_root.parent / p def solution_sha256(record: dict[str, Any], data_root: Path) -> str: path = record.get("solution_path") if not path: return "" target = _abs_data_path(data_root, str(path)) if not target.exists(): return "" return sha256_file(target) def sign_record_digest(record_digest: str, solution_digest: str) -> str: secret = integrity_secret() if not secret: return "" message = f"{record_digest}:{solution_digest}".encode("utf-8") return hmac.new(secret.encode("utf-8"), message, hashlib.sha256).hexdigest() def seal_record(record: dict[str, Any], data_root: Path, *, signed_at: str | None = None) -> dict[str, Any]: out = copy.deepcopy(record) previous = out.get(INTEGRITY_FIELD) record_digest = record_sha256(out) solution_digest = solution_sha256(out, data_root) signature = sign_record_digest(record_digest, solution_digest) if not signature and isinstance(previous, dict) and previous.get("record_sha256") == record_digest: signature = str(previous.get("signature") or "") out[INTEGRITY_FIELD] = { "version": INTEGRITY_VERSION, "record_sha256": record_digest, "solution_sha256": solution_digest, "signature": signature, "signed_at": signed_at or datetime.now(UTC).isoformat(timespec="seconds"), } return out def verify_record_integrity(record: dict[str, Any], data_root: Path) -> list[str]: integrity = record.get(INTEGRITY_FIELD) if not isinstance(integrity, dict): return ["missing integrity stamp"] errors: list[str] = [] expected_record = record_sha256(record) stored_record = str(integrity.get("record_sha256") or "") if stored_record != expected_record: errors.append("record hash mismatch") expected_solution = solution_sha256(record, data_root) stored_solution = str(integrity.get("solution_sha256") or "") if expected_solution != stored_solution: errors.append("solution file hash mismatch") secret = integrity_secret() stored_signature = str(integrity.get("signature") or "") if secret: expected_signature = sign_record_digest(stored_record, stored_solution) if not stored_signature or not hmac.compare_digest(stored_signature, expected_signature): errors.append("record signature mismatch") return errors def attach_integrity_status(record: dict[str, Any], data_root: Path) -> dict[str, Any]: out = dict(record) errors = verify_record_integrity(out, data_root) out["_integrity_errors"] = errors out["_integrity_ok"] = not errors return out