vimalk78's picture
Add complete Python backend with AI-powered crossword generation
38c016b
"""
Word Cache Manager - Replaces static word file dependencies with intelligent caching.
Caches vector-discovered words with quality clues for fast retrieval.
"""
import os
import json
import logging
import time
from datetime import datetime, timedelta
from typing import List, Dict, Any, Optional
from pathlib import Path
import asyncio
logger = logging.getLogger(__name__)
class WordCacheManager:
"""
Manages cached word data to replace static word file dependencies.
Features:
- Caches vector-discovered words with quality clues
- Supports cache expiration and refresh
- Fallback for when vector search fails
- Progressive cache building from successful searches
"""
def __init__(self, cache_dir: str = None):
# Use appropriate default cache directory for the environment
if cache_dir is None:
# Check if we're in a Docker container or HuggingFace Spaces
if os.path.exists("/.dockerenv") or os.getenv("SPACE_ID"):
# Use /tmp for containers/spaces where write permissions are limited
cache_dir = os.getenv("WORD_CACHE_DIR", "/tmp/crossword_cache")
else:
# Use local cache directory for development
cache_dir = os.getenv("WORD_CACHE_DIR", "cache")
self.cache_dir = Path(cache_dir)
# Try to create cache directory with fallback
try:
self.cache_dir.mkdir(parents=True, exist_ok=True)
logger.info(f"📁 Cache directory created: {self.cache_dir}")
except (PermissionError, OSError) as e:
# Fallback to temp directory
try:
import tempfile
temp_cache = Path(tempfile.gettempdir()) / "crossword_cache"
temp_cache.mkdir(exist_ok=True)
self.cache_dir = temp_cache
logger.warning(f"⚠️ Permission denied for '{cache_dir}', using temp: {self.cache_dir}")
except Exception as temp_error:
# Last resort: use in-memory only
logger.error(f"❌ Failed to create temp cache directory: {temp_error}")
logger.warning("⚠️ Using in-memory cache only (no persistence)")
self.cache_dir = None
except Exception as e:
# Last resort: use in-memory only
logger.error(f"❌ Failed to create cache directory: {e}")
logger.warning("⚠️ Using in-memory cache only (no persistence)")
self.cache_dir = None
# Cache configuration
self.cache_expiry_hours = int(os.getenv("CACHE_EXPIRY_HOURS", "24"))
self.max_cached_words_per_topic = int(os.getenv("MAX_CACHED_WORDS", "100"))
self.cache_version = "1.0"
# In-memory cache for fast access
self.memory_cache: Dict[str, List[Dict[str, Any]]] = {}
self.cache_metadata: Dict[str, Dict[str, Any]] = {}
logger.info(f"📦 WordCacheManager initialized with cache_dir: {self.cache_dir}")
async def initialize(self):
"""Initialize cache manager by loading existing cache files."""
try:
logger.info("🔧 Loading existing cache files...")
# Skip file loading if no cache directory (in-memory only)
if self.cache_dir is None:
logger.info("📝 In-memory cache mode - no file loading")
return
# Load all cache files into memory
cache_files = list(self.cache_dir.glob("*.json"))
loaded_count = 0
for cache_file in cache_files:
if cache_file.stem.endswith("_meta"):
continue # Skip metadata files
try:
cache_key = cache_file.stem
with open(cache_file, 'r') as f:
cached_data = json.load(f)
# Validate cache structure
if self._validate_cache_data(cached_data):
self.memory_cache[cache_key] = cached_data["words"]
self.cache_metadata[cache_key] = cached_data["metadata"]
loaded_count += 1
logger.info(f"📥 Loaded cache: {cache_key} ({len(cached_data['words'])} words)")
else:
logger.warning(f"⚠️ Invalid cache file: {cache_file}")
except Exception as e:
logger.error(f"❌ Failed to load cache file {cache_file}: {e}")
logger.info(f"✅ Cache manager initialized with {loaded_count} cached topics")
except Exception as e:
logger.error(f"❌ Failed to initialize cache manager: {e}")
def _validate_cache_data(self, data: Dict[str, Any]) -> bool:
"""Validate cache data structure."""
required_keys = ["words", "metadata", "version"]
if not all(key in data for key in required_keys):
return False
# Check metadata structure
metadata = data["metadata"]
required_meta_keys = ["created_at", "topic", "difficulty", "word_count"]
if not all(key in metadata for key in required_meta_keys):
return False
# Check words structure
words = data["words"]
if not isinstance(words, list) or not words:
return True # Empty cache is valid
# Validate first word structure
sample_word = words[0]
required_word_keys = ["word", "clue", "similarity", "source"]
return all(key in sample_word for key in required_word_keys)
async def get_cached_words(
self,
topic: str,
difficulty: str = "medium",
max_words: int = 15
) -> List[Dict[str, Any]]:
"""
Get cached words for a topic and difficulty.
Returns cached words if available and fresh, empty list otherwise.
"""
cache_key = self._get_cache_key(topic, difficulty)
# Check memory cache first
if cache_key in self.memory_cache:
# Check if cache is still fresh
if self._is_cache_fresh(cache_key):
cached_words = self.memory_cache[cache_key]
logger.info(f"📦 Using cached words for {cache_key}: {len(cached_words)} words")
# Return requested number of words
return cached_words[:max_words]
else:
logger.info(f"⏰ Cache expired for {cache_key}")
await self._remove_expired_cache(cache_key)
logger.info(f"📭 No fresh cache available for {cache_key}")
return []
async def cache_words(
self,
topic: str,
difficulty: str,
words: List[Dict[str, Any]],
source: str = "vector_search"
) -> bool:
"""
Cache words for future use.
Args:
topic: Topic name
difficulty: Difficulty level
words: List of word objects with clues
source: Source of the words (e.g., "vector_search")
"""
try:
cache_key = self._get_cache_key(topic, difficulty)
# Enhance words with caching metadata
enhanced_words = []
for word in words[:self.max_cached_words_per_topic]:
enhanced_word = {
**word,
"cached_at": datetime.utcnow().isoformat(),
"cache_source": source
}
enhanced_words.append(enhanced_word)
# Create cache data structure
cache_data = {
"version": self.cache_version,
"words": enhanced_words,
"metadata": {
"topic": topic,
"difficulty": difficulty,
"word_count": len(enhanced_words),
"created_at": datetime.utcnow().isoformat(),
"source": source,
"expiry_hours": self.cache_expiry_hours
}
}
# Save to file (if cache directory available)
if self.cache_dir is not None:
cache_file = self.cache_dir / f"{cache_key}.json"
with open(cache_file, 'w') as f:
json.dump(cache_data, f, indent=2)
# Update memory cache
self.memory_cache[cache_key] = enhanced_words
self.cache_metadata[cache_key] = cache_data["metadata"]
logger.info(f"💾 Cached {len(enhanced_words)} words for {cache_key}")
return True
except Exception as e:
logger.error(f"❌ Failed to cache words for {topic}/{difficulty}: {e}")
return False
def _get_cache_key(self, topic: str, difficulty: str) -> str:
"""Generate cache key from topic and difficulty."""
return f"{topic.lower()}_{difficulty.lower()}"
def _is_cache_fresh(self, cache_key: str) -> bool:
"""Check if cache is still fresh (not expired)."""
if cache_key not in self.cache_metadata:
return False
metadata = self.cache_metadata[cache_key]
created_at = datetime.fromisoformat(metadata["created_at"])
expiry_hours = metadata.get("expiry_hours", self.cache_expiry_hours)
expiry_time = created_at + timedelta(hours=expiry_hours)
return datetime.utcnow() < expiry_time
async def _remove_expired_cache(self, cache_key: str):
"""Remove expired cache from memory and disk."""
try:
# Remove from memory
if cache_key in self.memory_cache:
del self.memory_cache[cache_key]
if cache_key in self.cache_metadata:
del self.cache_metadata[cache_key]
# Remove from disk (if cache directory available)
if self.cache_dir is not None:
cache_file = self.cache_dir / f"{cache_key}.json"
if cache_file.exists():
cache_file.unlink()
logger.info(f"🗑️ Removed expired cache: {cache_key}")
except Exception as e:
logger.error(f"❌ Failed to remove expired cache {cache_key}: {e}")
async def warm_cache_from_static(self, static_words: Dict[str, List[Dict[str, Any]]]):
"""
Warm cache with high-quality static words as bootstrap data.
This converts the existing static words to cache format.
"""
try:
logger.info("🔥 Warming cache with bootstrap data from static words...")
cached_count = 0
for topic, words in static_words.items():
if not words:
continue
# Convert static words to cache format
cache_words = []
for word_obj in words:
cache_word = {
"word": word_obj["word"].upper(),
"clue": word_obj.get("clue", f"Related to {topic.lower()}"),
"similarity": 0.9, # Mark as high quality
"source": "bootstrap_static",
"quality_score": 100 # High quality bootstrap data
}
cache_words.append(cache_word)
# Cache for different difficulties
for difficulty in ["easy", "medium", "hard"]:
# Filter by difficulty
filtered_words = self._filter_words_by_difficulty(cache_words, difficulty)
if filtered_words:
success = await self.cache_words(topic, difficulty, filtered_words, "bootstrap")
if success:
cached_count += 1
logger.info(f"🔥 Cache warming completed: {cached_count} topic/difficulty combinations cached")
except Exception as e:
logger.error(f"❌ Failed to warm cache: {e}")
def _filter_words_by_difficulty(self, words: List[Dict[str, Any]], difficulty: str) -> List[Dict[str, Any]]:
"""Filter words by difficulty level."""
difficulty_map = {
"easy": {"min_len": 3, "max_len": 8},
"medium": {"min_len": 4, "max_len": 10},
"hard": {"min_len": 5, "max_len": 15}
}
criteria = difficulty_map.get(difficulty, difficulty_map["medium"])
filtered = []
for word_obj in words:
word_len = len(word_obj["word"])
if criteria["min_len"] <= word_len <= criteria["max_len"]:
filtered.append(word_obj)
return filtered
def get_cache_stats(self) -> Dict[str, Any]:
"""Get cache statistics for monitoring."""
total_words = sum(len(words) for words in self.memory_cache.values())
# Count fresh vs expired caches
fresh_caches = sum(1 for key in self.memory_cache.keys() if self._is_cache_fresh(key))
total_caches = len(self.memory_cache)
return {
"total_cached_topics": total_caches,
"fresh_caches": fresh_caches,
"expired_caches": total_caches - fresh_caches,
"total_cached_words": total_words,
"cache_directory": str(self.cache_dir),
"cache_expiry_hours": self.cache_expiry_hours
}
async def cleanup_expired_caches(self):
"""Clean up all expired caches."""
expired_keys = [
key for key in self.memory_cache.keys()
if not self._is_cache_fresh(key)
]
for key in expired_keys:
await self._remove_expired_cache(key)
logger.info(f"🧹 Cleaned up {len(expired_keys)} expired caches")