import os import json from io import BytesIO from typing import Union from urllib.parse import urlparse import boto3 from botocore.client import Config from botocore.exceptions import NoCredentialsError AWS_REGION = os.getenv("AWS_REGION") # Initialize the S3 client S3_CLIENT = boto3.client( "s3", region_name=AWS_REGION, config=Config(signature_version="s3v4") ) def save_to_s3( bucket_name: str, folder_name: str, content: Union[str, dict, BytesIO], file_name: str, ) -> str: """ Save a file to an S3 bucket, determining the content type based on the input type. Args: bucket_name (str): The name of the S3 bucket. folder_name (str): The folder path in the S3 bucket. content (Union[str, dict, BytesIO]): The content to save, can be a string, dictionary, or BytesIO. file_name (str): The file name under which the content should be saved. Returns: str: The S3 URL of the uploaded file, or an error message if credentials are not available. """ # Ensure the folder name ends with a '/' # if not folder_name.endswith('/'): # folder_name += '/' # Determine file name and content type based on the input if isinstance(content, str): file_content = content content_type = "text/plain" elif isinstance(content, dict): file_content = json.dumps(content) content_type = "application/json" elif isinstance(content, BytesIO): file_content = content content_type = "image/jpeg" else: print( "Invalid content type. Content must be a string, dictionary, or BytesIO." ) raise ValueError("Content must be either a string, dictionary, or BytesIO.") # Ensure the folder name ends with a '/' s3_file_path = f"{folder_name.rstrip('/')}/{file_name}" try: # Upload the file to S3 S3_CLIENT.put_object( Bucket=bucket_name, Key=s3_file_path, Body=file_content, ContentType=content_type, ) s3_url = f"s3://{bucket_name}/{s3_file_path}" print(f"File successfully uploaded to {s3_url}") return s3_url except NoCredentialsError: print("AWS credentials not available.") return "Error: AWS credentials not available." def fetch_from_s3(source: Union[str, dict], region_name: str = "ap-south-1") -> bytes: """ Fetch a file's content from S3 given a source URL or dictionary with bucket and key. Args: source (Union[str, dict]): The source S3 URL or a dictionary with 'bucket_name' and 'file_key'. region_name (str): The AWS region name for the S3 client (default is 'ap-south-1'). Returns: bytes: The content of the file fetched from S3. """ print(f"Fetching file from S3. Source: {source}") s3_client = boto3.client("s3", region_name=region_name) # Parse the source depending on its type if isinstance(source, str): parsed_url = urlparse(source) bucket_name = parsed_url.netloc.split(".")[0] file_path = parsed_url.path.lstrip("/") elif isinstance(source, dict): bucket_name = source.get("bucket_name") file_path = source.get("file_key") if not bucket_name or not file_path: print("Dictionary input must contain 'bucket_name' and 'file_key'.") raise ValueError( "Dictionary input must contain 'bucket_name' and 'file_key'." ) else: print("Source must be a string URL or a dictionary.") raise ValueError("Source must be a string URL or a dictionary.") print(f"Attempting to download from bucket: {bucket_name}, path: {file_path}") try: response = s3_client.get_object(Bucket=bucket_name, Key=file_path) file_content = response["Body"].read() print(f"File fetched successfully from {bucket_name}/{file_path}") return file_content except Exception as e: print(f"Failed to fetch file from S3: {e}") raise def list_s3_objects(bucket_name: str, folder_path: str = "") -> list: """ Lists a content of the given a directory URL. Args: bucket_name (str): The name of the S3 bucket. folder_name (str): The folder path in the S3 bucket. Returns: list: The list of files found inside the given directory URL. """ response = S3_CLIENT.list_objects_v2(Bucket=bucket_name, Prefix=folder_path) # Check if the bucket contains objects objects = [] if "Contents" in response: for obj in response["Contents"]: objects.append(obj["Key"]) return objects