File size: 11,796 Bytes
00ed4ec
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
import os
from io import BytesIO
import pandas as pd
from azure.storage.blob import (
    BlobServiceClient,
    __version__,
)
import json
from azure.storage.blob import BlobServiceClient

class AzureBlob:
    def __init__(self, str_connection):
        self.str_connection = str_connection
        self.blob_service_client = BlobServiceClient.from_connection_string(str_connection)

    def get_container_client(self, str_container):
        container_client = self.blob_service_client.get_container_client(str_container)
        return container_client

    def get_blob_client(self, str_container, str_blob):
        blob_client = self.blob_service_client.get_blob_client(container=str_container, blob=str_blob)
        return blob_client

    def create_container(self, str_container):
        container_client = self.get_container_client(str_container)
        if not container_client.exists():
            container_client.create_container()
            print('Created container:', str_container)

    def get_blob_list(self, str_container):
        container_client = self.get_container_client(str_container)
        return container_client.list_blobs()

    def delete_blob(self, str_container, str_blob):
        blob_client = self.get_blob_client(str_container, str_blob)
        blob_client.delete_blob()

    def copy_blob(self, str_container, str_new_container, str_blob):
        original_blob_client = self.get_blob_client(str_container, str_blob)
        new_blob_client = self.get_blob_client(str_new_container, str_blob)

        copy_operation = new_blob_client.start_copy_from_url(original_blob_client.url)
        if copy_operation['copy_status'] == "success":
            print(f"Copied {str_blob} to {str_new_container}.")
        else:
            print(f"Failed to copy {str_blob}.")

    def upload_file(self, str_container, str_blob, str_filename, str_filepath):
        blob_client = self.get_blob_client(str_container, str_blob)

        if blob_client.exists():
            print(f'\nBlob already exists:\n\t{str_filename}')
            pass
        else:
            print("\nUploading to Azure Storage as blob:\n\t" + str_filename)
            with open(file=str_filepath, mode="rb") as data:
                blob_client.upload_blob(data)

    def upload_from_memory(self, str_container, str_blob, data):
        blob_client = self.get_blob_client(str_container, str_blob)

        if blob_client.exists():
            print(f'\nBlob already exists:\n\t{str_blob}')
            pass
        else:
            print("\nUploading to Azure Storage as blob:\n\t" + str_blob)
            blob_client.upload_blob(data)

    def overwrite_blob(self, str_container, str_blob, data):
        blob_client = self.get_blob_client(str_container, str_blob)
        blob_client.upload_blob(data, overwrite=True)

    def overwrite_file(self, str_container, str_blob, str_filename, str_filepath):
        blob_client = self.get_blob_client(str_container, str_blob)
        with open(file=str_filepath, mode="rb") as data:
            blob_client.upload_blob(data, overwrite=True)

    def get_latest_parquet(self, str_container, str_client, str_view, str_table):
        print("Start running get_latest_parquet")
        container_client = self.get_container_client(str_container)

        # List the blobs in the container
        list_blob = container_client.list_blobs(name_starts_with=f"{str_client}/{str_view}/{str_table}")
        if str_table == 'so_':
                list_parquet_blob = [blob for blob in list_blob if blob.name.endswith('.parquet') and blob.name.startswith(f"{str_client}/{str_view}/so_") and not blob.name.startswith(f"{str_client}/{str_view}/so_week") and not blob.name.startswith(f"{str_client}/{str_view}/so_daily")]
        elif str_table == 'stock_move_':
                list_parquet_blob = [blob for blob in list_blob if blob.name.endswith('.parquet') and blob.name.startswith(f"{str_client}/{str_view}/stock_move_") and not blob.name.startswith(f"{str_client}/{str_view}/stock_move_line")]
        elif str_table == 'product_':
                list_parquet_blob = [blob for blob in list_blob if blob.name.endswith('.parquet') and blob.name.startswith(f"{str_client}/{str_view}/product_") and not blob.name.startswith(f"{str_client}/{str_view}/product_detail")]
        else:
            list_parquet_blob = [blob for blob in list_blob if blob.name.endswith('.parquet')]

        # Sort and get the latest blob
        sorted_blobs = sorted(list_parquet_blob, key=lambda b: b.name, reverse=True)
        latest_blob = sorted_blobs[0]

        # Download the blob contents into memory
        blob_contents = BytesIO()
        blob_client = container_client.get_blob_client(latest_blob.name)
        blob_client.download_blob().readinto(blob_contents)
        blob_contents.seek(0)

        print(f"Found parquet file:\t{latest_blob.name}")
        return blob_contents
    
    def download_blob(self, str_container, str_blob, str_filepath):
        blob_client = self.get_blob_client(str_container, str_blob)
        with open(str_filepath, "wb") as my_blob:
            blob_client.download_blob().readinto(my_blob)
        print(f"Downloaded blob:\t{str_blob}")
    
    def get_latest_parquet_date(self, str_container, str_client, str_view, str_table):
        print("Start running get_latest_parquet")
        container_client = self.get_container_client(str_container)

        # List the blobs in the container
        list_blob = container_client.list_blobs(name_starts_with=f"{str_client}/{str_view}/{str_table}")
        if str_table == 'so_':
            list_parquet_blob = [blob for blob in list_blob if blob.name.endswith('.parquet') and blob.name.startswith(f"{str_client}/{str_view}/so_") and not blob.name.startswith(f"{str_client}/{str_view}/so_week") and not blob.name.startswith(f"{str_client}/{str_view}/so_daily")]
        else:
            list_parquet_blob = [blob for blob in list_blob if blob.name.endswith('.parquet')]

        # Sort and get the latest blob
        sorted_blobs = sorted(list_parquet_blob, key=lambda b: b.name, reverse=True)
        latest_blob = sorted_blobs[0]
        latest_parquet_date = latest_blob.name.split('_')[-1].split('.')[0]
        print(f"The latest parquet date is\t{latest_parquet_date}")
        return latest_parquet_date
    
    def get_latest_json_date(self, str_container, str_client, str_view, str_table):
        container_client = self.get_container_client(str_container)
        list_blob = container_client.list_blobs(name_starts_with=f"{str_client}/{str_view}/{str_table}")
        list_parquet_blob = [blob for blob in list_blob if blob.name.endswith('.json')]
        # Sort and get the latest blob
        sorted_blobs = sorted(list_parquet_blob, key=lambda b: b.name, reverse=True)
        latest_blob = sorted_blobs[0]

        sorted_blobs = sorted(list_parquet_blob, key=lambda b: b.name, reverse=True)
        latest_blob = sorted_blobs[0]
        latest_json_date = latest_blob.name.split('_')[-1].split('.')[0]
        print(f"The latest json date is\t{latest_json_date}")
        return latest_json_date
            
    def get_latest_json(self, str_container, str_client, str_view, str_table):
        container_client = self.get_container_client(str_container)
        list_blob = container_client.list_blobs(name_starts_with=f"{str_client}/{str_view}/{str_table}")
        list_parquet_blob = [blob for blob in list_blob if blob.name.endswith('.json')]
        # Sort and get the latest blob
        sorted_blobs = sorted(list_parquet_blob, key=lambda b: b.name, reverse=True)
        latest_blob = sorted_blobs[0]

        # Download the blob contents into memory
        blob_contents = BytesIO()
        blob_client = container_client.get_blob_client(latest_blob.name)
        blob_client.download_blob().readinto(blob_contents)
        blob_contents.seek(0)

        print(f"Found json file:\t{latest_blob.name}")
        return blob_contents

    def get_latest_csv(self, str_container, str_client, str_view, str_table):
        container_client = self.get_container_client(str_container)
        list_blob = container_client.list_blobs(name_starts_with=f"{str_client}/{str_view}/{str_table}")
        list_parquet_blob = [blob for blob in list_blob if blob.name.endswith('.csv')]
        # Sort and get the latest blob
        sorted_blobs = sorted(list_parquet_blob, key=lambda b: b.name, reverse=True)
        latest_blob = sorted_blobs[0]

        # Download the blob contents into memory
        blob_contents = BytesIO()
        blob_client = container_client.get_blob_client(latest_blob.name)
        blob_client.download_blob().readinto(blob_contents)
        blob_contents.seek(0)

        print(f"Found csv file:\t{latest_blob.name}")
        return blob_contents

    def get_latest_sql(self, str_container, str_client, str_view, str_table):
        container_client = self.get_container_client(str_container)
        list_blob = container_client.list_blobs(name_starts_with=f"{str_client}/{str_view}/{str_table}.sql")
        list_parquet_blob = [blob for blob in list_blob]
        # Sort and get the latest blob
        sorted_blobs = sorted(list_parquet_blob, key=lambda b: b.name, reverse=True)
        latest_blob = sorted_blobs[0]

        # Download the blob contents into memory
        blob_contents = BytesIO()
        blob_client = container_client.get_blob_client(latest_blob.name)
        blob_client.download_blob().readinto(blob_contents)
        blob_contents.seek(0)

        print(f"Found sql file:\t{latest_blob.name}")
        return blob_contents
            
    def get_analytic(self, str_container, str_view, str_table):
        container_client = self.get_container_client(str_container)
        list_blob = container_client.list_blobs(name_starts_with=f"{str_view}/{str_table}")

        if str_table == 'membership_tag':
            list_parquet_blob = [blob for blob in list_blob if blob.name.endswith('.json') and blob.name.startswith(f"{str_view}/membership_tag") and not blob.name.startswith(f"{str_view}/membership_tag_config_change_log")]
        else:
            list_parquet_blob = [blob for blob in list_blob if blob.name.endswith('.json')]

        # Sort and get the latest blob
        sorted_blobs = sorted(list_parquet_blob, key=lambda b: b.name, reverse=True)
        latest_blob = sorted_blobs[0]

        # Download the blob contents into memory
        blob_contents = BytesIO()
        blob_client = container_client.get_blob_client(latest_blob.name)
        blob_client.download_blob().readinto(blob_contents)
        blob_contents.seek(0)

        print(f"Found parquet file:\t{latest_blob.name}")
        return blob_contents

    def get_last_modified(self, str_container, str_client, str_view, str_table, str_format):
        container_client = self.get_container_client(str_container)
        list_blob = container_client.list_blobs(name_starts_with=f"{str_client}/{str_view}/{str_table}")
        list_required_blob = [blob for blob in list_blob if blob.name.endswith(f".{str_format}")]
        # Sort and get the latest blob
        sorted_blobs = sorted(list_required_blob, key=lambda b: b.name, reverse=True)
        latest_blob = sorted_blobs[0]
        last_modified = latest_blob.last_modified
        print(f"Last modified date:\t{last_modified}")
        return last_modified
    
    def get_file_list(self, str_container, str_client, str_view, str_table, str_format):
        container_client = self.get_container_client(str_container)
        list_blob = container_client.list_blobs(name_starts_with=f"{str_client}/{str_view}/{str_table}")
        list_required_blob = [blob for blob in list_blob if blob.name.endswith(f".{str_format}")]
        return list_required_blob