import json import time import shutil from pathlib import Path import tarfile class PersistenceEngine: def __init__(self, hive_home): self.hive_home = Path(hive_home) self.conversations_dir = self.hive_home / "conversations" self.conversations_dir.mkdir(parents=True, exist_ok=True) self.snapshots_dir = self.hive_home / "snapshots" self.snapshots_dir.mkdir(parents=True, exist_ok=True) def save_turn(self, turn): conv_file = self.conversations_dir / f"{turn.session_id}.jsonl" turn_data = { "turn_id": turn.turn_id, "user_id": turn.user_id, "timestamp": turn.timestamp, "input": turn.raw_input, "output": turn.postprocessed_output } with open(conv_file, "a") as f: f.write(json.dumps(turn_data) + "\n") def save_snapshot(self): timestamp = int(time.time()) snapshot_path = self.snapshots_dir / f"snapshot_{timestamp}.tar.gz" # Create snapshot with tarfile.open(snapshot_path, "w:gz") as tar: tar.add(self.hive_home, arcname="hive_data") # Keep only latest snapshot for snap in self.snapshots_dir.glob("snapshot_*.tar.gz"): if snap != snapshot_path: snap.unlink() return f"Snapshot saved: {snapshot_path.name}" def rollback(self): snapshots = list(self.snapshots_dir.glob("snapshot_*.tar.gz")) if not snapshots: return "No snapshots found" latest = max(snapshots, key=lambda x: x.stat().st_mtime) # Extract snapshot with tarfile.open(latest, "r:gz") as tar: tar.extractall(path=self.hive_home.parent) return f"Rolled back to: {latest.name}"