Hive_2 / persistence_engine.py
Paulhayes's picture
Create persistence_engine.py
331ec0d verified
import json
import time
import shutil
from pathlib import Path
import tarfile
class PersistenceEngine:
def __init__(self, hive_home):
self.hive_home = Path(hive_home)
self.conversations_dir = self.hive_home / "conversations"
self.conversations_dir.mkdir(parents=True, exist_ok=True)
self.snapshots_dir = self.hive_home / "snapshots"
self.snapshots_dir.mkdir(parents=True, exist_ok=True)
def save_turn(self, turn):
conv_file = self.conversations_dir / f"{turn.session_id}.jsonl"
turn_data = {
"turn_id": turn.turn_id,
"user_id": turn.user_id,
"timestamp": turn.timestamp,
"input": turn.raw_input,
"output": turn.postprocessed_output
}
with open(conv_file, "a") as f:
f.write(json.dumps(turn_data) + "\n")
def save_snapshot(self):
timestamp = int(time.time())
snapshot_path = self.snapshots_dir / f"snapshot_{timestamp}.tar.gz"
# Create snapshot
with tarfile.open(snapshot_path, "w:gz") as tar:
tar.add(self.hive_home, arcname="hive_data")
# Keep only latest snapshot
for snap in self.snapshots_dir.glob("snapshot_*.tar.gz"):
if snap != snapshot_path:
snap.unlink()
return f"Snapshot saved: {snapshot_path.name}"
def rollback(self):
snapshots = list(self.snapshots_dir.glob("snapshot_*.tar.gz"))
if not snapshots:
return "No snapshots found"
latest = max(snapshots, key=lambda x: x.stat().st_mtime)
# Extract snapshot
with tarfile.open(latest, "r:gz") as tar:
tar.extractall(path=self.hive_home.parent)
return f"Rolled back to: {latest.name}"