Spaces:
Runtime error
Runtime error
File size: 5,309 Bytes
de6e775 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 |
from typing import List
import rqdatac as ricequant
from meta.data_processors._base import _Base
class Ricequant(_Base):
def __init__(
self,
data_source: str,
start_date: str,
end_date: str,
time_interval: str,
**kwargs,
):
super().__init__(data_source, start_date, end_date, time_interval, **kwargs)
if kwargs["username"] is None or kwargs["password"] is None:
ricequant.init() # if the lisence is already set, you can init without username and password
else:
ricequant.init(
kwargs["username"], kwargs["password"]
) # init with username and password
def download_data(
self, ticker_list: List[str], save_path: str = "./data/dataset.csv"
):
# download data by calling RiceQuant API
dataframe = ricequant.get_price(
ticker_list,
frequency=self.time_interval,
start_date=self.start_date,
end_date=self.end_date,
)
self.dataframe = dataframe
self.save_data(save_path)
print(
f"Download complete! Dataset saved to {save_path}. \nShape of DataFrame: {self.dataframe.shape}"
)
# def clean_data(self, df) -> pd.DataFrame:
# ''' RiceQuant data is already cleaned, we only need to transform data format here.
# No need for filling NaN data'''
# df = df.copy()
# # raw df uses multi-index (tic,time), reset it to single index (time)
# df = df.reset_index(level=[0,1])
# # rename column order_book_id to tic
# df = df.rename(columns={'order_book_id':'tic', 'datetime':'time'})
# # reserve columns needed
# df = df[['tic','time','open','high','low','close','volume']]
# # check if there is NaN values
# assert not df.isnull().values.any()
# return df
# def add_vix(self, data):
# print('VIX is NOT applicable to China A-shares')
# return data
# def calculate_turbulence(self, data, time_period=252):
# # can add other market assets
# df = data.copy()
# df_price_pivot = df.pivot(index="date", columns="tic", values="close")
# # use returns to calculate turbulence
# df_price_pivot = df_price_pivot.pct_change()
#
# unique_date = df.date.unique()
# # start after a fixed time period
# start = time_period
# turbulence_index = [0] * start
# # turbulence_index = [0]
# count = 0
# for i in range(start, len(unique_date)):
# current_price = df_price_pivot[df_price_pivot.index == unique_date[i]]
# # use one year rolling window to calcualte covariance
# hist_price = df_price_pivot[
# (df_price_pivot.index < unique_date[i])
# & (df_price_pivot.index >= unique_date[i - time_period])
# ]
# # Drop tickers which has number missing values more than the "oldest" ticker
# filtered_hist_price = hist_price.iloc[hist_price.isna().sum().min():].dropna(axis=1)
#
# cov_temp = filtered_hist_price.cov()
# current_temp = current_price[[x for x in filtered_hist_price]] - np.mean(filtered_hist_price, axis=0)
# temp = current_temp.values.dot(np.linalg.pinv(cov_temp)).dot(
# current_temp.values.T
# )
# if temp > 0:
# count += 1
# if count > 2:
# turbulence_temp = temp[0][0]
# else:
# # avoid large outlier because of the calculation just begins
# turbulence_temp = 0
# else:
# turbulence_temp = 0
# turbulence_index.append(turbulence_temp)
#
# turbulence_index = pd.DataFrame(
# {"date": df_price_pivot.index, "turbulence": turbulence_index}
# )
# return turbulence_index
#
# def add_turbulence(self, data, time_period=252):
# """
# add turbulence index from a precalcualted dataframe
# :param data: (df) pandas dataframe
# :return: (df) pandas dataframe
# """
# df = data.copy()
# turbulence_index = self.calculate_turbulence(df, time_period=time_period)
# df = df.merge(turbulence_index, on="date")
# df = df.sort_values(["date", "tic"]).reset_index(drop=True)
# return df
# def df_to_array(self, df, tech_indicator_list, if_vix):
# df = df.copy()
# unique_ticker = df.tic.unique()
# if_first_time = True
# for tic in unique_ticker:
# if if_first_time:
# price_array = df[df.tic==tic][['close']].values
# tech_array = df[df.tic==tic][tech_indicator_list].values
# #risk_array = df[df.tic==tic]['turbulence'].values
# if_first_time = False
# else:
# price_array = np.hstack([price_array, df[df.tic==tic][['close']].values])
# tech_array = np.hstack([tech_array, df[df.tic==tic][tech_indicator_list].values])
# print('Successfully transformed into array')
# return price_array, tech_array, None
|