bakhshaliyev's picture
added all the files
556d303 verified
# Chang Wei Tan, Angus Dempster, Christoph Bergmeir, Geoffrey I Webb
#
# MultiRocket: Multiple pooling operators and transformations for fast and effective time series classification
# https://arxiv.org/abs/2102.00457
import cmath
import os
import numpy as np
from numba import njit
# =======================================================================================================
# Simple functions that worked in Numba
# =======================================================================================================
@njit(fastmath=True, cache=True)
def downsample(x, n):
len_y = int(np.ceil(len(x) / n))
y = np.zeros(len_y, dtype=np.float64)
for i in range(len_y):
y[i] = x[i * n]
return y
@njit(fastmath=True, cache=True)
def histc(X, bins):
# https://stackoverflow.com/questions/32765333/how-do-i-replicate-this-matlab-function-in-numpy
map_to_bins = np.digitize(X, bins)
r = np.zeros(bins.shape, dtype=np.int32)
for i in map_to_bins:
r[i - 1] += 1
return r, map_to_bins
@njit(fastmath=True, cache=True)
def numba_dft(x=None, sign=-1):
N = x.shape[0]
if np.log2(N) % 1 > 0:
nFFT = int(2 ** (np.ceil(np.log2(np.abs(N))) + 1))
z = np.zeros(nFFT, dtype=x.dtype)
z[:N] = x
x = z
N = nFFT
dft = np.zeros(N, dtype=np.complex128)
for i in range(N):
series_element = 0
for n in range(N):
series_element += x[n] * cmath.exp(sign * 2j * cmath.pi * i * n * (1 / N))
dft[i] = series_element
if sign == 1:
dft = dft / N
return dft
@njit(fastmath=True, cache=True)
def numba_fft_v(x, sign=-1):
N = x.shape[0]
if np.log2(N) % 1 > 0:
nFFT = int(2 ** (np.ceil(np.log2(np.abs(N))) + 1))
z = np.zeros(nFFT, dtype=x.dtype)
z[:N] = x
x = z
N = nFFT
x = np.asarray(x, dtype=np.complex128)
N_min = min(N, 2)
n = np.arange(N_min, dtype=np.float64)
k = n.T.reshape(-1, 1)
M = np.exp(sign * 2j * np.pi * n * k / N_min)
X = np.dot(M, x.reshape((N_min, -1)))
while X.shape[0] < N:
X_even = X[:, :int(X.shape[1] / 2)]
X_odd = X[:, int(X.shape[1] / 2):]
terms = np.exp(sign * 1j * np.pi * np.arange(X.shape[0]) / X.shape[0]).T.reshape(-1, 1)
X = np.vstack((X_even + terms * X_odd,
X_even - terms * X_odd))
if sign == 1:
return X.ravel() / N
return X.ravel()
@njit(fastmath=True, cache=True)
def autocorr(y, fft):
l = len(y)
fft = fft * np.conjugate(fft)
acf = numba_fft_v(fft, sign=1)
acf = acf.real
acf = acf / acf[0]
acf = acf[:l]
return acf
@njit(fastmath=True, cache=True)
def numba_std(values, mean):
if len(values) == 1:
return 0
sum_squares_diff = 0
for v in values:
diff = v - mean
sum_squares_diff += diff * diff
return np.sqrt(sum_squares_diff / (len(values) - 1))
@njit(fastmath=True, cache=True)
def numba_min(a, b):
if a < b:
return a
return b
@njit(fastmath=True, cache=True)
def numba_max(a, b):
if a < b:
return b
return a
@njit(fastmath=True, cache=True)
def numba_linear_regression(x, y, n, lag):
co = np.zeros(2, dtype=np.float64)
sumx, sumx2, sumxy, sumy = 0, 0, 0, 0
for i in range(lag, n + lag):
sumx += x[i]
sumx2 += x[i] * x[i]
sumxy += x[i] * y[i]
sumy += y[i]
denom = n * sumx2 - sumx * sumx
if denom != 0:
co[0] = (n * sumxy - sumx * sumy) / denom
co[1] = (sumy * sumx2 - sumx * sumxy) / denom
return co
def create_directory(directory_path):
if os.path.exists(directory_path):
return None
else:
try:
os.makedirs(directory_path)
except:
# in case another machine created the path meanwhile !:(
return None
return directory_path