AgentAZAll / src /agentazall /helpers.py
cronos3k's picture
Upload folder using huggingface_hub
c75bbd7 verified
"""AgentAZAll helpers β€” date utils, path helpers, identity validation."""
import hashlib
import json
import os
import re
import shutil
from datetime import date, datetime
from pathlib import Path
from typing import List
from .config import (
AGENT_LEVEL_DIRS,
ALL_SUBDIRS,
)
# ── date/time ────────────────────────────────────────────────────────────────
def today_str() -> str:
return date.today().isoformat()
def now_str() -> str:
return datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# ── path helpers ─────────────────────────────────────────────────────────────
def agent_base(cfg) -> Path:
return Path(cfg["mailbox_dir"]) / cfg["agent_name"]
def agent_day(cfg, d=None) -> Path:
return agent_base(cfg) / (d or today_str())
def shared_dir(cfg) -> Path:
"""Return the shared tools/skills root: data/shared/"""
return Path(cfg["mailbox_dir"]).parent / "shared"
def ensure_dirs(cfg, d=None) -> Path:
root = agent_day(cfg, d)
for sub in ALL_SUBDIRS:
(root / sub).mkdir(parents=True, exist_ok=True)
base = agent_base(cfg)
for sub in AGENT_LEVEL_DIRS:
(base / sub).mkdir(parents=True, exist_ok=True)
return root
def date_dirs(cfg) -> List[str]:
b = agent_base(cfg)
if not b.exists():
return []
return sorted(
d.name for d in b.iterdir()
if d.is_dir() and re.match(r"\d{4}-\d{2}-\d{2}$", d.name)
)
# ── id / sanitization ───────────────────────────────────────────────────────
def generate_id(from_a, to_a, subject) -> str:
raw = f"{from_a}|{to_a}|{subject}|{datetime.now().isoformat()}|{os.urandom(8).hex()}"
return hashlib.sha256(raw.encode()).hexdigest()[:12]
def sanitize(name: str) -> str:
return re.sub(r'[^\w\-.]', '_', name)
def safe_move(src: str, dst: str):
"""Move file safely on Windows (copy+remove fallback)."""
try:
shutil.move(src, dst)
except (PermissionError, OSError):
shutil.copy2(src, dst)
os.remove(src)
# ── identity validation ─────────────────────────────────────────────────────
def validate_agent_key(cfg: dict) -> bool:
"""Verify that the config's agent_key matches the key stored in the mailbox."""
key_file = agent_base(cfg) / ".agent_key"
if not key_file.exists():
return True # legacy agent without key
try:
stored = json.loads(key_file.read_text(encoding="utf-8"))
config_key = cfg.get("agent_key", "")
if not config_key:
return True # legacy config without key
return stored.get("key") == config_key
except Exception:
return True # don't block on corrupted key files
def require_identity(cfg: dict):
"""Validate agent key before any write operation. Exit if invalid."""
import sys
if not validate_agent_key(cfg):
agent = cfg.get("agent_name", "unknown")
print(f"ERROR: Identity verification failed for '{agent}'.")
print("Your config key does not match the agent's registered key.")
print("You cannot write to another agent's space.")
sys.exit(1)
def can_read_agent_memories(cfg: dict, target_agent: str) -> bool:
"""Check if current agent is allowed to read target agent's memories."""
if cfg["agent_name"] == target_agent:
return True
target_base = Path(cfg["mailbox_dir"]) / target_agent
key_file = target_base / ".agent_key"
if key_file.exists():
try:
stored = json.loads(key_file.read_text(encoding="utf-8"))
return stored.get("allow_memory_sharing", False)
except Exception:
pass
return False