iBrokeTheCode commited on
Commit
f0142a5
·
1 Parent(s): b10cac9

chore: Implement transform functions

Browse files
Files changed (1) hide show
  1. src/transform.py +267 -0
src/transform.py CHANGED
@@ -0,0 +1,267 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from collections import namedtuple
2
+ from enum import Enum
3
+ from typing import Callable
4
+
5
+ from pandas import DataFrame, merge, read_sql, to_datetime
6
+ from sqlalchemy import Engine, TextClause, text
7
+
8
+ from src.config import QUERIES_ROOT_PATH
9
+
10
+ QueryResult = namedtuple("QueryResult", ["query", "result"])
11
+
12
+
13
+ class QueryEnum(Enum):
14
+ """Enumerates all the queries"""
15
+
16
+ DELIVERY_DATE_DIFFERENCE = "delivery_date_difference"
17
+ GLOBAL_AMOUNT_ORDER_STATUS = "global_amount_order_status"
18
+ REVENUE_BY_MONTH_YEAR = "revenue_by_month_year"
19
+ REVENUE_PER_STATE = "revenue_per_state"
20
+ TOP_10_LEAST_REVENUE_CATEGORIES = "top_10_least_revenue_categories"
21
+ TOP_10_REVENUE_CATEGORIES = "top_10_revenue_categories"
22
+ REAL_VS_ESTIMATED_DELIVERED_TIME = "real_vs_estimated_delivered_time"
23
+ ORDERS_PER_DAY_AND_HOLIDAYS_2017 = "orders_per_day_and_holidays_2017"
24
+ GET_FREIGHT_VALUE_WEIGHT_RELATIONSHIP = "get_freight_value_weight_relationship"
25
+
26
+
27
+ def read_query(query_name: str) -> TextClause:
28
+ """
29
+ Reads the query from the file and returns it as a string
30
+
31
+ Args:
32
+ query_name (str): The name of the query
33
+
34
+ Returns:
35
+ TextClause: The query
36
+ """
37
+ with open("{}/{}.sql".format(QUERIES_ROOT_PATH, query_name), "r") as file:
38
+ sql_file = file.read()
39
+ sql = text(sql_file)
40
+ return sql
41
+
42
+
43
+ def query_delivery_date_difference(database: Engine) -> QueryResult:
44
+ """
45
+ Get the query for the delivery date difference
46
+
47
+ Args:
48
+ database (Engine): The database to get the data from
49
+
50
+ Returns:
51
+ QueryResult: The query and the result
52
+ """
53
+ query_name = QueryEnum.DELIVERY_DATE_DIFFERENCE.value
54
+ query = read_query(QueryEnum.DELIVERY_DATE_DIFFERENCE.value)
55
+
56
+ return QueryResult(query=query_name, result=read_sql(query, database))
57
+
58
+
59
+ def query_global_amount_order_status(database: Engine) -> QueryResult:
60
+ """
61
+ Get the query for the global amount of order status
62
+
63
+ Args:
64
+ database (Engine): The database to get the data from
65
+
66
+ Returns:
67
+ QueryResult: The query and the result
68
+ """
69
+ query_name = QueryEnum.GLOBAL_AMOUNT_ORDER_STATUS.value
70
+ query = read_query(QueryEnum.GLOBAL_AMOUNT_ORDER_STATUS.value)
71
+
72
+ return QueryResult(query=query_name, result=read_sql(query, database))
73
+
74
+
75
+ def query_revenue_by_month_year(database: Engine) -> QueryResult:
76
+ """
77
+ Get the query for the revenue by month and year
78
+
79
+ Args:
80
+ database (Engine): The database to get the data from
81
+
82
+ Returns:
83
+ QueryResult: The query and the result
84
+ """
85
+ query_name = QueryEnum.REVENUE_BY_MONTH_YEAR.value
86
+ query = read_query(QueryEnum.REVENUE_BY_MONTH_YEAR.value)
87
+
88
+ return QueryResult(query=query_name, result=read_sql(query, database))
89
+
90
+
91
+ def query_revenue_per_state(database: Engine) -> QueryResult:
92
+ """
93
+ Get the query for the revenue per state
94
+
95
+ Args:
96
+ database (Engine): The database to get the data from
97
+
98
+ Returns:
99
+ QueryResult: The query and the result
100
+ """
101
+ query_name = QueryEnum.REVENUE_PER_STATE.value
102
+ query = read_query(QueryEnum.REVENUE_PER_STATE.value)
103
+
104
+ return QueryResult(query=query_name, result=read_sql(query, database))
105
+
106
+
107
+ def query_top_10_least_revenue_categories(database: Engine) -> QueryResult:
108
+ """
109
+ Get the query for the top 10 least revenue categories
110
+
111
+ Args:
112
+ database (Engine): The database to get the data from
113
+
114
+ Returns:
115
+ QueryResult: The query and the result
116
+ """
117
+ query_name = QueryEnum.TOP_10_LEAST_REVENUE_CATEGORIES.value
118
+ query = read_query(QueryEnum.TOP_10_LEAST_REVENUE_CATEGORIES.value)
119
+
120
+ return QueryResult(query=query_name, result=read_sql(query, database))
121
+
122
+
123
+ def query_top_10_revenue_categories(database: Engine) -> QueryResult:
124
+ """
125
+ Get the query for the top 10 revenue categories
126
+
127
+ Args:
128
+ database (Engine): The database to get the data from
129
+
130
+ Returns:
131
+ QueryResult: The query and the result
132
+ """
133
+ query_name = QueryEnum.TOP_10_REVENUE_CATEGORIES.value
134
+ query = read_query(QueryEnum.TOP_10_REVENUE_CATEGORIES.value)
135
+
136
+ return QueryResult(query=query_name, result=read_sql(query, database))
137
+
138
+
139
+ def query_real_vs_estimated_delivered_time(database: Engine) -> QueryResult:
140
+ """
141
+ Get the query for the real vs estimated delivered time
142
+
143
+ Args:
144
+ database (Engine): The database to get the data from
145
+
146
+ Returns:
147
+ QueryResult: The query and the result
148
+ """
149
+ query_name = QueryEnum.REAL_VS_ESTIMATED_DELIVERED_TIME.value
150
+ query = read_query(QueryEnum.REAL_VS_ESTIMATED_DELIVERED_TIME.value)
151
+
152
+ return QueryResult(query=query_name, result=read_sql(query, database))
153
+
154
+
155
+ def query_freight_value_weight_relationship(database: Engine) -> QueryResult:
156
+ """
157
+ Get the query for the freight value weight relationship
158
+
159
+ Args:
160
+ database (Engine): The database to get the data from
161
+
162
+ Returns:
163
+ QueryResult: The query and the result
164
+ """
165
+
166
+ query_name = QueryEnum.GET_FREIGHT_VALUE_WEIGHT_RELATIONSHIP.value
167
+
168
+ # Get data from database
169
+ orders = read_sql("SELECT * FROM olist_orders", database)
170
+ items = read_sql("SELECT * FROM olist_order_items", database)
171
+ products = read_sql("SELECT * FROM olist_products", database)
172
+
173
+ # Merge data
174
+ items_products = merge(items, products, on="product_id")
175
+ data = merge(items_products, orders, on="order_id")
176
+
177
+ # Filter delivered orders
178
+ delivered = data[data["order_status"] == "delivered"]
179
+
180
+ # Get the sum of freight_value and product_weight_g for each order_id
181
+ aggregations = delivered.groupby("order_id", as_index=False)[
182
+ ["freight_value", "product_weight_g"]
183
+ ].sum()
184
+
185
+ return QueryResult(query=query_name, result=aggregations)
186
+
187
+
188
+ def query_orders_per_day_and_holidays_2017(database: Engine) -> QueryResult:
189
+ query_name = QueryEnum.ORDERS_PER_DAY_AND_HOLIDAYS_2017.value
190
+
191
+ # Get data from database
192
+ holidays = read_sql("SELECT * FROM public_holidays", database)
193
+ orders = read_sql("SELECT * FROM olist_orders", database)
194
+
195
+ # Convert the date column to datetime
196
+ orders["order_purchase_timestamp"] = to_datetime(orders["order_purchase_timestamp"])
197
+
198
+ # Filter orders for 2017
199
+ filtered_dates = orders[orders["order_purchase_timestamp"].dt.year == 2017]
200
+
201
+ # Count orders per day
202
+ order_purchase_amount_per_date = (
203
+ filtered_dates.groupby(filtered_dates["order_purchase_timestamp"].dt.date)
204
+ .size()
205
+ .reset_index(name="order_count")
206
+ )
207
+
208
+ # Convert date column to datetime for comparison
209
+ holidays["date"] = to_datetime(holidays["date"]).dt.date
210
+
211
+ # Add milliseconds timestamp
212
+ order_purchase_amount_per_date["date"] = to_datetime(
213
+ order_purchase_amount_per_date["order_purchase_timestamp"]
214
+ )
215
+ order_purchase_amount_per_date["date"] = (
216
+ order_purchase_amount_per_date["date"].astype("int64") // 10**6
217
+ )
218
+
219
+ # Check if each date is a holiday
220
+ order_purchase_amount_per_date["holiday"] = order_purchase_amount_per_date[
221
+ "order_purchase_timestamp"
222
+ ].isin(holidays["date"])
223
+
224
+ # Create a dataframe with the result
225
+ result_df = order_purchase_amount_per_date[["order_count", "date", "holiday"]]
226
+
227
+ return QueryResult(query=query_name, result=result_df)
228
+
229
+
230
+ def get_all_queries() -> list[Callable[[Engine], QueryResult]]:
231
+ """
232
+ Get all the queries
233
+
234
+ Returns:
235
+ list[Callable[[Engine], QueryResult]]: The queries
236
+ """
237
+ return [
238
+ query_delivery_date_difference,
239
+ query_global_amount_order_status,
240
+ query_revenue_by_month_year,
241
+ query_revenue_per_state,
242
+ query_top_10_least_revenue_categories,
243
+ query_top_10_revenue_categories,
244
+ query_real_vs_estimated_delivered_time,
245
+ query_orders_per_day_and_holidays_2017,
246
+ query_freight_value_weight_relationship,
247
+ ]
248
+
249
+
250
+ def run_queries(database: Engine) -> dict[str, DataFrame]:
251
+ """
252
+ Transform data based on queries. For each query, the query is executed and the results is stored in the dataframe
253
+
254
+ Args:
255
+ database (Engine): The database to get the data from
256
+
257
+ Returns:
258
+ dict[str, DataFrame]: A dictionary with keys as the query filenames and values the result of the query as a dataframe
259
+ """
260
+
261
+ query_results = {}
262
+
263
+ for query in get_all_queries():
264
+ query_result = query(database)
265
+ query_results[query_result.query] = query_result.result
266
+
267
+ return query_results