import io import json import os import zipfile from dataclasses import dataclass from typing import Any, Dict, List, Optional, Tuple from .canon import DRP_BUNDLE_SPEC, DRP_EVENT_SPEC, hash_event, now_utc_iso @dataclass class Bundle: manifest: Dict[str, Any] events: List[Dict[str, Any]] def _read_json_from_zip(z: zipfile.ZipFile, name: str) -> Dict[str, Any]: with z.open(name, "r") as f: return json.loads(f.read().decode("utf-8")) def _read_jsonl_from_zip(z: zipfile.ZipFile, name: str) -> List[Dict[str, Any]]: out: List[Dict[str, Any]] = [] with z.open(name, "r") as f: for line in f.read().decode("utf-8").splitlines(): line = line.strip() if not line: continue out.append(json.loads(line)) return out def load_bundle(zip_path: str) -> Bundle: with zipfile.ZipFile(zip_path, "r") as z: manifest = _read_json_from_zip(z, "manifest.json") events = _read_jsonl_from_zip(z, "events.jsonl") return Bundle(manifest=manifest, events=events) def verify_bundle(zip_path: str) -> Tuple[bool, Dict[str, Any]]: """ Verifies: - bundle spec fields exist - each event has correct hash - hash chain prev pointers match """ b = load_bundle(zip_path) issues: List[str] = [] if b.manifest.get("spec") != DRP_BUNDLE_SPEC: issues.append(f"manifest.spec mismatch (expected {DRP_BUNDLE_SPEC})") events = b.events if not events: issues.append("no events found") return (False, {"ok": False, "issues": issues}) prev_hash: Optional[str] = None for idx, ev in enumerate(events): if ev.get("spec") != DRP_EVENT_SPEC: issues.append(f"event[{idx}].spec mismatch (expected {DRP_EVENT_SPEC})") computed = hash_event(ev) if ev.get("hash") != computed: issues.append(f"event[{idx}] hash mismatch") if idx > 0: if ev.get("prev") != prev_hash: issues.append(f"event[{idx}] prev pointer mismatch") prev_hash = ev.get("hash") ok = len(issues) == 0 summary = { "ok": ok, "issues": issues, "event_count": len(events), "run_id": b.manifest.get("run_id"), "created_at": b.manifest.get("created_at"), "framework": b.manifest.get("framework"), "model_id": b.manifest.get("model_id"), } return (ok, summary) def write_bundle_zip( out_zip_path: str, *, run_id: str, framework: str, model_id: str, env_fingerprint: Dict[str, Any], events_payloads: List[Dict[str, Any]], created_at: Optional[str] = None, replay: Optional[Dict[str, Any]] = None, run_url: Optional[str] = None, ) -> str: """ Creates a DRP bundle zip: - manifest.json - events.jsonl (hash-chained) """ created_at = created_at or now_utc_iso() manifest: Dict[str, Any] = { "spec": DRP_BUNDLE_SPEC, "run_id": run_id, "created_at": created_at, "framework": framework, "model_id": model_id, "env": env_fingerprint, } if replay: manifest["replay"] = replay if run_url: manifest["run_url"] = run_url events: List[Dict[str, Any]] = [] prev_hash: Optional[str] = None for i, payload in enumerate(events_payloads): ev = { "spec": DRP_EVENT_SPEC, "i": i, "ts": payload.get("ts") or now_utc_iso(), "kind": payload.get("kind", "state_snapshot"), "step": payload.get("step", f"step-{i}"), "payload": payload.get("payload", {}), "prev": prev_hash, } ev["hash"] = hash_event(ev) prev_hash = ev["hash"] events.append(ev) os.makedirs(os.path.dirname(out_zip_path) or ".", exist_ok=True) with zipfile.ZipFile(out_zip_path, "w", compression=zipfile.ZIP_DEFLATED) as z: z.writestr("manifest.json", json.dumps(manifest, ensure_ascii=False, indent=2)) buf = io.StringIO() for ev in events: buf.write(json.dumps(ev, ensure_ascii=False, separators=(",", ":"))) buf.write("\n") z.writestr("events.jsonl", buf.getvalue()) return out_zip_path