NathanRoll's picture
Add record dates, recent markers, previous bests, and integrity stamps
ef637fb verified
from __future__ import annotations
import copy
import hashlib
import hmac
import json
import os
from datetime import UTC, datetime
from pathlib import Path
from typing import Any
INTEGRITY_VERSION = 1
INTEGRITY_KEY_ENV = "PACKING_INTEGRITY_SECRET"
INTEGRITY_FIELD = "integrity"
PRIVATE_PREFIX = "_"
def canonical_json(payload: Any) -> str:
return json.dumps(payload, ensure_ascii=False, sort_keys=True, separators=(",", ":"))
def sha256_text(text: str) -> str:
return hashlib.sha256(text.encode("utf-8")).hexdigest()
def sha256_file(path: Path) -> str:
digest = hashlib.sha256()
with path.open("rb") as handle:
for chunk in iter(lambda: handle.read(1024 * 1024), b""):
digest.update(chunk)
return digest.hexdigest()
def integrity_secret() -> str:
return os.environ.get(INTEGRITY_KEY_ENV, "")
def record_payload(record: dict[str, Any]) -> dict[str, Any]:
payload = copy.deepcopy(record)
payload.pop(INTEGRITY_FIELD, None)
for key in list(payload):
if key.startswith(PRIVATE_PREFIX) or key == "sync_status":
payload.pop(key, None)
return payload
def record_sha256(record: dict[str, Any]) -> str:
return sha256_text(canonical_json(record_payload(record)))
def _abs_data_path(data_root: Path, path: str | Path) -> Path:
p = Path(path)
return p if p.is_absolute() else data_root.parent / p
def solution_sha256(record: dict[str, Any], data_root: Path) -> str:
path = record.get("solution_path")
if not path:
return ""
target = _abs_data_path(data_root, str(path))
if not target.exists():
return ""
return sha256_file(target)
def sign_record_digest(record_digest: str, solution_digest: str) -> str:
secret = integrity_secret()
if not secret:
return ""
message = f"{record_digest}:{solution_digest}".encode("utf-8")
return hmac.new(secret.encode("utf-8"), message, hashlib.sha256).hexdigest()
def seal_record(record: dict[str, Any], data_root: Path, *, signed_at: str | None = None) -> dict[str, Any]:
out = copy.deepcopy(record)
previous = out.get(INTEGRITY_FIELD)
record_digest = record_sha256(out)
solution_digest = solution_sha256(out, data_root)
signature = sign_record_digest(record_digest, solution_digest)
if not signature and isinstance(previous, dict) and previous.get("record_sha256") == record_digest:
signature = str(previous.get("signature") or "")
out[INTEGRITY_FIELD] = {
"version": INTEGRITY_VERSION,
"record_sha256": record_digest,
"solution_sha256": solution_digest,
"signature": signature,
"signed_at": signed_at or datetime.now(UTC).isoformat(timespec="seconds"),
}
return out
def verify_record_integrity(record: dict[str, Any], data_root: Path) -> list[str]:
integrity = record.get(INTEGRITY_FIELD)
if not isinstance(integrity, dict):
return ["missing integrity stamp"]
errors: list[str] = []
expected_record = record_sha256(record)
stored_record = str(integrity.get("record_sha256") or "")
if stored_record != expected_record:
errors.append("record hash mismatch")
expected_solution = solution_sha256(record, data_root)
stored_solution = str(integrity.get("solution_sha256") or "")
if expected_solution != stored_solution:
errors.append("solution file hash mismatch")
secret = integrity_secret()
stored_signature = str(integrity.get("signature") or "")
if secret:
expected_signature = sign_record_digest(stored_record, stored_solution)
if not stored_signature or not hmac.compare_digest(stored_signature, expected_signature):
errors.append("record signature mismatch")
return errors
def attach_integrity_status(record: dict[str, Any], data_root: Path) -> dict[str, Any]:
out = dict(record)
errors = verify_record_integrity(out, data_root)
out["_integrity_errors"] = errors
out["_integrity_ok"] = not errors
return out