Spaces:
Paused
Paused
| import base64 | |
| import os.path | |
| import traceback | |
| import uuid | |
| from pathlib import Path | |
| from typing import Optional | |
| import aioboto3 | |
| import aiofiles | |
| from metagpt.config2 import S3Config | |
| from metagpt.const import BASE64_FORMAT | |
| from metagpt.logs import logger | |
| class S3: | |
| """A class for interacting with Amazon S3 storage.""" | |
| def __init__(self, config: S3Config): | |
| self.session = aioboto3.Session() | |
| self.config = config | |
| self.auth_config = { | |
| "service_name": "s3", | |
| "aws_access_key_id": config.access_key, | |
| "aws_secret_access_key": config.secret_key, | |
| "endpoint_url": config.endpoint, | |
| } | |
| async def upload_file( | |
| self, | |
| bucket: str, | |
| local_path: str, | |
| object_name: str, | |
| ) -> None: | |
| """Upload a file from the local path to the specified path of the storage bucket specified in s3. | |
| Args: | |
| bucket: The name of the S3 storage bucket. | |
| local_path: The local file path, including the file name. | |
| object_name: The complete path of the uploaded file to be stored in S3, including the file name. | |
| Raises: | |
| Exception: If an error occurs during the upload process, an exception is raised. | |
| """ | |
| try: | |
| async with self.session.client(**self.auth_config) as client: | |
| async with aiofiles.open(local_path, mode="rb") as reader: | |
| body = await reader.read() | |
| await client.put_object(Body=body, Bucket=bucket, Key=object_name) | |
| logger.info(f"Successfully uploaded the file to path {object_name} in bucket {bucket} of s3.") | |
| except Exception as e: | |
| logger.error(f"Failed to upload the file to path {object_name} in bucket {bucket} of s3: {e}") | |
| raise e | |
| async def get_object_url( | |
| self, | |
| bucket: str, | |
| object_name: str, | |
| ) -> str: | |
| """Get the URL for a downloadable or preview file stored in the specified S3 bucket. | |
| Args: | |
| bucket: The name of the S3 storage bucket. | |
| object_name: The complete path of the file stored in S3, including the file name. | |
| Returns: | |
| The URL for the downloadable or preview file. | |
| Raises: | |
| Exception: If an error occurs while retrieving the URL, an exception is raised. | |
| """ | |
| try: | |
| async with self.session.client(**self.auth_config) as client: | |
| file = await client.get_object(Bucket=bucket, Key=object_name) | |
| return str(file["Body"].url) | |
| except Exception as e: | |
| logger.error(f"Failed to get the url for a downloadable or preview file: {e}") | |
| raise e | |
| async def get_object( | |
| self, | |
| bucket: str, | |
| object_name: str, | |
| ) -> bytes: | |
| """Get the binary data of a file stored in the specified S3 bucket. | |
| Args: | |
| bucket: The name of the S3 storage bucket. | |
| object_name: The complete path of the file stored in S3, including the file name. | |
| Returns: | |
| The binary data of the requested file. | |
| Raises: | |
| Exception: If an error occurs while retrieving the file data, an exception is raised. | |
| """ | |
| try: | |
| async with self.session.client(**self.auth_config) as client: | |
| s3_object = await client.get_object(Bucket=bucket, Key=object_name) | |
| return await s3_object["Body"].read() | |
| except Exception as e: | |
| logger.error(f"Failed to get the binary data of the file: {e}") | |
| raise e | |
| async def download_file( | |
| self, bucket: str, object_name: str, local_path: str, chunk_size: Optional[int] = 128 * 1024 | |
| ) -> None: | |
| """Download an S3 object to a local file. | |
| Args: | |
| bucket: The name of the S3 storage bucket. | |
| object_name: The complete path of the file stored in S3, including the file name. | |
| local_path: The local file path where the S3 object will be downloaded. | |
| chunk_size: The size of data chunks to read and write at a time. Default is 128 KB. | |
| Raises: | |
| Exception: If an error occurs during the download process, an exception is raised. | |
| """ | |
| try: | |
| async with self.session.client(**self.auth_config) as client: | |
| s3_object = await client.get_object(Bucket=bucket, Key=object_name) | |
| stream = s3_object["Body"] | |
| async with aiofiles.open(local_path, mode="wb") as writer: | |
| while True: | |
| file_data = await stream.read(chunk_size) | |
| if not file_data: | |
| break | |
| await writer.write(file_data) | |
| except Exception as e: | |
| logger.error(f"Failed to download the file from S3: {e}") | |
| raise e | |
| async def cache(self, data: str, file_ext: str, format: str = "") -> str: | |
| """Save data to remote S3 and return url""" | |
| object_name = uuid.uuid4().hex + file_ext | |
| path = Path(__file__).parent | |
| pathname = path / object_name | |
| try: | |
| async with aiofiles.open(str(pathname), mode="wb") as file: | |
| data = base64.b64decode(data) if format == BASE64_FORMAT else data.encode(encoding="utf-8") | |
| await file.write(data) | |
| bucket = self.config.bucket | |
| object_pathname = self.config.bucket or "system" | |
| object_pathname += f"/{object_name}" | |
| object_pathname = os.path.normpath(object_pathname) | |
| await self.upload_file(bucket=bucket, local_path=str(pathname), object_name=object_pathname) | |
| pathname.unlink(missing_ok=True) | |
| return await self.get_object_url(bucket=bucket, object_name=object_pathname) | |
| except Exception as e: | |
| logger.exception(f"{e}, stack:{traceback.format_exc()}") | |
| pathname.unlink(missing_ok=True) | |
| return None | |