Spaces:
Sleeping
Sleeping
| import os | |
| import json | |
| from io import BytesIO | |
| from typing import Union | |
| from urllib.parse import urlparse | |
| import boto3 | |
| from botocore.client import Config | |
| from botocore.exceptions import NoCredentialsError | |
| AWS_REGION = os.getenv("AWS_REGION") | |
| # Initialize the S3 client | |
| S3_CLIENT = boto3.client( | |
| "s3", region_name=AWS_REGION, config=Config(signature_version="s3v4") | |
| ) | |
| def save_to_s3( | |
| bucket_name: str, | |
| folder_name: str, | |
| content: Union[str, dict, BytesIO], | |
| file_name: str, | |
| ) -> str: | |
| """ | |
| Save a file to an S3 bucket, determining the content type based on the input type. | |
| Args: | |
| bucket_name (str): The name of the S3 bucket. | |
| folder_name (str): The folder path in the S3 bucket. | |
| content (Union[str, dict, BytesIO]): The content to save, can be a string, dictionary, or BytesIO. | |
| file_name (str): The file name under which the content should be saved. | |
| Returns: | |
| str: The S3 URL of the uploaded file, or an error message if credentials are not available. | |
| """ | |
| # Ensure the folder name ends with a '/' | |
| # if not folder_name.endswith('/'): | |
| # folder_name += '/' | |
| # Determine file name and content type based on the input | |
| if isinstance(content, str): | |
| file_content = content | |
| content_type = "text/plain" | |
| elif isinstance(content, dict): | |
| file_content = json.dumps(content) | |
| content_type = "application/json" | |
| elif isinstance(content, BytesIO): | |
| file_content = content | |
| content_type = "image/jpeg" | |
| else: | |
| print( | |
| "Invalid content type. Content must be a string, dictionary, or BytesIO." | |
| ) | |
| raise ValueError("Content must be either a string, dictionary, or BytesIO.") | |
| # Ensure the folder name ends with a '/' | |
| s3_file_path = f"{folder_name.rstrip('/')}/{file_name}" | |
| try: | |
| # Upload the file to S3 | |
| S3_CLIENT.put_object( | |
| Bucket=bucket_name, | |
| Key=s3_file_path, | |
| Body=file_content, | |
| ContentType=content_type, | |
| ) | |
| s3_url = f"s3://{bucket_name}/{s3_file_path}" | |
| print(f"File successfully uploaded to {s3_url}") | |
| return s3_url | |
| except NoCredentialsError: | |
| print("AWS credentials not available.") | |
| return "Error: AWS credentials not available." | |
| def fetch_from_s3(source: Union[str, dict], region_name: str = "ap-south-1") -> bytes: | |
| """ | |
| Fetch a file's content from S3 given a source URL or dictionary with bucket and key. | |
| Args: | |
| source (Union[str, dict]): The source S3 URL or a dictionary with 'bucket_name' and 'file_key'. | |
| region_name (str): The AWS region name for the S3 client (default is 'ap-south-1'). | |
| Returns: | |
| bytes: The content of the file fetched from S3. | |
| """ | |
| print(f"Fetching file from S3. Source: {source}") | |
| s3_client = boto3.client("s3", region_name=region_name) | |
| # Parse the source depending on its type | |
| if isinstance(source, str): | |
| parsed_url = urlparse(source) | |
| bucket_name = parsed_url.netloc.split(".")[0] | |
| file_path = parsed_url.path.lstrip("/") | |
| elif isinstance(source, dict): | |
| bucket_name = source.get("bucket_name") | |
| file_path = source.get("file_key") | |
| if not bucket_name or not file_path: | |
| print("Dictionary input must contain 'bucket_name' and 'file_key'.") | |
| raise ValueError( | |
| "Dictionary input must contain 'bucket_name' and 'file_key'." | |
| ) | |
| else: | |
| print("Source must be a string URL or a dictionary.") | |
| raise ValueError("Source must be a string URL or a dictionary.") | |
| print(f"Attempting to download from bucket: {bucket_name}, path: {file_path}") | |
| try: | |
| response = s3_client.get_object(Bucket=bucket_name, Key=file_path) | |
| file_content = response["Body"].read() | |
| print(f"File fetched successfully from {bucket_name}/{file_path}") | |
| return file_content | |
| except Exception as e: | |
| print(f"Failed to fetch file from S3: {e}") | |
| raise | |
| def list_s3_objects(bucket_name: str, folder_path: str = "") -> list: | |
| """ | |
| Lists a content of the given a directory URL. | |
| Args: | |
| bucket_name (str): The name of the S3 bucket. | |
| folder_name (str): The folder path in the S3 bucket. | |
| Returns: | |
| list: The list of files found inside the given directory URL. | |
| """ | |
| response = S3_CLIENT.list_objects_v2(Bucket=bucket_name, Prefix=folder_path) | |
| # Check if the bucket contains objects | |
| objects = [] | |
| if "Contents" in response: | |
| for obj in response["Contents"]: | |
| objects.append(obj["Key"]) | |
| return objects | |