|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import os |
|
|
from pathlib import Path, PurePath |
|
|
from typing import Optional, Tuple |
|
|
from warnings import warn |
|
|
|
|
|
from camel.loaders import File, create_file_from_raw_bytes |
|
|
from camel.storages.object_storages.base import BaseObjectStorage |
|
|
|
|
|
|
|
|
class AmazonS3Storage(BaseObjectStorage): |
|
|
r"""A class to connect with AWS S3 object storage to put and get objects |
|
|
from one S3 bucket. The class will first try to use the credentials passed |
|
|
as arguments, if not provided, it will look for the environment variables |
|
|
`AWS_ACCESS_KEY_ID` and `AWS_SECRET_ACCESS_KEY`. If none of these are |
|
|
provided, it will try to use the local credentials (will be created if |
|
|
logged in with AWS CLI). |
|
|
|
|
|
Args: |
|
|
bucket_name (str): The name of the S3 bucket. |
|
|
create_if_not_exists (bool, optional): Whether to create the bucket if |
|
|
it does not exist. Defaults to True. |
|
|
access_key_id (Optional[str], optional): The AWS access key ID. |
|
|
Defaults to None. |
|
|
secret_access_key (Optional[str], optional): The AWS secret access key. |
|
|
Defaults to None. |
|
|
anonymous (bool, optional): Whether to use anonymous access. Defaults |
|
|
to False. |
|
|
|
|
|
References: |
|
|
https://aws.amazon.com/pm/serv-s3/ |
|
|
|
|
|
https://aws.amazon.com/cli/ |
|
|
""" |
|
|
|
|
|
def __init__( |
|
|
self, |
|
|
bucket_name: str, |
|
|
create_if_not_exists: bool = True, |
|
|
access_key_id: Optional[str] = None, |
|
|
secret_access_key: Optional[str] = None, |
|
|
anonymous: bool = False, |
|
|
) -> None: |
|
|
self._bucket_name = bucket_name |
|
|
self._create_if_not_exists = create_if_not_exists |
|
|
|
|
|
aws_key_id = access_key_id or os.getenv("AWS_ACCESS_KEY_ID") |
|
|
aws_secret_key = secret_access_key or os.getenv( |
|
|
"AWS_SECRET_ACCESS_KEY" |
|
|
) |
|
|
if not all([aws_key_id, aws_secret_key]) and not anonymous: |
|
|
warn( |
|
|
"AWS access key not configured. Local credentials will be " |
|
|
"used." |
|
|
) |
|
|
|
|
|
aws_key_id = None |
|
|
aws_secret_key = None |
|
|
|
|
|
import botocore.session |
|
|
from botocore import UNSIGNED |
|
|
from botocore.config import Config |
|
|
|
|
|
session = botocore.session.get_session() |
|
|
|
|
|
if not anonymous: |
|
|
self._client = session.create_client( |
|
|
"s3", |
|
|
aws_access_key_id=aws_key_id, |
|
|
aws_secret_access_key=aws_secret_key, |
|
|
) |
|
|
else: |
|
|
self._client = session.create_client( |
|
|
"s3", config=Config(signature_version=UNSIGNED) |
|
|
) |
|
|
|
|
|
self._prepare_and_check() |
|
|
|
|
|
def _prepare_and_check(self) -> None: |
|
|
r"""Check privileges and existence of the bucket.""" |
|
|
from botocore.exceptions import ClientError, NoCredentialsError |
|
|
|
|
|
try: |
|
|
self._client.head_bucket(Bucket=self._bucket_name) |
|
|
except ClientError as e: |
|
|
error_code = e.response['Error']['Code'] |
|
|
if error_code == '403': |
|
|
raise PermissionError( |
|
|
f"Failed to access bucket {self._bucket_name}: " |
|
|
f"No permission." |
|
|
) |
|
|
elif error_code == '404': |
|
|
if self._create_if_not_exists: |
|
|
self._client.create_bucket(Bucket=self._bucket_name) |
|
|
warn( |
|
|
f"Bucket {self._bucket_name} not found. Automatically " |
|
|
f"created." |
|
|
) |
|
|
else: |
|
|
raise FileNotFoundError( |
|
|
f"Failed to access bucket {self._bucket_name}: Not " |
|
|
f"found." |
|
|
) |
|
|
else: |
|
|
raise e |
|
|
except NoCredentialsError as e: |
|
|
raise PermissionError("No AWS credentials found.") from e |
|
|
|
|
|
@staticmethod |
|
|
def canonicalize_path(file_path: PurePath) -> Tuple[str, str]: |
|
|
r"""Canonicalize file path for Amazon S3. |
|
|
|
|
|
Args: |
|
|
file_path (PurePath): The path to be canonicalized. |
|
|
|
|
|
Returns: |
|
|
Tuple[str, str]: The canonicalized file key and file name. |
|
|
""" |
|
|
return file_path.as_posix(), file_path.name |
|
|
|
|
|
def _put_file(self, file_key: str, file: File) -> None: |
|
|
r"""Put a file to the Amazon S3 bucket. |
|
|
|
|
|
Args: |
|
|
file_key (str): The path to the object in the bucket. |
|
|
file (File): The file to be uploaded. |
|
|
""" |
|
|
self._client.put_object( |
|
|
Bucket=self._bucket_name, Key=file_key, Body=file.raw_bytes |
|
|
) |
|
|
|
|
|
def _get_file(self, file_key: str, filename: str) -> File: |
|
|
r"""Get a file from the Amazon S3 bucket. |
|
|
|
|
|
Args: |
|
|
file_key (str): The path to the object in the bucket. |
|
|
filename (str): The name of the file. |
|
|
|
|
|
Returns: |
|
|
File: The object from the S3 bucket. |
|
|
""" |
|
|
response = self._client.get_object( |
|
|
Bucket=self._bucket_name, Key=file_key |
|
|
) |
|
|
raw_bytes = response["Body"].read() |
|
|
return create_file_from_raw_bytes(raw_bytes, filename) |
|
|
|
|
|
def _upload_file( |
|
|
self, local_file_path: Path, remote_file_key: str |
|
|
) -> None: |
|
|
r"""Upload a local file to the Amazon S3 bucket. |
|
|
|
|
|
Args: |
|
|
local_file_path (Path): The path to the local file to be uploaded. |
|
|
remote_file_key (str): The path to the object in the bucket. |
|
|
""" |
|
|
with open(local_file_path, "rb") as f: |
|
|
self._client.put_object( |
|
|
Bucket=self._bucket_name, Key=remote_file_key, Body=f |
|
|
) |
|
|
|
|
|
def _download_file( |
|
|
self, |
|
|
local_file_path: Path, |
|
|
remote_file_key: str, |
|
|
) -> None: |
|
|
r"""Download a file from the Amazon S3 bucket to the local system. |
|
|
|
|
|
Args: |
|
|
local_file_path (Path): The path to the local file to be saved. |
|
|
remote_file_key (str): The key of the object in the bucket. |
|
|
""" |
|
|
file = self._client.get_object( |
|
|
Bucket=self._bucket_name, |
|
|
Key=remote_file_key, |
|
|
) |
|
|
with open(local_file_path, "wb") as f: |
|
|
f.write(file["Body"].read()) |
|
|
|
|
|
def _object_exists(self, file_key: str) -> bool: |
|
|
r""" |
|
|
Check if the object exists in the Amazon S3 bucket. |
|
|
|
|
|
Args: |
|
|
file_key: The key of the object in the bucket. |
|
|
|
|
|
Returns: |
|
|
bool: Whether the object exists in the bucket. |
|
|
""" |
|
|
try: |
|
|
self._client.head_object(Bucket=self._bucket_name, Key=file_key) |
|
|
return True |
|
|
except self._client.exceptions.ClientError: |
|
|
return False |
|
|
|