from abc import ABC, abstractmethod from typing import Dict, List, Any, Optional, Union from datetime import datetime import uuid from enum import Enum from pydantic import BaseModel class StorageType(Enum): CHAT = "chat" DOCUMENT = "document" VECTOR = "vector" AUDIT = "audit" USER = "user" APP_STATE = "app_state" ANALYTICS = "analytics" class StorageRecord(BaseModel): """Base storage record""" id: str type: StorageType data: Dict[str, Any] metadata: Dict[str, Any] created_at: datetime updated_at: datetime tenant_id: Optional[str] user_id: Optional[str] tags: List[str] = [] class BaseStorage(ABC): """Abstract base storage interface""" @abstractmethod async def store( self, type: StorageType, data: Dict[str, Any], metadata: Optional[Dict[str, Any]] = None, tenant_id: Optional[str] = None, user_id: Optional[str] = None, tags: Optional[List[str]] = None ) -> str: """Store data and return record ID""" pass @abstractmethod async def retrieve( self, record_id: str, tenant_id: Optional[str] = None ) -> Optional[StorageRecord]: """Retrieve record by ID""" pass @abstractmethod async def query( self, type: StorageType, query: Dict[str, Any], tenant_id: Optional[str] = None ) -> List[StorageRecord]: """Query records""" pass @abstractmethod async def update( self, record_id: str, data: Dict[str, Any], tenant_id: Optional[str] = None ) -> bool: """Update record""" pass @abstractmethod async def delete( self, record_id: str, tenant_id: Optional[str] = None ) -> bool: """Delete record""" pass class PostgresStorage(BaseStorage): """PostgreSQL storage implementation""" def __init__(self, connection_string: str): self.connection_string = connection_string # Initialize connection pool async def store(self, type: StorageType, data: Dict[str, Any], **kwargs) -> str: record_id = str(uuid.uuid4()) # Implementation using asyncpg return record_id class MongoStorage(BaseStorage): """MongoDB storage implementation""" def __init__(self, connection_string: str): self.connection_string = connection_string # Initialize MongoDB client async def store(self, type: StorageType, data: Dict[str, Any], **kwargs) -> str: record_id = str(uuid.uuid4()) # Implementation using motor return record_id class StorageManager: """Manages different storage types""" def __init__(self, config: Dict[str, Any]): self.config = config self.storages: Dict[StorageType, BaseStorage] = {} self._initialize_storages() def _initialize_storages(self): """Initialize storage backends based on configuration""" for type in StorageType: storage_config = self.config.get(type.value, {}) provider = storage_config.get('provider', 'postgres') if provider == 'postgres': self.storages[type] = PostgresStorage( storage_config['connection_string'] ) elif provider == 'mongodb': self.storages[type] = MongoStorage( storage_config['connection_string'] ) async def store( self, type: StorageType, data: Dict[str, Any], **kwargs ) -> str: """Store data in appropriate storage""" storage = self.storages.get(type) if not storage: raise ValueError(f"No storage configured for type: {type}") return await storage.store(type, data, **kwargs) async def retrieve( self, type: StorageType, record_id: str, **kwargs ) -> Optional[StorageRecord]: """Retrieve data from appropriate storage""" storage = self.storages.get(type) if not storage: raise ValueError(f"No storage configured for type: {type}") return await storage.retrieve(record_id, **kwargs) # Example usage: storage_config = { 'chat': { 'provider': 'mongodb', 'connection_string': 'mongodb://localhost:27017' }, 'document': { 'provider': 'postgres', 'connection_string': 'postgresql://user:pass@localhost/db' } } storage_manager = StorageManager(storage_config)