Spaces:
Sleeping
Sleeping
| import os | |
| import boto3 | |
| import json | |
| import pickle | |
| class S3Client: | |
| def __init__(self, personaBuilderRequestId, folder_name): | |
| # personaBuilderRequestId: an integer id from the builderRequests table | |
| # folder_name: the directory path to the required files or the relevant module | |
| # when working on the builder, directory path is 'builder' | |
| # if there are multiple inner directories, path is 'builder/v1/other_directories' | |
| self.persona_builder_request_id = personaBuilderRequestId | |
| self.folder_name = folder_name | |
| self.path_string = str(self.persona_builder_request_id) + '/' + folder_name | |
| self.AWS_ACCESS_KEY_ID = os.getenv('AWS_ACCESS_KEY_ID') | |
| self.AWS_SECRET_ACCESS_KEY = os.getenv('AWS_SECRET_ACCESS_KEY') | |
| self.AWS_BUCKET_NAME = os.getenv('AWS_BUCKET_NAME') | |
| self.AWS_REGION = os.getenv('AWS_REGION') | |
| self.s3_client = boto3.client( | |
| service_name='s3', | |
| region_name=self.AWS_REGION, | |
| aws_access_key_id=self.AWS_ACCESS_KEY_ID, | |
| aws_secret_access_key=self.AWS_SECRET_ACCESS_KEY | |
| ) | |
| self.s3_session = boto3.Session( | |
| region_name=self.AWS_REGION, | |
| aws_access_key_id=self.AWS_ACCESS_KEY_ID, | |
| aws_secret_access_key=self.AWS_SECRET_ACCESS_KEY | |
| ) | |
| """ This function is for uploading json data to corresponding folder """ | |
| def upload_file(self, file_name, persona_data): | |
| path_to_file = self.path_string + '/' + file_name | |
| self.s3_client.put_object(Bucket=self.AWS_BUCKET_NAME, Key=path_to_file, Body=persona_data) | |
| def upload_file_json(self, file_name, persona_data): | |
| if '.json' not in file_name: | |
| file_name = file_name + '.json' | |
| persona_json = json.dumps(persona_data) | |
| self.upload_file(file_name, persona_json) | |
| def upload_file_pickle(self, file_name, persona_data): | |
| if '.pkl' not in file_name: | |
| file_name = file_name + '.pkl' | |
| persona_pickle = pickle.dumps(persona_data) | |
| self.upload_file(file_name, persona_pickle) | |
| """ This function is checking file exists or not in S3 """ | |
| def check_file_exists(self, file_name, bucket_name='rad-persona'): | |
| try: | |
| path_to_file = self.path_string + '/' + file_name | |
| print("Refer path ==>", path_to_file) | |
| self.s3_client.head_object(Bucket=bucket_name, Key=path_to_file) | |
| print("Path found in S3 buckets, using cache to load data...") | |
| return True | |
| except Exception as e: | |
| print(e) | |
| print("Path not found in S3 buckets, running the required scripts to generate data...") | |
| return False | |
| """ This function is for downloading json content of selected object """ | |
| def download_file(self, file_name,bucket_name='rad-persona'): | |
| file_exists_flag = self.check_file_exists(file_name) | |
| if file_exists_flag == True: | |
| resource = self.s3_session.resource('s3') | |
| path_to_file = self.path_string + '/' + file_name | |
| obj = resource.Object(bucket_name, path_to_file) | |
| file_content = obj.get()['Body'].read() | |
| return file_exists_flag, file_content | |
| else: | |
| return file_exists_flag, '' | |
| def download_file_json(self, file_name): | |
| flag, file_data = self.download_file(file_name) | |
| if flag == False: | |
| return flag, file_data | |
| else: | |
| file_content = json.loads(file_data.decode('utf-8')) | |
| return flag, file_content | |
| def download_file_pickle(self, file_name): | |
| flag, file_data = self.download_file(file_name) | |
| if flag == False: | |
| return flag, file_data | |
| else: | |
| file_content = pickle.loads(file_data) | |
| return flag, file_content | |
| """ This function is for deleting single file in S3 """ | |
| def delete_file(self, file_name): | |
| file_exists_flag = self.check_file_exists(file_name) | |
| if file_exists_flag == True: | |
| path_to_file = self.path_string + '/' + file_name | |
| self.s3_client.delete_object(Bucket=self.AWS_BUCKET_NAME, Key=path_to_file) | |
| """ This function is showing all files for persona located in S3 """ | |
| def list_of_folders_files(self): | |
| response = self.s3_client.list_objects_v2(Bucket=self.AWS_BUCKET_NAME, Prefix=str(self.persona_builder_request_id)) | |
| files = response.get("Contents") | |
| array = [] | |
| for file in files: | |
| array.append({"Key": file["Key"]}) | |
| return array | |
| if __name__ == "__main__": | |
| # create object with personaBuilderRequestId and directory name (static value) | |
| s3_conn_test = S3Client(628, 'gpt_personas') | |
| # to check if a file exists in the directory | |
| data_exist = s3_conn_test.check_file_exists('result.json') | |
| # to download a file from the directory, if it exists | |
| data_flag, data_pkl = s3_conn_test.download_file_pickle('steps/1.pkl') | |
| # to upload a file to the directory | |
| data_object = {} | |
| s3_conn_test.upload_file_pickle('result.json', data_object) | |
| print(data_exist) |