selfevolveagent / evoagentx /tools /storage_handler.py
iLOVE2D's picture
Upload 2846 files
5374a2d verified
import os
import shutil
from typing import Dict, Any, List
from pathlib import Path
from datetime import datetime
from abc import abstractmethod
from .storage_base import StorageBase
from ..core.logging import logger
class FileStorageHandler(StorageBase):
"""
Reference implementation showing all available _raw_xxx methods.
This class serves as a template for developers creating new storage handlers.
Concrete handlers only need to implement the _raw_xxx methods they need.
"""
def __init__(self, base_path: str = ".", **kwargs):
"""
Initialize the storage handler.
Args:
base_path (str): Base directory for storage operations (default: current directory)
**kwargs: Additional keyword arguments for parent class initialization
"""
super().__init__(base_path=base_path, **kwargs)
# ____________________ How to use it ____________________ #
def create(self, file_path: str, content: Any, **kwargs) -> Dict[str, Any]:
return super().save(file_path, content, **kwargs)
def read(self, file_path: str, **kwargs) -> Dict[str, Any]:
return super().read(file_path, **kwargs)
def list(self, path: str = None, max_depth: int = 3, include_hidden: bool = False) -> Dict[str, Any]:
return super().list(path, max_depth, include_hidden)
def delete(self, file_path: str, **kwargs) -> Dict[str, Any]:
return super().delete(file_path, **kwargs)
def move(self, source: str, destination: str, **kwargs) -> Dict[str, Any]:
return super().move(source, destination, **kwargs)
def copy(self, source: str, destination: str, **kwargs) -> Dict[str, Any]:
return super().copy(source, destination, **kwargs)
def create_directory(self, path: str, **kwargs) -> Dict[str, Any]:
return super().create_directory(path, **kwargs)
# ____________________ Required Methods ____________________ #
@abstractmethod
def _initialize_storage(self):
"""Initialize storage - must be implemented by subclasses"""
pass
@abstractmethod
def _read_raw(self, path: str, **kwargs) -> bytes:
"""Read raw file content - must be implemented by subclasses"""
pass
@abstractmethod
def _write_raw(self, path: str, content: bytes, **kwargs) -> bool:
"""Write raw file content - must be implemented by subclasses"""
pass
@abstractmethod
def _delete_raw(self, path: str) -> bool:
"""Delete file or directory - must be implemented by subclasses"""
pass
@abstractmethod
def _exists_raw(self, path: str) -> bool:
"""Check if path exists - must be implemented by subclasses"""
pass
@abstractmethod
def _create_directory_raw(self, path: str) -> bool:
"""Create directory - must be implemented by subclasses"""
pass
@abstractmethod
def _list_raw(self, path: str = None, **kwargs) -> List[Dict[str, Any]]:
"""List files and directories - must be implemented by subclasses"""
pass
# ____________________ Extra Mapping ____________________ #
def create_file(self, file_path: str, content: Any, **kwargs) -> Dict[str, Any]:
return self.save(file_path, content, **kwargs)
def read_file(self, file_path: str, **kwargs) -> Dict[str, Any]:
return self.read(file_path, **kwargs)
def list_files(self, path: str = None, max_depth: int = 3, include_hidden: bool = False) -> Dict[str, Any]:
return self.list(path, max_depth, include_hidden)
def delete_file(self, file_path: str, **kwargs) -> Dict[str, Any]:
return self.delete(file_path, **kwargs)
def move_file(self, source: str, destination: str, **kwargs) -> Dict[str, Any]:
return self.move(source, destination, **kwargs)
def copy_file(self, source: str, destination: str, **kwargs) -> Dict[str, Any]:
return self.copy(source, destination, **kwargs)
class LocalStorageHandler(FileStorageHandler):
"""
Local filesystem storage implementation.
Provides all file operations for local storage with default working directory.
"""
def __init__(self, base_path: str = ".", **kwargs):
"""
Initialize local storage handler.
Args:
base_path (str): Base directory for storage operations (default: current directory)
**kwargs: Additional keyword arguments for parent class initialization
"""
super().__init__(base_path=base_path, **kwargs)
def _initialize_storage(self):
"""Initialize local storage - ensure base directory exists"""
try:
# Ensure the base directory exists
Path(self.base_path).mkdir(parents=True, exist_ok=True)
logger.info(f"Local storage initialized with base path: {self.base_path}")
except Exception as e:
logger.error(f"Error initializing local storage: {str(e)}")
raise
def _read_raw(self, path: str, **kwargs) -> bytes:
"""Read raw file content from local filesystem"""
try:
with open(path, 'rb') as f:
return f.read()
except Exception as e:
logger.error(f"Error reading file {path}: {str(e)}")
raise
def _write_raw(self, path: str, content: bytes, **kwargs) -> bool:
"""Write raw file content to local filesystem"""
try:
# Ensure directory exists
Path(path).parent.mkdir(parents=True, exist_ok=True)
with open(path, 'wb') as f:
f.write(content)
return True
except Exception as e:
logger.error(f"Error writing file {path}: {str(e)}")
return False
def _delete_raw(self, path: str) -> bool:
"""Delete file or directory from local filesystem"""
try:
path_obj = Path(path)
if path_obj.is_file():
path_obj.unlink()
elif path_obj.is_dir():
shutil.rmtree(path_obj)
else:
return False
return True
except Exception as e:
logger.error(f"Error deleting {path}: {str(e)}")
return False
def _list_raw(self, path: str = None, max_depth: int = 3, include_hidden: bool = False) -> List[Dict[str, Any]]:
"""List files and directories in local filesystem"""
try:
if path is None:
path = str(self.base_path)
path_obj = Path(path)
if not path_obj.exists() or not path_obj.is_dir():
return []
items = []
def scan_directory(current_path: Path, current_depth: int):
if current_depth > max_depth:
return
try:
for item in current_path.iterdir():
# Skip hidden files if not included
if not include_hidden and item.name.startswith('.'):
continue
try:
stat = item.stat()
item_info = {
"name": item.name,
"path": str(item),
"type": "directory" if item.is_dir() else "file",
"size_bytes": stat.st_size if item.is_file() else 0,
"size_mb": round(stat.st_size / (1024 * 1024), 2) if item.is_file() else 0,
"modified_time": datetime.fromtimestamp(stat.st_mtime).isoformat(),
"extension": item.suffix.lower() if item.is_file() else "",
"is_hidden": item.name.startswith('.')
}
items.append(item_info)
# Recursively scan subdirectories
if item.is_dir() and current_depth < max_depth:
scan_directory(item, current_depth + 1)
except (PermissionError, OSError):
# Skip files we can't access
continue
except (PermissionError, OSError) as e:
logger.warning(f"Error scanning directory {current_path}: {str(e)}")
scan_directory(path_obj, 0)
return items
except Exception as e:
logger.error(f"Error listing directory {path}: {str(e)}")
return []
def _exists_raw(self, path: str) -> bool:
"""Check if path exists in local filesystem"""
return Path(path).exists()
def _create_directory_raw(self, path: str) -> bool:
"""Create directory in local filesystem"""
try:
Path(path).mkdir(parents=True, exist_ok=True)
return True
except Exception as e:
logger.error(f"Error creating directory {path}: {str(e)}")
return False
class SupabaseStorageHandler(FileStorageHandler):
"""
Supabase remote storage implementation.
Provides file operations via Supabase Storage API with environment-based configuration.
"""
def __init__(self, bucket_name: str = None, base_path: str = "/", **kwargs):
"""
Initialize Supabase storage handler.
Args:
bucket_name: Supabase storage bucket name (default: from environment or "default")
base_path: Base path for storage operations (default: "/")
**kwargs: Additional keyword arguments for parent class initialization
"""
# Call parent constructor first
super().__init__(base_path=base_path, **kwargs)
# Get bucket name from environment or use default
self.bucket_name = bucket_name or os.getenv("SUPABASE_BUCKET_STORAGE") or "default"
self.supabase_url = os.getenv("SUPABASE_URL_STORAGE")
self.supabase_key = os.getenv("SUPABASE_KEY_STORAGE")
if not self.supabase_url or not self.supabase_key:
raise ValueError(
"Supabase configuration not found in environment variables. "
"Please set SUPABASE_URL/SUPABASE_KEY environment variables."
)
# Initialize Supabase client
try:
from supabase import create_client, Client
logger.info(f"Creating Supabase client with URL: {self.supabase_url[:30]}...")
self.supabase: Client = create_client(self.supabase_url, self.supabase_key)
logger.info(f"Successfully initialized Supabase client for bucket: {bucket_name}")
except ImportError:
raise ImportError(
"Supabase Python client not installed. "
"Please install it with: pip install supabase"
)
except Exception as e:
logger.error(f"Failed to initialize Supabase client: {str(e)}")
raise Exception(f"Failed to initialize Supabase client: {str(e)}")
# Initialize storage after all attributes are set
self._initialize_storage()
def _initialize_storage(self):
"""Initialize remote storage - verify bucket exists and is accessible"""
# Check if required attributes are available
if not hasattr(self, 'bucket_name') or not hasattr(self, 'supabase'):
# If attributes aren't set yet, skip initialization
# This will be called again after attributes are set in __init__
return
try:
# Test bucket access by listing files (empty list is fine)
logger.info(f"Testing bucket access for: {self.bucket_name}")
self.supabase.storage.from_(self.bucket_name).list()
logger.info(f"Successfully connected to Supabase bucket: {self.bucket_name}")
except Exception as e:
logger.warning(f"Could not verify bucket access: {str(e)}")
# Don't raise error as bucket might be empty or have different permissions
def translate_in(self, file_path: str) -> str:
"""Resolve file path for remote storage"""
# Use the translate_in method to combine base_path with file_path
# For Supabase, we need to handle the special case where base_path is "/"
if self.base_path == "/":
# If base_path is "/", just clean the file_path
return file_path.lstrip('/')
else:
# Use the standard translate_in method
return super().translate_in(file_path)
def _read_raw(self, path: str, **kwargs) -> bytes:
"""Read raw file content from Supabase Storage"""
try:
# Remove leading slash if present
file_path = path.lstrip('/')
# Download file from Supabase Storage
response = self.supabase.storage.from_(self.bucket_name).download(file_path)
if isinstance(response, bytes):
return response
else:
# If response is not bytes, try to convert
return bytes(response) if response else b""
except Exception as e:
logger.error(f"Error reading file {path} from Supabase: {str(e)}")
raise
def _write_raw(self, path: str, content: bytes, **kwargs) -> bool:
"""Write raw file content to Supabase Storage with smart insert/update logic"""
try:
# Remove leading slash if present
file_path = path.lstrip('/')
# Check if file already exists
file_exists = self._exists_raw(file_path)
if file_exists:
# File exists, use update method
logger.info(f"File {file_path} exists, using update method")
response = self.supabase.storage.from_(self.bucket_name).update(
path=file_path,
file=content,
file_options={
"content-type": kwargs.get("content_type", "application/octet-stream"),
"upsert": "true" # Ensure update works even if there are issues
}
)
else:
# File doesn't exist, use upload method
logger.info(f"File {file_path} doesn't exist, using upload method")
response = self.supabase.storage.from_(self.bucket_name).upload(
path=file_path,
file=content,
file_options={"content-type": kwargs.get("content_type", "application/octet-stream")}
)
# Check if operation was successful
if response and (not isinstance(response, dict) or response.get("error") is None):
operation = "updated" if file_exists else "uploaded"
logger.info(f"Successfully {operation} file to Supabase: {file_path}")
return True
else:
logger.error(f"Operation failed: {response}")
return False
except Exception as e:
logger.error(f"Error writing file {path} to Supabase: {str(e)}")
return False
def _delete_raw(self, path: str) -> bool:
"""Delete file from Supabase Storage"""
try:
# Remove leading slash if present
file_path = path.lstrip('/')
# Delete file from Supabase Storage
response = self.supabase.storage.from_(self.bucket_name).remove([file_path])
# Check if deletion was successful
# Supabase remove() returns an empty list [] when successful
if response is not None:
if isinstance(response, list):
# Empty list means successful deletion
logger.info(f"Successfully deleted file from Supabase: {file_path}")
return True
elif isinstance(response, dict) and response.get("error") is None:
# Some responses might be dict format
logger.info(f"Successfully deleted file from Supabase: {file_path}")
return True
else:
logger.error(f"Deletion failed: {response}")
return False
else:
logger.error(f"Deletion failed: {response}")
return False
except Exception as e:
logger.error(f"Error deleting {path} from Supabase: {str(e)}")
return False
def _list_raw(self, path: str = None, max_depth: int = 3, include_hidden: bool = False) -> List[Dict[str, Any]]:
"""List files in Supabase Storage"""
try:
# Remove leading slash if present
list_path = (path or self.base_path).lstrip('/')
# List files from Supabase Storage
response = self.supabase.storage.from_(self.bucket_name).list(list_path)
items = []
if response and isinstance(response, list):
for item in response:
# Skip hidden files if not included
if not include_hidden and item.get('name', '').startswith('.'):
continue
# Calculate full path
full_path = f"{list_path}/{item['name']}" if list_path else item['name']
items.append({
"name": item.get('name', ''),
"path": full_path,
"type": "directory" if item.get('metadata', {}).get('mimetype') == 'application/x-directory' else "file",
"size_bytes": item.get('metadata', {}).get('size', 0),
"size_mb": round(item.get('metadata', {}).get('size', 0) / (1024 * 1024), 2),
"modified_time": item.get('updated_at', ''),
"extension": Path(item.get('name', '')).suffix.lower(),
"is_hidden": item.get('name', '').startswith('.'),
"mime_type": item.get('metadata', {}).get('mimetype', '')
})
return items
except Exception as e:
logger.error(f"Error listing directory {path} from Supabase: {str(e)}")
return []
def _exists_raw(self, path: str) -> bool:
"""Check if path exists in Supabase Storage"""
try:
# Remove leading slash if present
file_path = path.lstrip('/')
# Get the parent directory and filename
parent_dir = os.path.dirname(file_path)
file_name = os.path.basename(file_path)
# If no parent directory, check root
if not parent_dir:
parent_dir = ""
try:
# List files in the parent directory
response = self.supabase.storage.from_(self.bucket_name).list(parent_dir)
if response and isinstance(response, list):
# Check if our filename exists in the directory
for item in response:
if item.get('name') == file_name:
return True
return False
except Exception as e:
logger.warning(f"Error listing directory {parent_dir}: {str(e)}")
return False
except Exception as e:
logger.warning(f"Error checking if file {path} exists: {str(e)}")
return False
def _create_directory_raw(self, path: str) -> bool:
"""Create directory in Supabase Storage"""
try:
# Remove leading slash if present
dir_path = path.lstrip('/')
# Create a placeholder file to establish the directory
placeholder_content = b"# Directory placeholder"
placeholder_path = f"{dir_path}/.placeholder"
response = self.supabase.storage.from_(self.bucket_name).upload(
path=placeholder_path,
file=placeholder_content,
file_options={"content-type": "text/plain"}
)
# Check if upload was successful
if response and not isinstance(response, dict) or response.get("error") is None:
return True
else:
logger.error(f"Directory creation failed: {response}")
return False
except Exception as e:
logger.error(f"Error creating directory {path} in Supabase: {str(e)}")
return False