Spaces:
Running
Running
File size: 4,089 Bytes
b0b150b |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 |
from functools import lru_cache
from datetime import datetime, timedelta
from typing import Any, Optional, Dict
import threading
class InMemoryCache:
"""
Simple in-memory cache with TTL support.
Replaces Redis for development environments.
"""
def __init__(self, default_ttl: int = 3600):
self._cache: Dict[str, dict] = {}
self._default_ttl = default_ttl
self._lock = threading.RLock()
def get(self, key: str) -> Optional[Any]:
"""Get a value from cache."""
with self._lock:
if key not in self._cache:
return None
entry = self._cache[key]
# Check if expired
if entry['expires_at'] and datetime.utcnow() > entry['expires_at']:
del self._cache[key]
return None
return entry['value']
def set(self, key: str, value: Any, ttl: int = None) -> None:
"""Set a value in cache with optional TTL."""
with self._lock:
ttl = ttl if ttl is not None else self._default_ttl
expires_at = datetime.utcnow() + timedelta(seconds=ttl) if ttl > 0 else None
self._cache[key] = {
'value': value,
'expires_at': expires_at,
'created_at': datetime.utcnow()
}
def delete(self, key: str) -> bool:
"""Delete a key from cache."""
with self._lock:
if key in self._cache:
del self._cache[key]
return True
return False
def clear(self) -> None:
"""Clear all cache entries."""
with self._lock:
self._cache.clear()
def exists(self, key: str) -> bool:
"""Check if key exists and is not expired."""
return self.get(key) is not None
def get_stats(self) -> dict:
"""Get cache statistics."""
with self._lock:
now = datetime.utcnow()
active = sum(1 for e in self._cache.values()
if not e['expires_at'] or e['expires_at'] > now)
return {
'total_keys': len(self._cache),
'active_keys': active,
'expired_keys': len(self._cache) - active
}
def cleanup(self) -> int:
"""Remove expired entries and return count removed."""
with self._lock:
now = datetime.utcnow()
expired_keys = [
k for k, v in self._cache.items()
if v['expires_at'] and v['expires_at'] < now
]
for key in expired_keys:
del self._cache[key]
return len(expired_keys)
# Singleton instance
cache = InMemoryCache(default_ttl=3600) # 1 hour default
# Helper functions for common caching patterns
def cache_agent_artifacts(agent_id: int, artifacts: dict, ttl: int = 3600):
"""Cache agent artifacts (knowledge graph, etc.)"""
cache.set(f"agent:{agent_id}:artifacts", artifacts, ttl)
def get_cached_agent_artifacts(agent_id: int) -> Optional[dict]:
"""Get cached agent artifacts."""
return cache.get(f"agent:{agent_id}:artifacts")
def invalidate_agent_cache(agent_id: int):
"""Invalidate all cache entries for an agent."""
cache.delete(f"agent:{agent_id}:artifacts")
cache.delete(f"agent:{agent_id}:engine")
def cache_user_agents(user_id: int, agents: list, ttl: int = 60):
"""Cache user's agent list for quick dashboard loading."""
cache.set(f"user:{user_id}:agents", agents, ttl)
def get_cached_user_agents(user_id: int) -> Optional[list]:
"""Get cached user agents list."""
return cache.get(f"user:{user_id}:agents")
# LRU Cache for expensive computations
@lru_cache(maxsize=100)
def cached_domain_analysis(prompt_hash: str) -> dict:
"""
LRU cache for domain analysis results.
Use hash of prompt as key to avoid storing full prompts.
"""
# This is a placeholder - actual analysis happens in prompt_analyzer
return {}
|