| | import shutil |
| | import uuid |
| | import time |
| | import threading |
| | from pathlib import Path |
| | from typing import Callable, Optional |
| |
|
| | from open_storyline.utils.logging import get_logger |
| | from src.open_storyline.storage.agent_memory import ArtifactStore |
| |
|
| | logger = get_logger(__name__) |
| |
|
| |
|
| | class SessionLifecycleManager: |
| | """ |
| | Lifecycle Manager |
| | Responsibilities: |
| | 1. Create and clean up artifacts directory |
| | 2. Create and clean up .server_cache directory |
| | 3. Produce ArtifactStore instances |
| | """ |
| | def __init__( |
| | self, |
| | artifacts_root: str | Path, |
| | cache_root: str | Path, |
| | max_items: int = 256, |
| | retention_days: int = 3, |
| | enable_cleanup: bool = False, |
| | ): |
| | self.artifacts_root = Path(artifacts_root) |
| | self.cache_root = Path(cache_root) |
| | self.max_items = max_items |
| | self.retention_days = retention_days |
| | self.enable_cleanup = enable_cleanup |
| |
|
| | |
| | self.artifacts_root.mkdir(parents=True, exist_ok=True) |
| | self.cache_root.mkdir(parents=True, exist_ok=True) |
| | |
| | |
| | self._cleanup_lock = threading.Lock() |
| | self._is_cleaning = False |
| |
|
| | def _safe_rmtree(self, path: Path): |
| | """More robust directory deletion method""" |
| | def onerror(func, path, exc_info): |
| | import stat |
| | import os |
| | if not os.access(path, os.W_OK): |
| | os.chmod(path, stat.S_IWUSR) |
| | func(path) |
| | else: |
| | logger.warning(f"[Lifecycle] Failed to remove {path}: {exc_info[1]}") |
| |
|
| | if path.is_dir(): |
| | shutil.rmtree(path, onerror=onerror) |
| | else: |
| | path.unlink(missing_ok=True) |
| |
|
| | def _cleanup_dir(self, target_dir: Path, exclude_name: str = None, filter_func: Callable[[Path], bool] = None): |
| | """ |
| | Cleanup strategy: remove expired items first, then enforce quantity limit |
| | """ |
| | if not target_dir.exists(): |
| | return |
| |
|
| | try: |
| | |
| | now = time.time() |
| | |
| | cutoff_time = now - (self.retention_days * 86400) |
| |
|
| | valid_items = [] |
| | expired_items = [] |
| |
|
| | |
| | for p in target_dir.iterdir(): |
| | |
| | if filter_func and not filter_func(p): |
| | continue |
| |
|
| | |
| | if exclude_name and p.name == exclude_name: |
| | continue |
| | |
| | |
| | mtime = p.stat().st_mtime |
| | if mtime < cutoff_time: |
| | |
| | expired_items.append(p) |
| | else: |
| | |
| | valid_items.append(p) |
| |
|
| | |
| | for item in expired_items: |
| | logger.info(f"[Lifecycle] Deleting expired item (> {self.retention_days} days): {item.name}") |
| | self._safe_rmtree(item) |
| |
|
| | |
| | if len(valid_items) > self.max_items: |
| | |
| | valid_items.sort(key=lambda x: x.stat().st_mtime) |
| | |
| | num_to_delete = len(valid_items) - self.max_items |
| | logger.info(f"[Lifecycle] Item count {len(valid_items)} > limit {self.max_items}. Deleting {num_to_delete} oldest.") |
| | |
| | for item in valid_items[:num_to_delete]: |
| | logger.info(f"[Lifecycle] Deleting excess item: {item.name}") |
| | self._safe_rmtree(item) |
| |
|
| | except Exception as e: |
| | logger.error(f"[Lifecycle] Error cleaning {target_dir}: {e}") |
| |
|
| | def cleanup_expired_sessions(self, current_session_id: Optional[str] = None): |
| | """ |
| | Trigger cleanup for all managed directories |
| | Use lock to ensure only one cleanup task runs at a time |
| | """ |
| | if not self.enable_cleanup: |
| | return |
| | |
| | |
| | |
| | if not self._cleanup_lock.acquire(blocking=False): |
| | return |
| |
|
| | def artifact_filter(p: Path) -> bool: |
| | return p.is_dir() and self._is_valid_session_id(p.name) |
| |
|
| | try: |
| | self._is_cleaning = True |
| | |
| | self._cleanup_dir(self.artifacts_root, exclude_name=current_session_id, filter_func=artifact_filter) |
| | |
| | self._cleanup_dir(self.cache_root, exclude_name=current_session_id, filter_func=artifact_filter) |
| | finally: |
| | self._is_cleaning = False |
| | self._cleanup_lock.release() |
| |
|
| | def _is_valid_session_id(self, name: str) -> bool: |
| | |
| | if len(name) != 32: |
| | return False |
| | |
| | |
| | try: |
| | val = uuid.UUID(name) |
| | return val.hex == name and val.version == 4 |
| | except (ValueError, AttributeError): |
| | return False |
| |
|
| | |
| |
|
| | def get_artifact_store(self, session_id: str) -> ArtifactStore: |
| | |
| | |
| | if self.enable_cleanup: |
| | threading.Thread( |
| | target=self.cleanup_expired_sessions, |
| | args=(session_id,), |
| | daemon=True, |
| | name=f"CleanupThread-{session_id}" |
| | ).start() |
| | |
| | |
| | return ArtifactStore(self.artifacts_root, session_id) |