File size: 6,328 Bytes
06ba7ea
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
import shutil
import uuid
import time
import threading
from pathlib import Path
from typing import Callable, Optional

from open_storyline.utils.logging import get_logger
from src.open_storyline.storage.agent_memory import ArtifactStore

logger = get_logger(__name__)


class SessionLifecycleManager:
    """
    Lifecycle Manager
    Responsibilities:
    1. Create and clean up artifacts directory
    2. Create and clean up .server_cache directory
    3. Produce ArtifactStore instances
    """
    def __init__(
        self, 
        artifacts_root: str | Path, 
        cache_root: str | Path, 
        max_items: int = 256,
        retention_days: int = 3,
        enable_cleanup: bool = False,
    ):
        self.artifacts_root = Path(artifacts_root)
        self.cache_root = Path(cache_root)
        self.max_items = max_items
        self.retention_days = retention_days
        self.enable_cleanup = enable_cleanup

        # Ensure project root directory exists
        self.artifacts_root.mkdir(parents=True, exist_ok=True)
        self.cache_root.mkdir(parents=True, exist_ok=True)
        
        # Concurrency control: prevent multiple cleanup threads from interfering with each other
        self._cleanup_lock = threading.Lock()
        self._is_cleaning = False

    def _safe_rmtree(self, path: Path):
        """More robust directory deletion method"""
        def onerror(func, path, exc_info):
            import stat
            import os
            if not os.access(path, os.W_OK):
                os.chmod(path, stat.S_IWUSR)
                func(path)
            else:
                logger.warning(f"[Lifecycle] Failed to remove {path}: {exc_info[1]}")

        if path.is_dir():
            shutil.rmtree(path, onerror=onerror)
        else:
            path.unlink(missing_ok=True)

    def _cleanup_dir(self, target_dir: Path, exclude_name: str = None, filter_func: Callable[[Path], bool] = None):
        """
        Cleanup strategy: remove expired items first, then enforce quantity limit
        """
        if not target_dir.exists():
            return

        try:
            # 1. Calculate expiration timestamp cutoff
            now = time.time()
            # 86400 second = 1 day
            cutoff_time = now - (self.retention_days * 86400) 

            valid_items = []      # 没过期且合法的 Session
            expired_items = []    # 已经过期的 Session

            # 2. Iterate and check
            for p in target_dir.iterdir():
                # (A) Filter check (is it a directory, is it a UUID)
                if filter_func and not filter_func(p):
                    continue

                # (B) Protect currently in-use items (don't delete even if expired, to prevent running tasks from crashing)
                if exclude_name and p.name == exclude_name:
                    continue
                
                # (C) Check last modification time
                mtime = p.stat().st_mtime
                if mtime < cutoff_time:
                    # Exceeded retention_days, add to expired list
                    expired_items.append(p)
                else:
                    # Not yet expired, add to valid list
                    valid_items.append(p)

            # 3. Phase 1: Delete all expired items
            for item in expired_items:
                logger.info(f"[Lifecycle] Deleting expired item (> {self.retention_days} days): {item.name}")
                self._safe_rmtree(item)

            # 4. Phase 2: If remaining items still exceed max_items, delete the oldest
            if len(valid_items) > self.max_items:
                # Sort by time (oldest -> newest)
                valid_items.sort(key=lambda x: x.stat().st_mtime)
                
                num_to_delete = len(valid_items) - self.max_items
                logger.info(f"[Lifecycle] Item count {len(valid_items)} > limit {self.max_items}. Deleting {num_to_delete} oldest.")
                
                for item in valid_items[:num_to_delete]:
                    logger.info(f"[Lifecycle] Deleting excess item: {item.name}")
                    self._safe_rmtree(item)

        except Exception as e:
            logger.error(f"[Lifecycle] Error cleaning {target_dir}: {e}")

    def cleanup_expired_sessions(self, current_session_id: Optional[str] = None):
        """
        Trigger cleanup for all managed directories
        Use lock to ensure only one cleanup task runs at a time
        """
        if not self.enable_cleanup:
            return
        
        # Try acquiring the lock; if it fails (cleanup in progress), skip this round
        # Non-blocking approach suitable for high-frequency calls
        if not self._cleanup_lock.acquire(blocking=False):
            return

        def artifact_filter(p: Path) -> bool:
            return p.is_dir() and self._is_valid_session_id(p.name)

        try:
            self._is_cleaning = True
            # Clean up artifacts
            self._cleanup_dir(self.artifacts_root, exclude_name=current_session_id, filter_func=artifact_filter)
            # Clean up server_cache
            self._cleanup_dir(self.cache_root, exclude_name=current_session_id, filter_func=artifact_filter)
        finally:
            self._is_cleaning = False
            self._cleanup_lock.release()

    def _is_valid_session_id(self, name: str) -> bool:
        # 1. Quick filter: length must be 32 characters
        if len(name) != 32:
            return False
            
        # 2. Try to parse as UUID
        try:
            val = uuid.UUID(name)
            return val.hex == name and val.version == 4
        except (ValueError, AttributeError):
            return False

        

    def get_artifact_store(self, session_id: str) -> ArtifactStore:
        # 1. Trigger cleanup asynchronously
        # Even if called concurrently here, the non-blocking lock inside cleanup_expired_sessions handles concurrency issues
        if self.enable_cleanup:
            threading.Thread(
                target=self.cleanup_expired_sessions, 
                args=(session_id,), 
                daemon=True,
                name=f"CleanupThread-{session_id}"
            ).start()
        
        # 2. Return Store instance
        return ArtifactStore(self.artifacts_root, session_id)