Spaces:
Running
Running
| """ | |
| Knowledge Universe β Cache Strategy | |
| Defines TTL and eviction policy per data type. | |
| """ | |
| from dataclasses import dataclass | |
| from enum import Enum | |
| from typing import Optional | |
| class CacheTier(str, Enum): | |
| HOT = "hot" # Frequently accessed, short TTL | |
| WARM = "warm" # Normal requests | |
| COLD = "cold" # Rarely accessed, long TTL | |
| class CachePolicy: | |
| tier: CacheTier | |
| ttl_secs: int | |
| max_size: Optional[int] = None # max bytes, None = unlimited | |
| # Policies per namespace | |
| CACHE_POLICIES = { | |
| "req": CachePolicy(CacheTier.HOT, ttl_secs=14_400), # 4h β request results | |
| "source": CachePolicy(CacheTier.WARM, ttl_secs=43_200), # 12h β source metadata | |
| "index": CachePolicy(CacheTier.COLD, ttl_secs=86_400), # 24h β index/catalog | |
| "decay": CachePolicy(CacheTier.WARM, ttl_secs=21_600), # 6h β decay scores | |
| "ku:key": CachePolicy(CacheTier.HOT, ttl_secs=0), # 0 = no expiry (API keys) | |
| "ku:usage":CachePolicy(CacheTier.HOT, ttl_secs=2_678_400),# 31d β monthly counters | |
| } | |
| def get_policy(key: str) -> CachePolicy: | |
| """Return cache policy for a given key by matching its namespace prefix.""" | |
| for prefix, policy in CACHE_POLICIES.items(): | |
| if key.startswith(prefix): | |
| return policy | |
| # Default: warm tier, 4h TTL | |
| return CachePolicy(CacheTier.WARM, ttl_secs=14_400) | |
| def get_ttl(key: str) -> int: | |
| """Convenience: return TTL seconds for a key.""" | |
| return get_policy(key).ttl_secs |