Spaces:
Sleeping
Sleeping
| import shutil | |
| import uuid | |
| import time | |
| import threading | |
| from pathlib import Path | |
| from typing import Callable, Optional | |
| from open_storyline.utils.logging import get_logger | |
| from src.open_storyline.storage.agent_memory import ArtifactStore | |
| logger = get_logger(__name__) | |
| class SessionLifecycleManager: | |
| """ | |
| Lifecycle Manager | |
| Responsibilities: | |
| 1. Create and clean up artifacts directory | |
| 2. Create and clean up .server_cache directory | |
| 3. Produce ArtifactStore instances | |
| """ | |
| def __init__( | |
| self, | |
| artifacts_root: str | Path, | |
| cache_root: str | Path, | |
| max_items: int = 256, | |
| retention_days: int = 3, | |
| enable_cleanup: bool = False, | |
| ): | |
| self.artifacts_root = Path(artifacts_root) | |
| self.cache_root = Path(cache_root) | |
| self.max_items = max_items | |
| self.retention_days = retention_days | |
| self.enable_cleanup = enable_cleanup | |
| # Ensure project root directory exists | |
| self.artifacts_root.mkdir(parents=True, exist_ok=True) | |
| self.cache_root.mkdir(parents=True, exist_ok=True) | |
| # Concurrency control: prevent multiple cleanup threads from interfering with each other | |
| self._cleanup_lock = threading.Lock() | |
| self._is_cleaning = False | |
| def _safe_rmtree(self, path: Path): | |
| """More robust directory deletion method""" | |
| def onerror(func, path, exc_info): | |
| import stat | |
| import os | |
| if not os.access(path, os.W_OK): | |
| os.chmod(path, stat.S_IWUSR) | |
| func(path) | |
| else: | |
| logger.warning(f"[Lifecycle] Failed to remove {path}: {exc_info[1]}") | |
| if path.is_dir(): | |
| shutil.rmtree(path, onerror=onerror) | |
| else: | |
| path.unlink(missing_ok=True) | |
| def _cleanup_dir(self, target_dir: Path, exclude_name: str = None, filter_func: Callable[[Path], bool] = None): | |
| """ | |
| Cleanup strategy: remove expired items first, then enforce quantity limit | |
| """ | |
| if not target_dir.exists(): | |
| return | |
| try: | |
| # 1. Calculate expiration timestamp cutoff | |
| now = time.time() | |
| # 86400 second = 1 day | |
| cutoff_time = now - (self.retention_days * 86400) | |
| valid_items = [] # 没过期且合法的 Session | |
| expired_items = [] # 已经过期的 Session | |
| # 2. Iterate and check | |
| for p in target_dir.iterdir(): | |
| # (A) Filter check (is it a directory, is it a UUID) | |
| if filter_func and not filter_func(p): | |
| continue | |
| # (B) Protect currently in-use items (don't delete even if expired, to prevent running tasks from crashing) | |
| if exclude_name and p.name == exclude_name: | |
| continue | |
| # (C) Check last modification time | |
| mtime = p.stat().st_mtime | |
| if mtime < cutoff_time: | |
| # Exceeded retention_days, add to expired list | |
| expired_items.append(p) | |
| else: | |
| # Not yet expired, add to valid list | |
| valid_items.append(p) | |
| # 3. Phase 1: Delete all expired items | |
| for item in expired_items: | |
| logger.info(f"[Lifecycle] Deleting expired item (> {self.retention_days} days): {item.name}") | |
| self._safe_rmtree(item) | |
| # 4. Phase 2: If remaining items still exceed max_items, delete the oldest | |
| if len(valid_items) > self.max_items: | |
| # Sort by time (oldest -> newest) | |
| valid_items.sort(key=lambda x: x.stat().st_mtime) | |
| num_to_delete = len(valid_items) - self.max_items | |
| logger.info(f"[Lifecycle] Item count {len(valid_items)} > limit {self.max_items}. Deleting {num_to_delete} oldest.") | |
| for item in valid_items[:num_to_delete]: | |
| logger.info(f"[Lifecycle] Deleting excess item: {item.name}") | |
| self._safe_rmtree(item) | |
| except Exception as e: | |
| logger.error(f"[Lifecycle] Error cleaning {target_dir}: {e}") | |
| def cleanup_expired_sessions(self, current_session_id: Optional[str] = None): | |
| """ | |
| Trigger cleanup for all managed directories | |
| Use lock to ensure only one cleanup task runs at a time | |
| """ | |
| if not self.enable_cleanup: | |
| return | |
| # Try acquiring the lock; if it fails (cleanup in progress), skip this round | |
| # Non-blocking approach suitable for high-frequency calls | |
| if not self._cleanup_lock.acquire(blocking=False): | |
| return | |
| def artifact_filter(p: Path) -> bool: | |
| return p.is_dir() and self._is_valid_session_id(p.name) | |
| try: | |
| self._is_cleaning = True | |
| # Clean up artifacts | |
| self._cleanup_dir(self.artifacts_root, exclude_name=current_session_id, filter_func=artifact_filter) | |
| # Clean up server_cache | |
| self._cleanup_dir(self.cache_root, exclude_name=current_session_id, filter_func=artifact_filter) | |
| finally: | |
| self._is_cleaning = False | |
| self._cleanup_lock.release() | |
| def _is_valid_session_id(self, name: str) -> bool: | |
| # 1. Quick filter: length must be 32 characters | |
| if len(name) != 32: | |
| return False | |
| # 2. Try to parse as UUID | |
| try: | |
| val = uuid.UUID(name) | |
| return val.hex == name and val.version == 4 | |
| except (ValueError, AttributeError): | |
| return False | |
| def get_artifact_store(self, session_id: str) -> ArtifactStore: | |
| # 1. Trigger cleanup asynchronously | |
| # Even if called concurrently here, the non-blocking lock inside cleanup_expired_sessions handles concurrency issues | |
| if self.enable_cleanup: | |
| threading.Thread( | |
| target=self.cleanup_expired_sessions, | |
| args=(session_id,), | |
| daemon=True, | |
| name=f"CleanupThread-{session_id}" | |
| ).start() | |
| # 2. Return Store instance | |
| return ArtifactStore(self.artifacts_root, session_id) |