Spaces:
Runtime error
Runtime error
| """ | |
| KV Store - In-memory key-value store with TTL for wayyDB. | |
| Provides Redis-like KV semantics for future multi-process scaling. | |
| Background eviction runs every 60 seconds. | |
| """ | |
| import asyncio | |
| import logging | |
| import time | |
| from fnmatch import fnmatch | |
| from typing import Any, Dict, List, Optional | |
| logger = logging.getLogger(__name__) | |
| class KVEntry: | |
| """A stored value with optional TTL.""" | |
| __slots__ = ("value", "expires_at", "created_at") | |
| def __init__(self, value: Any, ttl: Optional[float] = None): | |
| now = time.time() | |
| self.value = value | |
| self.expires_at = now + ttl if ttl else float("inf") | |
| self.created_at = now | |
| def is_expired(self) -> bool: | |
| return time.time() > self.expires_at | |
| def ttl_remaining(self) -> Optional[float]: | |
| if self.expires_at == float("inf"): | |
| return None | |
| remaining = self.expires_at - time.time() | |
| return max(0, remaining) | |
| class KVStore: | |
| """ | |
| In-memory KV store with TTL and background eviction. | |
| Thread-safe for single-process async use (GIL + event loop). | |
| """ | |
| def __init__(self) -> None: | |
| self._data: Dict[str, KVEntry] = {} | |
| self._eviction_task: Optional[asyncio.Task] = None | |
| self._sets: int = 0 | |
| self._gets: int = 0 | |
| self._deletes: int = 0 | |
| self._evictions: int = 0 | |
| async def start(self) -> None: | |
| """Start the background eviction task.""" | |
| if self._eviction_task is None: | |
| self._eviction_task = asyncio.create_task(self._eviction_loop()) | |
| logger.info("KVStore eviction task started") | |
| async def stop(self) -> None: | |
| """Stop the background eviction task.""" | |
| if self._eviction_task: | |
| self._eviction_task.cancel() | |
| try: | |
| await self._eviction_task | |
| except asyncio.CancelledError: | |
| pass | |
| self._eviction_task = None | |
| def set(self, key: str, value: Any, ttl: Optional[float] = None) -> None: | |
| """Set a key with optional TTL (seconds).""" | |
| self._data[key] = KVEntry(value, ttl) | |
| self._sets += 1 | |
| def get(self, key: str) -> Optional[Any]: | |
| """Get a value by key. Returns None if missing or expired.""" | |
| self._gets += 1 | |
| entry = self._data.get(key) | |
| if entry is None: | |
| return None | |
| if entry.is_expired: | |
| del self._data[key] | |
| self._evictions += 1 | |
| return None | |
| return entry.value | |
| def delete(self, key: str) -> bool: | |
| """Delete a key. Returns True if existed.""" | |
| existed = key in self._data | |
| if existed: | |
| del self._data[key] | |
| self._deletes += 1 | |
| return existed | |
| def keys(self, pattern: Optional[str] = None) -> List[str]: | |
| """List keys, optionally filtered by glob pattern.""" | |
| now = time.time() | |
| result = [] | |
| for k, v in self._data.items(): | |
| if v.expires_at > now: | |
| if pattern is None or fnmatch(k, pattern): | |
| result.append(k) | |
| return result | |
| def stats(self) -> Dict[str, Any]: | |
| """Get store statistics.""" | |
| now = time.time() | |
| active = sum(1 for v in self._data.values() if v.expires_at > now) | |
| return { | |
| "total_keys": len(self._data), | |
| "active_keys": active, | |
| "sets": self._sets, | |
| "gets": self._gets, | |
| "deletes": self._deletes, | |
| "evictions": self._evictions, | |
| } | |
| async def _eviction_loop(self) -> None: | |
| """Background loop to evict expired entries every 60s.""" | |
| while True: | |
| try: | |
| await asyncio.sleep(60) | |
| count = self._evict_expired() | |
| if count > 0: | |
| logger.debug(f"KVStore evicted {count} expired entries") | |
| except asyncio.CancelledError: | |
| break | |
| except Exception as e: | |
| logger.error(f"KVStore eviction error: {e}") | |
| def _evict_expired(self) -> int: | |
| """Evict all expired entries. Returns count evicted.""" | |
| now = time.time() | |
| expired = [k for k, v in self._data.items() if now > v.expires_at] | |
| for k in expired: | |
| del self._data[k] | |
| self._evictions += len(expired) | |
| return len(expired) | |
| # Global singleton | |
| _kv_store: Optional[KVStore] = None | |
| def get_kv_store() -> KVStore: | |
| """Get the global KV store instance.""" | |
| global _kv_store | |
| if _kv_store is None: | |
| _kv_store = KVStore() | |
| return _kv_store | |