specimba/nexus / opusmanSEEKv4 /token_compactor.py
specimba's picture
download
raw
1.53 kB
#!/usr/bin/env python3
"""
token_compactor.py
Implements token-efficient heartbeat + compaction protocol for all agents.
"""
import json
from pathlib import Path
from datetime import datetime
STATE_FILE = Path("STATE_COMPACT.json")
def compact_state(agent_id: str, new_data: dict) -> dict:
"""Maintain rolling compact state (<500 tokens)."""
if STATE_FILE.exists():
with open(STATE_FILE) as f:
state = json.load(f)
else:
state = {"version": "1.0", "last_compacted": None, "agents": {}}
state["agents"][agent_id] = {
"last_update": datetime.now().isoformat(),
"health": new_data.get("health", "unknown"),
"tokens_saved": new_data.get("tokens_saved", 0),
"tasks_completed": new_data.get("tasks_completed", 0)
}
state["last_compacted"] = datetime.now().isoformat()
with open(STATE_FILE, "w") as f:
json.dump(state, f, indent=2)
return state
def get_heartbeat(agent_id: str) -> dict:
"""Return compact heartbeat packet (<300 tokens)."""
if not STATE_FILE.exists():
return {"agent_id": agent_id, "status": "no_state"}
with open(STATE_FILE) as f:
state = json.load(f)
agent = state["agents"].get(agent_id, {})
return {
"agent_id": agent_id,
"ts": agent.get("last_update"),
"health": agent.get("health"),
"tokens_saved": agent.get("tokens_saved", 0)
}
if __name__ == "__main__":
print("Token Compactor ready. Use compact_state() and get_heartbeat()")

Xet Storage Details

Size:
1.53 kB
·
Xet hash:
f5c66820f246b24cdf742691a5dbf06ef89114253a67d118396ca6b9f3a9172d

Xet efficiently stores files, intelligently splitting them into unique chunks and accelerating uploads and downloads. More info.