Spaces:
Sleeping
Sleeping
File size: 2,896 Bytes
867ae6b 807713e 867ae6b 807713e 867ae6b 807713e 867ae6b 807713e 867ae6b | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 | """
Project Requirements Cache Service
Caches project completion requirements (photo_requirements, activation_requirements)
to avoid repeated database queries. Uses cachetools for in-memory caching.
"""
from cachetools import TTLCache
from threading import RLock
from typing import Dict, Any, Optional
from uuid import UUID
from datetime import datetime
import logging
logger = logging.getLogger(__name__)
# Thread-safe requirements cache with 24-hour TTL
# Stores up to 500 project requirements in memory
requirements_cache = TTLCache(maxsize=500, ttl=86400) # 24 hours
requirements_cache_lock = RLock()
class ProjectRequirementsCache:
"""Cache for project completion requirements"""
@staticmethod
def get(project_id: UUID) -> Optional[Dict[str, Any]]:
"""
Get cached project requirements
Returns:
Dict with photo_requirements and field_requirements, or None if not cached
"""
try:
with requirements_cache_lock:
key = f"requirements:{str(project_id)}"
cached_data = requirements_cache.get(key)
if cached_data:
logger.debug(f"Requirements cache HIT: {project_id}")
return cached_data
except Exception as e:
logger.error(f"Error retrieving from requirements cache: {e}")
return None
@staticmethod
def set(project_id: UUID, photo_requirements: list, field_requirements: list):
"""
Cache project requirements
Args:
project_id: Project ID
photo_requirements: List of photo requirement dicts
field_requirements: Combined list of all field requirements (activation + inventory)
"""
try:
with requirements_cache_lock:
key = f"requirements:{str(project_id)}"
cache_data = {
"photo_requirements": photo_requirements,
"field_requirements": field_requirements,
"cached_at": datetime.utcnow().isoformat()
}
requirements_cache[key] = cache_data
logger.debug(f"Requirements cache SET: {project_id}")
except Exception as e:
logger.error(f"Error setting requirements cache: {e}")
@staticmethod
def invalidate(project_id: UUID):
"""
Invalidate cached project requirements
Call this when project requirements are updated
"""
try:
with requirements_cache_lock:
key = f"requirements:{str(project_id)}"
requirements_cache.pop(key, None)
logger.info(f"Invalidated requirements cache for project {project_id}")
except Exception as e:
logger.error(f"Error invalidating requirements cache: {e}")
|