File size: 4,683 Bytes
ac621a7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
import os
import json
from io import BytesIO
from typing import Union
from urllib.parse import urlparse

import boto3
from botocore.client import Config
from botocore.exceptions import NoCredentialsError

AWS_REGION = os.getenv("AWS_REGION")

# Initialize the S3 client
S3_CLIENT = boto3.client(
    "s3", region_name=AWS_REGION, config=Config(signature_version="s3v4")
)


def save_to_s3(
    bucket_name: str,
    folder_name: str,
    content: Union[str, dict, BytesIO],
    file_name: str,
) -> str:
    """
    Save a file to an S3 bucket, determining the content type based on the input type.

    Args:
        bucket_name (str): The name of the S3 bucket.
        folder_name (str): The folder path in the S3 bucket.
        content (Union[str, dict, BytesIO]): The content to save, can be a string, dictionary, or BytesIO.
        file_name (str): The file name under which the content should be saved.

    Returns:
        str: The S3 URL of the uploaded file, or an error message if credentials are not available.
    """
    # Ensure the folder name ends with a '/'
    # if not folder_name.endswith('/'):
    #     folder_name += '/'
    # Determine file name and content type based on the input
    if isinstance(content, str):
        file_content = content
        content_type = "text/plain"
    elif isinstance(content, dict):
        file_content = json.dumps(content)
        content_type = "application/json"
    elif isinstance(content, BytesIO):
        file_content = content
        content_type = "image/jpeg"
    else:
        print(
            "Invalid content type. Content must be a string, dictionary, or BytesIO."
        )
        raise ValueError("Content must be either a string, dictionary, or BytesIO.")

    # Ensure the folder name ends with a '/'
    s3_file_path = f"{folder_name.rstrip('/')}/{file_name}"

    try:
        # Upload the file to S3
        S3_CLIENT.put_object(
            Bucket=bucket_name,
            Key=s3_file_path,
            Body=file_content,
            ContentType=content_type,
        )
        s3_url = f"s3://{bucket_name}/{s3_file_path}"
        print(f"File successfully uploaded to {s3_url}")
        return s3_url

    except NoCredentialsError:
        print("AWS credentials not available.")
        return "Error: AWS credentials not available."


def fetch_from_s3(source: Union[str, dict], region_name: str = "ap-south-1") -> bytes:
    """
    Fetch a file's content from S3 given a source URL or dictionary with bucket and key.

    Args:
        source (Union[str, dict]): The source S3 URL or a dictionary with 'bucket_name' and 'file_key'.
        region_name (str): The AWS region name for the S3 client (default is 'ap-south-1').

    Returns:
        bytes: The content of the file fetched from S3.
    """
    print(f"Fetching file from S3. Source: {source}")
    s3_client = boto3.client("s3", region_name=region_name)

    # Parse the source depending on its type
    if isinstance(source, str):
        parsed_url = urlparse(source)
        bucket_name = parsed_url.netloc.split(".")[0]
        file_path = parsed_url.path.lstrip("/")
    elif isinstance(source, dict):
        bucket_name = source.get("bucket_name")
        file_path = source.get("file_key")
        if not bucket_name or not file_path:
            print("Dictionary input must contain 'bucket_name' and 'file_key'.")
            raise ValueError(
                "Dictionary input must contain 'bucket_name' and 'file_key'."
            )
    else:
        print("Source must be a string URL or a dictionary.")
        raise ValueError("Source must be a string URL or a dictionary.")

    print(f"Attempting to download from bucket: {bucket_name}, path: {file_path}")
    try:
        response = s3_client.get_object(Bucket=bucket_name, Key=file_path)
        file_content = response["Body"].read()
        print(f"File fetched successfully from {bucket_name}/{file_path}")
        return file_content
    except Exception as e:
        print(f"Failed to fetch file from S3: {e}")
        raise


def list_s3_objects(bucket_name: str, folder_path: str = "") -> list:
    """
    Lists a content of the given a directory URL.

    Args:
        bucket_name (str): The name of the S3 bucket.
        folder_name (str): The folder path in the S3 bucket.

    Returns:
        list: The list of files found inside the given directory URL.
    """
    response = S3_CLIENT.list_objects_v2(Bucket=bucket_name, Prefix=folder_path)
    # Check if the bucket contains objects
    objects = []
    if "Contents" in response:
        for obj in response["Contents"]:
            objects.append(obj["Key"])

    return objects