|
|
""" |
|
|
Word Cache Manager - Replaces static word file dependencies with intelligent caching. |
|
|
Caches vector-discovered words with quality clues for fast retrieval. |
|
|
""" |
|
|
|
|
|
import os |
|
|
import json |
|
|
import logging |
|
|
import time |
|
|
from datetime import datetime, timedelta |
|
|
from typing import List, Dict, Any, Optional |
|
|
from pathlib import Path |
|
|
import asyncio |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
class WordCacheManager: |
|
|
""" |
|
|
Manages cached word data to replace static word file dependencies. |
|
|
|
|
|
Features: |
|
|
- Caches vector-discovered words with quality clues |
|
|
- Supports cache expiration and refresh |
|
|
- Fallback for when vector search fails |
|
|
- Progressive cache building from successful searches |
|
|
""" |
|
|
|
|
|
def __init__(self, cache_dir: str = None): |
|
|
|
|
|
if cache_dir is None: |
|
|
|
|
|
if os.path.exists("/.dockerenv") or os.getenv("SPACE_ID"): |
|
|
|
|
|
cache_dir = os.getenv("WORD_CACHE_DIR", "/tmp/crossword_cache") |
|
|
else: |
|
|
|
|
|
cache_dir = os.getenv("WORD_CACHE_DIR", "cache") |
|
|
|
|
|
self.cache_dir = Path(cache_dir) |
|
|
|
|
|
|
|
|
try: |
|
|
self.cache_dir.mkdir(parents=True, exist_ok=True) |
|
|
logger.info(f"📁 Cache directory created: {self.cache_dir}") |
|
|
except (PermissionError, OSError) as e: |
|
|
|
|
|
try: |
|
|
import tempfile |
|
|
temp_cache = Path(tempfile.gettempdir()) / "crossword_cache" |
|
|
temp_cache.mkdir(exist_ok=True) |
|
|
self.cache_dir = temp_cache |
|
|
logger.warning(f"⚠️ Permission denied for '{cache_dir}', using temp: {self.cache_dir}") |
|
|
except Exception as temp_error: |
|
|
|
|
|
logger.error(f"❌ Failed to create temp cache directory: {temp_error}") |
|
|
logger.warning("⚠️ Using in-memory cache only (no persistence)") |
|
|
self.cache_dir = None |
|
|
except Exception as e: |
|
|
|
|
|
logger.error(f"❌ Failed to create cache directory: {e}") |
|
|
logger.warning("⚠️ Using in-memory cache only (no persistence)") |
|
|
self.cache_dir = None |
|
|
|
|
|
|
|
|
self.cache_expiry_hours = int(os.getenv("CACHE_EXPIRY_HOURS", "24")) |
|
|
self.max_cached_words_per_topic = int(os.getenv("MAX_CACHED_WORDS", "100")) |
|
|
self.cache_version = "1.0" |
|
|
|
|
|
|
|
|
self.memory_cache: Dict[str, List[Dict[str, Any]]] = {} |
|
|
self.cache_metadata: Dict[str, Dict[str, Any]] = {} |
|
|
|
|
|
logger.info(f"📦 WordCacheManager initialized with cache_dir: {self.cache_dir}") |
|
|
|
|
|
async def initialize(self): |
|
|
"""Initialize cache manager by loading existing cache files.""" |
|
|
try: |
|
|
logger.info("🔧 Loading existing cache files...") |
|
|
|
|
|
|
|
|
if self.cache_dir is None: |
|
|
logger.info("📝 In-memory cache mode - no file loading") |
|
|
return |
|
|
|
|
|
|
|
|
cache_files = list(self.cache_dir.glob("*.json")) |
|
|
loaded_count = 0 |
|
|
|
|
|
for cache_file in cache_files: |
|
|
if cache_file.stem.endswith("_meta"): |
|
|
continue |
|
|
|
|
|
try: |
|
|
cache_key = cache_file.stem |
|
|
with open(cache_file, 'r') as f: |
|
|
cached_data = json.load(f) |
|
|
|
|
|
|
|
|
if self._validate_cache_data(cached_data): |
|
|
self.memory_cache[cache_key] = cached_data["words"] |
|
|
self.cache_metadata[cache_key] = cached_data["metadata"] |
|
|
loaded_count += 1 |
|
|
logger.info(f"📥 Loaded cache: {cache_key} ({len(cached_data['words'])} words)") |
|
|
else: |
|
|
logger.warning(f"⚠️ Invalid cache file: {cache_file}") |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"❌ Failed to load cache file {cache_file}: {e}") |
|
|
|
|
|
logger.info(f"✅ Cache manager initialized with {loaded_count} cached topics") |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"❌ Failed to initialize cache manager: {e}") |
|
|
|
|
|
def _validate_cache_data(self, data: Dict[str, Any]) -> bool: |
|
|
"""Validate cache data structure.""" |
|
|
required_keys = ["words", "metadata", "version"] |
|
|
if not all(key in data for key in required_keys): |
|
|
return False |
|
|
|
|
|
|
|
|
metadata = data["metadata"] |
|
|
required_meta_keys = ["created_at", "topic", "difficulty", "word_count"] |
|
|
if not all(key in metadata for key in required_meta_keys): |
|
|
return False |
|
|
|
|
|
|
|
|
words = data["words"] |
|
|
if not isinstance(words, list) or not words: |
|
|
return True |
|
|
|
|
|
|
|
|
sample_word = words[0] |
|
|
required_word_keys = ["word", "clue", "similarity", "source"] |
|
|
return all(key in sample_word for key in required_word_keys) |
|
|
|
|
|
async def get_cached_words( |
|
|
self, |
|
|
topic: str, |
|
|
difficulty: str = "medium", |
|
|
max_words: int = 15 |
|
|
) -> List[Dict[str, Any]]: |
|
|
""" |
|
|
Get cached words for a topic and difficulty. |
|
|
|
|
|
Returns cached words if available and fresh, empty list otherwise. |
|
|
""" |
|
|
cache_key = self._get_cache_key(topic, difficulty) |
|
|
|
|
|
|
|
|
if cache_key in self.memory_cache: |
|
|
|
|
|
if self._is_cache_fresh(cache_key): |
|
|
cached_words = self.memory_cache[cache_key] |
|
|
logger.info(f"📦 Using cached words for {cache_key}: {len(cached_words)} words") |
|
|
|
|
|
|
|
|
return cached_words[:max_words] |
|
|
else: |
|
|
logger.info(f"⏰ Cache expired for {cache_key}") |
|
|
await self._remove_expired_cache(cache_key) |
|
|
|
|
|
logger.info(f"📭 No fresh cache available for {cache_key}") |
|
|
return [] |
|
|
|
|
|
async def cache_words( |
|
|
self, |
|
|
topic: str, |
|
|
difficulty: str, |
|
|
words: List[Dict[str, Any]], |
|
|
source: str = "vector_search" |
|
|
) -> bool: |
|
|
""" |
|
|
Cache words for future use. |
|
|
|
|
|
Args: |
|
|
topic: Topic name |
|
|
difficulty: Difficulty level |
|
|
words: List of word objects with clues |
|
|
source: Source of the words (e.g., "vector_search") |
|
|
""" |
|
|
try: |
|
|
cache_key = self._get_cache_key(topic, difficulty) |
|
|
|
|
|
|
|
|
enhanced_words = [] |
|
|
for word in words[:self.max_cached_words_per_topic]: |
|
|
enhanced_word = { |
|
|
**word, |
|
|
"cached_at": datetime.utcnow().isoformat(), |
|
|
"cache_source": source |
|
|
} |
|
|
enhanced_words.append(enhanced_word) |
|
|
|
|
|
|
|
|
cache_data = { |
|
|
"version": self.cache_version, |
|
|
"words": enhanced_words, |
|
|
"metadata": { |
|
|
"topic": topic, |
|
|
"difficulty": difficulty, |
|
|
"word_count": len(enhanced_words), |
|
|
"created_at": datetime.utcnow().isoformat(), |
|
|
"source": source, |
|
|
"expiry_hours": self.cache_expiry_hours |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
if self.cache_dir is not None: |
|
|
cache_file = self.cache_dir / f"{cache_key}.json" |
|
|
with open(cache_file, 'w') as f: |
|
|
json.dump(cache_data, f, indent=2) |
|
|
|
|
|
|
|
|
self.memory_cache[cache_key] = enhanced_words |
|
|
self.cache_metadata[cache_key] = cache_data["metadata"] |
|
|
|
|
|
logger.info(f"💾 Cached {len(enhanced_words)} words for {cache_key}") |
|
|
return True |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"❌ Failed to cache words for {topic}/{difficulty}: {e}") |
|
|
return False |
|
|
|
|
|
def _get_cache_key(self, topic: str, difficulty: str) -> str: |
|
|
"""Generate cache key from topic and difficulty.""" |
|
|
return f"{topic.lower()}_{difficulty.lower()}" |
|
|
|
|
|
def _is_cache_fresh(self, cache_key: str) -> bool: |
|
|
"""Check if cache is still fresh (not expired).""" |
|
|
if cache_key not in self.cache_metadata: |
|
|
return False |
|
|
|
|
|
metadata = self.cache_metadata[cache_key] |
|
|
created_at = datetime.fromisoformat(metadata["created_at"]) |
|
|
expiry_hours = metadata.get("expiry_hours", self.cache_expiry_hours) |
|
|
|
|
|
expiry_time = created_at + timedelta(hours=expiry_hours) |
|
|
return datetime.utcnow() < expiry_time |
|
|
|
|
|
async def _remove_expired_cache(self, cache_key: str): |
|
|
"""Remove expired cache from memory and disk.""" |
|
|
try: |
|
|
|
|
|
if cache_key in self.memory_cache: |
|
|
del self.memory_cache[cache_key] |
|
|
if cache_key in self.cache_metadata: |
|
|
del self.cache_metadata[cache_key] |
|
|
|
|
|
|
|
|
if self.cache_dir is not None: |
|
|
cache_file = self.cache_dir / f"{cache_key}.json" |
|
|
if cache_file.exists(): |
|
|
cache_file.unlink() |
|
|
|
|
|
logger.info(f"🗑️ Removed expired cache: {cache_key}") |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"❌ Failed to remove expired cache {cache_key}: {e}") |
|
|
|
|
|
async def warm_cache_from_static(self, static_words: Dict[str, List[Dict[str, Any]]]): |
|
|
""" |
|
|
Warm cache with high-quality static words as bootstrap data. |
|
|
This converts the existing static words to cache format. |
|
|
""" |
|
|
try: |
|
|
logger.info("🔥 Warming cache with bootstrap data from static words...") |
|
|
|
|
|
cached_count = 0 |
|
|
for topic, words in static_words.items(): |
|
|
if not words: |
|
|
continue |
|
|
|
|
|
|
|
|
cache_words = [] |
|
|
for word_obj in words: |
|
|
cache_word = { |
|
|
"word": word_obj["word"].upper(), |
|
|
"clue": word_obj.get("clue", f"Related to {topic.lower()}"), |
|
|
"similarity": 0.9, |
|
|
"source": "bootstrap_static", |
|
|
"quality_score": 100 |
|
|
} |
|
|
cache_words.append(cache_word) |
|
|
|
|
|
|
|
|
for difficulty in ["easy", "medium", "hard"]: |
|
|
|
|
|
filtered_words = self._filter_words_by_difficulty(cache_words, difficulty) |
|
|
|
|
|
if filtered_words: |
|
|
success = await self.cache_words(topic, difficulty, filtered_words, "bootstrap") |
|
|
if success: |
|
|
cached_count += 1 |
|
|
|
|
|
logger.info(f"🔥 Cache warming completed: {cached_count} topic/difficulty combinations cached") |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"❌ Failed to warm cache: {e}") |
|
|
|
|
|
def _filter_words_by_difficulty(self, words: List[Dict[str, Any]], difficulty: str) -> List[Dict[str, Any]]: |
|
|
"""Filter words by difficulty level.""" |
|
|
difficulty_map = { |
|
|
"easy": {"min_len": 3, "max_len": 8}, |
|
|
"medium": {"min_len": 4, "max_len": 10}, |
|
|
"hard": {"min_len": 5, "max_len": 15} |
|
|
} |
|
|
|
|
|
criteria = difficulty_map.get(difficulty, difficulty_map["medium"]) |
|
|
|
|
|
filtered = [] |
|
|
for word_obj in words: |
|
|
word_len = len(word_obj["word"]) |
|
|
if criteria["min_len"] <= word_len <= criteria["max_len"]: |
|
|
filtered.append(word_obj) |
|
|
|
|
|
return filtered |
|
|
|
|
|
def get_cache_stats(self) -> Dict[str, Any]: |
|
|
"""Get cache statistics for monitoring.""" |
|
|
total_words = sum(len(words) for words in self.memory_cache.values()) |
|
|
|
|
|
|
|
|
fresh_caches = sum(1 for key in self.memory_cache.keys() if self._is_cache_fresh(key)) |
|
|
total_caches = len(self.memory_cache) |
|
|
|
|
|
return { |
|
|
"total_cached_topics": total_caches, |
|
|
"fresh_caches": fresh_caches, |
|
|
"expired_caches": total_caches - fresh_caches, |
|
|
"total_cached_words": total_words, |
|
|
"cache_directory": str(self.cache_dir), |
|
|
"cache_expiry_hours": self.cache_expiry_hours |
|
|
} |
|
|
|
|
|
async def cleanup_expired_caches(self): |
|
|
"""Clean up all expired caches.""" |
|
|
expired_keys = [ |
|
|
key for key in self.memory_cache.keys() |
|
|
if not self._is_cache_fresh(key) |
|
|
] |
|
|
|
|
|
for key in expired_keys: |
|
|
await self._remove_expired_cache(key) |
|
|
|
|
|
logger.info(f"🧹 Cleaned up {len(expired_keys)} expired caches") |