Noo88ear's picture
πŸš€ Initial deployment of Multi-Agent Job Application Assistant
7498f2c
from __future__ import annotations
import time
from threading import RLock
from typing import Any, Dict, Tuple, Optional
class TTLCache:
def __init__(self, ttl_seconds: int = 3600, max_items: int = 512) -> None:
self.ttl_seconds = ttl_seconds
self.max_items = max_items
self._data: Dict[str, Tuple[float, Any]] = {}
self._lock = RLock()
def _evict_if_needed(self) -> None:
if len(self._data) <= self.max_items:
return
# Evict oldest by expiry time
items = sorted(self._data.items(), key=lambda kv: kv[1][0])
for k, _ in items[: max(1, len(items) - self.max_items)]:
self._data.pop(k, None)
def get(self, key: str) -> Optional[Any]:
now = time.time()
with self._lock:
item = self._data.get(key)
if not item:
return None
expires, value = item
if expires < now:
# expired
self._data.pop(key, None)
return None
return value
def set(self, key: str, value: Any) -> None:
with self._lock:
expires = time.time() + self.ttl_seconds
self._data[key] = (expires, value)
self._evict_if_needed()
def clear(self) -> None:
with self._lock:
self._data.clear()