Spaces:
Running
Running
fix: Python 3.9 compatibility - Optional type hints and agent registration newlines (3 fixes: literal
35156f5 | import os | |
| import json | |
| import time | |
| import hashlib | |
| from typing import Optional | |
| CACHE_TTL_SECONDS = int(os.getenv("AGENT_CACHE_TTL", "300")) | |
| CACHE_MAX_SIZE = int(os.getenv("AGENT_CACHE_MAX_SIZE", "10000")) | |
| class ResponseCache: | |
| def __init__(self): | |
| self._local_cache = {} | |
| self._local_order = [] | |
| def _make_key(self, prefix: str, *parts) -> str: | |
| raw = f"{prefix}:{'|'.join(str(p) for p in parts)}" | |
| return hashlib.md5(raw.encode()).hexdigest() | |
| async def get(self, prefix: str, *parts) -> Optional[str]: | |
| key = self._make_key(prefix, *parts) | |
| return self._local_cache.get(key) | |
| async def set(self, prefix: str, value: str, *parts, ttl: int = None) -> None: | |
| key = self._make_key(prefix, *parts) | |
| if key in self._local_cache: | |
| self._local_order.remove(key) | |
| elif len(self._local_cache) >= CACHE_MAX_SIZE: | |
| if self._local_order: | |
| oldest = self._local_order.pop(0) | |
| self._local_cache.pop(oldest, None) | |
| self._local_cache[key] = value | |
| self._local_order.append(key) | |
| async def invalidate(self, prefix: str, *parts) -> None: | |
| key = self._make_key(prefix, *parts) | |
| self._local_cache.pop(key, None) | |
| async def clear_prefix(self, prefix: str) -> None: | |
| keys_to_del = [k for k in self._local_cache if k.startswith(f"{prefix}:")] | |
| for k in keys_to_del: | |
| self._local_cache.pop(k, None) | |
| self._local_order = [k for k in self._local_order if k not in keys_to_del] | |
| _global_cache = None | |
| def get_cache() -> ResponseCache: | |
| global _global_cache | |
| if _global_cache is None: | |
| _global_cache = ResponseCache() | |
| return _global_cache | |