File size: 4,089 Bytes
b0b150b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123

from functools import lru_cache
from datetime import datetime, timedelta
from typing import Any, Optional, Dict
import threading

class InMemoryCache:
    """
    Simple in-memory cache with TTL support.
    Replaces Redis for development environments.
    """
    
    def __init__(self, default_ttl: int = 3600):
        self._cache: Dict[str, dict] = {}
        self._default_ttl = default_ttl
        self._lock = threading.RLock()
    
    def get(self, key: str) -> Optional[Any]:
        """Get a value from cache."""
        with self._lock:
            if key not in self._cache:
                return None
            
            entry = self._cache[key]
            
            # Check if expired
            if entry['expires_at'] and datetime.utcnow() > entry['expires_at']:
                del self._cache[key]
                return None
            
            return entry['value']
    
    def set(self, key: str, value: Any, ttl: int = None) -> None:
        """Set a value in cache with optional TTL."""
        with self._lock:
            ttl = ttl if ttl is not None else self._default_ttl
            expires_at = datetime.utcnow() + timedelta(seconds=ttl) if ttl > 0 else None
            
            self._cache[key] = {
                'value': value,
                'expires_at': expires_at,
                'created_at': datetime.utcnow()
            }
    
    def delete(self, key: str) -> bool:
        """Delete a key from cache."""
        with self._lock:
            if key in self._cache:
                del self._cache[key]
                return True
            return False
    
    def clear(self) -> None:
        """Clear all cache entries."""
        with self._lock:
            self._cache.clear()
    
    def exists(self, key: str) -> bool:
        """Check if key exists and is not expired."""
        return self.get(key) is not None
    
    def get_stats(self) -> dict:
        """Get cache statistics."""
        with self._lock:
            now = datetime.utcnow()
            active = sum(1 for e in self._cache.values() 
                        if not e['expires_at'] or e['expires_at'] > now)
            return {
                'total_keys': len(self._cache),
                'active_keys': active,
                'expired_keys': len(self._cache) - active
            }
    
    def cleanup(self) -> int:
        """Remove expired entries and return count removed."""
        with self._lock:
            now = datetime.utcnow()
            expired_keys = [
                k for k, v in self._cache.items() 
                if v['expires_at'] and v['expires_at'] < now
            ]
            for key in expired_keys:
                del self._cache[key]
            return len(expired_keys)


# Singleton instance
cache = InMemoryCache(default_ttl=3600)  # 1 hour default


# Helper functions for common caching patterns
def cache_agent_artifacts(agent_id: int, artifacts: dict, ttl: int = 3600):
    """Cache agent artifacts (knowledge graph, etc.)"""
    cache.set(f"agent:{agent_id}:artifacts", artifacts, ttl)

def get_cached_agent_artifacts(agent_id: int) -> Optional[dict]:
    """Get cached agent artifacts."""
    return cache.get(f"agent:{agent_id}:artifacts")

def invalidate_agent_cache(agent_id: int):
    """Invalidate all cache entries for an agent."""
    cache.delete(f"agent:{agent_id}:artifacts")
    cache.delete(f"agent:{agent_id}:engine")

def cache_user_agents(user_id: int, agents: list, ttl: int = 60):
    """Cache user's agent list for quick dashboard loading."""
    cache.set(f"user:{user_id}:agents", agents, ttl)

def get_cached_user_agents(user_id: int) -> Optional[list]:
    """Get cached user agents list."""
    return cache.get(f"user:{user_id}:agents")


# LRU Cache for expensive computations
@lru_cache(maxsize=100)
def cached_domain_analysis(prompt_hash: str) -> dict:
    """
    LRU cache for domain analysis results.
    Use hash of prompt as key to avoid storing full prompts.
    """
    # This is a placeholder - actual analysis happens in prompt_analyzer
    return {}