File size: 14,220 Bytes
38c016b |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 |
"""
Word Cache Manager - Replaces static word file dependencies with intelligent caching.
Caches vector-discovered words with quality clues for fast retrieval.
"""
import os
import json
import logging
import time
from datetime import datetime, timedelta
from typing import List, Dict, Any, Optional
from pathlib import Path
import asyncio
logger = logging.getLogger(__name__)
class WordCacheManager:
"""
Manages cached word data to replace static word file dependencies.
Features:
- Caches vector-discovered words with quality clues
- Supports cache expiration and refresh
- Fallback for when vector search fails
- Progressive cache building from successful searches
"""
def __init__(self, cache_dir: str = None):
# Use appropriate default cache directory for the environment
if cache_dir is None:
# Check if we're in a Docker container or HuggingFace Spaces
if os.path.exists("/.dockerenv") or os.getenv("SPACE_ID"):
# Use /tmp for containers/spaces where write permissions are limited
cache_dir = os.getenv("WORD_CACHE_DIR", "/tmp/crossword_cache")
else:
# Use local cache directory for development
cache_dir = os.getenv("WORD_CACHE_DIR", "cache")
self.cache_dir = Path(cache_dir)
# Try to create cache directory with fallback
try:
self.cache_dir.mkdir(parents=True, exist_ok=True)
logger.info(f"π Cache directory created: {self.cache_dir}")
except (PermissionError, OSError) as e:
# Fallback to temp directory
try:
import tempfile
temp_cache = Path(tempfile.gettempdir()) / "crossword_cache"
temp_cache.mkdir(exist_ok=True)
self.cache_dir = temp_cache
logger.warning(f"β οΈ Permission denied for '{cache_dir}', using temp: {self.cache_dir}")
except Exception as temp_error:
# Last resort: use in-memory only
logger.error(f"β Failed to create temp cache directory: {temp_error}")
logger.warning("β οΈ Using in-memory cache only (no persistence)")
self.cache_dir = None
except Exception as e:
# Last resort: use in-memory only
logger.error(f"β Failed to create cache directory: {e}")
logger.warning("β οΈ Using in-memory cache only (no persistence)")
self.cache_dir = None
# Cache configuration
self.cache_expiry_hours = int(os.getenv("CACHE_EXPIRY_HOURS", "24"))
self.max_cached_words_per_topic = int(os.getenv("MAX_CACHED_WORDS", "100"))
self.cache_version = "1.0"
# In-memory cache for fast access
self.memory_cache: Dict[str, List[Dict[str, Any]]] = {}
self.cache_metadata: Dict[str, Dict[str, Any]] = {}
logger.info(f"π¦ WordCacheManager initialized with cache_dir: {self.cache_dir}")
async def initialize(self):
"""Initialize cache manager by loading existing cache files."""
try:
logger.info("π§ Loading existing cache files...")
# Skip file loading if no cache directory (in-memory only)
if self.cache_dir is None:
logger.info("π In-memory cache mode - no file loading")
return
# Load all cache files into memory
cache_files = list(self.cache_dir.glob("*.json"))
loaded_count = 0
for cache_file in cache_files:
if cache_file.stem.endswith("_meta"):
continue # Skip metadata files
try:
cache_key = cache_file.stem
with open(cache_file, 'r') as f:
cached_data = json.load(f)
# Validate cache structure
if self._validate_cache_data(cached_data):
self.memory_cache[cache_key] = cached_data["words"]
self.cache_metadata[cache_key] = cached_data["metadata"]
loaded_count += 1
logger.info(f"π₯ Loaded cache: {cache_key} ({len(cached_data['words'])} words)")
else:
logger.warning(f"β οΈ Invalid cache file: {cache_file}")
except Exception as e:
logger.error(f"β Failed to load cache file {cache_file}: {e}")
logger.info(f"β
Cache manager initialized with {loaded_count} cached topics")
except Exception as e:
logger.error(f"β Failed to initialize cache manager: {e}")
def _validate_cache_data(self, data: Dict[str, Any]) -> bool:
"""Validate cache data structure."""
required_keys = ["words", "metadata", "version"]
if not all(key in data for key in required_keys):
return False
# Check metadata structure
metadata = data["metadata"]
required_meta_keys = ["created_at", "topic", "difficulty", "word_count"]
if not all(key in metadata for key in required_meta_keys):
return False
# Check words structure
words = data["words"]
if not isinstance(words, list) or not words:
return True # Empty cache is valid
# Validate first word structure
sample_word = words[0]
required_word_keys = ["word", "clue", "similarity", "source"]
return all(key in sample_word for key in required_word_keys)
async def get_cached_words(
self,
topic: str,
difficulty: str = "medium",
max_words: int = 15
) -> List[Dict[str, Any]]:
"""
Get cached words for a topic and difficulty.
Returns cached words if available and fresh, empty list otherwise.
"""
cache_key = self._get_cache_key(topic, difficulty)
# Check memory cache first
if cache_key in self.memory_cache:
# Check if cache is still fresh
if self._is_cache_fresh(cache_key):
cached_words = self.memory_cache[cache_key]
logger.info(f"π¦ Using cached words for {cache_key}: {len(cached_words)} words")
# Return requested number of words
return cached_words[:max_words]
else:
logger.info(f"β° Cache expired for {cache_key}")
await self._remove_expired_cache(cache_key)
logger.info(f"π No fresh cache available for {cache_key}")
return []
async def cache_words(
self,
topic: str,
difficulty: str,
words: List[Dict[str, Any]],
source: str = "vector_search"
) -> bool:
"""
Cache words for future use.
Args:
topic: Topic name
difficulty: Difficulty level
words: List of word objects with clues
source: Source of the words (e.g., "vector_search")
"""
try:
cache_key = self._get_cache_key(topic, difficulty)
# Enhance words with caching metadata
enhanced_words = []
for word in words[:self.max_cached_words_per_topic]:
enhanced_word = {
**word,
"cached_at": datetime.utcnow().isoformat(),
"cache_source": source
}
enhanced_words.append(enhanced_word)
# Create cache data structure
cache_data = {
"version": self.cache_version,
"words": enhanced_words,
"metadata": {
"topic": topic,
"difficulty": difficulty,
"word_count": len(enhanced_words),
"created_at": datetime.utcnow().isoformat(),
"source": source,
"expiry_hours": self.cache_expiry_hours
}
}
# Save to file (if cache directory available)
if self.cache_dir is not None:
cache_file = self.cache_dir / f"{cache_key}.json"
with open(cache_file, 'w') as f:
json.dump(cache_data, f, indent=2)
# Update memory cache
self.memory_cache[cache_key] = enhanced_words
self.cache_metadata[cache_key] = cache_data["metadata"]
logger.info(f"πΎ Cached {len(enhanced_words)} words for {cache_key}")
return True
except Exception as e:
logger.error(f"β Failed to cache words for {topic}/{difficulty}: {e}")
return False
def _get_cache_key(self, topic: str, difficulty: str) -> str:
"""Generate cache key from topic and difficulty."""
return f"{topic.lower()}_{difficulty.lower()}"
def _is_cache_fresh(self, cache_key: str) -> bool:
"""Check if cache is still fresh (not expired)."""
if cache_key not in self.cache_metadata:
return False
metadata = self.cache_metadata[cache_key]
created_at = datetime.fromisoformat(metadata["created_at"])
expiry_hours = metadata.get("expiry_hours", self.cache_expiry_hours)
expiry_time = created_at + timedelta(hours=expiry_hours)
return datetime.utcnow() < expiry_time
async def _remove_expired_cache(self, cache_key: str):
"""Remove expired cache from memory and disk."""
try:
# Remove from memory
if cache_key in self.memory_cache:
del self.memory_cache[cache_key]
if cache_key in self.cache_metadata:
del self.cache_metadata[cache_key]
# Remove from disk (if cache directory available)
if self.cache_dir is not None:
cache_file = self.cache_dir / f"{cache_key}.json"
if cache_file.exists():
cache_file.unlink()
logger.info(f"ποΈ Removed expired cache: {cache_key}")
except Exception as e:
logger.error(f"β Failed to remove expired cache {cache_key}: {e}")
async def warm_cache_from_static(self, static_words: Dict[str, List[Dict[str, Any]]]):
"""
Warm cache with high-quality static words as bootstrap data.
This converts the existing static words to cache format.
"""
try:
logger.info("π₯ Warming cache with bootstrap data from static words...")
cached_count = 0
for topic, words in static_words.items():
if not words:
continue
# Convert static words to cache format
cache_words = []
for word_obj in words:
cache_word = {
"word": word_obj["word"].upper(),
"clue": word_obj.get("clue", f"Related to {topic.lower()}"),
"similarity": 0.9, # Mark as high quality
"source": "bootstrap_static",
"quality_score": 100 # High quality bootstrap data
}
cache_words.append(cache_word)
# Cache for different difficulties
for difficulty in ["easy", "medium", "hard"]:
# Filter by difficulty
filtered_words = self._filter_words_by_difficulty(cache_words, difficulty)
if filtered_words:
success = await self.cache_words(topic, difficulty, filtered_words, "bootstrap")
if success:
cached_count += 1
logger.info(f"π₯ Cache warming completed: {cached_count} topic/difficulty combinations cached")
except Exception as e:
logger.error(f"β Failed to warm cache: {e}")
def _filter_words_by_difficulty(self, words: List[Dict[str, Any]], difficulty: str) -> List[Dict[str, Any]]:
"""Filter words by difficulty level."""
difficulty_map = {
"easy": {"min_len": 3, "max_len": 8},
"medium": {"min_len": 4, "max_len": 10},
"hard": {"min_len": 5, "max_len": 15}
}
criteria = difficulty_map.get(difficulty, difficulty_map["medium"])
filtered = []
for word_obj in words:
word_len = len(word_obj["word"])
if criteria["min_len"] <= word_len <= criteria["max_len"]:
filtered.append(word_obj)
return filtered
def get_cache_stats(self) -> Dict[str, Any]:
"""Get cache statistics for monitoring."""
total_words = sum(len(words) for words in self.memory_cache.values())
# Count fresh vs expired caches
fresh_caches = sum(1 for key in self.memory_cache.keys() if self._is_cache_fresh(key))
total_caches = len(self.memory_cache)
return {
"total_cached_topics": total_caches,
"fresh_caches": fresh_caches,
"expired_caches": total_caches - fresh_caches,
"total_cached_words": total_words,
"cache_directory": str(self.cache_dir),
"cache_expiry_hours": self.cache_expiry_hours
}
async def cleanup_expired_caches(self):
"""Clean up all expired caches."""
expired_keys = [
key for key in self.memory_cache.keys()
if not self._is_cache_fresh(key)
]
for key in expired_keys:
await self._remove_expired_cache(key)
logger.info(f"π§Ή Cleaned up {len(expired_keys)} expired caches") |