""" Word Cache Manager - Replaces static word file dependencies with intelligent caching. Caches vector-discovered words with quality clues for fast retrieval. """ import os import json import logging import time from datetime import datetime, timedelta from typing import List, Dict, Any, Optional from pathlib import Path import asyncio logger = logging.getLogger(__name__) class WordCacheManager: """ Manages cached word data to replace static word file dependencies. Features: - Caches vector-discovered words with quality clues - Supports cache expiration and refresh - Fallback for when vector search fails - Progressive cache building from successful searches """ def __init__(self, cache_dir: str = None): # Use appropriate default cache directory for the environment if cache_dir is None: # Check if we're in a Docker container or HuggingFace Spaces if os.path.exists("/.dockerenv") or os.getenv("SPACE_ID"): # Use /tmp for containers/spaces where write permissions are limited cache_dir = os.getenv("WORD_CACHE_DIR", "/tmp/crossword_cache") else: # Use local cache directory for development cache_dir = os.getenv("WORD_CACHE_DIR", "cache") self.cache_dir = Path(cache_dir) # Try to create cache directory with fallback try: self.cache_dir.mkdir(parents=True, exist_ok=True) logger.info(f"๐Ÿ“ Cache directory created: {self.cache_dir}") except (PermissionError, OSError) as e: # Fallback to temp directory try: import tempfile temp_cache = Path(tempfile.gettempdir()) / "crossword_cache" temp_cache.mkdir(exist_ok=True) self.cache_dir = temp_cache logger.warning(f"โš ๏ธ Permission denied for '{cache_dir}', using temp: {self.cache_dir}") except Exception as temp_error: # Last resort: use in-memory only logger.error(f"โŒ Failed to create temp cache directory: {temp_error}") logger.warning("โš ๏ธ Using in-memory cache only (no persistence)") self.cache_dir = None except Exception as e: # Last resort: use in-memory only logger.error(f"โŒ Failed to create cache directory: {e}") logger.warning("โš ๏ธ Using in-memory cache only (no persistence)") self.cache_dir = None # Cache configuration self.cache_expiry_hours = int(os.getenv("CACHE_EXPIRY_HOURS", "24")) self.max_cached_words_per_topic = int(os.getenv("MAX_CACHED_WORDS", "100")) self.cache_version = "1.0" # In-memory cache for fast access self.memory_cache: Dict[str, List[Dict[str, Any]]] = {} self.cache_metadata: Dict[str, Dict[str, Any]] = {} logger.info(f"๐Ÿ“ฆ WordCacheManager initialized with cache_dir: {self.cache_dir}") async def initialize(self): """Initialize cache manager by loading existing cache files.""" try: logger.info("๐Ÿ”ง Loading existing cache files...") # Skip file loading if no cache directory (in-memory only) if self.cache_dir is None: logger.info("๐Ÿ“ In-memory cache mode - no file loading") return # Load all cache files into memory cache_files = list(self.cache_dir.glob("*.json")) loaded_count = 0 for cache_file in cache_files: if cache_file.stem.endswith("_meta"): continue # Skip metadata files try: cache_key = cache_file.stem with open(cache_file, 'r') as f: cached_data = json.load(f) # Validate cache structure if self._validate_cache_data(cached_data): self.memory_cache[cache_key] = cached_data["words"] self.cache_metadata[cache_key] = cached_data["metadata"] loaded_count += 1 logger.info(f"๐Ÿ“ฅ Loaded cache: {cache_key} ({len(cached_data['words'])} words)") else: logger.warning(f"โš ๏ธ Invalid cache file: {cache_file}") except Exception as e: logger.error(f"โŒ Failed to load cache file {cache_file}: {e}") logger.info(f"โœ… Cache manager initialized with {loaded_count} cached topics") except Exception as e: logger.error(f"โŒ Failed to initialize cache manager: {e}") def _validate_cache_data(self, data: Dict[str, Any]) -> bool: """Validate cache data structure.""" required_keys = ["words", "metadata", "version"] if not all(key in data for key in required_keys): return False # Check metadata structure metadata = data["metadata"] required_meta_keys = ["created_at", "topic", "difficulty", "word_count"] if not all(key in metadata for key in required_meta_keys): return False # Check words structure words = data["words"] if not isinstance(words, list) or not words: return True # Empty cache is valid # Validate first word structure sample_word = words[0] required_word_keys = ["word", "clue", "similarity", "source"] return all(key in sample_word for key in required_word_keys) async def get_cached_words( self, topic: str, difficulty: str = "medium", max_words: int = 15 ) -> List[Dict[str, Any]]: """ Get cached words for a topic and difficulty. Returns cached words if available and fresh, empty list otherwise. """ cache_key = self._get_cache_key(topic, difficulty) # Check memory cache first if cache_key in self.memory_cache: # Check if cache is still fresh if self._is_cache_fresh(cache_key): cached_words = self.memory_cache[cache_key] logger.info(f"๐Ÿ“ฆ Using cached words for {cache_key}: {len(cached_words)} words") # Return requested number of words return cached_words[:max_words] else: logger.info(f"โฐ Cache expired for {cache_key}") await self._remove_expired_cache(cache_key) logger.info(f"๐Ÿ“ญ No fresh cache available for {cache_key}") return [] async def cache_words( self, topic: str, difficulty: str, words: List[Dict[str, Any]], source: str = "vector_search" ) -> bool: """ Cache words for future use. Args: topic: Topic name difficulty: Difficulty level words: List of word objects with clues source: Source of the words (e.g., "vector_search") """ try: cache_key = self._get_cache_key(topic, difficulty) # Enhance words with caching metadata enhanced_words = [] for word in words[:self.max_cached_words_per_topic]: enhanced_word = { **word, "cached_at": datetime.utcnow().isoformat(), "cache_source": source } enhanced_words.append(enhanced_word) # Create cache data structure cache_data = { "version": self.cache_version, "words": enhanced_words, "metadata": { "topic": topic, "difficulty": difficulty, "word_count": len(enhanced_words), "created_at": datetime.utcnow().isoformat(), "source": source, "expiry_hours": self.cache_expiry_hours } } # Save to file (if cache directory available) if self.cache_dir is not None: cache_file = self.cache_dir / f"{cache_key}.json" with open(cache_file, 'w') as f: json.dump(cache_data, f, indent=2) # Update memory cache self.memory_cache[cache_key] = enhanced_words self.cache_metadata[cache_key] = cache_data["metadata"] logger.info(f"๐Ÿ’พ Cached {len(enhanced_words)} words for {cache_key}") return True except Exception as e: logger.error(f"โŒ Failed to cache words for {topic}/{difficulty}: {e}") return False def _get_cache_key(self, topic: str, difficulty: str) -> str: """Generate cache key from topic and difficulty.""" return f"{topic.lower()}_{difficulty.lower()}" def _is_cache_fresh(self, cache_key: str) -> bool: """Check if cache is still fresh (not expired).""" if cache_key not in self.cache_metadata: return False metadata = self.cache_metadata[cache_key] created_at = datetime.fromisoformat(metadata["created_at"]) expiry_hours = metadata.get("expiry_hours", self.cache_expiry_hours) expiry_time = created_at + timedelta(hours=expiry_hours) return datetime.utcnow() < expiry_time async def _remove_expired_cache(self, cache_key: str): """Remove expired cache from memory and disk.""" try: # Remove from memory if cache_key in self.memory_cache: del self.memory_cache[cache_key] if cache_key in self.cache_metadata: del self.cache_metadata[cache_key] # Remove from disk (if cache directory available) if self.cache_dir is not None: cache_file = self.cache_dir / f"{cache_key}.json" if cache_file.exists(): cache_file.unlink() logger.info(f"๐Ÿ—‘๏ธ Removed expired cache: {cache_key}") except Exception as e: logger.error(f"โŒ Failed to remove expired cache {cache_key}: {e}") async def warm_cache_from_static(self, static_words: Dict[str, List[Dict[str, Any]]]): """ Warm cache with high-quality static words as bootstrap data. This converts the existing static words to cache format. """ try: logger.info("๐Ÿ”ฅ Warming cache with bootstrap data from static words...") cached_count = 0 for topic, words in static_words.items(): if not words: continue # Convert static words to cache format cache_words = [] for word_obj in words: cache_word = { "word": word_obj["word"].upper(), "clue": word_obj.get("clue", f"Related to {topic.lower()}"), "similarity": 0.9, # Mark as high quality "source": "bootstrap_static", "quality_score": 100 # High quality bootstrap data } cache_words.append(cache_word) # Cache for different difficulties for difficulty in ["easy", "medium", "hard"]: # Filter by difficulty filtered_words = self._filter_words_by_difficulty(cache_words, difficulty) if filtered_words: success = await self.cache_words(topic, difficulty, filtered_words, "bootstrap") if success: cached_count += 1 logger.info(f"๐Ÿ”ฅ Cache warming completed: {cached_count} topic/difficulty combinations cached") except Exception as e: logger.error(f"โŒ Failed to warm cache: {e}") def _filter_words_by_difficulty(self, words: List[Dict[str, Any]], difficulty: str) -> List[Dict[str, Any]]: """Filter words by difficulty level.""" difficulty_map = { "easy": {"min_len": 3, "max_len": 8}, "medium": {"min_len": 4, "max_len": 10}, "hard": {"min_len": 5, "max_len": 15} } criteria = difficulty_map.get(difficulty, difficulty_map["medium"]) filtered = [] for word_obj in words: word_len = len(word_obj["word"]) if criteria["min_len"] <= word_len <= criteria["max_len"]: filtered.append(word_obj) return filtered def get_cache_stats(self) -> Dict[str, Any]: """Get cache statistics for monitoring.""" total_words = sum(len(words) for words in self.memory_cache.values()) # Count fresh vs expired caches fresh_caches = sum(1 for key in self.memory_cache.keys() if self._is_cache_fresh(key)) total_caches = len(self.memory_cache) return { "total_cached_topics": total_caches, "fresh_caches": fresh_caches, "expired_caches": total_caches - fresh_caches, "total_cached_words": total_words, "cache_directory": str(self.cache_dir), "cache_expiry_hours": self.cache_expiry_hours } async def cleanup_expired_caches(self): """Clean up all expired caches.""" expired_keys = [ key for key in self.memory_cache.keys() if not self._is_cache_fresh(key) ] for key in expired_keys: await self._remove_expired_cache(key) logger.info(f"๐Ÿงน Cleaned up {len(expired_keys)} expired caches")