cryogenic22's picture
Create core/storage/base.py
d99a7d6 verified
from abc import ABC, abstractmethod
from typing import Dict, List, Any, Optional, Union
from datetime import datetime
import uuid
from enum import Enum
from pydantic import BaseModel
class StorageType(Enum):
CHAT = "chat"
DOCUMENT = "document"
VECTOR = "vector"
AUDIT = "audit"
USER = "user"
APP_STATE = "app_state"
ANALYTICS = "analytics"
class StorageRecord(BaseModel):
"""Base storage record"""
id: str
type: StorageType
data: Dict[str, Any]
metadata: Dict[str, Any]
created_at: datetime
updated_at: datetime
tenant_id: Optional[str]
user_id: Optional[str]
tags: List[str] = []
class BaseStorage(ABC):
"""Abstract base storage interface"""
@abstractmethod
async def store(
self,
type: StorageType,
data: Dict[str, Any],
metadata: Optional[Dict[str, Any]] = None,
tenant_id: Optional[str] = None,
user_id: Optional[str] = None,
tags: Optional[List[str]] = None
) -> str:
"""Store data and return record ID"""
pass
@abstractmethod
async def retrieve(
self,
record_id: str,
tenant_id: Optional[str] = None
) -> Optional[StorageRecord]:
"""Retrieve record by ID"""
pass
@abstractmethod
async def query(
self,
type: StorageType,
query: Dict[str, Any],
tenant_id: Optional[str] = None
) -> List[StorageRecord]:
"""Query records"""
pass
@abstractmethod
async def update(
self,
record_id: str,
data: Dict[str, Any],
tenant_id: Optional[str] = None
) -> bool:
"""Update record"""
pass
@abstractmethod
async def delete(
self,
record_id: str,
tenant_id: Optional[str] = None
) -> bool:
"""Delete record"""
pass
class PostgresStorage(BaseStorage):
"""PostgreSQL storage implementation"""
def __init__(self, connection_string: str):
self.connection_string = connection_string
# Initialize connection pool
async def store(self, type: StorageType, data: Dict[str, Any], **kwargs) -> str:
record_id = str(uuid.uuid4())
# Implementation using asyncpg
return record_id
class MongoStorage(BaseStorage):
"""MongoDB storage implementation"""
def __init__(self, connection_string: str):
self.connection_string = connection_string
# Initialize MongoDB client
async def store(self, type: StorageType, data: Dict[str, Any], **kwargs) -> str:
record_id = str(uuid.uuid4())
# Implementation using motor
return record_id
class StorageManager:
"""Manages different storage types"""
def __init__(self, config: Dict[str, Any]):
self.config = config
self.storages: Dict[StorageType, BaseStorage] = {}
self._initialize_storages()
def _initialize_storages(self):
"""Initialize storage backends based on configuration"""
for type in StorageType:
storage_config = self.config.get(type.value, {})
provider = storage_config.get('provider', 'postgres')
if provider == 'postgres':
self.storages[type] = PostgresStorage(
storage_config['connection_string']
)
elif provider == 'mongodb':
self.storages[type] = MongoStorage(
storage_config['connection_string']
)
async def store(
self,
type: StorageType,
data: Dict[str, Any],
**kwargs
) -> str:
"""Store data in appropriate storage"""
storage = self.storages.get(type)
if not storage:
raise ValueError(f"No storage configured for type: {type}")
return await storage.store(type, data, **kwargs)
async def retrieve(
self,
type: StorageType,
record_id: str,
**kwargs
) -> Optional[StorageRecord]:
"""Retrieve data from appropriate storage"""
storage = self.storages.get(type)
if not storage:
raise ValueError(f"No storage configured for type: {type}")
return await storage.retrieve(record_id, **kwargs)
# Example usage:
storage_config = {
'chat': {
'provider': 'mongodb',
'connection_string': 'mongodb://localhost:27017'
},
'document': {
'provider': 'postgres',
'connection_string': 'postgresql://user:pass@localhost/db'
}
}
storage_manager = StorageManager(storage_config)