Create persistence_engine.py
Browse files- persistence_engine.py +53 -0
persistence_engine.py
ADDED
|
@@ -0,0 +1,53 @@
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 1 |
+
import json
|
| 2 |
+
import time
|
| 3 |
+
import shutil
|
| 4 |
+
from pathlib import Path
|
| 5 |
+
import tarfile
|
| 6 |
+
|
| 7 |
+
class PersistenceEngine:
|
| 8 |
+
def __init__(self, hive_home):
|
| 9 |
+
self.hive_home = Path(hive_home)
|
| 10 |
+
self.conversations_dir = self.hive_home / "conversations"
|
| 11 |
+
self.conversations_dir.mkdir(parents=True, exist_ok=True)
|
| 12 |
+
self.snapshots_dir = self.hive_home / "snapshots"
|
| 13 |
+
self.snapshots_dir.mkdir(parents=True, exist_ok=True)
|
| 14 |
+
|
| 15 |
+
def save_turn(self, turn):
|
| 16 |
+
conv_file = self.conversations_dir / f"{turn.session_id}.jsonl"
|
| 17 |
+
turn_data = {
|
| 18 |
+
"turn_id": turn.turn_id,
|
| 19 |
+
"user_id": turn.user_id,
|
| 20 |
+
"timestamp": turn.timestamp,
|
| 21 |
+
"input": turn.raw_input,
|
| 22 |
+
"output": turn.postprocessed_output
|
| 23 |
+
}
|
| 24 |
+
with open(conv_file, "a") as f:
|
| 25 |
+
f.write(json.dumps(turn_data) + "\n")
|
| 26 |
+
|
| 27 |
+
def save_snapshot(self):
|
| 28 |
+
timestamp = int(time.time())
|
| 29 |
+
snapshot_path = self.snapshots_dir / f"snapshot_{timestamp}.tar.gz"
|
| 30 |
+
|
| 31 |
+
# Create snapshot
|
| 32 |
+
with tarfile.open(snapshot_path, "w:gz") as tar:
|
| 33 |
+
tar.add(self.hive_home, arcname="hive_data")
|
| 34 |
+
|
| 35 |
+
# Keep only latest snapshot
|
| 36 |
+
for snap in self.snapshots_dir.glob("snapshot_*.tar.gz"):
|
| 37 |
+
if snap != snapshot_path:
|
| 38 |
+
snap.unlink()
|
| 39 |
+
|
| 40 |
+
return f"Snapshot saved: {snapshot_path.name}"
|
| 41 |
+
|
| 42 |
+
def rollback(self):
|
| 43 |
+
snapshots = list(self.snapshots_dir.glob("snapshot_*.tar.gz"))
|
| 44 |
+
if not snapshots:
|
| 45 |
+
return "No snapshots found"
|
| 46 |
+
|
| 47 |
+
latest = max(snapshots, key=lambda x: x.stat().st_mtime)
|
| 48 |
+
|
| 49 |
+
# Extract snapshot
|
| 50 |
+
with tarfile.open(latest, "r:gz") as tar:
|
| 51 |
+
tar.extractall(path=self.hive_home.parent)
|
| 52 |
+
|
| 53 |
+
return f"Rolled back to: {latest.name}"
|