| import os |
| from typing import List |
| import boto3 |
| from dotenv import load_dotenv |
| from urllib.parse import urlencode |
| import fsspec |
| import parameters |
|
|
| load_dotenv() |
|
|
| aws_config = parameters.aws_config |
| print(aws_config) |
|
|
| if aws_config: |
| access_key = os.getenv("AWS_ACCESS_KEY") |
| secret_key = os.getenv("AWS_SECRET_KEY") |
|
|
| bucket_name = os.getenv("AWS_BUCKET_NAME") |
|
|
| if aws_config: |
| location = boto3.client( |
| "s3", aws_access_key_id=access_key, aws_secret_access_key=secret_key |
| ).get_bucket_location(Bucket=bucket_name)["LocationConstraint"] |
| session = boto3.Session( |
| aws_access_key_id=access_key, aws_secret_access_key=secret_key |
| ) |
| else: |
| location = boto3.client("s3").get_bucket_location(Bucket=bucket_name)[ |
| "LocationConstraint" |
| ] |
| session = boto3.Session() |
|
|
| |
| s3 = session.client("s3") |
|
|
| if aws_config: |
| s3_client = boto3.client( |
| "s3", |
| region_name=location, |
| aws_access_key_id=access_key, |
| aws_secret_access_key=secret_key, |
| config=boto3.session.Config(signature_version="s3v4"), |
| ) |
| else: |
| s3_client = boto3.client( |
| "s3", |
| region_name=location, |
| config=boto3.session.Config(signature_version="s3v4"), |
| ) |
|
|
|
|
| def _is_custom(k): |
| return k.lower().startswith("x-") |
|
|
|
|
| def _client_param_handler(*, params, context, **_kw): |
| |
| context["custom_params"] = {k: v for k, v in params.items() if _is_custom(k)} |
| |
| |
| return {k: v for k, v in params.items() if not _is_custom(k)} |
|
|
|
|
| def _request_param_injector(*, request, **_kw): |
| if request.context["custom_params"]: |
| request.url += "&" if "?" in request.url else "?" |
| request.url += urlencode(request.context["custom_params"]) |
|
|
|
|
| s3_client.meta.events.register( |
| "provide-client-params.s3.GetObject", _client_param_handler |
| ) |
| s3_client.meta.events.register("before-sign.s3.GetObject", _request_param_injector) |
|
|
|
|
| class AWS: |
| def __init__(self) -> None: |
| self.bucket_name = bucket_name |
| self.s3 = s3 |
| self.s3_client = s3_client |
| self.fs = fsspec.filesystem( |
| "s3", |
| key=access_key, |
| secret=secret_key, |
| client_kwargs={"region_name": location}, |
| anon=False, |
| ) |
|
|
| def upload_fsspec(self, local_path: str, s3_path: str): |
| s3_path = f"s3://{self.bucket_name}/{s3_path}" |
| with self.fs.open(s3_path, "wb") as f: |
| with open(local_path, "rb") as local_file: |
| f.write(local_file.read()) |
|
|
| def create_presigned_url(self, key: str) -> str: |
| url = self.s3_client.generate_presigned_url( |
| ClientMethod="get_object", |
| |
| Params={"Bucket": self.bucket_name, "Key": key}, |
| ExpiresIn=180, |
| ) |
| return url |
|
|
| def sync_local_to_s3(self, object: bytes, s3_key: str) -> str: |
| self.s3.put_object(Key=s3_key, Bucket=self.bucket_name, Body=object) |
| url = self.create_presigned_url(key=s3_key) |
| return url |
|
|
| def check_if_exists(self, object_key) -> bool: |
| try: |
| self.s3.head_object(Bucket=self.bucket_name, Key=object_key) |
| |
| |
| |
| response = True |
| except Exception: |
| |
| |
| |
| response = False |
|
|
| return response |
|
|
| def s3_upload(self, object: str, s3_key: str) -> str: |
| self.s3.put_object(Key=s3_key, Bucket=self.bucket_name, Body=object) |
| return "Success" |
|
|
| def s3_upload_zip(self, file_name: str, object_name=None): |
| if object_name is None: |
| object_name = file_name |
| try: |
| s3_client.upload_file(file_name, self.bucket_name, object_name) |
| return True |
| except FileNotFoundError: |
| print("The file was not found.") |
| return False |
|
|
| def s3_download(self, s3_key: str): |
| response = self.s3.get_object(Bucket=self.bucket_name, Key=s3_key) |
| data = response["Body"].read() |
| return data |
|
|
| def s3_dowload_zip(self, download_path: str, object_name=None): |
| s3_client.download_file(self.bucket_name, object_name, download_path) |
|
|
| def s3_delete_object(self, s3_key: str) -> bool: |
| try: |
| |
| self.s3.delete_object(Bucket=self.bucket_name, Key=s3_key) |
|
|
| |
| return True |
| except Exception as e: |
| print(f"Error deleting object '{s3_key}' from '{self.bucket_name}': {e}") |
| return False |
|
|
| def list_s3_objects(self, prefix: str) -> List[str]: |
| response = s3.list_objects_v2(Bucket=self.bucket_name, Prefix=prefix) |
|
|
| objects = response["Contents"] |
| list_obj = [] |
| for obj in objects: |
| print(obj["Key"]) |
| list_obj.append(obj["Key"]) |
| return list_obj |
|
|
| def rename_s3_object(self, old_key: str, new_key: str) -> None: |
| |
| self.s3.copy_object( |
| Bucket=self.bucket_name, |
| CopySource={"Bucket": self.bucket_name, "Key": old_key}, |
| Key=new_key, |
| ) |
|
|
| |
| self.s3.delete_object(Bucket=self.bucket_name, Key=old_key) |
|
|
| print(f"s3 object renamed {old_key} --> {new_key}") |
|
|
| return None |
|
|
| def s3_upload_wav(self, obj, s3_key): |
| self.s3_client.upload_fileobj( |
| obj, self.bucket_name, s3_key, ExtraArgs={"ContentType": "audio/wav"} |
| ) |
| return None |
|
|