|
|
from abc import ABC, abstractmethod |
|
|
from typing import Dict, List, Any, Optional, Union |
|
|
from datetime import datetime |
|
|
import uuid |
|
|
from enum import Enum |
|
|
from pydantic import BaseModel |
|
|
|
|
|
class StorageType(Enum): |
|
|
CHAT = "chat" |
|
|
DOCUMENT = "document" |
|
|
VECTOR = "vector" |
|
|
AUDIT = "audit" |
|
|
USER = "user" |
|
|
APP_STATE = "app_state" |
|
|
ANALYTICS = "analytics" |
|
|
|
|
|
class StorageRecord(BaseModel): |
|
|
"""Base storage record""" |
|
|
id: str |
|
|
type: StorageType |
|
|
data: Dict[str, Any] |
|
|
metadata: Dict[str, Any] |
|
|
created_at: datetime |
|
|
updated_at: datetime |
|
|
tenant_id: Optional[str] |
|
|
user_id: Optional[str] |
|
|
tags: List[str] = [] |
|
|
|
|
|
class BaseStorage(ABC): |
|
|
"""Abstract base storage interface""" |
|
|
|
|
|
@abstractmethod |
|
|
async def store( |
|
|
self, |
|
|
type: StorageType, |
|
|
data: Dict[str, Any], |
|
|
metadata: Optional[Dict[str, Any]] = None, |
|
|
tenant_id: Optional[str] = None, |
|
|
user_id: Optional[str] = None, |
|
|
tags: Optional[List[str]] = None |
|
|
) -> str: |
|
|
"""Store data and return record ID""" |
|
|
pass |
|
|
|
|
|
@abstractmethod |
|
|
async def retrieve( |
|
|
self, |
|
|
record_id: str, |
|
|
tenant_id: Optional[str] = None |
|
|
) -> Optional[StorageRecord]: |
|
|
"""Retrieve record by ID""" |
|
|
pass |
|
|
|
|
|
@abstractmethod |
|
|
async def query( |
|
|
self, |
|
|
type: StorageType, |
|
|
query: Dict[str, Any], |
|
|
tenant_id: Optional[str] = None |
|
|
) -> List[StorageRecord]: |
|
|
"""Query records""" |
|
|
pass |
|
|
|
|
|
@abstractmethod |
|
|
async def update( |
|
|
self, |
|
|
record_id: str, |
|
|
data: Dict[str, Any], |
|
|
tenant_id: Optional[str] = None |
|
|
) -> bool: |
|
|
"""Update record""" |
|
|
pass |
|
|
|
|
|
@abstractmethod |
|
|
async def delete( |
|
|
self, |
|
|
record_id: str, |
|
|
tenant_id: Optional[str] = None |
|
|
) -> bool: |
|
|
"""Delete record""" |
|
|
pass |
|
|
|
|
|
class PostgresStorage(BaseStorage): |
|
|
"""PostgreSQL storage implementation""" |
|
|
|
|
|
def __init__(self, connection_string: str): |
|
|
self.connection_string = connection_string |
|
|
|
|
|
|
|
|
async def store(self, type: StorageType, data: Dict[str, Any], **kwargs) -> str: |
|
|
record_id = str(uuid.uuid4()) |
|
|
|
|
|
return record_id |
|
|
|
|
|
class MongoStorage(BaseStorage): |
|
|
"""MongoDB storage implementation""" |
|
|
|
|
|
def __init__(self, connection_string: str): |
|
|
self.connection_string = connection_string |
|
|
|
|
|
|
|
|
async def store(self, type: StorageType, data: Dict[str, Any], **kwargs) -> str: |
|
|
record_id = str(uuid.uuid4()) |
|
|
|
|
|
return record_id |
|
|
|
|
|
class StorageManager: |
|
|
"""Manages different storage types""" |
|
|
|
|
|
def __init__(self, config: Dict[str, Any]): |
|
|
self.config = config |
|
|
self.storages: Dict[StorageType, BaseStorage] = {} |
|
|
self._initialize_storages() |
|
|
|
|
|
def _initialize_storages(self): |
|
|
"""Initialize storage backends based on configuration""" |
|
|
for type in StorageType: |
|
|
storage_config = self.config.get(type.value, {}) |
|
|
provider = storage_config.get('provider', 'postgres') |
|
|
|
|
|
if provider == 'postgres': |
|
|
self.storages[type] = PostgresStorage( |
|
|
storage_config['connection_string'] |
|
|
) |
|
|
elif provider == 'mongodb': |
|
|
self.storages[type] = MongoStorage( |
|
|
storage_config['connection_string'] |
|
|
) |
|
|
|
|
|
async def store( |
|
|
self, |
|
|
type: StorageType, |
|
|
data: Dict[str, Any], |
|
|
**kwargs |
|
|
) -> str: |
|
|
"""Store data in appropriate storage""" |
|
|
storage = self.storages.get(type) |
|
|
if not storage: |
|
|
raise ValueError(f"No storage configured for type: {type}") |
|
|
|
|
|
return await storage.store(type, data, **kwargs) |
|
|
|
|
|
async def retrieve( |
|
|
self, |
|
|
type: StorageType, |
|
|
record_id: str, |
|
|
**kwargs |
|
|
) -> Optional[StorageRecord]: |
|
|
"""Retrieve data from appropriate storage""" |
|
|
storage = self.storages.get(type) |
|
|
if not storage: |
|
|
raise ValueError(f"No storage configured for type: {type}") |
|
|
|
|
|
return await storage.retrieve(record_id, **kwargs) |
|
|
|
|
|
|
|
|
storage_config = { |
|
|
'chat': { |
|
|
'provider': 'mongodb', |
|
|
'connection_string': 'mongodb://localhost:27017' |
|
|
}, |
|
|
'document': { |
|
|
'provider': 'postgres', |
|
|
'connection_string': 'postgresql://user:pass@localhost/db' |
|
|
} |
|
|
} |
|
|
|
|
|
storage_manager = StorageManager(storage_config) |