File size: 6,328 Bytes
06ba7ea | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 | import shutil
import uuid
import time
import threading
from pathlib import Path
from typing import Callable, Optional
from open_storyline.utils.logging import get_logger
from src.open_storyline.storage.agent_memory import ArtifactStore
logger = get_logger(__name__)
class SessionLifecycleManager:
"""
Lifecycle Manager
Responsibilities:
1. Create and clean up artifacts directory
2. Create and clean up .server_cache directory
3. Produce ArtifactStore instances
"""
def __init__(
self,
artifacts_root: str | Path,
cache_root: str | Path,
max_items: int = 256,
retention_days: int = 3,
enable_cleanup: bool = False,
):
self.artifacts_root = Path(artifacts_root)
self.cache_root = Path(cache_root)
self.max_items = max_items
self.retention_days = retention_days
self.enable_cleanup = enable_cleanup
# Ensure project root directory exists
self.artifacts_root.mkdir(parents=True, exist_ok=True)
self.cache_root.mkdir(parents=True, exist_ok=True)
# Concurrency control: prevent multiple cleanup threads from interfering with each other
self._cleanup_lock = threading.Lock()
self._is_cleaning = False
def _safe_rmtree(self, path: Path):
"""More robust directory deletion method"""
def onerror(func, path, exc_info):
import stat
import os
if not os.access(path, os.W_OK):
os.chmod(path, stat.S_IWUSR)
func(path)
else:
logger.warning(f"[Lifecycle] Failed to remove {path}: {exc_info[1]}")
if path.is_dir():
shutil.rmtree(path, onerror=onerror)
else:
path.unlink(missing_ok=True)
def _cleanup_dir(self, target_dir: Path, exclude_name: str = None, filter_func: Callable[[Path], bool] = None):
"""
Cleanup strategy: remove expired items first, then enforce quantity limit
"""
if not target_dir.exists():
return
try:
# 1. Calculate expiration timestamp cutoff
now = time.time()
# 86400 second = 1 day
cutoff_time = now - (self.retention_days * 86400)
valid_items = [] # 没过期且合法的 Session
expired_items = [] # 已经过期的 Session
# 2. Iterate and check
for p in target_dir.iterdir():
# (A) Filter check (is it a directory, is it a UUID)
if filter_func and not filter_func(p):
continue
# (B) Protect currently in-use items (don't delete even if expired, to prevent running tasks from crashing)
if exclude_name and p.name == exclude_name:
continue
# (C) Check last modification time
mtime = p.stat().st_mtime
if mtime < cutoff_time:
# Exceeded retention_days, add to expired list
expired_items.append(p)
else:
# Not yet expired, add to valid list
valid_items.append(p)
# 3. Phase 1: Delete all expired items
for item in expired_items:
logger.info(f"[Lifecycle] Deleting expired item (> {self.retention_days} days): {item.name}")
self._safe_rmtree(item)
# 4. Phase 2: If remaining items still exceed max_items, delete the oldest
if len(valid_items) > self.max_items:
# Sort by time (oldest -> newest)
valid_items.sort(key=lambda x: x.stat().st_mtime)
num_to_delete = len(valid_items) - self.max_items
logger.info(f"[Lifecycle] Item count {len(valid_items)} > limit {self.max_items}. Deleting {num_to_delete} oldest.")
for item in valid_items[:num_to_delete]:
logger.info(f"[Lifecycle] Deleting excess item: {item.name}")
self._safe_rmtree(item)
except Exception as e:
logger.error(f"[Lifecycle] Error cleaning {target_dir}: {e}")
def cleanup_expired_sessions(self, current_session_id: Optional[str] = None):
"""
Trigger cleanup for all managed directories
Use lock to ensure only one cleanup task runs at a time
"""
if not self.enable_cleanup:
return
# Try acquiring the lock; if it fails (cleanup in progress), skip this round
# Non-blocking approach suitable for high-frequency calls
if not self._cleanup_lock.acquire(blocking=False):
return
def artifact_filter(p: Path) -> bool:
return p.is_dir() and self._is_valid_session_id(p.name)
try:
self._is_cleaning = True
# Clean up artifacts
self._cleanup_dir(self.artifacts_root, exclude_name=current_session_id, filter_func=artifact_filter)
# Clean up server_cache
self._cleanup_dir(self.cache_root, exclude_name=current_session_id, filter_func=artifact_filter)
finally:
self._is_cleaning = False
self._cleanup_lock.release()
def _is_valid_session_id(self, name: str) -> bool:
# 1. Quick filter: length must be 32 characters
if len(name) != 32:
return False
# 2. Try to parse as UUID
try:
val = uuid.UUID(name)
return val.hex == name and val.version == 4
except (ValueError, AttributeError):
return False
def get_artifact_store(self, session_id: str) -> ArtifactStore:
# 1. Trigger cleanup asynchronously
# Even if called concurrently here, the non-blocking lock inside cleanup_expired_sessions handles concurrency issues
if self.enable_cleanup:
threading.Thread(
target=self.cleanup_expired_sessions,
args=(session_id,),
daemon=True,
name=f"CleanupThread-{session_id}"
).start()
# 2. Return Store instance
return ArtifactStore(self.artifacts_root, session_id) |