Spaces:
Runtime error
Runtime error
| from .active_models import active_models, idsc_models | |
| from .forecast.Prophet import ProphetWrapper | |
| from .idsc.IDSC import IDSC | |
| import pandas as pd | |
| import math | |
| import numpy as np | |
| from sklearn.metrics import mean_squared_error, mean_absolute_error | |
| from .functions.mase import MASE | |
| from .functions.order_qty_rmse import order_qty_rmse | |
| from .functions.itmtt_scores import interm_scores | |
| # List of models to verify user input | |
| class DemandForecasting(): | |
| ''' | |
| DemandForecasting is assuming a single SKU at each time. | |
| There will be a 2 step process, model selection and forecasting. | |
| This process is identified by whether model parameter is provided | |
| This API's behavior was designed based on if certain information is provided, and the API itself | |
| will decide what to do. Instead of trying to force user perform "model selection" or "actual | |
| forecasting" the API will only check what are the models user attempted to run, as well as if user | |
| want any test result or not. In this way, we can take care of multiple requirements without having | |
| a lot of different end points. | |
| ''' | |
| def __init__(self) -> None: | |
| self.idsc = IDSC() | |
| pass | |
| def forecast( | |
| self, | |
| ts, | |
| n_predict: int, | |
| model: str or list, | |
| freq=None, | |
| run_test: bool = False, | |
| characteristic=None, | |
| m=None): | |
| ''' | |
| ts: timeseries object, use pd.DataFrame().to_json() to generate | |
| example: | |
| { | |
| "datetime": | |
| {"0":"2018-05-06","1":"2018-05-13"}, | |
| "y": | |
| {"0":2,"1":12}} | |
| n_predict: number of future values to predict | |
| freq: optional, timeseries data frequency, if not provided, will try to inference by pandas lib | |
| model: optional, | |
| If not provided, consider model selection process | |
| If model is provided, will not calculate the RMSE and will not perform test | |
| characteristic: optionsal | |
| Provide model information about the data characteristic, for now, either continuous or anything else (intermittent) | |
| If not provided, will perform profiling (relay on IDSC API) first, user are quired to track the data's characteristics | |
| for future forecasting purpose. | |
| m: seasonal period value, most likely will be used for internal testing purpose. | |
| ''' | |
| self.idsc_profile = None | |
| self.characteristic = characteristic | |
| self.ts_df = pd.DataFrame(ts) | |
| self.ts_df['datetime'] = pd.to_datetime(self.ts_df['datetime']) | |
| self.freq = freq | |
| self.n_predict = n_predict | |
| self.run_test = run_test | |
| if self.n_predict <= 0: | |
| print('n_predict is 0, force run_test to be true') | |
| self.run_test = True | |
| # Try to get the timeseries frequency based on the data | |
| # This will be used if user did not provide freq param | |
| self.__get_frequency() | |
| self.m = m | |
| # Convert n_predict number to timestamp based on the frequency | |
| self.forecast_horizon = pd.date_range( | |
| self.ts_df['datetime'].iloc[-1], periods=n_predict, freq=self.freq) | |
| ''' | |
| Split 80% data for training and the rest for testing | |
| This will only be used if rum_test param set to True | |
| ''' | |
| self.n_test = round(self.ts_df.shape[0] * 0.2) | |
| self.ts_train = self.ts_df[:-self.n_test] | |
| self.test_truth = self.ts_df[-self.n_test:]['y'].tolist() | |
| self.test_horizon = self.ts_df[-self.n_test:]['datetime'].tolist() | |
| self.__prep_idsc_ts() # prep idsc_ts, both profiling and idsc models will require this | |
| # ============== # | |
| # IDSC profiling # | |
| # ============== # | |
| # Default idsc characteristic, continuous or intermittent | |
| self.idsc_characteristic = None | |
| if self.characteristic is None: | |
| print('characteristic not provided, running profiling') | |
| self.__profiling() | |
| print('profiling completed, data characteristic is ', | |
| self.characteristic) | |
| # ======= # | |
| # TESTING # | |
| # ======= # | |
| "For testing purpose, only return data's characteristics" | |
| # return self.characteristic | |
| # ------------- # | |
| # Assign models # | |
| # ------------- # | |
| ''' | |
| For model parameter, user can input either string name of a particular model name, or a list of available models | |
| if user input "all", will just call all models | |
| ''' | |
| model_is_str = isinstance(model, str) | |
| if model_is_str: | |
| model_is_all = (model == 'all') | |
| if not model_is_all: | |
| # When there is only one model name provided | |
| self.model = [model] | |
| if model_is_all: | |
| if self.characteristic == 'continuous': | |
| self.model = active_models['continuous'] | |
| if self.characteristic != 'continuous': | |
| self.model = active_models['intermittent'] | |
| if not model_is_str: | |
| self.model = model | |
| ''' | |
| For idsc models, the profiling process will be required | |
| Also input data will be formated specifically for idsc | |
| ''' | |
| temp_models = [list(filter(lambda x: x in self.model, sublist)) | |
| for sublist in idsc_models] | |
| # self.has_idsc_model = any('plus' in m for m in self.model) | |
| self.has_idsc_model = len(temp_models) > 0 | |
| print('Has idsc model, ', self.has_idsc_model) | |
| if self.has_idsc_model and self.idsc_profile is None: | |
| ''' | |
| Running profiling if the idsc_profile is none,this is | |
| because some idsc model request idsc profile as input | |
| ''' | |
| self.__profiling() | |
| self.__check_model() | |
| # =================== # | |
| # Perform forecasting # | |
| # =================== # | |
| ''' | |
| The model below should always return the forecasted result based on n_predict value | |
| res : { | |
| 'model': model name, | |
| 'forecast': the forecasted value, | |
| 'test': test result, | |
| 'RMSE': RMSE value to evaluate best performing model, | |
| 'raw': keep a copy of the original model response, without any filtering | |
| } | |
| ''' | |
| self.fcst_res = [] # Array storeing all results | |
| # -------------------------- # | |
| # Calling forecasting models # | |
| # -------------------------- # | |
| # Todo: to track model time spending here | |
| for m in self.model: | |
| print(f'callindg model: {m}') | |
| getattr(self, m)() | |
| # ========================== # | |
| # Rank the model by response # | |
| # ========================== # | |
| "For continuous data, use RMSE, for intermittent data, use average of interm scores" | |
| # Sort forecast result by smallest RMSE | |
| if self.run_test and self.characteristic == 'continuous': | |
| self.fcst_res.sort(key=lambda x: x['RMSE']) | |
| # Sort forecast result by highest avg_interm_scores | |
| if self.run_test and self.characteristic != 'continuous': | |
| self.fcst_res.sort( | |
| key=lambda x: x['avg_interm_scores'], reverse=True) | |
| # Return the result with lowest RMSE ranked as 1st item | |
| self.res = {'characteristic': self.characteristic, | |
| 'predictability': self.predictability, | |
| 'forecast': self.fcst_res} | |
| return self.res | |
| def __get_frequency(self): | |
| # Attempt to get the frequency from the provided datetime column | |
| if pd.infer_freq(self.ts_df['datetime']) is not None: | |
| self.freq = pd.infer_freq(self.ts_df['datetime']) | |
| # Always make sure the frequency is not None | |
| if self.freq is None: | |
| raise ValueError( | |
| 'Unable inference freq from datetime column, please make timeseries interval consistent or provide customized frequency.') | |
| def __check_model(self): | |
| all_active_models = active_models['continuous'] + \ | |
| active_models['intermittent'] | |
| unknown_models = set(self.model) - set(all_active_models) | |
| if len(unknown_models) > 0: | |
| raise ValueError( | |
| f'Unknown model : {unknown_models}, please use active models: {active_models}') | |
| if self.characteristic == 'continuous': | |
| unsuitable_models = set(self.model) - \ | |
| set(active_models['continuous']) | |
| if len(unsuitable_models) > 0: | |
| raise ValueError( | |
| f'Unsuitable model for continuous data: {unsuitable_models}. please use continuous models: {active_models["continuous"]}') | |
| if self.characteristic != 'continuous': | |
| unsuitable_models = set(self.model) - \ | |
| set(active_models['intermittent']) | |
| if len(unsuitable_models) > 0: | |
| raise ValueError( | |
| f'Unsuitable model for intermittent data: {unsuitable_models}. please use continuous models: {active_models["intermittent"]}') | |
| def __prep_idsc_ts(self): | |
| # Time series configured for IDSC apis, all converted to json strings | |
| print('[__prep_idsc_ts]') | |
| self.idsc_ts = self.ts_df.rename( | |
| columns={'datetime': 'date', 'y': 'target'}) | |
| self.idsc_ts['date'] = self.idsc_ts['date'].dt.strftime('%Y-%m-%d') | |
| self.idsc_ts = self.idsc_ts.to_json() | |
| self.idsc_ts_train = self.ts_train.rename( | |
| columns={'datetime': 'date', 'y': 'target'}) | |
| self.idsc_ts_train['date'] = self.idsc_ts_train['date'].dt.strftime( | |
| '%Y-%m-%d') | |
| self.idsc_ts_train = self.idsc_ts_train.to_json() | |
| def __profiling(self): | |
| self.idsc_profile = self.idsc.profiling(self.idsc_ts) | |
| characteristic = self.idsc_profile['classification_res'][ | |
| 'time_series_class']['overall_characteristic'] | |
| print('predictability temporarily using order_quantity predictability') | |
| # print(self.idsc_profile) | |
| predictability = self.idsc_profile['predictability_res'][ | |
| 'predictability_result']['order_quantity'][-1]['predictability'] | |
| predictability = predictability if isinstance( | |
| predictability, str) else round(predictability, 2) | |
| if self.characteristic is not None and self.characteristic != characteristic: | |
| raise ValueError( | |
| f"Provided characteristics - {self.characteristic} is different from data's characteristics - {characteristic}. Please use the correct data characteristics.") | |
| self.characteristic = characteristic | |
| self.predictability = predictability | |
| if self.run_test: | |
| self.idsc_profile_train = self.idsc.profiling( | |
| self.idsc_ts_train) | |
| else: | |
| self.idsc_profile_train = None | |
| # =========== # | |
| # Core method # | |
| # =========== # | |
| ''' | |
| This methods takes input of model and run the mode, test (to evaluate RMSE) and | |
| return the processed result within this method itself. In this way, the model can | |
| be considered as a black box, as long as the model takes ls, n_predict, **kwargs | |
| and return as an object, this method can process it and format it correctly. | |
| Because sometimes actual forecasting model and test model may take different arguments | |
| both args and test_args can be used and pass the arguments around. | |
| ''' | |
| def __use_model(self, model, model_name, get_value, args=None, test_args=None): | |
| ''' | |
| model: the model to call | |
| get_value: lambda, to extract the value list from the model response | |
| ''' | |
| ts = self.ts_df | |
| train = self.ts_train | |
| res = {'model': model_name} | |
| # IDSC is using different input configuration | |
| # if 'plus' in model_name: | |
| if model_name in idsc_models: | |
| print('has_idsc_model') | |
| ts = self.idsc_ts | |
| train = self.idsc_ts_train | |
| # Pass keyword arguments to the model | |
| if self.n_predict > 0: | |
| if args is not None: | |
| pred = model(ts, self.n_predict, **args) | |
| else: | |
| pred = model(ts, self.n_predict) | |
| pred_val: list = get_value(pred) | |
| # res['forecast'] = pd.DataFrame( | |
| # pred_val, | |
| # # len() required because sometimes the response is not same size as n_predict requirement | |
| # # Same for below 'test' dataframe | |
| # index=self.forecast_horizon[:len(pred_val)], | |
| # columns=['y']) | |
| res['forecast'] = { | |
| 'datetime': self.forecast_horizon[:len(pred_val)+1], | |
| 'y': pred_val} | |
| res['raw'] = pred | |
| # Run the test set and evaluate model performance | |
| if self.run_test: | |
| # If the train and test arguments are exactly the same | |
| # Expect user only provide 1 args dictionary | |
| test_args = args if test_args is None else test_args | |
| if test_args is not None: | |
| test = model(train, self.n_test, **test_args) | |
| else: | |
| test = model(train, self.n_test) | |
| test_val: list = get_value(test) | |
| # Make sure test truth same size as test_val | |
| test_truth = self.test_truth[:len(test_val)] | |
| res['test'] = pd.DataFrame( | |
| { | |
| 'truth': test_truth, | |
| 'test': test_val | |
| }, | |
| index=self.test_horizon[:len(test_val)]) | |
| res['RMSE'] = math.sqrt( | |
| mean_squared_error( | |
| test_truth, list(test_val))) | |
| # res['MASE'] = MASE(test_truth, list(test_val)) | |
| res['order_quantity_RMSE'] = order_qty_rmse( | |
| test_truth, list(test_val)) | |
| res['inter_order_RMSE'] = mean_squared_error( | |
| [0 if i == 0 else 1 for i in test_truth], | |
| [0 if i == 0 else 1 for i in list(test_val)]) | |
| res['interm_scores'] = interm_scores( | |
| test_truth, list(test_val)) | |
| # Calculate the average intermittent data score, used for sorting the forecasting response | |
| res['avg_interm_scores'] = np.mean(res['interm_scores']) | |
| res['test_raw'] = test | |
| self.fcst_res.append(res) | |
| # ---------- # | |
| # All Models # | |
| # ---------- # | |
| def prophet_i(self): | |
| model = self.idsc.prophet | |
| model_name = 'prophet_i' | |
| args = {'profile': self.idsc_profile} | |
| test_args = {'profile': self.idsc_profile_train} | |
| self.__use_model( | |
| model, | |
| model_name, | |
| lambda x: x['prediction_result']['predicted_value'].values(), | |
| args=args, | |
| test_args=test_args | |
| ) | |
| def prophet(self): | |
| model = ProphetWrapper() | |
| model_name = 'prophet' | |
| args = {'freq': self.freq} | |
| self.__use_model( | |
| model.forecast, | |
| model_name, | |
| lambda x: x['yhat'].to_list(), | |
| args=args) | |
| def ceif(self): | |
| model_name = 'ceif' | |
| self.__use_model( | |
| self.idsc.ceif, | |
| model_name, | |
| lambda x: x['prediction_result']['predicted_value']) | |
| def fft_i(self): | |
| model_name = 'fft_i' | |
| self.__use_model( | |
| self.idsc.fft, | |
| model_name, | |
| lambda x: x['prediction_result']['predicted_value']) | |
| def holt_winters_i(self): | |
| model_name = 'holt_winters_i' | |
| def get_value(x): return x['prediction_result']['predicted_value'] | |
| if self.m is not None: | |
| args = {'seasonal_cycle': self.m} | |
| self.__use_model( | |
| self.idsc.holt_winters, | |
| model_name, | |
| get_value, | |
| args=args) | |
| else: | |
| self.__use_model( | |
| self.idsc.holt_winters, | |
| model_name, | |
| get_value) | |
| def auto_arima_i(self): | |
| model_name = 'auto_arima_i' | |
| model = self.idsc.auto_arima | |
| def get_value(x): return x['prediction_result']['predicted_value'] | |
| self.__use_model(model, model_name, get_value) | |