Spaces:
Runtime error
Runtime error
| import os | |
| import re | |
| import numpy as np | |
| import openai | |
| import pandas as pd | |
| from sklearn.preprocessing import MinMaxScaler | |
| from statsforecast import StatsForecast | |
| from statsforecast.models import Naive | |
| openai.api_key = os.environ['OPENAI_API_KEY'] | |
| class ChatGPTForecast: | |
| def __init__(self): | |
| self.bins = np.linspace(0, 1, num=10_000) # Create 1000 bins between -10 and 10 | |
| self.mapping = {i: f"{i}" for i in range(len(self.bins))} | |
| self.prompt = f""" | |
| forecast this series, | |
| (i know that you prefer using specific tools, but i'm testing something, | |
| just give me your predicted numbers please, just print the numbers i dont need an explanation) | |
| please consider: | |
| - give the output with the same structure: "number1 number2 number3" | |
| - give more weight to the most recent observations | |
| - consider trend | |
| - consider seasonality | |
| """ | |
| def tokenize_time_series(self, series): | |
| indices = np.digitize(series, self.bins) - 1 # Find which bin each data point falls into | |
| return ' '.join(self.mapping[i] for i in indices) | |
| def clean_string(self, s): | |
| pattern = r'(\d+)[^\s]*' | |
| # Extract the bin_# parts and join them with space | |
| cleaned = ' '.join(re.findall(pattern, s)) | |
| return cleaned | |
| def extend_string(self, s, h): | |
| # Find all bin_# elements | |
| bin_numbers = re.findall(r'\d+', s) | |
| # Calculate current length | |
| current_length = len(bin_numbers) | |
| # If the string is already of length h, return as is | |
| if current_length == h: | |
| return s | |
| # If the string length exceeds h, trim the string | |
| elif current_length > h: | |
| bin_numbers = bin_numbers[:h] | |
| return ' '.join(bin_numbers) | |
| else: | |
| # Calculate how many full repeats we need | |
| repeats = h // current_length | |
| # If h is not a multiple of current_length, calculate how many more elements we need | |
| extra = h % current_length | |
| # Create the new string by repeating the original string and adding any extra elements | |
| new_string = ' '.join(bin_numbers * repeats + bin_numbers[:extra]) | |
| return new_string | |
| def clean_gpt_output(self, output): | |
| # Remove extra spaces and trailing underscores | |
| cleaned_output = output.replace(" _", "_").replace("_ ", "_") | |
| # Trim any trailing underscore | |
| if cleaned_output.endswith("_"): | |
| cleaned_output = cleaned_output[:-1] | |
| return self.clean_string(cleaned_output) | |
| def decode_time_series(self, tokens): | |
| # Reverse the mapping | |
| reverse_mapping = {v: k for k, v in self.mapping.items()} | |
| # Split the token string into individual tokens and map them back to bin indices | |
| indices = [int(token) for token in tokens.split()]#[reverse_mapping[token] for token in tokens.split()] | |
| # Convert bin indices back to the original values | |
| # Here we'll use the center point of each bin | |
| bin_width = self.bins[1] - self.bins[0] | |
| series = [self.bins[i] + bin_width / 2 for i in indices] | |
| return series | |
| def find_min_max(self, string_of_integers): | |
| # Split the string into a list of strings | |
| str_list = string_of_integers.split() | |
| # Convert the list of strings into a list of integers | |
| int_list = [int(i) for i in str_list] | |
| # Find the minimum and maximum values | |
| min_value = min(int_list) | |
| max_value = max(int_list) | |
| return min_value, max_value | |
| def call_openai(self, series, seasonality, h, n_forecasts): | |
| series_tokenized = self.tokenize_time_series(series) | |
| min_val, max_val = self.find_min_max(series_tokenized) | |
| prompt = f""" | |
| {self.prompt}-consider {seasonality} as seasonality | |
| - just print {h} steps ahead | |
| - values should be integers between {min_val} and {max_val}, please be sure to do this | |
| this is the series: {series_tokenized} | |
| """ | |
| response = openai.ChatCompletion.create( | |
| model="gpt-3.5-turbo", | |
| messages=[{"role": "user", "content": prompt}], | |
| n=n_forecasts | |
| ) | |
| choices = response['choices'] | |
| outputs = [] | |
| for choice in choices: | |
| output_gpt = choice['message']['content'] | |
| if len(output_gpt.split()) < 2: | |
| continue | |
| output_gpt = self.extend_string(output_gpt, h) | |
| output_gpt = ' '.join(f'{max(min(int(x), len(self.bins) - 1), 0)}' for x in output_gpt.split()) | |
| outputs.append(self.decode_time_series(output_gpt)) | |
| outputs = np.vstack(outputs) | |
| return outputs | |
| def forward(self, series, seasonality, h, n_forecasts): | |
| outputs = self.call_openai(series, seasonality, h, n_forecasts) | |
| outputs = np.median(outputs, axis=0) | |
| return outputs | |
| def conformal_intervals(self, series, seasonality, h, n_forecasts): | |
| series_train, series_test = series[:-h], series[-h:] | |
| outputs = self.call_openai(series_train, seasonality, h, n_forecasts) | |
| errors = np.abs(outputs - series_test) | |
| lower_levels = np.quantile(errors, q=0.05, axis=0) | |
| upper_levels = np.quantile(errors, q=0.095, axis=0) | |
| return lower_levels, upper_levels | |
| def compute_ds_future(self, ds, fh): | |
| ds_ = pd.to_datetime(ds) | |
| try: | |
| freq = pd.infer_freq(ds_) | |
| except: | |
| freq = None | |
| if freq is not None: | |
| ds_future = pd.date_range(ds_[-1], periods=fh + 1, freq=freq)[1:] | |
| else: | |
| freq = ds_[-1] - ds_[-2] | |
| ds_future = [ds_[-1] + (i + 1) * freq for i in range(fh)] | |
| ds_future = list(map(str, ds_future)) | |
| return ds_future, freq | |
| def forecast(self, df, h, input_size, n_forecasts=10): | |
| df = df.copy() | |
| scaler = MinMaxScaler() | |
| df['y'] = scaler.fit_transform(df[['y']]) | |
| ds_future, freq = self.compute_ds_future(df['ds'].values, h) | |
| sf = StatsForecast(models=[Naive()], freq='D') | |
| fcst_df = sf.forecast(df=df, h=h) | |
| fcst_df['ds'] = ds_future | |
| fcst_df['ChatGPT_3.5_Turbo'] = self.forward(df['y'].values[-input_size:], freq, h, n_forecasts)[-h:] | |
| # add prediction intervals | |
| lower_levels, upper_levels = self.conformal_intervals(df['y'].values[-(input_size + h):], freq, h, n_forecasts) | |
| fcst_df['ChatGPT_3.5_Turbo-lo-90'] = fcst_df['ChatGPT_3.5_Turbo'] - lower_levels | |
| fcst_df['ChatGPT_3.5_Turbo-hi-90'] = fcst_df['ChatGPT_3.5_Turbo'] + upper_levels | |
| for col in ['Naive', 'ChatGPT_3.5_Turbo', 'ChatGPT_3.5_Turbo-lo-90', 'ChatGPT_3.5_Turbo-hi-90']: | |
| fcst_df[col] = scaler.inverse_transform(fcst_df[[col]]) | |
| df['y'] = scaler.inverse_transform(df[['y']]) | |
| return sf.plot(df, fcst_df, max_insample_length=3 * h, level=[90]) | |