| | import boto3 |
| | from botocore.exceptions import ClientError, NoCredentialsError |
| | from loguru import logger |
| |
|
| | from .service import StorageService |
| |
|
| |
|
| | class S3StorageService(StorageService): |
| | """A service class for handling operations with AWS S3 storage.""" |
| |
|
| | def __init__(self, session_service, settings_service) -> None: |
| | """Initialize the S3 storage service with session and settings services.""" |
| | super().__init__(session_service, settings_service) |
| | self.bucket = "langflow" |
| | self.s3_client = boto3.client("s3") |
| | self.set_ready() |
| |
|
| | async def save_file(self, folder: str, file_name: str, data) -> None: |
| | """Save a file to the S3 bucket. |
| | |
| | :param folder: The folder in the bucket to save the file. |
| | :param file_name: The name of the file to be saved. |
| | :param data: The byte content of the file. |
| | :raises Exception: If an error occurs during file saving. |
| | """ |
| | try: |
| | self.s3_client.put_object(Bucket=self.bucket, Key=f"{folder}/{file_name}", Body=data) |
| | logger.info(f"File {file_name} saved successfully in folder {folder}.") |
| | except NoCredentialsError: |
| | logger.exception("Credentials not available for AWS S3.") |
| | raise |
| | except ClientError: |
| | logger.exception(f"Error saving file {file_name} in folder {folder}") |
| | raise |
| |
|
| | async def get_file(self, folder: str, file_name: str): |
| | """Retrieve a file from the S3 bucket. |
| | |
| | :param folder: The folder in the bucket where the file is stored. |
| | :param file_name: The name of the file to be retrieved. |
| | :return: The byte content of the file. |
| | :raises Exception: If an error occurs during file retrieval. |
| | """ |
| | try: |
| | response = self.s3_client.get_object(Bucket=self.bucket, Key=f"{folder}/{file_name}") |
| | logger.info(f"File {file_name} retrieved successfully from folder {folder}.") |
| | return response["Body"].read() |
| | except ClientError: |
| | logger.exception(f"Error retrieving file {file_name} from folder {folder}") |
| | raise |
| |
|
| | async def list_files(self, folder: str): |
| | """List all files in a specified folder of the S3 bucket. |
| | |
| | :param folder: The folder in the bucket to list files from. |
| | :return: A list of file names. |
| | :raises Exception: If an error occurs during file listing. |
| | """ |
| | try: |
| | response = self.s3_client.list_objects_v2(Bucket=self.bucket, Prefix=folder) |
| | except ClientError: |
| | logger.exception(f"Error listing files in folder {folder}") |
| | raise |
| |
|
| | files = [item["Key"] for item in response.get("Contents", []) if "/" not in item["Key"][len(folder) :]] |
| | logger.info(f"{len(files)} files listed in folder {folder}.") |
| | return files |
| |
|
| | async def delete_file(self, folder: str, file_name: str) -> None: |
| | """Delete a file from the S3 bucket. |
| | |
| | :param folder: The folder in the bucket where the file is stored. |
| | :param file_name: The name of the file to be deleted. |
| | :raises Exception: If an error occurs during file deletion. |
| | """ |
| | try: |
| | self.s3_client.delete_object(Bucket=self.bucket, Key=f"{folder}/{file_name}") |
| | logger.info(f"File {file_name} deleted successfully from folder {folder}.") |
| | except ClientError: |
| | logger.exception(f"Error deleting file {file_name} from folder {folder}") |
| | raise |
| |
|
| | async def teardown(self) -> None: |
| | """Perform any cleanup operations when the service is being torn down.""" |
| | |
| |
|