Spaces:
Runtime error
Runtime error
| import pandas as pd | |
| import numpy as np | |
| import math | |
| import matplotlib.pyplot as plt | |
| import seaborn as sns | |
| from statsmodels.graphics.tsaplots import plot_pacf | |
| from statsmodels.tsa.seasonal import MSTL, seasonal_decompose | |
| from statsmodels.tsa.tsatools import freq_to_period | |
| import statsmodels.api as sm | |
| from scipy import stats | |
| from sklearn.preprocessing import MinMaxScaler | |
| class Ts_Analytics(): | |
| def __init__(self): | |
| self.log_transformed = False | |
| self.scaler = MinMaxScaler() | |
| pass | |
| def analyse( | |
| self, | |
| ts_df: pd.DataFrame, | |
| auto_correlations={}): | |
| ''' | |
| ts_df: timeseries dataframe, will assume the datetime column is the index and time encoded | |
| auto_correlations: dictionary to input customised auto correlations | |
| ''' | |
| self.ts_df = ts_df.copy() | |
| self.ts_df['datetime'] = pd.to_datetime(self.ts_df['datetime']) | |
| self.ts_df.set_index('datetime', inplace=True) | |
| self.ar = auto_correlations | |
| self.__infer_frequency() | |
| # Annual maps to 1, quarterly maps to 4, monthly to 12, weekly to 52. | |
| # Using statsmodel's freq_to_period function | |
| self.period = freq_to_period(self.freq) | |
| pass | |
| def set_ar(self, col, ar): | |
| ''' | |
| Set the auto correlation | |
| ''' | |
| self.ar[col] = ar | |
| def set_period(self, period): | |
| self.period = period | |
| def create_target_lag_columns(self): | |
| print('create_target_lag_columns') | |
| def create_lag_dfs(col, n): | |
| print('create_lag_dfs', col, n) | |
| for i in range(n): | |
| self.ts_df[f'{col}_t-{i+1}'] = self.ts_df[col].shift(-(i+1)) | |
| for col, n in self.ar.items(): | |
| create_lag_dfs(col, n) | |
| print('drop all null values') | |
| self.ts_df.ffill(inplace=True) | |
| def log_transform(self): | |
| self.log_transformed = True | |
| self.ts_df = np.log2(self.ts_df) | |
| def exp_transform(self): | |
| self.log_transformed = False | |
| self.ts_df = np.exp(self.ts_df) | |
| def train_multiple_regression(self): | |
| print('train_multiple_regression') | |
| x_cols = self.ts_df.columns.tolist() | |
| x_cols.remove('y') | |
| _X = self.ts_df[x_cols] | |
| y = self.ts_df['y'] | |
| X = sm.add_constant(_X) | |
| # ----------------------------------------------------------------------- # | |
| # Train an additional model with standardized data, to get the Beta value # | |
| # ----------------------------------------------------------------------- # | |
| std_ts_df = pd.DataFrame(self.scaler.fit_transform( | |
| self.ts_df), columns=self.ts_df.columns) | |
| std_X = sm.add_constant(std_ts_df[x_cols]) | |
| std_y = std_ts_df['y'] | |
| self.multiple_regression = sm.OLS(y, X).fit() | |
| coef = self.multiple_regression.params | |
| self.multiple_regression_formula = f'{coef[0]} + {" + ".join([f"{c} * {round(n, 3)}" for c, n in zip(x_cols, coef[1:])]) }' | |
| self.std_multiple_regression = sm.OLS(std_y, std_X).fit() | |
| beta = self.std_multiple_regression.params | |
| self.multiple_regression_beta = pd.DataFrame( | |
| np.array(beta[1:]) ** 2, index=x_cols, columns=['Beta (influence on "y")']) | |
| self.multiple_regression_beta['Beta (influence on "y")'] = self.multiple_regression_beta['Beta (influence on "y")'].round( | |
| 3) | |
| return self.multiple_regression.summary() | |
| # ===== # | |
| # Plots # | |
| # ===== # | |
| def plot_correlation(self): | |
| # Generate a mask for the upper triangle | |
| corr = self.ts_df.corr(numeric_only=True) | |
| mask = np.triu(np.ones_like(corr, dtype=bool)) | |
| fig, ax = plt.subplots(figsize=(8, 8)) | |
| sns.heatmap( | |
| corr, | |
| mask=mask, | |
| square=True, | |
| annot=True, | |
| cmap='coolwarm', | |
| linewidths=.5, | |
| cbar_kws={"shrink": .5}, | |
| ax=ax) | |
| return fig | |
| def plot_target_pacf(self): | |
| fig, ax = plt.subplots(figsize=(12, 4)) | |
| plot_pacf(self.ts_df['y'], ax=ax) | |
| fig.tight_layout() | |
| return fig | |
| def plot_distributions(self): | |
| plot_col = min(math.ceil(math.sqrt(self.ts_df.shape[1])), 5) | |
| plot_row = math.ceil(self.ts_df.shape[1] / plot_col) | |
| fig, axs = plt.subplots(plot_row, plot_col) | |
| for idx, col in enumerate(self.ts_df.columns): | |
| axs_x = math.floor(idx/plot_col) | |
| axs_y = idx - axs_x * plot_col | |
| # sns.distplot(self.ts_df[col], ax=axs[axs_x, axs_y]) | |
| sns.histplot(self.ts_df[col], ax=axs[axs_x, axs_y], kde=True) | |
| fig.tight_layout() | |
| return fig | |
| def plot_target_seasonality(self): | |
| if isinstance(self.period, list): | |
| seasonal = MSTL( | |
| self.ts_df['y'], periods=self.period).fit() | |
| else: | |
| seasonal = seasonal_decompose(self.ts_df['y'], period=self.period) | |
| return seasonal | |
| def plot_beta(self): | |
| fig, ax = plt.subplots(figsize=(6, 4)) | |
| beta_plot = sns.barplot( | |
| self.multiple_regression_beta['Beta (influence on "y")'], gap=2, ax=ax) | |
| beta_plot.set_xticklabels(beta_plot.get_xticklabels(), rotation=45) | |
| ax.bar_label(ax.containers[-1], fmt='%.2f', label_type='center') | |
| return fig | |
| def __infer_frequency(self): | |
| # Attempt to get the frequency from the provided datetime column | |
| freq = pd.infer_freq(self.ts_df.index) | |
| if freq is not None: | |
| self.freq = freq | |
| # Always make sure the frequency is not None | |
| if self.freq is None: | |
| raise ValueError( | |
| 'Unable inference freq from datetime column, please make timeseries interval consistent or provide customized frequency.') | |