Spaces:
Sleeping
Sleeping
| """ | |
| MCP Core - State Persistence Layer | |
| This module provides functionality for persisting context state across sessions, | |
| allowing for long-term storage and retrieval of context data. | |
| """ | |
| import json | |
| import os | |
| import sqlite3 | |
| from datetime import datetime | |
| from typing import Dict, List, Optional, Any, Union | |
| from abc import ABC, abstractmethod | |
| from .context import Context | |
| class PersistenceProvider(ABC): | |
| """ | |
| Abstract base class for persistence providers. | |
| Persistence providers handle the storage and retrieval of context data, | |
| allowing contexts to be persisted across sessions. | |
| """ | |
| async def save_context(self, context: Context) -> bool: | |
| """ | |
| Save a context to persistent storage. | |
| Args: | |
| context: The Context object to save. | |
| Returns: | |
| True if the save was successful, False otherwise. | |
| """ | |
| pass | |
| async def load_context(self, context_id: str) -> Optional[Context]: | |
| """ | |
| Load a context from persistent storage. | |
| Args: | |
| context_id: The ID of the context to load. | |
| Returns: | |
| The loaded Context object if found, None otherwise. | |
| """ | |
| pass | |
| async def delete_context(self, context_id: str) -> bool: | |
| """ | |
| Delete a context from persistent storage. | |
| Args: | |
| context_id: The ID of the context to delete. | |
| Returns: | |
| True if the deletion was successful, False otherwise. | |
| """ | |
| pass | |
| async def list_contexts(self, filter_criteria: Optional[Dict[str, Any]] = None) -> List[str]: | |
| """ | |
| List context IDs in persistent storage, optionally filtered. | |
| Args: | |
| filter_criteria: Optional criteria to filter contexts by. | |
| Returns: | |
| List of context IDs matching the filter criteria. | |
| """ | |
| pass | |
| class FileSystemProvider(PersistenceProvider): | |
| """ | |
| File system-based persistence provider. | |
| This provider stores contexts as JSON files in the file system. | |
| """ | |
| def __init__(self, storage_dir: str = "./storage/contexts"): | |
| """ | |
| Initialize a new FileSystemProvider. | |
| Args: | |
| storage_dir: Directory to store context files in. | |
| """ | |
| self.storage_dir = storage_dir | |
| os.makedirs(storage_dir, exist_ok=True) | |
| def _get_context_path(self, context_id: str) -> str: | |
| """ | |
| Get the file path for a context. | |
| Args: | |
| context_id: The ID of the context. | |
| Returns: | |
| The file path for the context. | |
| """ | |
| return os.path.join(self.storage_dir, f"{context_id}.json") | |
| async def save_context(self, context: Context) -> bool: | |
| """ | |
| Save a context to a JSON file. | |
| Args: | |
| context: The Context object to save. | |
| Returns: | |
| True if the save was successful, False otherwise. | |
| """ | |
| try: | |
| with open(self._get_context_path(context.context_id), 'w') as f: | |
| json.dump(context.to_dict(), f, indent=2) | |
| return True | |
| except Exception as e: | |
| print(f"Error saving context: {e}") | |
| return False | |
| async def load_context(self, context_id: str) -> Optional[Context]: | |
| """ | |
| Load a context from a JSON file. | |
| Args: | |
| context_id: The ID of the context to load. | |
| Returns: | |
| The loaded Context object if found, None otherwise. | |
| """ | |
| try: | |
| path = self._get_context_path(context_id) | |
| if not os.path.exists(path): | |
| return None | |
| with open(path, 'r') as f: | |
| data = json.load(f) | |
| return Context.from_dict(data) | |
| except Exception as e: | |
| print(f"Error loading context: {e}") | |
| return None | |
| async def delete_context(self, context_id: str) -> bool: | |
| """ | |
| Delete a context JSON file. | |
| Args: | |
| context_id: The ID of the context to delete. | |
| Returns: | |
| True if the deletion was successful, False otherwise. | |
| """ | |
| try: | |
| path = self._get_context_path(context_id) | |
| if os.path.exists(path): | |
| os.remove(path) | |
| return True | |
| return False | |
| except Exception as e: | |
| print(f"Error deleting context: {e}") | |
| return False | |
| async def list_contexts(self, filter_criteria: Optional[Dict[str, Any]] = None) -> List[str]: | |
| """ | |
| List context IDs in the storage directory, optionally filtered. | |
| Args: | |
| filter_criteria: Optional criteria to filter contexts by. | |
| Returns: | |
| List of context IDs matching the filter criteria. | |
| """ | |
| try: | |
| context_ids = [] | |
| for filename in os.listdir(self.storage_dir): | |
| if filename.endswith('.json'): | |
| context_id = filename[:-5] # Remove .json extension | |
| # Apply filtering if criteria provided | |
| if filter_criteria: | |
| try: | |
| with open(self._get_context_path(context_id), 'r') as f: | |
| data = json.load(f) | |
| # Check if context matches all filter criteria | |
| matches = True | |
| for key, value in filter_criteria.items(): | |
| # Handle nested keys with dot notation | |
| if '.' in key: | |
| parts = key.split('.') | |
| current = data | |
| for part in parts: | |
| if part not in current: | |
| matches = False | |
| break | |
| current = current[part] | |
| if matches and current != value: | |
| matches = False | |
| elif key not in data or data[key] != value: | |
| matches = False | |
| if matches: | |
| context_ids.append(context_id) | |
| except: | |
| # Skip contexts that can't be loaded or don't match | |
| continue | |
| else: | |
| context_ids.append(context_id) | |
| return context_ids | |
| except Exception as e: | |
| print(f"Error listing contexts: {e}") | |
| return [] | |
| class SQLiteProvider(PersistenceProvider): | |
| """ | |
| SQLite-based persistence provider. | |
| This provider stores contexts in an SQLite database. | |
| """ | |
| def __init__(self, db_path: str = "./storage/contexts.db"): | |
| """ | |
| Initialize a new SQLiteProvider. | |
| Args: | |
| db_path: Path to the SQLite database file. | |
| """ | |
| os.makedirs(os.path.dirname(db_path), exist_ok=True) | |
| self.db_path = db_path | |
| self._init_db() | |
| def _init_db(self): | |
| """Initialize the database schema if it doesn't exist.""" | |
| conn = sqlite3.connect(self.db_path) | |
| cursor = conn.cursor() | |
| # Create contexts table | |
| cursor.execute(''' | |
| CREATE TABLE IF NOT EXISTS contexts ( | |
| context_id TEXT PRIMARY KEY, | |
| metadata TEXT, | |
| state TEXT, | |
| history TEXT, | |
| created_at TEXT, | |
| updated_at TEXT | |
| ) | |
| ''') | |
| conn.commit() | |
| conn.close() | |
| async def save_context(self, context: Context) -> bool: | |
| """ | |
| Save a context to the SQLite database. | |
| Args: | |
| context: The Context object to save. | |
| Returns: | |
| True if the save was successful, False otherwise. | |
| """ | |
| try: | |
| conn = sqlite3.connect(self.db_path) | |
| cursor = conn.cursor() | |
| # Convert complex objects to JSON strings | |
| context_data = context.to_dict() | |
| metadata = json.dumps(context_data.get("metadata", {})) | |
| state = json.dumps(context_data.get("state", {})) | |
| history = json.dumps(context_data.get("history", [])) | |
| # Check if context already exists | |
| cursor.execute("SELECT 1 FROM contexts WHERE context_id = ?", (context.context_id,)) | |
| exists = cursor.fetchone() is not None | |
| if exists: | |
| # Update existing context | |
| cursor.execute(''' | |
| UPDATE contexts | |
| SET metadata = ?, state = ?, history = ?, updated_at = ? | |
| WHERE context_id = ? | |
| ''', (metadata, state, history, context.updated_at.isoformat(), context.context_id)) | |
| else: | |
| # Insert new context | |
| cursor.execute(''' | |
| INSERT INTO contexts (context_id, metadata, state, history, created_at, updated_at) | |
| VALUES (?, ?, ?, ?, ?, ?) | |
| ''', ( | |
| context.context_id, | |
| metadata, | |
| state, | |
| history, | |
| context.created_at.isoformat(), | |
| context.updated_at.isoformat() | |
| )) | |
| conn.commit() | |
| conn.close() | |
| return True | |
| except Exception as e: | |
| print(f"Error saving context to SQLite: {e}") | |
| return False | |
| async def load_context(self, context_id: str) -> Optional[Context]: | |
| """ | |
| Load a context from the SQLite database. | |
| Args: | |
| context_id: The ID of the context to load. | |
| Returns: | |
| The loaded Context object if found, None otherwise. | |
| """ | |
| try: | |
| conn = sqlite3.connect(self.db_path) | |
| cursor = conn.cursor() | |
| cursor.execute(''' | |
| SELECT context_id, metadata, state, history, created_at, updated_at | |
| FROM contexts | |
| WHERE context_id = ? | |
| ''', (context_id,)) | |
| row = cursor.fetchone() | |
| conn.close() | |
| if not row: | |
| return None | |
| # Reconstruct context from database row | |
| context_data = { | |
| "context_id": row[0], | |
| "metadata": json.loads(row[1]), | |
| "state": json.loads(row[2]), | |
| "history": json.loads(row[3]), | |
| "created_at": row[4], | |
| "updated_at": row[5] | |
| } | |
| return Context.from_dict(context_data) | |
| except Exception as e: | |
| print(f"Error loading context from SQLite: {e}") | |
| return None | |
| async def delete_context(self, context_id: str) -> bool: | |
| """ | |
| Delete a context from the SQLite database. | |
| Args: | |
| context_id: The ID of the context to delete. | |
| Returns: | |
| True if the deletion was successful, False otherwise. | |
| """ | |
| try: | |
| conn = sqlite3.connect(self.db_path) | |
| cursor = conn.cursor() | |
| cursor.execute("DELETE FROM contexts WHERE context_id = ?", (context_id,)) | |
| deleted = cursor.rowcount > 0 | |
| conn.commit() | |
| conn.close() | |
| return deleted | |
| except Exception as e: | |
| print(f"Error deleting context from SQLite: {e}") | |
| return False | |
| async def list_contexts(self, filter_criteria: Optional[Dict[str, Any]] = None) -> List[str]: | |
| """ | |
| List context IDs in the SQLite database, optionally filtered. | |
| Args: | |
| filter_criteria: Optional criteria to filter contexts by. | |
| Returns: | |
| List of context IDs matching the filter criteria. | |
| """ | |
| try: | |
| conn = sqlite3.connect(self.db_path) | |
| cursor = conn.cursor() | |
| if not filter_criteria: | |
| # Simple case: no filtering | |
| cursor.execute("SELECT context_id FROM contexts") | |
| context_ids = [row[0] for row in cursor.fetchall()] | |
| else: | |
| # Complex case: need to load and filter each context | |
| cursor.execute("SELECT context_id, metadata, state FROM contexts") | |
| rows = cursor.fetchall() | |
| context_ids = [] | |
| for row in rows: | |
| context_id = row[0] | |
| metadata = json.loads(row[1]) | |
| state = json.loads(row[2]) | |
| # Check if context matches all filter criteria | |
| matches = True | |
| for key, value in filter_criteria.items(): | |
| if key.startswith("metadata."): | |
| # Filter on metadata field | |
| field = key[9:] # Remove "metadata." prefix | |
| if field not in metadata or metadata[field] != value: | |
| matches = False | |
| break | |
| elif key.startswith("state."): | |
| # Filter on state field | |
| field = key[6:] # Remove "state." prefix | |
| if field not in state or state[field] != value: | |
| matches = False | |
| break | |
| if matches: | |
| context_ids.append(context_id) | |
| conn.close() | |
| return context_ids | |
| except Exception as e: | |
| print(f"Error listing contexts from SQLite: {e}") | |
| return [] | |
| class PersistenceManager: | |
| """ | |
| Manages context persistence operations. | |
| This class provides a unified interface for working with different | |
| persistence providers. | |
| """ | |
| def __init__(self, provider: PersistenceProvider): | |
| """ | |
| Initialize a new PersistenceManager. | |
| Args: | |
| provider: The persistence provider to use. | |
| """ | |
| self.provider = provider | |
| async def save_context(self, context: Context) -> bool: | |
| """ | |
| Save a context using the configured provider. | |
| Args: | |
| context: The Context object to save. | |
| Returns: | |
| True if the save was successful, False otherwise. | |
| """ | |
| return await self.provider.save_context(context) | |
| async def load_context(self, context_id: str) -> Optional[Context]: | |
| """ | |
| Load a context using the configured provider. | |
| Args: | |
| context_id: The ID of the context to load. | |
| Returns: | |
| The loaded Context object if found, None otherwise. | |
| """ | |
| return await self.provider.load_context(context_id) | |
| async def delete_context(self, context_id: str) -> bool: | |
| """ | |
| Delete a context using the configured provider. | |
| Args: | |
| context_id: The ID of the context to delete. | |
| Returns: | |
| True if the deletion was successful, False otherwise. | |
| """ | |
| return await self.provider.delete_context(context_id) | |
| async def list_contexts(self, filter_criteria: Optional[Dict[str, Any]] = None) -> List[str]: | |
| """ | |
| List context IDs using the configured provider, optionally filtered. | |
| Args: | |
| filter_criteria: Optional criteria to filter contexts by. | |
| Returns: | |
| List of context IDs matching the filter criteria. | |
| """ | |
| return await self.provider.list_contexts(filter_criteria) | |
| def change_provider(self, provider: PersistenceProvider) -> None: | |
| """ | |
| Change the persistence provider. | |
| Args: | |
| provider: The new persistence provider to use. | |
| """ | |
| self.provider = provider | |