Spaces:
Paused
Paused
| import fnmatch | |
| import threading | |
| import time | |
| from dataclasses import dataclass | |
| from typing import Any | |
| _lock = threading.RLock() | |
| _cache: dict[str, dict[str, "CacheEntry"]] = {} | |
| _enabled_global: bool = True | |
| _enabled_areas: dict[str, bool] = {} | |
| class CacheEntry: | |
| value: Any | |
| timestamp: float | |
| def toggle_global(enabled: bool) -> None: | |
| global _enabled_global | |
| _enabled_global = enabled | |
| def toggle_area(area: str, enabled: bool) -> None: | |
| _enabled_areas[area] = enabled | |
| def has(area: str, key: Any) -> bool: | |
| if not _is_enabled(area): | |
| return False | |
| with _lock: | |
| entry = _cache.get(area, {}).get(key) | |
| if entry is None: | |
| return False | |
| _touch_entry(entry) | |
| return True | |
| def add(area: str, key: Any, data: Any) -> None: | |
| if not _is_enabled(area): | |
| return | |
| with _lock: | |
| if area not in _cache: | |
| _cache[area] = {} | |
| _cache[area][key] = _create_entry(data) | |
| def get(area: str, key: Any, default: Any = None) -> Any: | |
| if not _is_enabled(area): | |
| return default | |
| with _lock: | |
| entry = _cache.get(area, {}).get(key) | |
| if entry is None: | |
| return default | |
| _touch_entry(entry) | |
| return entry.value | |
| def remove(area: str, key: Any) -> None: | |
| if not _is_enabled(area): | |
| return | |
| with _lock: | |
| if area in _cache: | |
| _cache[area].pop(key, None) | |
| def clear(area: str) -> None: | |
| with _lock: | |
| if any(ch in area for ch in "*?["): | |
| keys_to_remove = [k for k in _cache.keys() if fnmatch.fnmatch(k, area)] | |
| for k in keys_to_remove: | |
| _cache.pop(k, None) | |
| return | |
| _cache.pop(area, None) | |
| def trim_cache(area: str, seconds: float = 300) -> None: | |
| cutoff = time.time() - seconds | |
| with _lock: | |
| for area_key in _get_matching_areas(area): | |
| area_cache = _cache.get(area_key) | |
| if not area_cache: | |
| continue | |
| keys_to_remove = [key for key, entry in area_cache.items() if entry.timestamp < cutoff] | |
| for key in keys_to_remove: | |
| area_cache.pop(key, None) | |
| if not area_cache: | |
| _cache.pop(area_key, None) | |
| def clear_all() -> None: | |
| with _lock: | |
| _cache.clear() | |
| def _is_enabled(area: str) -> bool: | |
| if not _enabled_global: | |
| return False | |
| return _enabled_areas.get(area, True) | |
| def _create_entry(value: Any) -> CacheEntry: | |
| return CacheEntry(value=value, timestamp=time.time()) | |
| def _touch_entry(entry: CacheEntry) -> None: | |
| entry.timestamp = time.time() | |
| def _get_matching_areas(area: str) -> list[str]: | |
| if any(ch in area for ch in "*?["): | |
| return [k for k in _cache.keys() if fnmatch.fnmatch(k, area)] | |
| return [area] | |
| def determine_cache_key(agent, *additional): | |
| if agent: | |
| profile = agent.config.profile or "none" | |
| project = agent.context.get_data("project") or "none" | |
| return (profile, project, *additional) | |
| return ("none", "none", *additional) |