petrified commited on
Commit
00ed4ec
·
verified ·
1 Parent(s): a86480f

Upload azure_blob.py

Browse files
Files changed (1) hide show
  1. utils/azure_blob.py +233 -0
utils/azure_blob.py ADDED
@@ -0,0 +1,233 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from dotenv import load_dotenv
2
+ import os
3
+ from io import BytesIO
4
+ import pandas as pd
5
+ from azure.storage.blob import (
6
+ BlobServiceClient,
7
+ __version__,
8
+ )
9
+ import json
10
+ from azure.storage.blob import BlobServiceClient
11
+
12
+ class AzureBlob:
13
+ def __init__(self, str_connection):
14
+ self.str_connection = str_connection
15
+ self.blob_service_client = BlobServiceClient.from_connection_string(str_connection)
16
+
17
+ def get_container_client(self, str_container):
18
+ container_client = self.blob_service_client.get_container_client(str_container)
19
+ return container_client
20
+
21
+ def get_blob_client(self, str_container, str_blob):
22
+ blob_client = self.blob_service_client.get_blob_client(container=str_container, blob=str_blob)
23
+ return blob_client
24
+
25
+ def create_container(self, str_container):
26
+ container_client = self.get_container_client(str_container)
27
+ if not container_client.exists():
28
+ container_client.create_container()
29
+ print('Created container:', str_container)
30
+
31
+ def get_blob_list(self, str_container):
32
+ container_client = self.get_container_client(str_container)
33
+ return container_client.list_blobs()
34
+
35
+ def delete_blob(self, str_container, str_blob):
36
+ blob_client = self.get_blob_client(str_container, str_blob)
37
+ blob_client.delete_blob()
38
+
39
+ def copy_blob(self, str_container, str_new_container, str_blob):
40
+ original_blob_client = self.get_blob_client(str_container, str_blob)
41
+ new_blob_client = self.get_blob_client(str_new_container, str_blob)
42
+
43
+ copy_operation = new_blob_client.start_copy_from_url(original_blob_client.url)
44
+ if copy_operation['copy_status'] == "success":
45
+ print(f"Copied {str_blob} to {str_new_container}.")
46
+ else:
47
+ print(f"Failed to copy {str_blob}.")
48
+
49
+ def upload_file(self, str_container, str_blob, str_filename, str_filepath):
50
+ blob_client = self.get_blob_client(str_container, str_blob)
51
+
52
+ if blob_client.exists():
53
+ print(f'\nBlob already exists:\n\t{str_filename}')
54
+ pass
55
+ else:
56
+ print("\nUploading to Azure Storage as blob:\n\t" + str_filename)
57
+ with open(file=str_filepath, mode="rb") as data:
58
+ blob_client.upload_blob(data)
59
+
60
+ def upload_from_memory(self, str_container, str_blob, data):
61
+ blob_client = self.get_blob_client(str_container, str_blob)
62
+
63
+ if blob_client.exists():
64
+ print(f'\nBlob already exists:\n\t{str_blob}')
65
+ pass
66
+ else:
67
+ print("\nUploading to Azure Storage as blob:\n\t" + str_blob)
68
+ blob_client.upload_blob(data)
69
+
70
+ def overwrite_blob(self, str_container, str_blob, data):
71
+ blob_client = self.get_blob_client(str_container, str_blob)
72
+ blob_client.upload_blob(data, overwrite=True)
73
+
74
+ def overwrite_file(self, str_container, str_blob, str_filename, str_filepath):
75
+ blob_client = self.get_blob_client(str_container, str_blob)
76
+ with open(file=str_filepath, mode="rb") as data:
77
+ blob_client.upload_blob(data, overwrite=True)
78
+
79
+ def get_latest_parquet(self, str_container, str_client, str_view, str_table):
80
+ print("Start running get_latest_parquet")
81
+ container_client = self.get_container_client(str_container)
82
+
83
+ # List the blobs in the container
84
+ list_blob = container_client.list_blobs(name_starts_with=f"{str_client}/{str_view}/{str_table}")
85
+ if str_table == 'so_':
86
+ list_parquet_blob = [blob for blob in list_blob if blob.name.endswith('.parquet') and blob.name.startswith(f"{str_client}/{str_view}/so_") and not blob.name.startswith(f"{str_client}/{str_view}/so_week") and not blob.name.startswith(f"{str_client}/{str_view}/so_daily")]
87
+ elif str_table == 'stock_move_':
88
+ list_parquet_blob = [blob for blob in list_blob if blob.name.endswith('.parquet') and blob.name.startswith(f"{str_client}/{str_view}/stock_move_") and not blob.name.startswith(f"{str_client}/{str_view}/stock_move_line")]
89
+ elif str_table == 'product_':
90
+ list_parquet_blob = [blob for blob in list_blob if blob.name.endswith('.parquet') and blob.name.startswith(f"{str_client}/{str_view}/product_") and not blob.name.startswith(f"{str_client}/{str_view}/product_detail")]
91
+ else:
92
+ list_parquet_blob = [blob for blob in list_blob if blob.name.endswith('.parquet')]
93
+
94
+ # Sort and get the latest blob
95
+ sorted_blobs = sorted(list_parquet_blob, key=lambda b: b.name, reverse=True)
96
+ latest_blob = sorted_blobs[0]
97
+
98
+ # Download the blob contents into memory
99
+ blob_contents = BytesIO()
100
+ blob_client = container_client.get_blob_client(latest_blob.name)
101
+ blob_client.download_blob().readinto(blob_contents)
102
+ blob_contents.seek(0)
103
+
104
+ print(f"Found parquet file:\t{latest_blob.name}")
105
+ return blob_contents
106
+
107
+ def download_blob(self, str_container, str_blob, str_filepath):
108
+ blob_client = self.get_blob_client(str_container, str_blob)
109
+ with open(str_filepath, "wb") as my_blob:
110
+ blob_client.download_blob().readinto(my_blob)
111
+ print(f"Downloaded blob:\t{str_blob}")
112
+
113
+ def get_latest_parquet_date(self, str_container, str_client, str_view, str_table):
114
+ print("Start running get_latest_parquet")
115
+ container_client = self.get_container_client(str_container)
116
+
117
+ # List the blobs in the container
118
+ list_blob = container_client.list_blobs(name_starts_with=f"{str_client}/{str_view}/{str_table}")
119
+ if str_table == 'so_':
120
+ list_parquet_blob = [blob for blob in list_blob if blob.name.endswith('.parquet') and blob.name.startswith(f"{str_client}/{str_view}/so_") and not blob.name.startswith(f"{str_client}/{str_view}/so_week") and not blob.name.startswith(f"{str_client}/{str_view}/so_daily")]
121
+ else:
122
+ list_parquet_blob = [blob for blob in list_blob if blob.name.endswith('.parquet')]
123
+
124
+ # Sort and get the latest blob
125
+ sorted_blobs = sorted(list_parquet_blob, key=lambda b: b.name, reverse=True)
126
+ latest_blob = sorted_blobs[0]
127
+ latest_parquet_date = latest_blob.name.split('_')[-1].split('.')[0]
128
+ print(f"The latest parquet date is\t{latest_parquet_date}")
129
+ return latest_parquet_date
130
+
131
+ def get_latest_json_date(self, str_container, str_client, str_view, str_table):
132
+ container_client = self.get_container_client(str_container)
133
+ list_blob = container_client.list_blobs(name_starts_with=f"{str_client}/{str_view}/{str_table}")
134
+ list_parquet_blob = [blob for blob in list_blob if blob.name.endswith('.json')]
135
+ # Sort and get the latest blob
136
+ sorted_blobs = sorted(list_parquet_blob, key=lambda b: b.name, reverse=True)
137
+ latest_blob = sorted_blobs[0]
138
+
139
+ sorted_blobs = sorted(list_parquet_blob, key=lambda b: b.name, reverse=True)
140
+ latest_blob = sorted_blobs[0]
141
+ latest_json_date = latest_blob.name.split('_')[-1].split('.')[0]
142
+ print(f"The latest json date is\t{latest_json_date}")
143
+ return latest_json_date
144
+
145
+ def get_latest_json(self, str_container, str_client, str_view, str_table):
146
+ container_client = self.get_container_client(str_container)
147
+ list_blob = container_client.list_blobs(name_starts_with=f"{str_client}/{str_view}/{str_table}")
148
+ list_parquet_blob = [blob for blob in list_blob if blob.name.endswith('.json')]
149
+ # Sort and get the latest blob
150
+ sorted_blobs = sorted(list_parquet_blob, key=lambda b: b.name, reverse=True)
151
+ latest_blob = sorted_blobs[0]
152
+
153
+ # Download the blob contents into memory
154
+ blob_contents = BytesIO()
155
+ blob_client = container_client.get_blob_client(latest_blob.name)
156
+ blob_client.download_blob().readinto(blob_contents)
157
+ blob_contents.seek(0)
158
+
159
+ print(f"Found json file:\t{latest_blob.name}")
160
+ return blob_contents
161
+
162
+ def get_latest_csv(self, str_container, str_client, str_view, str_table):
163
+ container_client = self.get_container_client(str_container)
164
+ list_blob = container_client.list_blobs(name_starts_with=f"{str_client}/{str_view}/{str_table}")
165
+ list_parquet_blob = [blob for blob in list_blob if blob.name.endswith('.csv')]
166
+ # Sort and get the latest blob
167
+ sorted_blobs = sorted(list_parquet_blob, key=lambda b: b.name, reverse=True)
168
+ latest_blob = sorted_blobs[0]
169
+
170
+ # Download the blob contents into memory
171
+ blob_contents = BytesIO()
172
+ blob_client = container_client.get_blob_client(latest_blob.name)
173
+ blob_client.download_blob().readinto(blob_contents)
174
+ blob_contents.seek(0)
175
+
176
+ print(f"Found csv file:\t{latest_blob.name}")
177
+ return blob_contents
178
+
179
+ def get_latest_sql(self, str_container, str_client, str_view, str_table):
180
+ container_client = self.get_container_client(str_container)
181
+ list_blob = container_client.list_blobs(name_starts_with=f"{str_client}/{str_view}/{str_table}.sql")
182
+ list_parquet_blob = [blob for blob in list_blob]
183
+ # Sort and get the latest blob
184
+ sorted_blobs = sorted(list_parquet_blob, key=lambda b: b.name, reverse=True)
185
+ latest_blob = sorted_blobs[0]
186
+
187
+ # Download the blob contents into memory
188
+ blob_contents = BytesIO()
189
+ blob_client = container_client.get_blob_client(latest_blob.name)
190
+ blob_client.download_blob().readinto(blob_contents)
191
+ blob_contents.seek(0)
192
+
193
+ print(f"Found sql file:\t{latest_blob.name}")
194
+ return blob_contents
195
+
196
+ def get_analytic(self, str_container, str_view, str_table):
197
+ container_client = self.get_container_client(str_container)
198
+ list_blob = container_client.list_blobs(name_starts_with=f"{str_view}/{str_table}")
199
+
200
+ if str_table == 'membership_tag':
201
+ list_parquet_blob = [blob for blob in list_blob if blob.name.endswith('.json') and blob.name.startswith(f"{str_view}/membership_tag") and not blob.name.startswith(f"{str_view}/membership_tag_config_change_log")]
202
+ else:
203
+ list_parquet_blob = [blob for blob in list_blob if blob.name.endswith('.json')]
204
+
205
+ # Sort and get the latest blob
206
+ sorted_blobs = sorted(list_parquet_blob, key=lambda b: b.name, reverse=True)
207
+ latest_blob = sorted_blobs[0]
208
+
209
+ # Download the blob contents into memory
210
+ blob_contents = BytesIO()
211
+ blob_client = container_client.get_blob_client(latest_blob.name)
212
+ blob_client.download_blob().readinto(blob_contents)
213
+ blob_contents.seek(0)
214
+
215
+ print(f"Found parquet file:\t{latest_blob.name}")
216
+ return blob_contents
217
+
218
+ def get_last_modified(self, str_container, str_client, str_view, str_table, str_format):
219
+ container_client = self.get_container_client(str_container)
220
+ list_blob = container_client.list_blobs(name_starts_with=f"{str_client}/{str_view}/{str_table}")
221
+ list_required_blob = [blob for blob in list_blob if blob.name.endswith(f".{str_format}")]
222
+ # Sort and get the latest blob
223
+ sorted_blobs = sorted(list_required_blob, key=lambda b: b.name, reverse=True)
224
+ latest_blob = sorted_blobs[0]
225
+ last_modified = latest_blob.last_modified
226
+ print(f"Last modified date:\t{last_modified}")
227
+ return last_modified
228
+
229
+ def get_file_list(self, str_container, str_client, str_view, str_table, str_format):
230
+ container_client = self.get_container_client(str_container)
231
+ list_blob = container_client.list_blobs(name_starts_with=f"{str_client}/{str_view}/{str_table}")
232
+ list_required_blob = [blob for blob in list_blob if blob.name.endswith(f".{str_format}")]
233
+ return list_required_blob