Spaces:
Sleeping
Sleeping
| from collections import namedtuple | |
| from enum import Enum | |
| from typing import Callable | |
| from pandas import DataFrame, merge, read_sql, to_datetime | |
| from sqlalchemy import Engine, TextClause, text | |
| from src.config import QUERIES_ROOT_PATH | |
| QueryResult = namedtuple("QueryResult", ["query", "result"]) | |
| class QueryEnum(Enum): | |
| """Enumerates all the queries""" | |
| DELIVERY_DATE_DIFFERENCE = "delivery_date_difference" | |
| GLOBAL_AMOUNT_ORDER_STATUS = "global_amount_order_status" | |
| REVENUE_BY_MONTH_YEAR = "revenue_by_month_year" | |
| REVENUE_PER_STATE = "revenue_per_state" | |
| TOP_10_LEAST_REVENUE_CATEGORIES = "top_10_least_revenue_categories" | |
| TOP_10_REVENUE_CATEGORIES = "top_10_revenue_categories" | |
| REAL_VS_ESTIMATED_DELIVERED_TIME = "real_vs_estimated_delivered_time" | |
| ORDERS_PER_DAY_AND_HOLIDAYS_2017 = "orders_per_day_and_holidays_2017" | |
| GET_FREIGHT_VALUE_WEIGHT_RELATIONSHIP = "get_freight_value_weight_relationship" | |
| def read_query(query_name: str) -> TextClause: | |
| """ | |
| Reads the query from the file and returns it as a string | |
| Args: | |
| query_name (str): The name of the query | |
| Returns: | |
| TextClause: The query | |
| """ | |
| with open("{}/{}.sql".format(QUERIES_ROOT_PATH, query_name), "r") as file: | |
| sql_file = file.read() | |
| sql = text(sql_file) | |
| return sql | |
| def query_delivery_date_difference(database: Engine) -> QueryResult: | |
| """ | |
| Get the query for the delivery date difference | |
| Args: | |
| database (Engine): The database to get the data from | |
| Returns: | |
| QueryResult: The query and the result | |
| """ | |
| query_name = QueryEnum.DELIVERY_DATE_DIFFERENCE.value | |
| query = read_query(QueryEnum.DELIVERY_DATE_DIFFERENCE.value) | |
| return QueryResult(query=query_name, result=read_sql(query, database)) | |
| def query_global_amount_order_status(database: Engine) -> QueryResult: | |
| """ | |
| Get the query for the global amount of order status | |
| Args: | |
| database (Engine): The database to get the data from | |
| Returns: | |
| QueryResult: The query and the result | |
| """ | |
| query_name = QueryEnum.GLOBAL_AMOUNT_ORDER_STATUS.value | |
| query = read_query(QueryEnum.GLOBAL_AMOUNT_ORDER_STATUS.value) | |
| return QueryResult(query=query_name, result=read_sql(query, database)) | |
| def query_revenue_by_month_year(database: Engine) -> QueryResult: | |
| """ | |
| Get the query for the revenue by month and year | |
| Args: | |
| database (Engine): The database to get the data from | |
| Returns: | |
| QueryResult: The query and the result | |
| """ | |
| query_name = QueryEnum.REVENUE_BY_MONTH_YEAR.value | |
| query = read_query(QueryEnum.REVENUE_BY_MONTH_YEAR.value) | |
| return QueryResult(query=query_name, result=read_sql(query, database)) | |
| def query_revenue_per_state(database: Engine) -> QueryResult: | |
| """ | |
| Get the query for the revenue per state | |
| Args: | |
| database (Engine): The database to get the data from | |
| Returns: | |
| QueryResult: The query and the result | |
| """ | |
| query_name = QueryEnum.REVENUE_PER_STATE.value | |
| query = read_query(QueryEnum.REVENUE_PER_STATE.value) | |
| return QueryResult(query=query_name, result=read_sql(query, database)) | |
| def query_top_10_least_revenue_categories(database: Engine) -> QueryResult: | |
| """ | |
| Get the query for the top 10 least revenue categories | |
| Args: | |
| database (Engine): The database to get the data from | |
| Returns: | |
| QueryResult: The query and the result | |
| """ | |
| query_name = QueryEnum.TOP_10_LEAST_REVENUE_CATEGORIES.value | |
| query = read_query(QueryEnum.TOP_10_LEAST_REVENUE_CATEGORIES.value) | |
| return QueryResult(query=query_name, result=read_sql(query, database)) | |
| def query_top_10_revenue_categories(database: Engine) -> QueryResult: | |
| """ | |
| Get the query for the top 10 revenue categories | |
| Args: | |
| database (Engine): The database to get the data from | |
| Returns: | |
| QueryResult: The query and the result | |
| """ | |
| query_name = QueryEnum.TOP_10_REVENUE_CATEGORIES.value | |
| query = read_query(QueryEnum.TOP_10_REVENUE_CATEGORIES.value) | |
| return QueryResult(query=query_name, result=read_sql(query, database)) | |
| def query_real_vs_estimated_delivered_time(database: Engine) -> QueryResult: | |
| """ | |
| Get the query for the real vs estimated delivered time | |
| Args: | |
| database (Engine): The database to get the data from | |
| Returns: | |
| QueryResult: The query and the result | |
| """ | |
| query_name = QueryEnum.REAL_VS_ESTIMATED_DELIVERED_TIME.value | |
| query = read_query(QueryEnum.REAL_VS_ESTIMATED_DELIVERED_TIME.value) | |
| return QueryResult(query=query_name, result=read_sql(query, database)) | |
| def query_freight_value_weight_relationship(database: Engine) -> QueryResult: | |
| """ | |
| Get the query for the freight value weight relationship | |
| Args: | |
| database (Engine): The database to get the data from | |
| Returns: | |
| QueryResult: The query and the result | |
| """ | |
| query_name = QueryEnum.GET_FREIGHT_VALUE_WEIGHT_RELATIONSHIP.value | |
| # Get data from database | |
| orders = read_sql("SELECT * FROM olist_orders", database) | |
| items = read_sql("SELECT * FROM olist_order_items", database) | |
| products = read_sql("SELECT * FROM olist_products", database) | |
| # Merge data | |
| items_products = merge(items, products, on="product_id") | |
| data = merge(items_products, orders, on="order_id") | |
| # Filter delivered orders | |
| delivered = data[data["order_status"] == "delivered"] | |
| # Get the sum of freight_value and product_weight_g for each order_id | |
| aggregations = delivered.groupby("order_id", as_index=False)[ | |
| ["freight_value", "product_weight_g"] | |
| ].sum() | |
| return QueryResult(query=query_name, result=aggregations) | |
| def query_orders_per_day_and_holidays_2017(database: Engine) -> QueryResult: | |
| query_name = QueryEnum.ORDERS_PER_DAY_AND_HOLIDAYS_2017.value | |
| # Get data from database | |
| holidays = read_sql("SELECT * FROM public_holidays", database) | |
| orders = read_sql("SELECT * FROM olist_orders", database) | |
| # Convert the date column to datetime | |
| orders["order_purchase_timestamp"] = to_datetime(orders["order_purchase_timestamp"]) | |
| # Filter orders for 2017 | |
| filtered_dates = orders[orders["order_purchase_timestamp"].dt.year == 2017] | |
| # Count orders per day | |
| order_purchase_amount_per_date = ( | |
| filtered_dates.groupby(filtered_dates["order_purchase_timestamp"].dt.date) | |
| .size() | |
| .reset_index(name="order_count") | |
| ) | |
| # Convert date column to datetime for comparison | |
| holidays["date"] = to_datetime(holidays["date"]).dt.date | |
| # Add milliseconds timestamp | |
| order_purchase_amount_per_date["date"] = to_datetime( | |
| order_purchase_amount_per_date["order_purchase_timestamp"] | |
| ) | |
| order_purchase_amount_per_date["date"] = ( | |
| order_purchase_amount_per_date["date"].astype("int64") // 10**6 | |
| ) | |
| # Check if each date is a holiday | |
| order_purchase_amount_per_date["holiday"] = order_purchase_amount_per_date[ | |
| "order_purchase_timestamp" | |
| ].isin(holidays["date"]) | |
| # Create a dataframe with the result | |
| result_df = order_purchase_amount_per_date[["order_count", "date", "holiday"]] | |
| return QueryResult(query=query_name, result=result_df) | |
| def get_all_queries() -> list[Callable[[Engine], QueryResult]]: | |
| """ | |
| Get all the queries | |
| Returns: | |
| list[Callable[[Engine], QueryResult]]: The queries | |
| """ | |
| return [ | |
| query_delivery_date_difference, | |
| query_global_amount_order_status, | |
| query_revenue_by_month_year, | |
| query_revenue_per_state, | |
| query_top_10_least_revenue_categories, | |
| query_top_10_revenue_categories, | |
| query_real_vs_estimated_delivered_time, | |
| query_orders_per_day_and_holidays_2017, | |
| query_freight_value_weight_relationship, | |
| ] | |
| def run_queries(database: Engine) -> dict[str, DataFrame]: | |
| """ | |
| Transform data based on queries. For each query, the query is executed and the results is stored in the dataframe | |
| Args: | |
| database (Engine): The database to get the data from | |
| Returns: | |
| dict[str, DataFrame]: A dictionary with keys as the query filenames and values the result of the query as a dataframe | |
| """ | |
| query_results = {} | |
| for query in get_all_queries(): | |
| query_result = query(database) | |
| query_results[query_result.query] = query_result.result | |
| return query_results | |