|
|
import os |
|
|
import shutil |
|
|
from typing import Dict, Any, List |
|
|
from pathlib import Path |
|
|
from datetime import datetime |
|
|
from abc import abstractmethod |
|
|
|
|
|
from .storage_base import StorageBase |
|
|
from ..core.logging import logger |
|
|
|
|
|
|
|
|
class FileStorageHandler(StorageBase): |
|
|
""" |
|
|
Reference implementation showing all available _raw_xxx methods. |
|
|
This class serves as a template for developers creating new storage handlers. |
|
|
Concrete handlers only need to implement the _raw_xxx methods they need. |
|
|
""" |
|
|
|
|
|
def __init__(self, base_path: str = ".", **kwargs): |
|
|
""" |
|
|
Initialize the storage handler. |
|
|
|
|
|
Args: |
|
|
base_path (str): Base directory for storage operations (default: current directory) |
|
|
**kwargs: Additional keyword arguments for parent class initialization |
|
|
""" |
|
|
super().__init__(base_path=base_path, **kwargs) |
|
|
|
|
|
|
|
|
def create(self, file_path: str, content: Any, **kwargs) -> Dict[str, Any]: |
|
|
return super().save(file_path, content, **kwargs) |
|
|
|
|
|
def read(self, file_path: str, **kwargs) -> Dict[str, Any]: |
|
|
return super().read(file_path, **kwargs) |
|
|
|
|
|
def list(self, path: str = None, max_depth: int = 3, include_hidden: bool = False) -> Dict[str, Any]: |
|
|
return super().list(path, max_depth, include_hidden) |
|
|
|
|
|
def delete(self, file_path: str, **kwargs) -> Dict[str, Any]: |
|
|
return super().delete(file_path, **kwargs) |
|
|
|
|
|
def move(self, source: str, destination: str, **kwargs) -> Dict[str, Any]: |
|
|
return super().move(source, destination, **kwargs) |
|
|
|
|
|
def copy(self, source: str, destination: str, **kwargs) -> Dict[str, Any]: |
|
|
return super().copy(source, destination, **kwargs) |
|
|
|
|
|
def create_directory(self, path: str, **kwargs) -> Dict[str, Any]: |
|
|
return super().create_directory(path, **kwargs) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@abstractmethod |
|
|
def _initialize_storage(self): |
|
|
"""Initialize storage - must be implemented by subclasses""" |
|
|
pass |
|
|
|
|
|
@abstractmethod |
|
|
def _read_raw(self, path: str, **kwargs) -> bytes: |
|
|
"""Read raw file content - must be implemented by subclasses""" |
|
|
pass |
|
|
|
|
|
@abstractmethod |
|
|
def _write_raw(self, path: str, content: bytes, **kwargs) -> bool: |
|
|
"""Write raw file content - must be implemented by subclasses""" |
|
|
pass |
|
|
|
|
|
@abstractmethod |
|
|
def _delete_raw(self, path: str) -> bool: |
|
|
"""Delete file or directory - must be implemented by subclasses""" |
|
|
pass |
|
|
|
|
|
@abstractmethod |
|
|
def _exists_raw(self, path: str) -> bool: |
|
|
"""Check if path exists - must be implemented by subclasses""" |
|
|
pass |
|
|
|
|
|
@abstractmethod |
|
|
def _create_directory_raw(self, path: str) -> bool: |
|
|
"""Create directory - must be implemented by subclasses""" |
|
|
pass |
|
|
|
|
|
@abstractmethod |
|
|
def _list_raw(self, path: str = None, **kwargs) -> List[Dict[str, Any]]: |
|
|
"""List files and directories - must be implemented by subclasses""" |
|
|
pass |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def create_file(self, file_path: str, content: Any, **kwargs) -> Dict[str, Any]: |
|
|
return self.save(file_path, content, **kwargs) |
|
|
|
|
|
def read_file(self, file_path: str, **kwargs) -> Dict[str, Any]: |
|
|
return self.read(file_path, **kwargs) |
|
|
|
|
|
def list_files(self, path: str = None, max_depth: int = 3, include_hidden: bool = False) -> Dict[str, Any]: |
|
|
return self.list(path, max_depth, include_hidden) |
|
|
|
|
|
def delete_file(self, file_path: str, **kwargs) -> Dict[str, Any]: |
|
|
return self.delete(file_path, **kwargs) |
|
|
|
|
|
def move_file(self, source: str, destination: str, **kwargs) -> Dict[str, Any]: |
|
|
return self.move(source, destination, **kwargs) |
|
|
|
|
|
def copy_file(self, source: str, destination: str, **kwargs) -> Dict[str, Any]: |
|
|
return self.copy(source, destination, **kwargs) |
|
|
|
|
|
|
|
|
class LocalStorageHandler(FileStorageHandler): |
|
|
""" |
|
|
Local filesystem storage implementation. |
|
|
Provides all file operations for local storage with default working directory. |
|
|
""" |
|
|
|
|
|
def __init__(self, base_path: str = ".", **kwargs): |
|
|
""" |
|
|
Initialize local storage handler. |
|
|
|
|
|
Args: |
|
|
base_path (str): Base directory for storage operations (default: current directory) |
|
|
**kwargs: Additional keyword arguments for parent class initialization |
|
|
""" |
|
|
super().__init__(base_path=base_path, **kwargs) |
|
|
|
|
|
def _initialize_storage(self): |
|
|
"""Initialize local storage - ensure base directory exists""" |
|
|
try: |
|
|
|
|
|
Path(self.base_path).mkdir(parents=True, exist_ok=True) |
|
|
logger.info(f"Local storage initialized with base path: {self.base_path}") |
|
|
except Exception as e: |
|
|
logger.error(f"Error initializing local storage: {str(e)}") |
|
|
raise |
|
|
|
|
|
def _read_raw(self, path: str, **kwargs) -> bytes: |
|
|
"""Read raw file content from local filesystem""" |
|
|
try: |
|
|
with open(path, 'rb') as f: |
|
|
return f.read() |
|
|
except Exception as e: |
|
|
logger.error(f"Error reading file {path}: {str(e)}") |
|
|
raise |
|
|
|
|
|
def _write_raw(self, path: str, content: bytes, **kwargs) -> bool: |
|
|
"""Write raw file content to local filesystem""" |
|
|
try: |
|
|
|
|
|
Path(path).parent.mkdir(parents=True, exist_ok=True) |
|
|
with open(path, 'wb') as f: |
|
|
f.write(content) |
|
|
return True |
|
|
except Exception as e: |
|
|
logger.error(f"Error writing file {path}: {str(e)}") |
|
|
return False |
|
|
|
|
|
def _delete_raw(self, path: str) -> bool: |
|
|
"""Delete file or directory from local filesystem""" |
|
|
try: |
|
|
path_obj = Path(path) |
|
|
if path_obj.is_file(): |
|
|
path_obj.unlink() |
|
|
elif path_obj.is_dir(): |
|
|
shutil.rmtree(path_obj) |
|
|
else: |
|
|
return False |
|
|
return True |
|
|
except Exception as e: |
|
|
logger.error(f"Error deleting {path}: {str(e)}") |
|
|
return False |
|
|
|
|
|
def _list_raw(self, path: str = None, max_depth: int = 3, include_hidden: bool = False) -> List[Dict[str, Any]]: |
|
|
"""List files and directories in local filesystem""" |
|
|
try: |
|
|
if path is None: |
|
|
path = str(self.base_path) |
|
|
|
|
|
path_obj = Path(path) |
|
|
if not path_obj.exists() or not path_obj.is_dir(): |
|
|
return [] |
|
|
|
|
|
items = [] |
|
|
|
|
|
def scan_directory(current_path: Path, current_depth: int): |
|
|
if current_depth > max_depth: |
|
|
return |
|
|
|
|
|
try: |
|
|
for item in current_path.iterdir(): |
|
|
|
|
|
if not include_hidden and item.name.startswith('.'): |
|
|
continue |
|
|
|
|
|
try: |
|
|
stat = item.stat() |
|
|
item_info = { |
|
|
"name": item.name, |
|
|
"path": str(item), |
|
|
"type": "directory" if item.is_dir() else "file", |
|
|
"size_bytes": stat.st_size if item.is_file() else 0, |
|
|
"size_mb": round(stat.st_size / (1024 * 1024), 2) if item.is_file() else 0, |
|
|
"modified_time": datetime.fromtimestamp(stat.st_mtime).isoformat(), |
|
|
"extension": item.suffix.lower() if item.is_file() else "", |
|
|
"is_hidden": item.name.startswith('.') |
|
|
} |
|
|
|
|
|
items.append(item_info) |
|
|
|
|
|
|
|
|
if item.is_dir() and current_depth < max_depth: |
|
|
scan_directory(item, current_depth + 1) |
|
|
|
|
|
except (PermissionError, OSError): |
|
|
|
|
|
continue |
|
|
|
|
|
except (PermissionError, OSError) as e: |
|
|
logger.warning(f"Error scanning directory {current_path}: {str(e)}") |
|
|
|
|
|
scan_directory(path_obj, 0) |
|
|
return items |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error listing directory {path}: {str(e)}") |
|
|
return [] |
|
|
|
|
|
def _exists_raw(self, path: str) -> bool: |
|
|
"""Check if path exists in local filesystem""" |
|
|
return Path(path).exists() |
|
|
|
|
|
def _create_directory_raw(self, path: str) -> bool: |
|
|
"""Create directory in local filesystem""" |
|
|
try: |
|
|
Path(path).mkdir(parents=True, exist_ok=True) |
|
|
return True |
|
|
except Exception as e: |
|
|
logger.error(f"Error creating directory {path}: {str(e)}") |
|
|
return False |
|
|
|
|
|
|
|
|
|
|
|
class SupabaseStorageHandler(FileStorageHandler): |
|
|
""" |
|
|
Supabase remote storage implementation. |
|
|
Provides file operations via Supabase Storage API with environment-based configuration. |
|
|
""" |
|
|
|
|
|
def __init__(self, bucket_name: str = None, base_path: str = "/", **kwargs): |
|
|
""" |
|
|
Initialize Supabase storage handler. |
|
|
|
|
|
Args: |
|
|
bucket_name: Supabase storage bucket name (default: from environment or "default") |
|
|
base_path: Base path for storage operations (default: "/") |
|
|
**kwargs: Additional keyword arguments for parent class initialization |
|
|
""" |
|
|
|
|
|
super().__init__(base_path=base_path, **kwargs) |
|
|
|
|
|
|
|
|
self.bucket_name = bucket_name or os.getenv("SUPABASE_BUCKET_STORAGE") or "default" |
|
|
self.supabase_url = os.getenv("SUPABASE_URL_STORAGE") |
|
|
self.supabase_key = os.getenv("SUPABASE_KEY_STORAGE") |
|
|
|
|
|
if not self.supabase_url or not self.supabase_key: |
|
|
raise ValueError( |
|
|
"Supabase configuration not found in environment variables. " |
|
|
"Please set SUPABASE_URL/SUPABASE_KEY environment variables." |
|
|
) |
|
|
|
|
|
|
|
|
try: |
|
|
from supabase import create_client, Client |
|
|
logger.info(f"Creating Supabase client with URL: {self.supabase_url[:30]}...") |
|
|
self.supabase: Client = create_client(self.supabase_url, self.supabase_key) |
|
|
logger.info(f"Successfully initialized Supabase client for bucket: {bucket_name}") |
|
|
except ImportError: |
|
|
raise ImportError( |
|
|
"Supabase Python client not installed. " |
|
|
"Please install it with: pip install supabase" |
|
|
) |
|
|
except Exception as e: |
|
|
logger.error(f"Failed to initialize Supabase client: {str(e)}") |
|
|
raise Exception(f"Failed to initialize Supabase client: {str(e)}") |
|
|
|
|
|
|
|
|
self._initialize_storage() |
|
|
|
|
|
def _initialize_storage(self): |
|
|
"""Initialize remote storage - verify bucket exists and is accessible""" |
|
|
|
|
|
if not hasattr(self, 'bucket_name') or not hasattr(self, 'supabase'): |
|
|
|
|
|
|
|
|
return |
|
|
|
|
|
try: |
|
|
|
|
|
logger.info(f"Testing bucket access for: {self.bucket_name}") |
|
|
self.supabase.storage.from_(self.bucket_name).list() |
|
|
logger.info(f"Successfully connected to Supabase bucket: {self.bucket_name}") |
|
|
except Exception as e: |
|
|
logger.warning(f"Could not verify bucket access: {str(e)}") |
|
|
|
|
|
|
|
|
def translate_in(self, file_path: str) -> str: |
|
|
"""Resolve file path for remote storage""" |
|
|
|
|
|
|
|
|
if self.base_path == "/": |
|
|
|
|
|
return file_path.lstrip('/') |
|
|
else: |
|
|
|
|
|
return super().translate_in(file_path) |
|
|
|
|
|
def _read_raw(self, path: str, **kwargs) -> bytes: |
|
|
"""Read raw file content from Supabase Storage""" |
|
|
try: |
|
|
|
|
|
file_path = path.lstrip('/') |
|
|
|
|
|
|
|
|
response = self.supabase.storage.from_(self.bucket_name).download(file_path) |
|
|
|
|
|
if isinstance(response, bytes): |
|
|
return response |
|
|
else: |
|
|
|
|
|
return bytes(response) if response else b"" |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error reading file {path} from Supabase: {str(e)}") |
|
|
raise |
|
|
|
|
|
def _write_raw(self, path: str, content: bytes, **kwargs) -> bool: |
|
|
"""Write raw file content to Supabase Storage with smart insert/update logic""" |
|
|
try: |
|
|
|
|
|
file_path = path.lstrip('/') |
|
|
|
|
|
|
|
|
file_exists = self._exists_raw(file_path) |
|
|
|
|
|
if file_exists: |
|
|
|
|
|
logger.info(f"File {file_path} exists, using update method") |
|
|
response = self.supabase.storage.from_(self.bucket_name).update( |
|
|
path=file_path, |
|
|
file=content, |
|
|
file_options={ |
|
|
"content-type": kwargs.get("content_type", "application/octet-stream"), |
|
|
"upsert": "true" |
|
|
} |
|
|
) |
|
|
else: |
|
|
|
|
|
logger.info(f"File {file_path} doesn't exist, using upload method") |
|
|
response = self.supabase.storage.from_(self.bucket_name).upload( |
|
|
path=file_path, |
|
|
file=content, |
|
|
file_options={"content-type": kwargs.get("content_type", "application/octet-stream")} |
|
|
) |
|
|
|
|
|
|
|
|
if response and (not isinstance(response, dict) or response.get("error") is None): |
|
|
operation = "updated" if file_exists else "uploaded" |
|
|
logger.info(f"Successfully {operation} file to Supabase: {file_path}") |
|
|
return True |
|
|
else: |
|
|
logger.error(f"Operation failed: {response}") |
|
|
return False |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error writing file {path} to Supabase: {str(e)}") |
|
|
return False |
|
|
|
|
|
def _delete_raw(self, path: str) -> bool: |
|
|
"""Delete file from Supabase Storage""" |
|
|
try: |
|
|
|
|
|
file_path = path.lstrip('/') |
|
|
|
|
|
|
|
|
response = self.supabase.storage.from_(self.bucket_name).remove([file_path]) |
|
|
|
|
|
|
|
|
|
|
|
if response is not None: |
|
|
if isinstance(response, list): |
|
|
|
|
|
logger.info(f"Successfully deleted file from Supabase: {file_path}") |
|
|
return True |
|
|
elif isinstance(response, dict) and response.get("error") is None: |
|
|
|
|
|
logger.info(f"Successfully deleted file from Supabase: {file_path}") |
|
|
return True |
|
|
else: |
|
|
logger.error(f"Deletion failed: {response}") |
|
|
return False |
|
|
else: |
|
|
logger.error(f"Deletion failed: {response}") |
|
|
return False |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error deleting {path} from Supabase: {str(e)}") |
|
|
return False |
|
|
|
|
|
def _list_raw(self, path: str = None, max_depth: int = 3, include_hidden: bool = False) -> List[Dict[str, Any]]: |
|
|
"""List files in Supabase Storage""" |
|
|
try: |
|
|
|
|
|
list_path = (path or self.base_path).lstrip('/') |
|
|
|
|
|
|
|
|
response = self.supabase.storage.from_(self.bucket_name).list(list_path) |
|
|
|
|
|
items = [] |
|
|
if response and isinstance(response, list): |
|
|
for item in response: |
|
|
|
|
|
if not include_hidden and item.get('name', '').startswith('.'): |
|
|
continue |
|
|
|
|
|
|
|
|
full_path = f"{list_path}/{item['name']}" if list_path else item['name'] |
|
|
|
|
|
items.append({ |
|
|
"name": item.get('name', ''), |
|
|
"path": full_path, |
|
|
"type": "directory" if item.get('metadata', {}).get('mimetype') == 'application/x-directory' else "file", |
|
|
"size_bytes": item.get('metadata', {}).get('size', 0), |
|
|
"size_mb": round(item.get('metadata', {}).get('size', 0) / (1024 * 1024), 2), |
|
|
"modified_time": item.get('updated_at', ''), |
|
|
"extension": Path(item.get('name', '')).suffix.lower(), |
|
|
"is_hidden": item.get('name', '').startswith('.'), |
|
|
"mime_type": item.get('metadata', {}).get('mimetype', '') |
|
|
}) |
|
|
|
|
|
return items |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error listing directory {path} from Supabase: {str(e)}") |
|
|
return [] |
|
|
|
|
|
def _exists_raw(self, path: str) -> bool: |
|
|
"""Check if path exists in Supabase Storage""" |
|
|
try: |
|
|
|
|
|
file_path = path.lstrip('/') |
|
|
|
|
|
|
|
|
parent_dir = os.path.dirname(file_path) |
|
|
file_name = os.path.basename(file_path) |
|
|
|
|
|
|
|
|
if not parent_dir: |
|
|
parent_dir = "" |
|
|
|
|
|
try: |
|
|
|
|
|
response = self.supabase.storage.from_(self.bucket_name).list(parent_dir) |
|
|
|
|
|
if response and isinstance(response, list): |
|
|
|
|
|
for item in response: |
|
|
if item.get('name') == file_name: |
|
|
return True |
|
|
|
|
|
return False |
|
|
|
|
|
except Exception as e: |
|
|
logger.warning(f"Error listing directory {parent_dir}: {str(e)}") |
|
|
return False |
|
|
|
|
|
except Exception as e: |
|
|
logger.warning(f"Error checking if file {path} exists: {str(e)}") |
|
|
return False |
|
|
|
|
|
def _create_directory_raw(self, path: str) -> bool: |
|
|
"""Create directory in Supabase Storage""" |
|
|
try: |
|
|
|
|
|
dir_path = path.lstrip('/') |
|
|
|
|
|
|
|
|
placeholder_content = b"# Directory placeholder" |
|
|
placeholder_path = f"{dir_path}/.placeholder" |
|
|
|
|
|
response = self.supabase.storage.from_(self.bucket_name).upload( |
|
|
path=placeholder_path, |
|
|
file=placeholder_content, |
|
|
file_options={"content-type": "text/plain"} |
|
|
) |
|
|
|
|
|
|
|
|
if response and not isinstance(response, dict) or response.get("error") is None: |
|
|
return True |
|
|
else: |
|
|
logger.error(f"Directory creation failed: {response}") |
|
|
return False |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error creating directory {path} in Supabase: {str(e)}") |
|
|
return False |
|
|
|
|
|
|