OriTTS / S3_bucket.py
ajajali09's picture
fix in a bug
1558975
import os
from typing import List
import boto3
from dotenv import load_dotenv
from urllib.parse import urlencode
import fsspec
import parameters
load_dotenv()
aws_config = parameters.aws_config
print(aws_config)
if aws_config:
access_key = os.getenv("AWS_ACCESS_KEY")
secret_key = os.getenv("AWS_SECRET_KEY")
bucket_name = os.getenv("AWS_BUCKET_NAME")
if aws_config:
location = boto3.client(
"s3", aws_access_key_id=access_key, aws_secret_access_key=secret_key
).get_bucket_location(Bucket=bucket_name)["LocationConstraint"]
session = boto3.Session(
aws_access_key_id=access_key, aws_secret_access_key=secret_key
)
else:
location = boto3.client("s3").get_bucket_location(Bucket=bucket_name)[
"LocationConstraint"
]
session = boto3.Session()
# Create an S3 client
s3 = session.client("s3")
if aws_config:
s3_client = boto3.client(
"s3",
region_name=location,
aws_access_key_id=access_key,
aws_secret_access_key=secret_key,
config=boto3.session.Config(signature_version="s3v4"),
)
else:
s3_client = boto3.client(
"s3",
region_name=location,
config=boto3.session.Config(signature_version="s3v4"),
)
def _is_custom(k):
return k.lower().startswith("x-")
def _client_param_handler(*, params, context, **_kw):
# Store custom parameters in context for later event handlers
context["custom_params"] = {k: v for k, v in params.items() if _is_custom(k)}
# Remove custom parameters from client parameters,
# because validation would fail on them
return {k: v for k, v in params.items() if not _is_custom(k)}
def _request_param_injector(*, request, **_kw):
if request.context["custom_params"]:
request.url += "&" if "?" in request.url else "?"
request.url += urlencode(request.context["custom_params"])
s3_client.meta.events.register(
"provide-client-params.s3.GetObject", _client_param_handler
)
s3_client.meta.events.register("before-sign.s3.GetObject", _request_param_injector)
class AWS:
def __init__(self) -> None:
self.bucket_name = bucket_name
self.s3 = s3
self.s3_client = s3_client
self.fs = fsspec.filesystem(
"s3",
key=access_key,
secret=secret_key,
client_kwargs={"region_name": location},
anon=False,
)
def upload_fsspec(self, local_path: str, s3_path: str):
s3_path = f"s3://{self.bucket_name}/{s3_path}"
with self.fs.open(s3_path, "wb") as f:
with open(local_path, "rb") as local_file:
f.write(local_file.read())
def create_presigned_url(self, key: str) -> str:
url = self.s3_client.generate_presigned_url(
ClientMethod="get_object",
# Params={"Bucket": self.bucket_name, "Key": key,"x-filetype":f'.mp3'},
Params={"Bucket": self.bucket_name, "Key": key},
ExpiresIn=180,
)
return url
def sync_local_to_s3(self, object: bytes, s3_key: str) -> str:
self.s3.put_object(Key=s3_key, Bucket=self.bucket_name, Body=object)
url = self.create_presigned_url(key=s3_key)
return url
def check_if_exists(self, object_key) -> bool:
try:
self.s3.head_object(Bucket=self.bucket_name, Key=object_key)
# print(
# f"The object with key '{object_key}' exists in the bucket '{self.bucket_name}'." # noqa : E501
# )
response = True
except Exception:
# print(
# f"The object with key '{object_key}' does not exist in the bucket '{self.bucket_name}'." # noqa : E501
# )
response = False
return response
def s3_upload(self, object: str, s3_key: str) -> str:
self.s3.put_object(Key=s3_key, Bucket=self.bucket_name, Body=object)
return "Success"
def s3_upload_zip(self, file_name: str, object_name=None):
if object_name is None:
object_name = file_name
try:
s3_client.upload_file(file_name, self.bucket_name, object_name)
return True
except FileNotFoundError:
print("The file was not found.")
return False
def s3_download(self, s3_key: str):
response = self.s3.get_object(Bucket=self.bucket_name, Key=s3_key)
data = response["Body"].read()
return data
def s3_dowload_zip(self, download_path: str, object_name=None):
s3_client.download_file(self.bucket_name, object_name, download_path)
def s3_delete_object(self, s3_key: str) -> bool:
try:
# Delete the object from the bucket
self.s3.delete_object(Bucket=self.bucket_name, Key=s3_key)
# print(f"Object '{s3_key}' deleted successfully from '{self.bucket_name}'")
return True
except Exception as e:
print(f"Error deleting object '{s3_key}' from '{self.bucket_name}': {e}")
return False
def list_s3_objects(self, prefix: str) -> List[str]:
response = s3.list_objects_v2(Bucket=self.bucket_name, Prefix=prefix)
objects = response["Contents"]
list_obj = []
for obj in objects:
print(obj["Key"])
list_obj.append(obj["Key"])
return list_obj
def rename_s3_object(self, old_key: str, new_key: str) -> None:
# Copy the object to a new key
self.s3.copy_object(
Bucket=self.bucket_name,
CopySource={"Bucket": self.bucket_name, "Key": old_key},
Key=new_key,
)
# Delete the old object
self.s3.delete_object(Bucket=self.bucket_name, Key=old_key)
print(f"s3 object renamed {old_key} --> {new_key}")
return None
def s3_upload_wav(self, obj, s3_key):
self.s3_client.upload_fileobj(
obj, self.bucket_name, s3_key, ExtraArgs={"ContentType": "audio/wav"}
)
return None