from __future__ import annotations import json import os import logging from typing import Any, Dict, Optional from threading import RLock from pathlib import Path from utils.security import sanitize_path_component, validate_job_id logger = logging.getLogger(__name__) class MemoryStore: def __init__(self, base_dir: str = None) -> None: if base_dir is None: # Use a writable location in HF Spaces if os.path.exists("/home/user/app"): base_dir = "/home/user/app/memory_data" # HF Spaces environment else: base_dir = "./memory/data" # Local environment self.base_dir = Path(base_dir) self.base_dir.mkdir(parents=True, exist_ok=True) self._lock = RLock() logger.info(f"Memory store initialized at {self.base_dir}") def _path(self, user_id: str, job_id: Optional[str], agent_name: str) -> Path: """Generate a safe file path for storing agent memory.""" # Sanitize all components to prevent directory traversal safe_user = sanitize_path_component(user_id) safe_job = sanitize_path_component(job_id or "global") safe_agent = sanitize_path_component(agent_name) # Validate job_id if provided if job_id and not validate_job_id(job_id): logger.warning(f"Invalid job_id format: {job_id}, using sanitized version") agent_file = f"{safe_user}__{safe_job}__{safe_agent}.json" full_path = self.base_dir / agent_file # Ensure the path is within our base directory (defense in depth) try: full_path = full_path.resolve() if not full_path.is_relative_to(self.base_dir.resolve()): logger.error(f"Path traversal attempt detected: {full_path}") raise ValueError("Invalid path") except (ValueError, RuntimeError) as e: logger.error(f"Path validation error: {e}") # Fallback to a safe default full_path = self.base_dir / f"default__{safe_job}__{safe_agent}.json" return full_path def load(self, user_id: str, agent_name: str, job_id: Optional[str] = None) -> Dict[str, Any]: """Load agent memory from disk with error handling.""" path = self._path(user_id, job_id, agent_name) if not path.exists(): logger.debug(f"No memory file found at {path}") return {} with self._lock: try: with open(path, "r", encoding="utf-8") as f: data = json.load(f) logger.debug(f"Loaded memory from {path}") return data except json.JSONDecodeError as e: logger.error(f"JSON decode error in {path}: {e}") # Backup corrupted file backup_path = path.with_suffix(".corrupted.json") try: path.rename(backup_path) logger.warning(f"Backed up corrupted file to {backup_path}") except Exception: pass return {} except Exception as e: logger.error(f"Error loading memory from {path}: {e}") return {} def save(self, user_id: str, agent_name: str, data: Dict[str, Any], job_id: Optional[str] = None) -> None: """Save agent memory to disk with atomic write.""" path = self._path(user_id, job_id, agent_name) with self._lock: try: # Write to temporary file first (atomic write) temp_path = path.with_suffix(".tmp") with open(temp_path, "w", encoding="utf-8") as f: json.dump(data, f, ensure_ascii=False, indent=2) # Atomic rename temp_path.replace(path) logger.debug(f"Saved memory to {path}") except Exception as e: logger.error(f"Error saving memory to {path}: {e}") # Clean up temp file if it exists try: temp_path = path.with_suffix(".tmp") if temp_path.exists(): temp_path.unlink() except Exception: pass raise def clear(self, user_id: str, agent_name: str, job_id: Optional[str] = None) -> bool: """Clear specific memory file.""" path = self._path(user_id, job_id, agent_name) with self._lock: try: if path.exists(): path.unlink() logger.info(f"Cleared memory at {path}") return True return False except Exception as e: logger.error(f"Error clearing memory at {path}: {e}") return False memory_store = MemoryStore()