| | from pathlib import Path |
| |
|
| | from aiofile import async_open |
| | from loguru import logger |
| |
|
| | from .service import StorageService |
| |
|
| |
|
| | class LocalStorageService(StorageService): |
| | """A service class for handling local storage operations without aiofiles.""" |
| |
|
| | def __init__(self, session_service, settings_service) -> None: |
| | """Initialize the local storage service with session and settings services.""" |
| | super().__init__(session_service, settings_service) |
| | self.data_dir = Path(settings_service.settings.config_dir) |
| | self.set_ready() |
| |
|
| | def build_full_path(self, flow_id: str, file_name: str) -> str: |
| | """Build the full path of a file in the local storage.""" |
| | return str(self.data_dir / flow_id / file_name) |
| |
|
| | async def save_file(self, flow_id: str, file_name: str, data: bytes) -> None: |
| | """Save a file in the local storage. |
| | |
| | :param flow_id: The identifier for the flow. |
| | :param file_name: The name of the file to be saved. |
| | :param data: The byte content of the file. |
| | :raises FileNotFoundError: If the specified flow does not exist. |
| | :raises IsADirectoryError: If the file name is a directory. |
| | :raises PermissionError: If there is no permission to write the file. |
| | """ |
| | folder_path = self.data_dir / flow_id |
| | folder_path.mkdir(parents=True, exist_ok=True) |
| | file_path = folder_path / file_name |
| |
|
| | try: |
| | async with async_open(file_path, "wb") as f: |
| | await f.write(data) |
| | logger.info(f"File {file_name} saved successfully in flow {flow_id}.") |
| | except Exception: |
| | logger.exception(f"Error saving file {file_name} in flow {flow_id}") |
| | raise |
| |
|
| | async def get_file(self, flow_id: str, file_name: str) -> bytes: |
| | """Retrieve a file from the local storage. |
| | |
| | :param flow_id: The identifier for the flow. |
| | :param file_name: The name of the file to be retrieved. |
| | :return: The byte content of the file. |
| | :raises FileNotFoundError: If the file does not exist. |
| | """ |
| | file_path = self.data_dir / flow_id / file_name |
| | if not file_path.exists(): |
| | logger.warning(f"File {file_name} not found in flow {flow_id}.") |
| | msg = f"File {file_name} not found in flow {flow_id}" |
| | raise FileNotFoundError(msg) |
| |
|
| | async with async_open(file_path, "rb") as f: |
| | content = await f.read() |
| |
|
| | logger.debug(f"File {file_name} retrieved successfully from flow {flow_id}.") |
| | return content |
| |
|
| | async def list_files(self, flow_id: str): |
| | """List all files in a specified flow. |
| | |
| | :param flow_id: The identifier for the flow. |
| | :return: A list of file names. |
| | :raises FileNotFoundError: If the flow directory does not exist. |
| | """ |
| | folder_path = self.data_dir / flow_id |
| | if not folder_path.exists() or not folder_path.is_dir(): |
| | logger.warning(f"Flow {flow_id} directory does not exist.") |
| | msg = f"Flow {flow_id} directory does not exist." |
| | raise FileNotFoundError(msg) |
| |
|
| | files = [file.name for file in folder_path.iterdir() if file.is_file()] |
| | logger.info(f"Listed {len(files)} files in flow {flow_id}.") |
| | return files |
| |
|
| | async def delete_file(self, flow_id: str, file_name: str) -> None: |
| | """Delete a file from the local storage. |
| | |
| | :param flow_id: The identifier for the flow. |
| | :param file_name: The name of the file to be deleted. |
| | """ |
| | file_path = self.data_dir / flow_id / file_name |
| | if file_path.exists(): |
| | file_path.unlink() |
| | logger.info(f"File {file_name} deleted successfully from flow {flow_id}.") |
| | else: |
| | logger.warning(f"Attempted to delete non-existent file {file_name} in flow {flow_id}.") |
| |
|
| | async def teardown(self) -> None: |
| | """Perform any cleanup operations when the service is being torn down.""" |
| | |
| |
|