intelligent-pid / storage.py
msIntui
Fix storage initialization with better fallback handling
58bdeac
import os
import shutil
from abc import ABC, abstractmethod
import json
try:
from azure.storage.blob import BlobServiceClient
AZURE_AVAILABLE = True
except ImportError:
AZURE_AVAILABLE = False
class StorageInterface(ABC):
@abstractmethod
def save_file(self, file_path: str, content: bytes) -> str:
pass
@abstractmethod
def load_file(self, file_path: str) -> bytes:
pass
@abstractmethod
def list_files(self, directory: str) -> list[str]:
pass
@abstractmethod
def file_exists(self, file_path: str) -> bool:
pass
@abstractmethod
def delete_file(self, file_path: str) -> None:
pass
@abstractmethod
def create_directory(self, directory: str) -> None:
pass
@abstractmethod
def delete_directory(self, directory: str) -> None:
pass
@abstractmethod
def upload(self, local_path: str, destination_path: str) -> None:
pass
@abstractmethod
def append_file(self, file_path: str, content: bytes) -> None:
pass
@abstractmethod
def get_modified_time(self, file_path: str) -> float:
pass
@abstractmethod
def directory_exists(self, directory: str) -> bool:
pass
def load_json(self, file_path):
"""Load and parse JSON file."""
try:
with open(file_path, 'r', encoding='utf-8') as f:
data = json.load(f)
return data
except Exception as e:
print(f"Error loading JSON from {file_path}: {str(e)}")
return None
class LocalStorage(StorageInterface):
def save_file(self, file_path: str, content: bytes) -> str:
os.makedirs(os.path.dirname(file_path), exist_ok=True)
with open(file_path, 'wb') as f:
f.write(content)
return file_path
def load_file(self, file_path: str) -> bytes:
with open(file_path, 'rb') as f:
return f.read()
def list_files(self, directory: str) -> list[str]:
return [f for f in os.listdir(directory) if os.path.isfile(os.path.join(directory, f))]
def file_exists(self, file_path: str) -> bool:
return os.path.exists(file_path)
def delete_file(self, file_path: str) -> None:
os.remove(file_path)
def create_directory(self, directory: str) -> None:
os.makedirs(directory, exist_ok=True)
def delete_directory(self, directory: str) -> None:
shutil.rmtree(directory)
def upload(self, local_path: str, destination_path: str) -> None:
os.makedirs(os.path.dirname(destination_path), exist_ok=True)
shutil.copy(local_path, destination_path)
def append_file(self, file_path: str, content: bytes) -> None:
os.makedirs(os.path.dirname(file_path), exist_ok=True)
with open(file_path, 'ab') as f:
f.write(content)
def get_modified_time(self, file_path: str) -> float:
return os.path.getmtime(file_path)
def directory_exists(self, directory: str) -> bool:
return self.file_exists(directory)
class BlobStorage(StorageInterface):
"""
Writes to blob storage, using local disk as a cache
TODO: Allow configuration of temp dir instead of just using the same paths in both local and remote
"""
def __init__(self, connection_string: str, container_name: str):
self.blob_service_client = BlobServiceClient.from_connection_string(connection_string)
self.container_client = self.blob_service_client.get_container_client(container_name)
self.local_storage = LocalStorage()
def download(self, file_path: str) -> bytes:
blob_client = self.container_client.get_blob_client(file_path)
return blob_client.download_blob().readall()
def sync(self, file_path: str) -> None:
if not self.local_storage.file_exists(file_path):
print(f"DEBUG: missing local version of {file_path} - downloading")
self.local_storage.save_file(file_path, self.download(file_path))
else:
local_timestamp = self.local_storage.get_modified_time(file_path)
remote_timestamp = self.get_modified_time(file_path)
if local_timestamp < remote_timestamp:
# We always write remotely before writing locally, so we expect local_timestamp to be > remote timestamp
print(f"DBEUG: local version of {file_path} out of date - downloading")
self.local_storage.save_file(file_path, self.download(file_path))
def save_file(self, file_path: str, content: bytes) -> str:
blob_client = self.container_client.get_blob_client(file_path)
blob_client.upload_blob(content, overwrite=True)
self.local_storage.save_file(file_path, content)
return file_path
def load_file(self, file_path: str) -> bytes:
self.sync(file_path)
return self.local_storage.load_file(file_path)
def list_files(self, directory: str) -> list[str]:
return [blob.name for blob in self.container_client.list_blobs(name_starts_with=directory)]
def file_exists(self, file_path: str) -> bool:
blob_client = self.container_client.get_blob_client(file_path)
return blob_client.exists()
def delete_file(self, file_path: str) -> None:
self.local_storage.delete_file(file_path)
blob_client = self.container_client.get_blob_client(file_path)
blob_client.delete_blob()
def create_directory(self, directory: str) -> None:
# Blob storage doesn't have directories, so only create it locally
self.local_storage.create_directory(directory)
def delete_directory(self, directory: str) -> None:
self.local_storage.delete_directory(directory)
blobs_to_delete = self.container_client.list_blobs(name_starts_with=directory)
for blob in blobs_to_delete:
self.container_client.delete_blob(blob.name)
def upload(self, local_path: str, destination_path: str) -> None:
with open(local_path, "rb") as data:
blob_client = self.container_client.get_blob_client(destination_path)
blob_client.upload_blob(data, overwrite=True)
self.local_storage.upload(local_path, destination_path)
def append_file(self, file_path: str, content: bytes) -> None:
blob_client = self.container_client.get_blob_client(file_path)
if not blob_client.exists():
blob_client.create_append_blob()
else:
self.sync(file_path)
blob_client.append_block(content)
self.local_storage.append_file(file_path, content)
def get_modified_time(self, file_path: str) -> float:
blob_client = self.container_client.get_blob_client(file_path)
properties = blob_client.get_blob_properties()
# Convert the UTC datetime to a UNIX timestamp
return properties.last_modified.timestamp()
def directory_exists(self, directory: str) -> bool:
blobs = self.container_client.list_blobs(name_starts_with=directory)
return next(blobs, None) is not None
class StorageFactory:
@staticmethod
def get_storage() -> StorageInterface:
storage_type = os.getenv('STORAGE_TYPE', 'local')
# Always return LocalStorage if STORAGE_TYPE is 'local'
if storage_type == 'local':
return LocalStorage()
# Handle Azure storage with fallback
if storage_type in ['azure', 'blob']:
# If Azure SDK isn't available, fall back to local
if not AZURE_AVAILABLE:
print("Warning: Azure Storage SDK not available, falling back to local storage")
return LocalStorage()
# Try to get Azure credentials
connection_string = os.getenv('AZURE_STORAGE_CONNECTION_STRING')
container_name = os.getenv('AZURE_STORAGE_CONTAINER_NAME')
# If credentials are missing, fall back to local
if not connection_string or not container_name:
print("Warning: Azure credentials not found, falling back to local storage")
return LocalStorage()
return BlobStorage(connection_string, container_name)
raise ValueError(f"Unsupported storage type: {storage_type}")