character-grading / aws_utils.py
Manish Gupta
First Commit.
ac621a7
import os
import json
from io import BytesIO
from typing import Union
from urllib.parse import urlparse
import boto3
from botocore.client import Config
from botocore.exceptions import NoCredentialsError
AWS_REGION = os.getenv("AWS_REGION")
# Initialize the S3 client
S3_CLIENT = boto3.client(
"s3", region_name=AWS_REGION, config=Config(signature_version="s3v4")
)
def save_to_s3(
bucket_name: str,
folder_name: str,
content: Union[str, dict, BytesIO],
file_name: str,
) -> str:
"""
Save a file to an S3 bucket, determining the content type based on the input type.
Args:
bucket_name (str): The name of the S3 bucket.
folder_name (str): The folder path in the S3 bucket.
content (Union[str, dict, BytesIO]): The content to save, can be a string, dictionary, or BytesIO.
file_name (str): The file name under which the content should be saved.
Returns:
str: The S3 URL of the uploaded file, or an error message if credentials are not available.
"""
# Ensure the folder name ends with a '/'
# if not folder_name.endswith('/'):
# folder_name += '/'
# Determine file name and content type based on the input
if isinstance(content, str):
file_content = content
content_type = "text/plain"
elif isinstance(content, dict):
file_content = json.dumps(content)
content_type = "application/json"
elif isinstance(content, BytesIO):
file_content = content
content_type = "image/jpeg"
else:
print(
"Invalid content type. Content must be a string, dictionary, or BytesIO."
)
raise ValueError("Content must be either a string, dictionary, or BytesIO.")
# Ensure the folder name ends with a '/'
s3_file_path = f"{folder_name.rstrip('/')}/{file_name}"
try:
# Upload the file to S3
S3_CLIENT.put_object(
Bucket=bucket_name,
Key=s3_file_path,
Body=file_content,
ContentType=content_type,
)
s3_url = f"s3://{bucket_name}/{s3_file_path}"
print(f"File successfully uploaded to {s3_url}")
return s3_url
except NoCredentialsError:
print("AWS credentials not available.")
return "Error: AWS credentials not available."
def fetch_from_s3(source: Union[str, dict], region_name: str = "ap-south-1") -> bytes:
"""
Fetch a file's content from S3 given a source URL or dictionary with bucket and key.
Args:
source (Union[str, dict]): The source S3 URL or a dictionary with 'bucket_name' and 'file_key'.
region_name (str): The AWS region name for the S3 client (default is 'ap-south-1').
Returns:
bytes: The content of the file fetched from S3.
"""
print(f"Fetching file from S3. Source: {source}")
s3_client = boto3.client("s3", region_name=region_name)
# Parse the source depending on its type
if isinstance(source, str):
parsed_url = urlparse(source)
bucket_name = parsed_url.netloc.split(".")[0]
file_path = parsed_url.path.lstrip("/")
elif isinstance(source, dict):
bucket_name = source.get("bucket_name")
file_path = source.get("file_key")
if not bucket_name or not file_path:
print("Dictionary input must contain 'bucket_name' and 'file_key'.")
raise ValueError(
"Dictionary input must contain 'bucket_name' and 'file_key'."
)
else:
print("Source must be a string URL or a dictionary.")
raise ValueError("Source must be a string URL or a dictionary.")
print(f"Attempting to download from bucket: {bucket_name}, path: {file_path}")
try:
response = s3_client.get_object(Bucket=bucket_name, Key=file_path)
file_content = response["Body"].read()
print(f"File fetched successfully from {bucket_name}/{file_path}")
return file_content
except Exception as e:
print(f"Failed to fetch file from S3: {e}")
raise
def list_s3_objects(bucket_name: str, folder_path: str = "") -> list:
"""
Lists a content of the given a directory URL.
Args:
bucket_name (str): The name of the S3 bucket.
folder_name (str): The folder path in the S3 bucket.
Returns:
list: The list of files found inside the given directory URL.
"""
response = S3_CLIENT.list_objects_v2(Bucket=bucket_name, Prefix=folder_path)
# Check if the bucket contains objects
objects = []
if "Contents" in response:
for obj in response["Contents"]:
objects.append(obj["Key"])
return objects