Spaces:
Running
Running
File size: 2,343 Bytes
b32fbe0 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 | """
Knowledge Universe β Cache Key Generator
Deterministic, collision-resistant key generation for all cache namespaces.
"""
import hashlib
import json
from typing import Any, Dict, List, Optional
class CacheKeyGenerator:
"""
Generates structured, deterministic Redis keys.
Namespaces:
req:{hash} β discovery request result
source:{platform}:{id} β individual source metadata
usage:{customer_id}:{YYYY-MM} β monthly usage counter
key:{api_key_hash} β API key β customer metadata
signup:{email_hash} β signup dedup guard
All keys are β€ 128 chars, URL-safe.
"""
@staticmethod
def request_key(
topic: str,
difficulty: int,
formats: List[str],
language: str = "en",
max_results: int = 10,
) -> str:
"""Stable key for a discovery request."""
payload = {
"topic": topic.lower().strip(),
"difficulty": difficulty,
"formats": sorted(formats),
"language": language,
"max_results": max_results,
}
digest = hashlib.sha256(
json.dumps(payload, sort_keys=True).encode()
).hexdigest()[:32]
return f"req:{digest}"
@staticmethod
def source_key(platform: str, source_id: str) -> str:
"""Key for an individual source."""
safe_id = source_id.replace(":", "_").replace("/", "_")[:64]
return f"source:{platform}:{safe_id}"
@staticmethod
def api_key_meta(api_key: str) -> str:
"""Key storing customer metadata for an API key."""
key_hash = hashlib.sha256(api_key.encode()).hexdigest()[:32]
return f"ku:key:{key_hash}"
@staticmethod
def usage_key(customer_id: str, year_month: str) -> str:
"""Monthly usage counter key. year_month format: '2026-03'."""
return f"ku:usage:{customer_id}:{year_month}"
@staticmethod
def signup_key(email: str) -> str:
"""Dedup guard for signups."""
email_hash = hashlib.sha256(email.lower().encode()).hexdigest()[:16]
return f"ku:signup:{email_hash}"
@staticmethod
def decay_key(source_id: str) -> str:
"""Cached decay score for a source."""
safe = source_id.replace(":", "_")[:48]
return f"ku:decay:{safe}" |