Spaces:
Runtime error
Runtime error
| from __future__ import annotations | |
| import json | |
| import os | |
| import logging | |
| from typing import Any, Dict, Optional | |
| from threading import RLock | |
| from pathlib import Path | |
| from utils.security import sanitize_path_component, validate_job_id | |
| logger = logging.getLogger(__name__) | |
| class MemoryStore: | |
| def __init__(self, base_dir: str = None) -> None: | |
| if base_dir is None: | |
| # Use a writable location in HF Spaces | |
| if os.path.exists("/home/user/app"): | |
| base_dir = "/home/user/app/memory_data" # HF Spaces environment | |
| else: | |
| base_dir = "./memory/data" # Local environment | |
| self.base_dir = Path(base_dir) | |
| self.base_dir.mkdir(parents=True, exist_ok=True) | |
| self._lock = RLock() | |
| logger.info(f"Memory store initialized at {self.base_dir}") | |
| def _path(self, user_id: str, job_id: Optional[str], agent_name: str) -> Path: | |
| """Generate a safe file path for storing agent memory.""" | |
| # Sanitize all components to prevent directory traversal | |
| safe_user = sanitize_path_component(user_id) | |
| safe_job = sanitize_path_component(job_id or "global") | |
| safe_agent = sanitize_path_component(agent_name) | |
| # Validate job_id if provided | |
| if job_id and not validate_job_id(job_id): | |
| logger.warning(f"Invalid job_id format: {job_id}, using sanitized version") | |
| agent_file = f"{safe_user}__{safe_job}__{safe_agent}.json" | |
| full_path = self.base_dir / agent_file | |
| # Ensure the path is within our base directory (defense in depth) | |
| try: | |
| full_path = full_path.resolve() | |
| if not full_path.is_relative_to(self.base_dir.resolve()): | |
| logger.error(f"Path traversal attempt detected: {full_path}") | |
| raise ValueError("Invalid path") | |
| except (ValueError, RuntimeError) as e: | |
| logger.error(f"Path validation error: {e}") | |
| # Fallback to a safe default | |
| full_path = self.base_dir / f"default__{safe_job}__{safe_agent}.json" | |
| return full_path | |
| def load(self, user_id: str, agent_name: str, job_id: Optional[str] = None) -> Dict[str, Any]: | |
| """Load agent memory from disk with error handling.""" | |
| path = self._path(user_id, job_id, agent_name) | |
| if not path.exists(): | |
| logger.debug(f"No memory file found at {path}") | |
| return {} | |
| with self._lock: | |
| try: | |
| with open(path, "r", encoding="utf-8") as f: | |
| data = json.load(f) | |
| logger.debug(f"Loaded memory from {path}") | |
| return data | |
| except json.JSONDecodeError as e: | |
| logger.error(f"JSON decode error in {path}: {e}") | |
| # Backup corrupted file | |
| backup_path = path.with_suffix(".corrupted.json") | |
| try: | |
| path.rename(backup_path) | |
| logger.warning(f"Backed up corrupted file to {backup_path}") | |
| except Exception: | |
| pass | |
| return {} | |
| except Exception as e: | |
| logger.error(f"Error loading memory from {path}: {e}") | |
| return {} | |
| def save(self, user_id: str, agent_name: str, data: Dict[str, Any], job_id: Optional[str] = None) -> None: | |
| """Save agent memory to disk with atomic write.""" | |
| path = self._path(user_id, job_id, agent_name) | |
| with self._lock: | |
| try: | |
| # Write to temporary file first (atomic write) | |
| temp_path = path.with_suffix(".tmp") | |
| with open(temp_path, "w", encoding="utf-8") as f: | |
| json.dump(data, f, ensure_ascii=False, indent=2) | |
| # Atomic rename | |
| temp_path.replace(path) | |
| logger.debug(f"Saved memory to {path}") | |
| except Exception as e: | |
| logger.error(f"Error saving memory to {path}: {e}") | |
| # Clean up temp file if it exists | |
| try: | |
| temp_path = path.with_suffix(".tmp") | |
| if temp_path.exists(): | |
| temp_path.unlink() | |
| except Exception: | |
| pass | |
| raise | |
| def clear(self, user_id: str, agent_name: str, job_id: Optional[str] = None) -> bool: | |
| """Clear specific memory file.""" | |
| path = self._path(user_id, job_id, agent_name) | |
| with self._lock: | |
| try: | |
| if path.exists(): | |
| path.unlink() | |
| logger.info(f"Cleared memory at {path}") | |
| return True | |
| return False | |
| except Exception as e: | |
| logger.error(f"Error clearing memory at {path}: {e}") | |
| return False | |
| memory_store = MemoryStore() |