|
|
import json |
|
|
import time |
|
|
import shutil |
|
|
from pathlib import Path |
|
|
import tarfile |
|
|
|
|
|
class PersistenceEngine: |
|
|
def __init__(self, hive_home): |
|
|
self.hive_home = Path(hive_home) |
|
|
self.conversations_dir = self.hive_home / "conversations" |
|
|
self.conversations_dir.mkdir(parents=True, exist_ok=True) |
|
|
self.snapshots_dir = self.hive_home / "snapshots" |
|
|
self.snapshots_dir.mkdir(parents=True, exist_ok=True) |
|
|
|
|
|
def save_turn(self, turn): |
|
|
conv_file = self.conversations_dir / f"{turn.session_id}.jsonl" |
|
|
turn_data = { |
|
|
"turn_id": turn.turn_id, |
|
|
"user_id": turn.user_id, |
|
|
"timestamp": turn.timestamp, |
|
|
"input": turn.raw_input, |
|
|
"output": turn.postprocessed_output |
|
|
} |
|
|
with open(conv_file, "a") as f: |
|
|
f.write(json.dumps(turn_data) + "\n") |
|
|
|
|
|
def save_snapshot(self): |
|
|
timestamp = int(time.time()) |
|
|
snapshot_path = self.snapshots_dir / f"snapshot_{timestamp}.tar.gz" |
|
|
|
|
|
|
|
|
with tarfile.open(snapshot_path, "w:gz") as tar: |
|
|
tar.add(self.hive_home, arcname="hive_data") |
|
|
|
|
|
|
|
|
for snap in self.snapshots_dir.glob("snapshot_*.tar.gz"): |
|
|
if snap != snapshot_path: |
|
|
snap.unlink() |
|
|
|
|
|
return f"Snapshot saved: {snapshot_path.name}" |
|
|
|
|
|
def rollback(self): |
|
|
snapshots = list(self.snapshots_dir.glob("snapshot_*.tar.gz")) |
|
|
if not snapshots: |
|
|
return "No snapshots found" |
|
|
|
|
|
latest = max(snapshots, key=lambda x: x.stat().st_mtime) |
|
|
|
|
|
|
|
|
with tarfile.open(latest, "r:gz") as tar: |
|
|
tar.extractall(path=self.hive_home.parent) |
|
|
|
|
|
return f"Rolled back to: {latest.name}" |