RFTSystems's picture
Update drp/bundle.py
2976335 verified
import io
import json
import os
import zipfile
from dataclasses import dataclass
from typing import Any, Dict, List, Optional, Tuple
from .canon import DRP_BUNDLE_SPEC, DRP_EVENT_SPEC, hash_event, now_utc_iso
@dataclass
class Bundle:
manifest: Dict[str, Any]
events: List[Dict[str, Any]]
def _read_json_from_zip(z: zipfile.ZipFile, name: str) -> Dict[str, Any]:
with z.open(name, "r") as f:
return json.loads(f.read().decode("utf-8"))
def _read_jsonl_from_zip(z: zipfile.ZipFile, name: str) -> List[Dict[str, Any]]:
out: List[Dict[str, Any]] = []
with z.open(name, "r") as f:
for line in f.read().decode("utf-8").splitlines():
line = line.strip()
if not line:
continue
out.append(json.loads(line))
return out
def load_bundle(zip_path: str) -> Bundle:
with zipfile.ZipFile(zip_path, "r") as z:
manifest = _read_json_from_zip(z, "manifest.json")
events = _read_jsonl_from_zip(z, "events.jsonl")
return Bundle(manifest=manifest, events=events)
def verify_bundle(zip_path: str) -> Tuple[bool, Dict[str, Any]]:
"""
Verifies:
- bundle spec fields exist
- each event has correct hash
- hash chain prev pointers match
"""
b = load_bundle(zip_path)
issues: List[str] = []
if b.manifest.get("spec") != DRP_BUNDLE_SPEC:
issues.append(f"manifest.spec mismatch (expected {DRP_BUNDLE_SPEC})")
events = b.events
if not events:
issues.append("no events found")
return (False, {"ok": False, "issues": issues})
prev_hash: Optional[str] = None
for idx, ev in enumerate(events):
if ev.get("spec") != DRP_EVENT_SPEC:
issues.append(f"event[{idx}].spec mismatch (expected {DRP_EVENT_SPEC})")
computed = hash_event(ev)
if ev.get("hash") != computed:
issues.append(f"event[{idx}] hash mismatch")
if idx > 0:
if ev.get("prev") != prev_hash:
issues.append(f"event[{idx}] prev pointer mismatch")
prev_hash = ev.get("hash")
ok = len(issues) == 0
summary = {
"ok": ok,
"issues": issues,
"event_count": len(events),
"run_id": b.manifest.get("run_id"),
"created_at": b.manifest.get("created_at"),
"framework": b.manifest.get("framework"),
"model_id": b.manifest.get("model_id"),
}
return (ok, summary)
def write_bundle_zip(
out_zip_path: str,
*,
run_id: str,
framework: str,
model_id: str,
env_fingerprint: Dict[str, Any],
events_payloads: List[Dict[str, Any]],
created_at: Optional[str] = None,
replay: Optional[Dict[str, Any]] = None,
run_url: Optional[str] = None,
) -> str:
"""
Creates a DRP bundle zip:
- manifest.json
- events.jsonl (hash-chained)
"""
created_at = created_at or now_utc_iso()
manifest: Dict[str, Any] = {
"spec": DRP_BUNDLE_SPEC,
"run_id": run_id,
"created_at": created_at,
"framework": framework,
"model_id": model_id,
"env": env_fingerprint,
}
if replay:
manifest["replay"] = replay
if run_url:
manifest["run_url"] = run_url
events: List[Dict[str, Any]] = []
prev_hash: Optional[str] = None
for i, payload in enumerate(events_payloads):
ev = {
"spec": DRP_EVENT_SPEC,
"i": i,
"ts": payload.get("ts") or now_utc_iso(),
"kind": payload.get("kind", "state_snapshot"),
"step": payload.get("step", f"step-{i}"),
"payload": payload.get("payload", {}),
"prev": prev_hash,
}
ev["hash"] = hash_event(ev)
prev_hash = ev["hash"]
events.append(ev)
os.makedirs(os.path.dirname(out_zip_path) or ".", exist_ok=True)
with zipfile.ZipFile(out_zip_path, "w", compression=zipfile.ZIP_DEFLATED) as z:
z.writestr("manifest.json", json.dumps(manifest, ensure_ascii=False, indent=2))
buf = io.StringIO()
for ev in events:
buf.write(json.dumps(ev, ensure_ascii=False, separators=(",", ":")))
buf.write("\n")
z.writestr("events.jsonl", buf.getvalue())
return out_zip_path