WaveLSFromer / data_collect.py
ducheng678
Initial WaveLSFromer project
093b0a5
Raw
History Blame Contribute Delete
9.58 kB
#!/usr/bin/env python
# coding: utf-8
# In[1]:
import pandas as pd
from time import sleep
import datetime
import os
from utils.ipynb_helpers import read_data, write_df, convert_tz, add_tz
from dotenv import load_dotenv
import traceback
# Create a .env file and add your keys
load_dotenv()
# Location to save raw data from data providers
DATA_RAW = "data/raw"
equities = ["XOM", "CVX", "COP", "BP", "PBR", "WTI", "TTE", "EQNR", "EOG", "ENB", "SLB"]
more_equities = []
crude_oil = ["CL=F", "BZ=F"] # wti, brent,
random = ["TSLA", "AAPL"]
materials_equities = ["BHP", "LIN", "RIO", "VALE", "APD", "FCX", "SHW", "SCCO", "CTVA", "ECL", "NUE", "NTR"]
# https://en.wikipedia.org/wiki/List_of_countries_by_oil_production
# https://www.weforum.org/agenda/2016/05/which-economies-are-most-reliant-on-oil/
# OPEC: Iran, Iraq, Kuwait, Saudi Arabia, Venezuela
# fx_opec = [_, "C:USDIQD", "C:USDKWD", "C:USDSAR", "C:USDVEF"]
# OPEC+: Algeria, Angola, Congo, Equatorial Guinea, Gabon, Libya, Nigeria, United Arab Emirates
# fx_opec_pp = ["C:USDDZD",_, "C:USDCDF", "C:USDGNF", _, "C:USDLYD", "C:USDNGN", "C:USDAED"]
# Large: US, Russia, China, Canada, Norway
# Other important: Qatar, Kazakhstan
# fx_other= ["C:USDQAR", "C:USDKZT"]
fx = ["C:USDSAR", "C:USDAED"]
tickers = equities # + crude_oil
# ##### Get Data From Data Provider
# In[2]:
# Y Finance
import yfinance as yf
def use_yfinance(
tickers, out_file, timeframe="day", start="2000-01-01", end="2023-01-01"
):
assert timeframe == "day", "Use day timeframe for day"
data = yf.download(tickers, start=start, end=end, group_by="ticker", threads=False)
if len(tickers) == 1:
data = pd.concat([data], axis=1, keys=[tickers[0]])
data.index.rename("date", inplace=True)
data.rename(columns=lambda x: str.lower(x), level=1, inplace=True)
if data.index.to_series().dt.tz is None:
print("Adding time")
data = add_tz(data, time_zone="UTC")
if out_file is not None:
write_df(data, out_file)
return data
# In[22]:
# Alpha Vantage
def csv_str_to_df(decoded_content, ticker):
"""CSV string to df"""
lines = decoded_content.splitlines()
print(lines[-20:])
lines = [ "".join([ lines[i+j][8:-3] if j//6==0 else lines[i+j][12:-1] for j in range(6) ]) for i in range(10, len(lines), 6)]
print(len(lines))
print(lines[-20:])
while(1):pass
data = pd.DataFrame(
[row.split(",") for row in lines[1:]],
columns=["date", "open", "high", "low", "close", "volume"],
)
data = data.reset_index(drop=True).set_index("date")
data.index = pd.to_datetime(data.index)
# Add timezome -- we assume it is sent in with unlabled eastern time
if data.index.to_series().dt.tz is None:
print("CONVERTING TIME")
data = add_tz(data, time_zone="US/Eastern")
data = convert_tz(data, time_zone="UTC")
data = pd.concat([data], axis=1, keys=[ticker])
return data
def alpha_vantage_get_ticker_data(ticker, time="1min", year=1, month=1):
"""Function to get (ticker, year, month) data using alpha vantage's time series intraday extended API"""
ALPHA_VANTAGE_API_KEY = "VGRS7MNEHU6K8FAZ"
import requests
CSV_URL = f"https://www.alphavantage.co/query?function=TIME_SERIES_INTRADAY&symbol={ticker}&interval={time}&month={2026-year}-{11-month:02d}&outputsize=full&apikey={ALPHA_VANTAGE_API_KEY}"
while True:
with requests.Session() as s:
download = s.get(CSV_URL)
# save to local file
decoded_content = download.content.decode("utf-8")
print(
f"ticker: {ticker}, y{year} m{month}; response length: {len(decoded_content)}"
)
if len(decoded_content) == 236:
# API too many requests
sleep(60)
elif len(decoded_content) <= 243:
# Token doesn't exist or something
print(f"Error getting {ticker}, y{year}, m{month}. We are skipping")
print(decoded_content)
return None
else:
return csv_str_to_df(decoded_content, ticker)
def use_alpha_vantage(tickers, out_file, time="1min"):
"""Function to get multiple full tickers data using alpha vantage's time series intraday extended API"""
dfs = []
for ticker in tickers:
t_dfs = []
for year in range(1, 3):
for month in range(1, 13):
df_temp = alpha_vantage_get_ticker_data(
ticker, time=time, year=year, month=month
)
if df_temp is not None:
t_dfs.append(df_temp)
if len(t_dfs):
dfs.append(pd.concat(t_dfs, axis=0))
else:
print(f"Skipped {ticker}.")
df = pd.concat(dfs, axis=1, sort=True)
while(1):pass
df.index.rename("date", inplace=True)
write_df(df, out_file)
return df
# In[23]:
# Alpaca
def use_alpaca(tickers, out_file, timeframe="1Minute", start="2017-01-01"):
APCA_API_BASE_URL = os.environ.get("APCA_API_BASE_URL")
APCA_API_KEY_ID = os.environ.get("APCA_API_KEY_ID")
APCA_API_SECRET_KEY = os.environ.get("APCA_API_SECRET_KEY")
import alpaca_trade_api as tradeapi
alpaca = tradeapi.REST(
key_id=APCA_API_KEY_ID,
secret_key=APCA_API_SECRET_KEY,
base_url=APCA_API_BASE_URL,
)
account = alpaca.get_account()
print(account.status)
dfs = []
for ticker in tickers:
print("Getting", ticker)
df = alpaca.get_bars(ticker, timeframe, start).df
print("Recieved", ticker)
df.index.name = "date"
df = pd.concat([df], axis=1, keys=[ticker])
dfs.append(df)
df = pd.concat(dfs, axis=1, sort=True)
df.index.rename("date", inplace=True)
if out_file is not None:
write_df(df, out_file)
return df
# In[24]:
# Polygon
def try_until_suc(request_func, *args, **kwargs):
while True:
try:
res = request_func(*args, **kwargs)
except Exception as e:
print("Error Message:", e)
print(f"Traceback Details: {traceback.format_exc()}") # Get full traceback as a string
print("retry sending request...")
sleep(5)
else:
break
return res
def use_polygon(tickers, out_file, multiplier=1, timespan="minute", start="2000-01-01"):
POLYGON_API_KEY = "i0tmf9psII0FV_W7cAHs5PSKSVlqns72"
from polygon import RESTClient
client = RESTClient(POLYGON_API_KEY)
# aggs = client.get_aggs("AAPL", 1, "day", "2000-01-01", "2001-01-01")
# print(aggs[0].timestamp)
# while(1):pass
dfs = []
end = datetime.datetime.utcnow()
start_og = start
for ticker in tickers:
start = start_og
df_agg = None
response_len = None
i = 0
print("Getting", ticker)
while response_len != 1:
i += 1
aggs = try_until_suc(
client.get_aggs,
ticker,
multiplier,
timespan,
start,
end,
adjusted=True,
sort="asc",
limit=50000,
)
df = pd.DataFrame(aggs)
df.index = pd.DatetimeIndex(
pd.to_datetime(df["timestamp"], unit="ms", utc=True)
)
df.index.name = "date"
df = df.filter(["open", "high", "low", "close", "volume", "vwap"], axis=1)
response_len = len(df.index)
start = df.last_valid_index()
print(i, response_len)
if df_agg is not None:
df_agg.drop(index=df_agg.index[-1], axis=0, inplace=True)
df_agg = pd.merge(df_agg.reset_index(), df.reset_index(), how="outer")
df_agg = df_agg.set_index("date")
else:
df_agg = df
sleep(12) # Attempt to be nice
df_agg = pd.concat([df_agg], axis=1, keys=[ticker])
dfs.append(df_agg)
print("Recieved", ticker)
df = pd.concat(dfs, axis=1, sort=True)
df.index.rename("date", inplace=True)
if out_file is not None:
write_df(df, out_file)
return df
# In[6]:
# Yahoo Finance
# df = use_yfinance(
# random, os.path.join(DATA_RAW, "aapl_day_full.csv"), start="1970-01-01",
# )
# In[25]:
# Alpha Vantage
# df = use_alpha_vantage(tickers, os.path.join(DATA_RAW, "realdata.csv"), time="1h")
# In[ ]:
# # Alpaca
# df = use_alpaca(
# tickers + random, os.path.join(DATA_RAW, "realdata_alp_1h.csv"), timeframe="1Hour"
# )
# # In[ ]:
# Polygon
df = use_polygon(
tickers,
os.path.join(DATA_RAW, "realdata.csv"),
multiplier=1,
timespan="hour",
start="2000-01-01",
)
# In[ ]:
df.head()
# ## Extras
# ##### Read Data From All-Data CSV (Multi Index Columns)
# In[ ]:
df_all = read_data(os.path.join(DATA_RAW, "realdata.csv"))
# df = read_data("tsla_aapl.csv")
print(df_all.head())
print(df.head())
print(df_all.columns)
print(df.columns)
# ##### Concatenate two datasets
# In[ ]:
run = False
if run and not df.columns.equals(df_all.columns):
df_new = write_df(
pd.concat([df_all, df], axis=1), os.path.join(DATA_RAW, "realdata.csv")
)
# ### Remove rows with a lot of NANs
# This is important when using FX data
# In[ ]:
df_f = df.copy()
df_f = df_f.dropna(axis=0, thresh=50) #80
write_df(df_f, os.path.join(DATA_RAW, "realdata_pol_1h.csv"))
# In[ ]:
df.tail(80)