Spaces:
Running
Running
| """ | |
| Knowledge storage with LRU eviction and expiration management. | |
| Stores knowledge items as JSON files with 200MB storage limit. | |
| Implements LRU eviction when limit is reached. | |
| """ | |
| import json | |
| import logging | |
| import os | |
| from datetime import datetime, timedelta | |
| from pathlib import Path | |
| from typing import Dict, Any, Optional, List | |
| import uuid | |
| logger = logging.getLogger(__name__) | |
| class KnowledgeStore: | |
| """Stores and retrieves knowledge items with storage limits.""" | |
| def __init__(self, data_dir: str, max_size_mb: int = 200): | |
| self.data_dir = Path(data_dir) / "knowledge" | |
| self.max_size_bytes = max_size_mb * 1024 * 1024 | |
| self.data_dir.mkdir(parents=True, exist_ok=True) | |
| def save_knowledge(self, item: Dict[str, Any]) -> str: | |
| """ | |
| Save a knowledge item to storage. | |
| Args: | |
| item: Knowledge item to save | |
| Returns: | |
| Item ID | |
| """ | |
| # Generate ID if not present | |
| if "id" not in item: | |
| item["id"] = str(uuid.uuid4()) | |
| # Add metadata | |
| item["saved_at"] = datetime.utcnow().isoformat() | |
| item["last_accessed"] = datetime.utcnow().isoformat() | |
| # Check storage limit and evict if needed | |
| self._enforce_storage_limit() | |
| # Save to file | |
| file_path = self.data_dir / f"{item['id']}.json" | |
| with open(file_path, 'w') as f: | |
| json.dump(item, f, indent=2) | |
| logger.info(f"Saved knowledge item: {item['id']}") | |
| return item["id"] | |
| def get_knowledge(self, item_id: str) -> Optional[Dict[str, Any]]: | |
| """ | |
| Retrieve a knowledge item by ID. | |
| Args: | |
| item_id: Item ID | |
| Returns: | |
| Knowledge item or None if not found | |
| """ | |
| file_path = self.data_dir / f"{item_id}.json" | |
| if not file_path.exists(): | |
| return None | |
| with open(file_path, 'r') as f: | |
| item = json.load(f) | |
| # Update last accessed time | |
| item["last_accessed"] = datetime.utcnow().isoformat() | |
| with open(file_path, 'w') as f: | |
| json.dump(item, f, indent=2) | |
| return item | |
| def search_knowledge(self, query: str, limit: int = 10) -> List[Dict[str, Any]]: | |
| """ | |
| Search knowledge items by query. | |
| Args: | |
| query: Search query | |
| limit: Maximum number of results | |
| Returns: | |
| List of matching knowledge items | |
| """ | |
| results = [] | |
| query_lower = query.lower() | |
| for file_path in self.data_dir.glob("*.json"): | |
| try: | |
| with open(file_path, 'r') as f: | |
| item = json.load(f) | |
| # Simple text search in title and summary | |
| title = item.get("title", "").lower() | |
| summary = item.get("summary", "").lower() | |
| if query_lower in title or query_lower in summary: | |
| results.append(item) | |
| if len(results) >= limit: | |
| break | |
| except Exception as e: | |
| logger.error(f"Failed to read knowledge item {file_path}: {e}") | |
| # Sort by relevance (title match first, then by recency) | |
| results.sort(key=lambda x: ( | |
| query_lower not in x.get("title", "").lower(), | |
| x.get("saved_at", "") | |
| ), reverse=True) | |
| return results[:limit] | |
| def list_all(self, limit: Optional[int] = None) -> List[Dict[str, Any]]: | |
| """ | |
| List all knowledge items. | |
| Args: | |
| limit: Maximum number of items to return | |
| Returns: | |
| List of knowledge items | |
| """ | |
| items = [] | |
| for file_path in self.data_dir.glob("*.json"): | |
| try: | |
| with open(file_path, 'r') as f: | |
| item = json.load(f) | |
| items.append(item) | |
| except Exception as e: | |
| logger.error(f"Failed to read knowledge item {file_path}: {e}") | |
| # Sort by saved_at descending | |
| items.sort(key=lambda x: x.get("saved_at", ""), reverse=True) | |
| if limit: | |
| return items[:limit] | |
| return items | |
| def delete_expired_knowledge(self, expiration_days: int = 30) -> int: | |
| """ | |
| Delete expired knowledge items. | |
| Args: | |
| expiration_days: Number of days before expiration | |
| Returns: | |
| Number of items deleted | |
| """ | |
| cutoff = datetime.utcnow() - timedelta(days=expiration_days) | |
| deleted_count = 0 | |
| for file_path in self.data_dir.glob("*.json"): | |
| try: | |
| with open(file_path, 'r') as f: | |
| item = json.load(f) | |
| saved_at = datetime.fromisoformat(item.get("saved_at", "")) | |
| if saved_at < cutoff: | |
| file_path.unlink() | |
| deleted_count += 1 | |
| logger.info(f"Deleted expired knowledge item: {item.get('id')}") | |
| except Exception as e: | |
| logger.error(f"Failed to check expiration for {file_path}: {e}") | |
| logger.info(f"Deleted {deleted_count} expired knowledge items") | |
| return deleted_count | |
| def _enforce_storage_limit(self): | |
| """Enforce storage limit using LRU eviction.""" | |
| current_size = self._get_storage_size() | |
| if current_size <= self.max_size_bytes: | |
| return | |
| logger.warning(f"Storage limit exceeded: {current_size / 1024 / 1024:.2f}MB / {self.max_size_bytes / 1024 / 1024:.2f}MB") | |
| # Get all items sorted by last_accessed (LRU) | |
| items = [] | |
| for file_path in self.data_dir.glob("*.json"): | |
| try: | |
| with open(file_path, 'r') as f: | |
| item = json.load(f) | |
| items.append((file_path, item.get("last_accessed", ""))) | |
| except Exception as e: | |
| logger.error(f"Failed to read {file_path}: {e}") | |
| items.sort(key=lambda x: x[1]) # Sort by last_accessed ascending | |
| # Delete oldest items until under limit | |
| for file_path, _ in items: | |
| if current_size <= self.max_size_bytes: | |
| break | |
| file_size = file_path.stat().st_size | |
| file_path.unlink() | |
| current_size -= file_size | |
| logger.info(f"Evicted knowledge item (LRU): {file_path.name}") | |
| def _get_storage_size(self) -> int: | |
| """Get total storage size in bytes.""" | |
| total_size = 0 | |
| for file_path in self.data_dir.glob("*.json"): | |
| total_size += file_path.stat().st_size | |
| return total_size | |
| def get_storage_stats(self) -> Dict[str, Any]: | |
| """Get storage statistics.""" | |
| total_size = self._get_storage_size() | |
| item_count = len(list(self.data_dir.glob("*.json"))) | |
| return { | |
| "total_size_mb": total_size / 1024 / 1024, | |
| "max_size_mb": self.max_size_bytes / 1024 / 1024, | |
| "usage_percent": (total_size / self.max_size_bytes) * 100 if self.max_size_bytes > 0 else 0, | |
| "item_count": item_count, | |
| } | |