simple_visual / utils /azure_blob.py
petrified's picture
Update utils/azure_blob.py
e1e83dc verified
import os
from io import BytesIO
import pandas as pd
from azure.storage.blob import (
BlobServiceClient,
__version__,
)
import json
from azure.storage.blob import BlobServiceClient
class AzureBlob:
def __init__(self, str_connection):
self.str_connection = str_connection
self.blob_service_client = BlobServiceClient.from_connection_string(str_connection)
def get_container_client(self, str_container):
container_client = self.blob_service_client.get_container_client(str_container)
return container_client
def get_blob_client(self, str_container, str_blob):
blob_client = self.blob_service_client.get_blob_client(container=str_container, blob=str_blob)
return blob_client
def create_container(self, str_container):
container_client = self.get_container_client(str_container)
if not container_client.exists():
container_client.create_container()
print('Created container:', str_container)
def get_blob_list(self, str_container):
container_client = self.get_container_client(str_container)
return container_client.list_blobs()
def delete_blob(self, str_container, str_blob):
blob_client = self.get_blob_client(str_container, str_blob)
blob_client.delete_blob()
def copy_blob(self, str_container, str_new_container, str_blob):
original_blob_client = self.get_blob_client(str_container, str_blob)
new_blob_client = self.get_blob_client(str_new_container, str_blob)
copy_operation = new_blob_client.start_copy_from_url(original_blob_client.url)
if copy_operation['copy_status'] == "success":
print(f"Copied {str_blob} to {str_new_container}.")
else:
print(f"Failed to copy {str_blob}.")
def upload_file(self, str_container, str_blob, str_filename, str_filepath):
blob_client = self.get_blob_client(str_container, str_blob)
if blob_client.exists():
print(f'\nBlob already exists:\n\t{str_filename}')
pass
else:
print("\nUploading to Azure Storage as blob:\n\t" + str_filename)
with open(file=str_filepath, mode="rb") as data:
blob_client.upload_blob(data)
def upload_from_memory(self, str_container, str_blob, data):
blob_client = self.get_blob_client(str_container, str_blob)
if blob_client.exists():
print(f'\nBlob already exists:\n\t{str_blob}')
pass
else:
print("\nUploading to Azure Storage as blob:\n\t" + str_blob)
blob_client.upload_blob(data)
def overwrite_blob(self, str_container, str_blob, data):
blob_client = self.get_blob_client(str_container, str_blob)
blob_client.upload_blob(data, overwrite=True)
def overwrite_file(self, str_container, str_blob, str_filename, str_filepath):
blob_client = self.get_blob_client(str_container, str_blob)
with open(file=str_filepath, mode="rb") as data:
blob_client.upload_blob(data, overwrite=True)
def get_latest_parquet(self, str_container, str_client, str_view, str_table):
print("Start running get_latest_parquet")
container_client = self.get_container_client(str_container)
# List the blobs in the container
list_blob = container_client.list_blobs(name_starts_with=f"{str_client}/{str_view}/{str_table}")
if str_table == 'so_':
list_parquet_blob = [blob for blob in list_blob if blob.name.endswith('.parquet') and blob.name.startswith(f"{str_client}/{str_view}/so_") and not blob.name.startswith(f"{str_client}/{str_view}/so_week") and not blob.name.startswith(f"{str_client}/{str_view}/so_daily")]
elif str_table == 'stock_move_':
list_parquet_blob = [blob for blob in list_blob if blob.name.endswith('.parquet') and blob.name.startswith(f"{str_client}/{str_view}/stock_move_") and not blob.name.startswith(f"{str_client}/{str_view}/stock_move_line")]
elif str_table == 'product_':
list_parquet_blob = [blob for blob in list_blob if blob.name.endswith('.parquet') and blob.name.startswith(f"{str_client}/{str_view}/product_") and not blob.name.startswith(f"{str_client}/{str_view}/product_detail")]
else:
list_parquet_blob = [blob for blob in list_blob if blob.name.endswith('.parquet')]
# Sort and get the latest blob
sorted_blobs = sorted(list_parquet_blob, key=lambda b: b.name, reverse=True)
latest_blob = sorted_blobs[0]
# Download the blob contents into memory
blob_contents = BytesIO()
blob_client = container_client.get_blob_client(latest_blob.name)
blob_client.download_blob().readinto(blob_contents)
blob_contents.seek(0)
print(f"Found parquet file:\t{latest_blob.name}")
return blob_contents
def download_blob(self, str_container, str_blob, str_filepath):
blob_client = self.get_blob_client(str_container, str_blob)
with open(str_filepath, "wb") as my_blob:
blob_client.download_blob().readinto(my_blob)
print(f"Downloaded blob:\t{str_blob}")
def get_latest_parquet_date(self, str_container, str_client, str_view, str_table):
print("Start running get_latest_parquet")
container_client = self.get_container_client(str_container)
# List the blobs in the container
list_blob = container_client.list_blobs(name_starts_with=f"{str_client}/{str_view}/{str_table}")
if str_table == 'so_':
list_parquet_blob = [blob for blob in list_blob if blob.name.endswith('.parquet') and blob.name.startswith(f"{str_client}/{str_view}/so_") and not blob.name.startswith(f"{str_client}/{str_view}/so_week") and not blob.name.startswith(f"{str_client}/{str_view}/so_daily")]
else:
list_parquet_blob = [blob for blob in list_blob if blob.name.endswith('.parquet')]
# Sort and get the latest blob
sorted_blobs = sorted(list_parquet_blob, key=lambda b: b.name, reverse=True)
latest_blob = sorted_blobs[0]
latest_parquet_date = latest_blob.name.split('_')[-1].split('.')[0]
print(f"The latest parquet date is\t{latest_parquet_date}")
return latest_parquet_date
def get_latest_json_date(self, str_container, str_client, str_view, str_table):
container_client = self.get_container_client(str_container)
list_blob = container_client.list_blobs(name_starts_with=f"{str_client}/{str_view}/{str_table}")
list_parquet_blob = [blob for blob in list_blob if blob.name.endswith('.json')]
# Sort and get the latest blob
sorted_blobs = sorted(list_parquet_blob, key=lambda b: b.name, reverse=True)
latest_blob = sorted_blobs[0]
sorted_blobs = sorted(list_parquet_blob, key=lambda b: b.name, reverse=True)
latest_blob = sorted_blobs[0]
latest_json_date = latest_blob.name.split('_')[-1].split('.')[0]
print(f"The latest json date is\t{latest_json_date}")
return latest_json_date
def get_latest_json(self, str_container, str_client, str_view, str_table):
container_client = self.get_container_client(str_container)
list_blob = container_client.list_blobs(name_starts_with=f"{str_client}/{str_view}/{str_table}")
list_parquet_blob = [blob for blob in list_blob if blob.name.endswith('.json')]
# Sort and get the latest blob
sorted_blobs = sorted(list_parquet_blob, key=lambda b: b.name, reverse=True)
latest_blob = sorted_blobs[0]
# Download the blob contents into memory
blob_contents = BytesIO()
blob_client = container_client.get_blob_client(latest_blob.name)
blob_client.download_blob().readinto(blob_contents)
blob_contents.seek(0)
print(f"Found json file:\t{latest_blob.name}")
return blob_contents
def get_latest_csv(self, str_container, str_client, str_view, str_table):
container_client = self.get_container_client(str_container)
list_blob = container_client.list_blobs(name_starts_with=f"{str_client}/{str_view}/{str_table}")
list_parquet_blob = [blob for blob in list_blob if blob.name.endswith('.csv')]
# Sort and get the latest blob
sorted_blobs = sorted(list_parquet_blob, key=lambda b: b.name, reverse=True)
latest_blob = sorted_blobs[0]
# Download the blob contents into memory
blob_contents = BytesIO()
blob_client = container_client.get_blob_client(latest_blob.name)
blob_client.download_blob().readinto(blob_contents)
blob_contents.seek(0)
print(f"Found csv file:\t{latest_blob.name}")
return blob_contents
def get_latest_sql(self, str_container, str_client, str_view, str_table):
container_client = self.get_container_client(str_container)
list_blob = container_client.list_blobs(name_starts_with=f"{str_client}/{str_view}/{str_table}.sql")
list_parquet_blob = [blob for blob in list_blob]
# Sort and get the latest blob
sorted_blobs = sorted(list_parquet_blob, key=lambda b: b.name, reverse=True)
latest_blob = sorted_blobs[0]
# Download the blob contents into memory
blob_contents = BytesIO()
blob_client = container_client.get_blob_client(latest_blob.name)
blob_client.download_blob().readinto(blob_contents)
blob_contents.seek(0)
print(f"Found sql file:\t{latest_blob.name}")
return blob_contents
def get_analytic(self, str_container, str_view, str_table):
container_client = self.get_container_client(str_container)
list_blob = container_client.list_blobs(name_starts_with=f"{str_view}/{str_table}")
if str_table == 'membership_tag':
list_parquet_blob = [blob for blob in list_blob if blob.name.endswith('.json') and blob.name.startswith(f"{str_view}/membership_tag") and not blob.name.startswith(f"{str_view}/membership_tag_config_change_log")]
else:
list_parquet_blob = [blob for blob in list_blob if blob.name.endswith('.json')]
# Sort and get the latest blob
sorted_blobs = sorted(list_parquet_blob, key=lambda b: b.name, reverse=True)
latest_blob = sorted_blobs[0]
# Download the blob contents into memory
blob_contents = BytesIO()
blob_client = container_client.get_blob_client(latest_blob.name)
blob_client.download_blob().readinto(blob_contents)
blob_contents.seek(0)
print(f"Found parquet file:\t{latest_blob.name}")
return blob_contents
def get_last_modified(self, str_container, str_client, str_view, str_table, str_format):
container_client = self.get_container_client(str_container)
list_blob = container_client.list_blobs(name_starts_with=f"{str_client}/{str_view}/{str_table}")
list_required_blob = [blob for blob in list_blob if blob.name.endswith(f".{str_format}")]
# Sort and get the latest blob
sorted_blobs = sorted(list_required_blob, key=lambda b: b.name, reverse=True)
latest_blob = sorted_blobs[0]
last_modified = latest_blob.last_modified
print(f"Last modified date:\t{last_modified}")
return last_modified
def get_file_list(self, str_container, str_client, str_view, str_table, str_format):
container_client = self.get_container_client(str_container)
list_blob = container_client.list_blobs(name_starts_with=f"{str_client}/{str_view}/{str_table}")
list_required_blob = [blob for blob in list_blob if blob.name.endswith(f".{str_format}")]
return list_required_blob