Spaces:
Running
Running
File size: 5,365 Bytes
d03866e |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 |
"""
This function is adapted from [lag-llama] by [ashok-arjun&kashif]
Original source: [https://github.com/time-series-foundation-models/lag-llama]
"""
from itertools import islice
from matplotlib import pyplot as plt
import matplotlib.dates as mdates
import torch
from gluonts.evaluation import make_evaluation_predictions
from gluonts.dataset.pandas import PandasDataset
import pandas as pd
import numpy as np
from ..utils.torch_utility import get_gpu
from lag_llama.gluon.estimator import LagLlamaEstimator
class Lag_Llama():
def __init__(self,
win_size=96,
prediction_length=1,
input_c=1,
use_rope_scaling=False,
batch_size=64,
num_samples=100,
ckpt_path='lag-llama.ckpt'):
self.model_name = 'Lag_Llama'
self.context_length = win_size
self.prediction_length = prediction_length
self.input_c = input_c
self.ckpt_path = ckpt_path
self.use_rope_scaling = use_rope_scaling
self.batch_size = batch_size
self.num_samples = num_samples
self.score_list = []
self.cuda = True
self.device = get_gpu(self.cuda)
def fit(self, data):
for channel in range(self.input_c):
data_channel = data[:, channel].reshape(-1, 1)
data_win, data_target = self.create_dataset(data_channel, slidingWindow=self.context_length, predict_time_steps=self.prediction_length)
# print('data_win: ', data_win.shape) # (2330, 100)
# print('data_target: ', data_target.shape) # (2330, 1)
data_win = data_win.T
date_rng = pd.date_range(start='2021-01-01', periods=data_win.shape[0], freq='H') # Dummy timestep
df_wide = pd.DataFrame(data_win, index=date_rng)
# Convert numerical columns to float 32 format for lag-llama
for col in df_wide.columns:
# Check if column is not of string type
if df_wide[col].dtype != 'object' and pd.api.types.is_string_dtype(df_wide[col]) == False:
df_wide[col] = df_wide[col].astype('float32')
# Create a PandasDataset
ds = PandasDataset(dict(df_wide))
ckpt = torch.load(self.ckpt_path, map_location=self.device) # Uses GPU since in this Colab we use a GPU.
estimator_args = ckpt["hyper_parameters"]["model_kwargs"]
rope_scaling_arguments = {
"type": "linear",
"factor": max(1.0, (self.context_length + self.prediction_length) / estimator_args["context_length"]),
}
estimator = LagLlamaEstimator(
ckpt_path=self.ckpt_path,
prediction_length=self.prediction_length,
context_length=self.context_length, # Lag-Llama was trained with a context length of 32, but can work with any context length
# estimator args
input_size=estimator_args["input_size"],
n_layer=estimator_args["n_layer"],
n_embd_per_head=estimator_args["n_embd_per_head"],
n_head=estimator_args["n_head"],
scaling=estimator_args["scaling"],
time_feat=estimator_args["time_feat"],
rope_scaling=rope_scaling_arguments if self.use_rope_scaling else None,
batch_size=self.batch_size,
num_parallel_samples=100,
device=self.device,
)
lightning_module = estimator.create_lightning_module()
transformation = estimator.create_transformation()
predictor = estimator.create_predictor(transformation, lightning_module)
forecast_it, ts_it = make_evaluation_predictions(
dataset=ds,
predictor=predictor,
num_samples=self.num_samples
)
forecasts = list(forecast_it)
tss = list(ts_it)
predictions = np.array([pred.mean for pred in forecasts])
# print('predictions: ', predictions.shape)
### using mse as the anomaly score
scores = (data_target.squeeze() - predictions.squeeze()) ** 2
self.score_list.append(scores)
scores_merge = np.mean(np.array(self.score_list), axis=0)
padded_decision_scores = np.zeros(len(data))
padded_decision_scores[: self.context_length+self.prediction_length-1] = scores_merge[0]
padded_decision_scores[self.context_length+self.prediction_length-1 : ]=scores_merge
self.decision_scores_ = padded_decision_scores
def decision_function(self, X):
"""
Not used, present for API consistency by convention.
"""
pass
def create_dataset(self, X, slidingWindow, predict_time_steps=1):
Xs, ys = [], []
for i in range(len(X) - slidingWindow - predict_time_steps+1):
tmp = X[i : i + slidingWindow + predict_time_steps].ravel()
# tmp= MinMaxScaler(feature_range=(0,1)).fit_transform(tmp.reshape(-1,1)).ravel()
x = tmp[:slidingWindow]
y = tmp[slidingWindow:]
Xs.append(x)
ys.append(y)
return np.array(Xs), np.array(ys) |