File size: 4,689 Bytes
d99a7d6 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 | from abc import ABC, abstractmethod
from typing import Dict, List, Any, Optional, Union
from datetime import datetime
import uuid
from enum import Enum
from pydantic import BaseModel
class StorageType(Enum):
CHAT = "chat"
DOCUMENT = "document"
VECTOR = "vector"
AUDIT = "audit"
USER = "user"
APP_STATE = "app_state"
ANALYTICS = "analytics"
class StorageRecord(BaseModel):
"""Base storage record"""
id: str
type: StorageType
data: Dict[str, Any]
metadata: Dict[str, Any]
created_at: datetime
updated_at: datetime
tenant_id: Optional[str]
user_id: Optional[str]
tags: List[str] = []
class BaseStorage(ABC):
"""Abstract base storage interface"""
@abstractmethod
async def store(
self,
type: StorageType,
data: Dict[str, Any],
metadata: Optional[Dict[str, Any]] = None,
tenant_id: Optional[str] = None,
user_id: Optional[str] = None,
tags: Optional[List[str]] = None
) -> str:
"""Store data and return record ID"""
pass
@abstractmethod
async def retrieve(
self,
record_id: str,
tenant_id: Optional[str] = None
) -> Optional[StorageRecord]:
"""Retrieve record by ID"""
pass
@abstractmethod
async def query(
self,
type: StorageType,
query: Dict[str, Any],
tenant_id: Optional[str] = None
) -> List[StorageRecord]:
"""Query records"""
pass
@abstractmethod
async def update(
self,
record_id: str,
data: Dict[str, Any],
tenant_id: Optional[str] = None
) -> bool:
"""Update record"""
pass
@abstractmethod
async def delete(
self,
record_id: str,
tenant_id: Optional[str] = None
) -> bool:
"""Delete record"""
pass
class PostgresStorage(BaseStorage):
"""PostgreSQL storage implementation"""
def __init__(self, connection_string: str):
self.connection_string = connection_string
# Initialize connection pool
async def store(self, type: StorageType, data: Dict[str, Any], **kwargs) -> str:
record_id = str(uuid.uuid4())
# Implementation using asyncpg
return record_id
class MongoStorage(BaseStorage):
"""MongoDB storage implementation"""
def __init__(self, connection_string: str):
self.connection_string = connection_string
# Initialize MongoDB client
async def store(self, type: StorageType, data: Dict[str, Any], **kwargs) -> str:
record_id = str(uuid.uuid4())
# Implementation using motor
return record_id
class StorageManager:
"""Manages different storage types"""
def __init__(self, config: Dict[str, Any]):
self.config = config
self.storages: Dict[StorageType, BaseStorage] = {}
self._initialize_storages()
def _initialize_storages(self):
"""Initialize storage backends based on configuration"""
for type in StorageType:
storage_config = self.config.get(type.value, {})
provider = storage_config.get('provider', 'postgres')
if provider == 'postgres':
self.storages[type] = PostgresStorage(
storage_config['connection_string']
)
elif provider == 'mongodb':
self.storages[type] = MongoStorage(
storage_config['connection_string']
)
async def store(
self,
type: StorageType,
data: Dict[str, Any],
**kwargs
) -> str:
"""Store data in appropriate storage"""
storage = self.storages.get(type)
if not storage:
raise ValueError(f"No storage configured for type: {type}")
return await storage.store(type, data, **kwargs)
async def retrieve(
self,
type: StorageType,
record_id: str,
**kwargs
) -> Optional[StorageRecord]:
"""Retrieve data from appropriate storage"""
storage = self.storages.get(type)
if not storage:
raise ValueError(f"No storage configured for type: {type}")
return await storage.retrieve(record_id, **kwargs)
# Example usage:
storage_config = {
'chat': {
'provider': 'mongodb',
'connection_string': 'mongodb://localhost:27017'
},
'document': {
'provider': 'postgres',
'connection_string': 'postgresql://user:pass@localhost/db'
}
}
storage_manager = StorageManager(storage_config) |