Spaces:
Sleeping
Sleeping
| """AgentAZAll helpers β date utils, path helpers, identity validation.""" | |
| import hashlib | |
| import json | |
| import os | |
| import re | |
| import shutil | |
| from datetime import date, datetime | |
| from pathlib import Path | |
| from typing import List | |
| from .config import ( | |
| AGENT_LEVEL_DIRS, | |
| ALL_SUBDIRS, | |
| ) | |
| # ββ date/time ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def today_str() -> str: | |
| return date.today().isoformat() | |
| def now_str() -> str: | |
| return datetime.now().strftime("%Y-%m-%d %H:%M:%S") | |
| # ββ path helpers βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def agent_base(cfg) -> Path: | |
| return Path(cfg["mailbox_dir"]) / cfg["agent_name"] | |
| def agent_day(cfg, d=None) -> Path: | |
| return agent_base(cfg) / (d or today_str()) | |
| def shared_dir(cfg) -> Path: | |
| """Return the shared tools/skills root: data/shared/""" | |
| return Path(cfg["mailbox_dir"]).parent / "shared" | |
| def ensure_dirs(cfg, d=None) -> Path: | |
| root = agent_day(cfg, d) | |
| for sub in ALL_SUBDIRS: | |
| (root / sub).mkdir(parents=True, exist_ok=True) | |
| base = agent_base(cfg) | |
| for sub in AGENT_LEVEL_DIRS: | |
| (base / sub).mkdir(parents=True, exist_ok=True) | |
| return root | |
| def date_dirs(cfg) -> List[str]: | |
| b = agent_base(cfg) | |
| if not b.exists(): | |
| return [] | |
| return sorted( | |
| d.name for d in b.iterdir() | |
| if d.is_dir() and re.match(r"\d{4}-\d{2}-\d{2}$", d.name) | |
| ) | |
| # ββ id / sanitization βββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def generate_id(from_a, to_a, subject) -> str: | |
| raw = f"{from_a}|{to_a}|{subject}|{datetime.now().isoformat()}|{os.urandom(8).hex()}" | |
| return hashlib.sha256(raw.encode()).hexdigest()[:12] | |
| def sanitize(name: str) -> str: | |
| return re.sub(r'[^\w\-.]', '_', name) | |
| def safe_move(src: str, dst: str): | |
| """Move file safely on Windows (copy+remove fallback).""" | |
| try: | |
| shutil.move(src, dst) | |
| except (PermissionError, OSError): | |
| shutil.copy2(src, dst) | |
| os.remove(src) | |
| # ββ identity validation βββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def validate_agent_key(cfg: dict) -> bool: | |
| """Verify that the config's agent_key matches the key stored in the mailbox.""" | |
| key_file = agent_base(cfg) / ".agent_key" | |
| if not key_file.exists(): | |
| return True # legacy agent without key | |
| try: | |
| stored = json.loads(key_file.read_text(encoding="utf-8")) | |
| config_key = cfg.get("agent_key", "") | |
| if not config_key: | |
| return True # legacy config without key | |
| return stored.get("key") == config_key | |
| except Exception: | |
| return True # don't block on corrupted key files | |
| def require_identity(cfg: dict): | |
| """Validate agent key before any write operation. Exit if invalid.""" | |
| import sys | |
| if not validate_agent_key(cfg): | |
| agent = cfg.get("agent_name", "unknown") | |
| print(f"ERROR: Identity verification failed for '{agent}'.") | |
| print("Your config key does not match the agent's registered key.") | |
| print("You cannot write to another agent's space.") | |
| sys.exit(1) | |
| def can_read_agent_memories(cfg: dict, target_agent: str) -> bool: | |
| """Check if current agent is allowed to read target agent's memories.""" | |
| if cfg["agent_name"] == target_agent: | |
| return True | |
| target_base = Path(cfg["mailbox_dir"]) / target_agent | |
| key_file = target_base / ".agent_key" | |
| if key_file.exists(): | |
| try: | |
| stored = json.loads(key_file.read_text(encoding="utf-8")) | |
| return stored.get("allow_memory_sharing", False) | |
| except Exception: | |
| pass | |
| return False | |