import os from io import BytesIO import pandas as pd from azure.storage.blob import ( BlobServiceClient, __version__, ) import json from azure.storage.blob import BlobServiceClient class AzureBlob: def __init__(self, str_connection): self.str_connection = str_connection self.blob_service_client = BlobServiceClient.from_connection_string(str_connection) def get_container_client(self, str_container): container_client = self.blob_service_client.get_container_client(str_container) return container_client def get_blob_client(self, str_container, str_blob): blob_client = self.blob_service_client.get_blob_client(container=str_container, blob=str_blob) return blob_client def create_container(self, str_container): container_client = self.get_container_client(str_container) if not container_client.exists(): container_client.create_container() print('Created container:', str_container) def get_blob_list(self, str_container): container_client = self.get_container_client(str_container) return container_client.list_blobs() def delete_blob(self, str_container, str_blob): blob_client = self.get_blob_client(str_container, str_blob) blob_client.delete_blob() def copy_blob(self, str_container, str_new_container, str_blob): original_blob_client = self.get_blob_client(str_container, str_blob) new_blob_client = self.get_blob_client(str_new_container, str_blob) copy_operation = new_blob_client.start_copy_from_url(original_blob_client.url) if copy_operation['copy_status'] == "success": print(f"Copied {str_blob} to {str_new_container}.") else: print(f"Failed to copy {str_blob}.") def upload_file(self, str_container, str_blob, str_filename, str_filepath): blob_client = self.get_blob_client(str_container, str_blob) if blob_client.exists(): print(f'\nBlob already exists:\n\t{str_filename}') pass else: print("\nUploading to Azure Storage as blob:\n\t" + str_filename) with open(file=str_filepath, mode="rb") as data: blob_client.upload_blob(data) def upload_from_memory(self, str_container, str_blob, data): blob_client = self.get_blob_client(str_container, str_blob) if blob_client.exists(): print(f'\nBlob already exists:\n\t{str_blob}') pass else: print("\nUploading to Azure Storage as blob:\n\t" + str_blob) blob_client.upload_blob(data) def overwrite_blob(self, str_container, str_blob, data): blob_client = self.get_blob_client(str_container, str_blob) blob_client.upload_blob(data, overwrite=True) def overwrite_file(self, str_container, str_blob, str_filename, str_filepath): blob_client = self.get_blob_client(str_container, str_blob) with open(file=str_filepath, mode="rb") as data: blob_client.upload_blob(data, overwrite=True) def get_latest_parquet(self, str_container, str_client, str_view, str_table): print("Start running get_latest_parquet") container_client = self.get_container_client(str_container) # List the blobs in the container list_blob = container_client.list_blobs(name_starts_with=f"{str_client}/{str_view}/{str_table}") if str_table == 'so_': list_parquet_blob = [blob for blob in list_blob if blob.name.endswith('.parquet') and blob.name.startswith(f"{str_client}/{str_view}/so_") and not blob.name.startswith(f"{str_client}/{str_view}/so_week") and not blob.name.startswith(f"{str_client}/{str_view}/so_daily")] elif str_table == 'stock_move_': list_parquet_blob = [blob for blob in list_blob if blob.name.endswith('.parquet') and blob.name.startswith(f"{str_client}/{str_view}/stock_move_") and not blob.name.startswith(f"{str_client}/{str_view}/stock_move_line")] elif str_table == 'product_': list_parquet_blob = [blob for blob in list_blob if blob.name.endswith('.parquet') and blob.name.startswith(f"{str_client}/{str_view}/product_") and not blob.name.startswith(f"{str_client}/{str_view}/product_detail")] else: list_parquet_blob = [blob for blob in list_blob if blob.name.endswith('.parquet')] # Sort and get the latest blob sorted_blobs = sorted(list_parquet_blob, key=lambda b: b.name, reverse=True) latest_blob = sorted_blobs[0] # Download the blob contents into memory blob_contents = BytesIO() blob_client = container_client.get_blob_client(latest_blob.name) blob_client.download_blob().readinto(blob_contents) blob_contents.seek(0) print(f"Found parquet file:\t{latest_blob.name}") return blob_contents def download_blob(self, str_container, str_blob, str_filepath): blob_client = self.get_blob_client(str_container, str_blob) with open(str_filepath, "wb") as my_blob: blob_client.download_blob().readinto(my_blob) print(f"Downloaded blob:\t{str_blob}") def get_latest_parquet_date(self, str_container, str_client, str_view, str_table): print("Start running get_latest_parquet") container_client = self.get_container_client(str_container) # List the blobs in the container list_blob = container_client.list_blobs(name_starts_with=f"{str_client}/{str_view}/{str_table}") if str_table == 'so_': list_parquet_blob = [blob for blob in list_blob if blob.name.endswith('.parquet') and blob.name.startswith(f"{str_client}/{str_view}/so_") and not blob.name.startswith(f"{str_client}/{str_view}/so_week") and not blob.name.startswith(f"{str_client}/{str_view}/so_daily")] else: list_parquet_blob = [blob for blob in list_blob if blob.name.endswith('.parquet')] # Sort and get the latest blob sorted_blobs = sorted(list_parquet_blob, key=lambda b: b.name, reverse=True) latest_blob = sorted_blobs[0] latest_parquet_date = latest_blob.name.split('_')[-1].split('.')[0] print(f"The latest parquet date is\t{latest_parquet_date}") return latest_parquet_date def get_latest_json_date(self, str_container, str_client, str_view, str_table): container_client = self.get_container_client(str_container) list_blob = container_client.list_blobs(name_starts_with=f"{str_client}/{str_view}/{str_table}") list_parquet_blob = [blob for blob in list_blob if blob.name.endswith('.json')] # Sort and get the latest blob sorted_blobs = sorted(list_parquet_blob, key=lambda b: b.name, reverse=True) latest_blob = sorted_blobs[0] sorted_blobs = sorted(list_parquet_blob, key=lambda b: b.name, reverse=True) latest_blob = sorted_blobs[0] latest_json_date = latest_blob.name.split('_')[-1].split('.')[0] print(f"The latest json date is\t{latest_json_date}") return latest_json_date def get_latest_json(self, str_container, str_client, str_view, str_table): container_client = self.get_container_client(str_container) list_blob = container_client.list_blobs(name_starts_with=f"{str_client}/{str_view}/{str_table}") list_parquet_blob = [blob for blob in list_blob if blob.name.endswith('.json')] # Sort and get the latest blob sorted_blobs = sorted(list_parquet_blob, key=lambda b: b.name, reverse=True) latest_blob = sorted_blobs[0] # Download the blob contents into memory blob_contents = BytesIO() blob_client = container_client.get_blob_client(latest_blob.name) blob_client.download_blob().readinto(blob_contents) blob_contents.seek(0) print(f"Found json file:\t{latest_blob.name}") return blob_contents def get_latest_csv(self, str_container, str_client, str_view, str_table): container_client = self.get_container_client(str_container) list_blob = container_client.list_blobs(name_starts_with=f"{str_client}/{str_view}/{str_table}") list_parquet_blob = [blob for blob in list_blob if blob.name.endswith('.csv')] # Sort and get the latest blob sorted_blobs = sorted(list_parquet_blob, key=lambda b: b.name, reverse=True) latest_blob = sorted_blobs[0] # Download the blob contents into memory blob_contents = BytesIO() blob_client = container_client.get_blob_client(latest_blob.name) blob_client.download_blob().readinto(blob_contents) blob_contents.seek(0) print(f"Found csv file:\t{latest_blob.name}") return blob_contents def get_latest_sql(self, str_container, str_client, str_view, str_table): container_client = self.get_container_client(str_container) list_blob = container_client.list_blobs(name_starts_with=f"{str_client}/{str_view}/{str_table}.sql") list_parquet_blob = [blob for blob in list_blob] # Sort and get the latest blob sorted_blobs = sorted(list_parquet_blob, key=lambda b: b.name, reverse=True) latest_blob = sorted_blobs[0] # Download the blob contents into memory blob_contents = BytesIO() blob_client = container_client.get_blob_client(latest_blob.name) blob_client.download_blob().readinto(blob_contents) blob_contents.seek(0) print(f"Found sql file:\t{latest_blob.name}") return blob_contents def get_analytic(self, str_container, str_view, str_table): container_client = self.get_container_client(str_container) list_blob = container_client.list_blobs(name_starts_with=f"{str_view}/{str_table}") if str_table == 'membership_tag': list_parquet_blob = [blob for blob in list_blob if blob.name.endswith('.json') and blob.name.startswith(f"{str_view}/membership_tag") and not blob.name.startswith(f"{str_view}/membership_tag_config_change_log")] else: list_parquet_blob = [blob for blob in list_blob if blob.name.endswith('.json')] # Sort and get the latest blob sorted_blobs = sorted(list_parquet_blob, key=lambda b: b.name, reverse=True) latest_blob = sorted_blobs[0] # Download the blob contents into memory blob_contents = BytesIO() blob_client = container_client.get_blob_client(latest_blob.name) blob_client.download_blob().readinto(blob_contents) blob_contents.seek(0) print(f"Found parquet file:\t{latest_blob.name}") return blob_contents def get_last_modified(self, str_container, str_client, str_view, str_table, str_format): container_client = self.get_container_client(str_container) list_blob = container_client.list_blobs(name_starts_with=f"{str_client}/{str_view}/{str_table}") list_required_blob = [blob for blob in list_blob if blob.name.endswith(f".{str_format}")] # Sort and get the latest blob sorted_blobs = sorted(list_required_blob, key=lambda b: b.name, reverse=True) latest_blob = sorted_blobs[0] last_modified = latest_blob.last_modified print(f"Last modified date:\t{last_modified}") return last_modified def get_file_list(self, str_container, str_client, str_view, str_table, str_format): container_client = self.get_container_client(str_container) list_blob = container_client.list_blobs(name_starts_with=f"{str_client}/{str_view}/{str_table}") list_required_blob = [blob for blob in list_blob if blob.name.endswith(f".{str_format}")] return list_required_blob