Spaces:
Runtime error
Runtime error
File size: 3,396 Bytes
43de500 7359595 43de500 e8f5f18 43de500 e8f5f18 43de500 7359595 43de500 f0716b5 43de500 f0716b5 7359595 e8f5f18 7359595 ef80389 7359595 ef80389 43de500 e8f5f18 7359595 43de500 7359595 43de500 7359595 43de500 7359595 e8f5f18 7359595 43de500 e8f5f18 43de500 e8f5f18 43de500 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 | import boto3
import os
import tempfile
from utility import aws_access_key_id, aws_secret_access_key,terminal_print
s3 = boto3.client(
's3',
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key
)
# post or update a file to s3
@terminal_print
def upload_file(path, bucket, object_name=None):
"""Upload a file to an S3 bucket
:param path: path to file for upload
:param bucket: Bucket to upload to
:param object_name: S3 object name.
If not specified then file_name is used
:return: True if file was uploaded, else False
"""
file_name = os.path.basename(path)
if object_name is None:
object_name = file_name
try:
res = s3.upload_file(path, bucket, object_name)
except Exception as e:
print(e)
return False
return res
@terminal_print
def upload_fileobj(file_obj, bucket, object_name=None):
'''
Upload a file object to an S3 bucket
:param file_obj: File object to upload
:param bucket: Bucket to upload to
:param object_name: S3 object name.
If not specified then file_name is used
:return: True if file was uploaded, else False
'''
if object_name is None:
object_name = file_obj.name
try:
res = s3.upload_fileobj(file_obj, bucket, object_name)
except Exception as e:
print(e)
return e
return res
# get a file from s3
@terminal_print
def download_file(bucket, object_name, file_name=None):
"""Download a file from an S3 bucket
:param bucket: Bucket to download from
:param file_name: File to download
:param object_name: S3 object name.
If not specified then file_name is used
:return: True if file was downloaded, else False
"""
if file_name is None:
file_name = object_name
try:
s3.download_file(bucket, object_name, file_name)
except Exception as e:
print(e)
return False
return True
# download a file object from s3
@terminal_print
def download_fileobj(bucket, object_name, temp_obj=None):
'''
Download a file object from an S3 bucket
:param file_name: File to download
:param bucket: Bucket to download from
:param object_name: S3 object name.
If not specified then file_name is used
:return: True if file was downloaded, else False
'''
if temp_obj is None:
temp_obj = tempfile.TemporaryFile()
try:
s3.download_fileobj(bucket, object_name, temp_obj)
except Exception as e:
print(e)
return False
return temp_obj
# delete a file from s3
@terminal_print
def delete_file(bucket, object_name):
"""Delete a file from an S3 bucket
:param bucket: Bucket to delete from
:param object_name: S3 object name.
:return: True if file was deleted, else False
"""
try:
response = s3.delete_object(Bucket=bucket, Key=object_name)
except Exception as e:
print(e)
return False
return response
# list all files in a bucket
@terminal_print
def list_files(bucket):
"""List files in a bucket
:param bucket: Bucket to list from
:return: List of files in bucket
"""
try:
response = s3.list_objects_v2(Bucket=bucket)
except Exception as e:
print(e)
return False
return response['Contents'] |