File size: 5,098 Bytes
7498f2c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
from __future__ import annotations
import json
import os
import logging
from typing import Any, Dict, Optional
from threading import RLock
from pathlib import Path

from utils.security import sanitize_path_component, validate_job_id

logger = logging.getLogger(__name__)


class MemoryStore:
    def __init__(self, base_dir: str = None) -> None:
        if base_dir is None:
            # Use a writable location in HF Spaces
            if os.path.exists("/home/user/app"):
                base_dir = "/home/user/app/memory_data"  # HF Spaces environment
            else:
                base_dir = "./memory/data"  # Local environment
        self.base_dir = Path(base_dir)
        self.base_dir.mkdir(parents=True, exist_ok=True)
        self._lock = RLock()
        logger.info(f"Memory store initialized at {self.base_dir}")

    def _path(self, user_id: str, job_id: Optional[str], agent_name: str) -> Path:
        """Generate a safe file path for storing agent memory."""
        # Sanitize all components to prevent directory traversal
        safe_user = sanitize_path_component(user_id)
        safe_job = sanitize_path_component(job_id or "global")
        safe_agent = sanitize_path_component(agent_name)
        
        # Validate job_id if provided
        if job_id and not validate_job_id(job_id):
            logger.warning(f"Invalid job_id format: {job_id}, using sanitized version")
        
        agent_file = f"{safe_user}__{safe_job}__{safe_agent}.json"
        full_path = self.base_dir / agent_file
        
        # Ensure the path is within our base directory (defense in depth)
        try:
            full_path = full_path.resolve()
            if not full_path.is_relative_to(self.base_dir.resolve()):
                logger.error(f"Path traversal attempt detected: {full_path}")
                raise ValueError("Invalid path")
        except (ValueError, RuntimeError) as e:
            logger.error(f"Path validation error: {e}")
            # Fallback to a safe default
            full_path = self.base_dir / f"default__{safe_job}__{safe_agent}.json"
        
        return full_path

    def load(self, user_id: str, agent_name: str, job_id: Optional[str] = None) -> Dict[str, Any]:
        """Load agent memory from disk with error handling."""
        path = self._path(user_id, job_id, agent_name)
        
        if not path.exists():
            logger.debug(f"No memory file found at {path}")
            return {}
        
        with self._lock:
            try:
                with open(path, "r", encoding="utf-8") as f:
                    data = json.load(f)
                    logger.debug(f"Loaded memory from {path}")
                    return data
            except json.JSONDecodeError as e:
                logger.error(f"JSON decode error in {path}: {e}")
                # Backup corrupted file
                backup_path = path.with_suffix(".corrupted.json")
                try:
                    path.rename(backup_path)
                    logger.warning(f"Backed up corrupted file to {backup_path}")
                except Exception:
                    pass
                return {}
            except Exception as e:
                logger.error(f"Error loading memory from {path}: {e}")
                return {}

    def save(self, user_id: str, agent_name: str, data: Dict[str, Any], job_id: Optional[str] = None) -> None:
        """Save agent memory to disk with atomic write."""
        path = self._path(user_id, job_id, agent_name)
        
        with self._lock:
            try:
                # Write to temporary file first (atomic write)
                temp_path = path.with_suffix(".tmp")
                with open(temp_path, "w", encoding="utf-8") as f:
                    json.dump(data, f, ensure_ascii=False, indent=2)
                
                # Atomic rename
                temp_path.replace(path)
                logger.debug(f"Saved memory to {path}")
                
            except Exception as e:
                logger.error(f"Error saving memory to {path}: {e}")
                # Clean up temp file if it exists
                try:
                    temp_path = path.with_suffix(".tmp")
                    if temp_path.exists():
                        temp_path.unlink()
                except Exception:
                    pass
                raise

    def clear(self, user_id: str, agent_name: str, job_id: Optional[str] = None) -> bool:
        """Clear specific memory file."""
        path = self._path(user_id, job_id, agent_name)
        
        with self._lock:
            try:
                if path.exists():
                    path.unlink()
                    logger.info(f"Cleared memory at {path}")
                    return True
                return False
            except Exception as e:
                logger.error(f"Error clearing memory at {path}: {e}")
                return False


memory_store = MemoryStore()