File size: 3,077 Bytes
cbdb02c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
import fnmatch
import threading
import time
from dataclasses import dataclass
from typing import Any

_lock = threading.RLock()
_cache: dict[str, dict[str, "CacheEntry"]] = {}

_enabled_global: bool = True
_enabled_areas: dict[str, bool] = {}


@dataclass(slots=True)
class CacheEntry:
    value: Any
    timestamp: float


def toggle_global(enabled: bool) -> None:
    global _enabled_global
    _enabled_global = enabled


def toggle_area(area: str, enabled: bool) -> None:
    _enabled_areas[area] = enabled


def has(area: str, key: Any) -> bool:
    if not _is_enabled(area):
        return False
    with _lock:
        entry = _cache.get(area, {}).get(key)
        if entry is None:
            return False
        _touch_entry(entry)
        return True


def add(area: str, key: Any, data: Any) -> None:
    if not _is_enabled(area):
        return
    with _lock:
        if area not in _cache:
            _cache[area] = {}
        _cache[area][key] = _create_entry(data)


def get(area: str, key: Any, default: Any = None) -> Any:
    if not _is_enabled(area):
        return default
    with _lock:
        entry = _cache.get(area, {}).get(key)
        if entry is None:
            return default
        _touch_entry(entry)
        return entry.value


def remove(area: str, key: Any) -> None:
    if not _is_enabled(area):
        return
    with _lock:
        if area in _cache:
            _cache[area].pop(key, None)


def clear(area: str) -> None:
    with _lock:
        if any(ch in area for ch in "*?["):
            keys_to_remove = [k for k in _cache.keys() if fnmatch.fnmatch(k, area)]
            for k in keys_to_remove:
                _cache.pop(k, None)
            return

        _cache.pop(area, None)


def trim_cache(area: str, seconds: float = 300) -> None:
    cutoff = time.time() - seconds
    with _lock:
        for area_key in _get_matching_areas(area):
            area_cache = _cache.get(area_key)
            if not area_cache:
                continue

            keys_to_remove = [key for key, entry in area_cache.items() if entry.timestamp < cutoff]
            for key in keys_to_remove:
                area_cache.pop(key, None)

            if not area_cache:
                _cache.pop(area_key, None)


def clear_all() -> None:
    with _lock:
        _cache.clear()


def _is_enabled(area: str) -> bool:
    if not _enabled_global:
        return False
    return _enabled_areas.get(area, True)


def _create_entry(value: Any) -> CacheEntry:
    return CacheEntry(value=value, timestamp=time.time())


def _touch_entry(entry: CacheEntry) -> None:
    entry.timestamp = time.time()


def _get_matching_areas(area: str) -> list[str]:
    if any(ch in area for ch in "*?["):
        return [k for k in _cache.keys() if fnmatch.fnmatch(k, area)]
    return [area]


def determine_cache_key(agent, *additional):
    if agent:
        profile = agent.config.profile or "none"
        project = agent.context.get_data("project") or "none"
        return (profile, project, *additional)
    return ("none", "none", *additional)