AjinkyaPagare's picture
fix: Python 3.9 compatibility - Optional type hints and agent registration newlines (3 fixes: literal
35156f5
import os
import json
import time
import hashlib
from typing import Optional
CACHE_TTL_SECONDS = int(os.getenv("AGENT_CACHE_TTL", "300"))
CACHE_MAX_SIZE = int(os.getenv("AGENT_CACHE_MAX_SIZE", "10000"))
class ResponseCache:
def __init__(self):
self._local_cache = {}
self._local_order = []
def _make_key(self, prefix: str, *parts) -> str:
raw = f"{prefix}:{'|'.join(str(p) for p in parts)}"
return hashlib.md5(raw.encode()).hexdigest()
async def get(self, prefix: str, *parts) -> Optional[str]:
key = self._make_key(prefix, *parts)
return self._local_cache.get(key)
async def set(self, prefix: str, value: str, *parts, ttl: int = None) -> None:
key = self._make_key(prefix, *parts)
if key in self._local_cache:
self._local_order.remove(key)
elif len(self._local_cache) >= CACHE_MAX_SIZE:
if self._local_order:
oldest = self._local_order.pop(0)
self._local_cache.pop(oldest, None)
self._local_cache[key] = value
self._local_order.append(key)
async def invalidate(self, prefix: str, *parts) -> None:
key = self._make_key(prefix, *parts)
self._local_cache.pop(key, None)
async def clear_prefix(self, prefix: str) -> None:
keys_to_del = [k for k in self._local_cache if k.startswith(f"{prefix}:")]
for k in keys_to_del:
self._local_cache.pop(k, None)
self._local_order = [k for k in self._local_order if k not in keys_to_del]
_global_cache = None
def get_cache() -> ResponseCache:
global _global_cache
if _global_cache is None:
_global_cache = ResponseCache()
return _global_cache