| | """ |
| | Document and Page Caching |
| | |
| | File-based caching for rendered pages and processing results. |
| | Supports LRU eviction and configurable storage backends. |
| | """ |
| |
|
| | import hashlib |
| | import json |
| | import logging |
| | import os |
| | import pickle |
| | import shutil |
| | import time |
| | from dataclasses import dataclass, field |
| | from pathlib import Path |
| | from typing import Any, Callable, Dict, List, Optional, Tuple, TypeVar, Union |
| |
|
| | import numpy as np |
| | from PIL import Image |
| |
|
| | logger = logging.getLogger(__name__) |
| |
|
| | T = TypeVar("T") |
| |
|
| |
|
| | @dataclass |
| | class CacheConfig: |
| | """Configuration for document cache.""" |
| |
|
| | cache_dir: Path = field(default_factory=lambda: Path.home() / ".cache" / "sparknet" / "documents") |
| | max_size_gb: float = 10.0 |
| | ttl_hours: float = 168.0 |
| | enabled: bool = True |
| | compression: bool = True |
| |
|
| | def __post_init__(self): |
| | self.cache_dir = Path(self.cache_dir) |
| |
|
| |
|
| | @dataclass |
| | class CacheEntry: |
| | """Metadata for a cached item.""" |
| |
|
| | key: str |
| | path: Path |
| | size_bytes: int |
| | created_at: float |
| | last_accessed: float |
| | ttl_hours: float |
| | metadata: Dict[str, Any] = field(default_factory=dict) |
| |
|
| | @property |
| | def is_expired(self) -> bool: |
| | """Check if entry has expired.""" |
| | age_hours = (time.time() - self.created_at) / 3600 |
| | return age_hours > self.ttl_hours |
| |
|
| |
|
| | class DocumentCache: |
| | """ |
| | File-based cache for document processing results. |
| | |
| | Features: |
| | - LRU eviction when cache exceeds max size |
| | - TTL-based expiration |
| | - Separate namespaces for different data types |
| | - Compressed storage option |
| | """ |
| |
|
| | NAMESPACES = ["pages", "ocr", "layout", "chunks", "metadata"] |
| |
|
| | def __init__(self, config: Optional[CacheConfig] = None): |
| | self.config = config or CacheConfig() |
| | self._index: Dict[str, CacheEntry] = {} |
| | self._index_path: Optional[Path] = None |
| |
|
| | if self.config.enabled: |
| | self._init_cache_dir() |
| | self._load_index() |
| |
|
| | def _init_cache_dir(self) -> None: |
| | """Initialize cache directory structure.""" |
| | self.config.cache_dir.mkdir(parents=True, exist_ok=True) |
| |
|
| | for namespace in self.NAMESPACES: |
| | (self.config.cache_dir / namespace).mkdir(exist_ok=True) |
| |
|
| | self._index_path = self.config.cache_dir / "index.json" |
| |
|
| | def _load_index(self) -> None: |
| | """Load cache index from disk.""" |
| | if self._index_path and self._index_path.exists(): |
| | try: |
| | with open(self._index_path, "r") as f: |
| | data = json.load(f) |
| |
|
| | for key, entry_data in data.items(): |
| | entry = CacheEntry( |
| | key=entry_data["key"], |
| | path=Path(entry_data["path"]), |
| | size_bytes=entry_data["size_bytes"], |
| | created_at=entry_data["created_at"], |
| | last_accessed=entry_data["last_accessed"], |
| | ttl_hours=entry_data.get("ttl_hours", self.config.ttl_hours), |
| | metadata=entry_data.get("metadata", {}) |
| | ) |
| | self._index[key] = entry |
| | except Exception as e: |
| | logger.warning(f"Failed to load cache index: {e}") |
| | self._index = {} |
| |
|
| | def _save_index(self) -> None: |
| | """Save cache index to disk.""" |
| | if not self._index_path: |
| | return |
| |
|
| | try: |
| | data = {} |
| | for key, entry in self._index.items(): |
| | data[key] = { |
| | "key": entry.key, |
| | "path": str(entry.path), |
| | "size_bytes": entry.size_bytes, |
| | "created_at": entry.created_at, |
| | "last_accessed": entry.last_accessed, |
| | "ttl_hours": entry.ttl_hours, |
| | "metadata": entry.metadata |
| | } |
| |
|
| | with open(self._index_path, "w") as f: |
| | json.dump(data, f) |
| | except Exception as e: |
| | logger.warning(f"Failed to save cache index: {e}") |
| |
|
| | def _generate_key( |
| | self, |
| | doc_path: Union[str, Path], |
| | namespace: str, |
| | *args, |
| | **kwargs |
| | ) -> str: |
| | """Generate a unique cache key.""" |
| | doc_path = Path(doc_path) |
| |
|
| | |
| | try: |
| | mtime = doc_path.stat().st_mtime |
| | except OSError: |
| | mtime = 0 |
| |
|
| | key_parts = [ |
| | str(doc_path.absolute()), |
| | str(mtime), |
| | namespace, |
| | *[str(a) for a in args], |
| | *[f"{k}={v}" for k, v in sorted(kwargs.items())] |
| | ] |
| |
|
| | key_str = "|".join(key_parts) |
| | return hashlib.sha256(key_str.encode()).hexdigest() |
| |
|
| | def _get_cache_path(self, key: str, namespace: str, ext: str = ".pkl") -> Path: |
| | """Get file path for a cache entry.""" |
| | return self.config.cache_dir / namespace / f"{key}{ext}" |
| |
|
| | def _get_total_size(self) -> int: |
| | """Get total cache size in bytes.""" |
| | return sum(entry.size_bytes for entry in self._index.values()) |
| |
|
| | def _evict_if_needed(self, required_bytes: int = 0) -> None: |
| | """Evict entries if cache exceeds max size.""" |
| | max_bytes = self.config.max_size_gb * 1024 * 1024 * 1024 |
| | current_size = self._get_total_size() |
| |
|
| | if current_size + required_bytes <= max_bytes: |
| | return |
| |
|
| | |
| | entries = sorted( |
| | self._index.values(), |
| | key=lambda e: e.last_accessed |
| | ) |
| |
|
| | |
| | for entry in entries: |
| | if current_size + required_bytes <= max_bytes: |
| | break |
| |
|
| | self._delete_entry(entry.key) |
| | current_size -= entry.size_bytes |
| |
|
| | def _delete_entry(self, key: str) -> None: |
| | """Delete a cache entry.""" |
| | if key not in self._index: |
| | return |
| |
|
| | entry = self._index[key] |
| | try: |
| | if entry.path.exists(): |
| | entry.path.unlink() |
| | except Exception as e: |
| | logger.warning(f"Failed to delete cache file: {e}") |
| |
|
| | del self._index[key] |
| |
|
| | def _cleanup_expired(self) -> int: |
| | """Remove expired entries. Returns number removed.""" |
| | expired_keys = [ |
| | key for key, entry in self._index.items() |
| | if entry.is_expired |
| | ] |
| |
|
| | for key in expired_keys: |
| | self._delete_entry(key) |
| |
|
| | if expired_keys: |
| | self._save_index() |
| |
|
| | return len(expired_keys) |
| |
|
| | |
| |
|
| | def get_page_image( |
| | self, |
| | doc_path: Union[str, Path], |
| | page_number: int, |
| | dpi: int = 200 |
| | ) -> Optional[np.ndarray]: |
| | """Get cached page image.""" |
| | if not self.config.enabled: |
| | return None |
| |
|
| | key = self._generate_key(doc_path, "pages", page_number, dpi=dpi) |
| |
|
| | if key not in self._index: |
| | return None |
| |
|
| | entry = self._index[key] |
| | if entry.is_expired: |
| | self._delete_entry(key) |
| | return None |
| |
|
| | try: |
| | |
| | img = Image.open(entry.path) |
| | arr = np.array(img) |
| |
|
| | |
| | entry.last_accessed = time.time() |
| | self._save_index() |
| |
|
| | return arr |
| | except Exception as e: |
| | logger.warning(f"Failed to load cached page: {e}") |
| | self._delete_entry(key) |
| | return None |
| |
|
| | def set_page_image( |
| | self, |
| | doc_path: Union[str, Path], |
| | page_number: int, |
| | image: np.ndarray, |
| | dpi: int = 200 |
| | ) -> bool: |
| | """Cache a page image.""" |
| | if not self.config.enabled: |
| | return False |
| |
|
| | key = self._generate_key(doc_path, "pages", page_number, dpi=dpi) |
| | cache_path = self._get_cache_path(key, "pages", ".png") |
| |
|
| | try: |
| | |
| | img = Image.fromarray(image) |
| |
|
| | |
| | estimated_size = image.nbytes // 10 |
| |
|
| | self._evict_if_needed(estimated_size) |
| |
|
| | |
| | img.save(cache_path, format="PNG", optimize=self.config.compression) |
| |
|
| | |
| | entry = CacheEntry( |
| | key=key, |
| | path=cache_path, |
| | size_bytes=cache_path.stat().st_size, |
| | created_at=time.time(), |
| | last_accessed=time.time(), |
| | ttl_hours=self.config.ttl_hours, |
| | metadata={"page": page_number, "dpi": dpi} |
| | ) |
| | self._index[key] = entry |
| | self._save_index() |
| |
|
| | return True |
| | except Exception as e: |
| | logger.warning(f"Failed to cache page image: {e}") |
| | return False |
| |
|
| | def get( |
| | self, |
| | doc_path: Union[str, Path], |
| | namespace: str, |
| | *args, |
| | **kwargs |
| | ) -> Optional[Any]: |
| | """Get a cached object.""" |
| | if not self.config.enabled: |
| | return None |
| |
|
| | key = self._generate_key(doc_path, namespace, *args, **kwargs) |
| |
|
| | if key not in self._index: |
| | return None |
| |
|
| | entry = self._index[key] |
| | if entry.is_expired: |
| | self._delete_entry(key) |
| | return None |
| |
|
| | try: |
| | with open(entry.path, "rb") as f: |
| | data = pickle.load(f) |
| |
|
| | entry.last_accessed = time.time() |
| | self._save_index() |
| |
|
| | return data |
| | except Exception as e: |
| | logger.warning(f"Failed to load cached object: {e}") |
| | self._delete_entry(key) |
| | return None |
| |
|
| | def set( |
| | self, |
| | doc_path: Union[str, Path], |
| | namespace: str, |
| | value: Any, |
| | *args, |
| | **kwargs |
| | ) -> bool: |
| | """Cache an object.""" |
| | if not self.config.enabled: |
| | return False |
| |
|
| | key = self._generate_key(doc_path, namespace, *args, **kwargs) |
| | cache_path = self._get_cache_path(key, namespace, ".pkl") |
| |
|
| | try: |
| | |
| | data = pickle.dumps(value) |
| | self._evict_if_needed(len(data)) |
| |
|
| | |
| | with open(cache_path, "wb") as f: |
| | f.write(data) |
| |
|
| | entry = CacheEntry( |
| | key=key, |
| | path=cache_path, |
| | size_bytes=len(data), |
| | created_at=time.time(), |
| | last_accessed=time.time(), |
| | ttl_hours=self.config.ttl_hours |
| | ) |
| | self._index[key] = entry |
| | self._save_index() |
| |
|
| | return True |
| | except Exception as e: |
| | logger.warning(f"Failed to cache object: {e}") |
| | return False |
| |
|
| | def invalidate_document(self, doc_path: Union[str, Path]) -> int: |
| | """Invalidate all cache entries for a document. Returns count removed.""" |
| | doc_path = Path(doc_path).absolute() |
| | doc_str = str(doc_path) |
| |
|
| | keys_to_remove = [] |
| | for key, entry in self._index.items(): |
| | |
| | if entry.metadata.get("doc_path") == doc_str: |
| | keys_to_remove.append(key) |
| |
|
| | for key in keys_to_remove: |
| | self._delete_entry(key) |
| |
|
| | if keys_to_remove: |
| | self._save_index() |
| |
|
| | return len(keys_to_remove) |
| |
|
| | def clear(self) -> None: |
| | """Clear entire cache.""" |
| | if self.config.cache_dir.exists(): |
| | shutil.rmtree(self.config.cache_dir) |
| |
|
| | self._index = {} |
| | self._init_cache_dir() |
| |
|
| | def get_stats(self) -> Dict[str, Any]: |
| | """Get cache statistics.""" |
| | total_size = self._get_total_size() |
| | return { |
| | "enabled": self.config.enabled, |
| | "total_entries": len(self._index), |
| | "total_size_bytes": total_size, |
| | "total_size_mb": total_size / (1024 * 1024), |
| | "max_size_gb": self.config.max_size_gb, |
| | "utilization_percent": (total_size / (self.config.max_size_gb * 1024 * 1024 * 1024)) * 100, |
| | "cache_dir": str(self.config.cache_dir), |
| | "namespaces": { |
| | ns: sum(1 for e in self._index.values() if ns in str(e.path)) |
| | for ns in self.NAMESPACES |
| | } |
| | } |
| |
|
| |
|
| | |
| | _global_cache: Optional[DocumentCache] = None |
| |
|
| |
|
| | def get_document_cache(config: Optional[CacheConfig] = None) -> DocumentCache: |
| | """Get or create global document cache.""" |
| | global _global_cache |
| |
|
| | if _global_cache is None or config is not None: |
| | _global_cache = DocumentCache(config) |
| |
|
| | return _global_cache |
| |
|
| |
|
| | def cached_page( |
| | cache: Optional[DocumentCache] = None, |
| | dpi: int = 200 |
| | ) -> Callable: |
| | """ |
| | Decorator for caching page rendering results. |
| | |
| | Usage: |
| | @cached_page(cache, dpi=200) |
| | def render_page(doc_path, page_number): |
| | # ... rendering logic |
| | return image_array |
| | """ |
| | def decorator(func: Callable) -> Callable: |
| | def wrapper(doc_path: Union[str, Path], page_number: int, *args, **kwargs): |
| | _cache = cache or get_document_cache() |
| |
|
| | |
| | cached = _cache.get_page_image(doc_path, page_number, dpi) |
| | if cached is not None: |
| | return cached |
| |
|
| | |
| | result = func(doc_path, page_number, *args, **kwargs) |
| |
|
| | if result is not None: |
| | _cache.set_page_image(doc_path, page_number, result, dpi) |
| |
|
| | return result |
| |
|
| | return wrapper |
| | return decorator |
| |
|