Spaces:
Sleeping
Sleeping
| from fastapi import FastAPI | |
| import pandas as pd | |
| import numpy as np | |
| import matplotlib.pyplot as plt | |
| import json | |
| # import seaborn as sns | |
| import yfinance as yf | |
| import datetime as dt | |
| import xgboost as xgb | |
| from sklearn.metrics import mean_squared_error | |
| app = FastAPI() | |
| def read_root(): | |
| return { | |
| "message": "Hello, Please type a ticker at the end of the URL to get LAST TRADING HOUR FORCAST.", | |
| "format": "https://yaakovy-lasthourforcast.hf.space/ticker/[TICKER]", | |
| "example": "https://yaakovy-lasthourforcast.hf.space/ticker/msft", | |
| } | |
| def get_data(ticker): | |
| # Define the ticker symbol | |
| tickerSymbol = ticker | |
| days_period = 300 | |
| # Get data on this ticker | |
| tickerData = yf.Ticker(tickerSymbol) | |
| start_date = dt.datetime.today() - dt.timedelta(days=days_period) | |
| end_date = dt.datetime.today() | |
| df_all = tickerData.history(start=start_date, end=end_date, interval="1h") | |
| df_all = df_all.drop(columns=["Dividends", "Stock Splits", "Volume"]) | |
| return df_all | |
| def get_last_date_missing_hours(df): | |
| # Assuming df is your DataFrame with the correct datetime index | |
| df.index = pd.to_datetime(df.index) # Ensure datetime format | |
| # Define the trading hours | |
| trading_start = "09:30:00" | |
| trading_end = "16:00:00" | |
| # Normalize the timezone if necessary, here assuming the data might be timezone aware | |
| df.index = df.index.tz_localize(None) | |
| # Find the latest date in your data | |
| latest_date = df.index.max().date() | |
| # Generate a full range of expected trading hours for the latest date, ensuring it's timezone-naive | |
| expected_hours = pd.date_range( | |
| start=f"{latest_date} {trading_start}", | |
| end=f"{latest_date} {trading_end}", | |
| freq="H", | |
| tz=None, | |
| ) | |
| # Extract actual timestamps for the latest date, also as timezone-naive | |
| actual_hours = df[df.index.date == latest_date].index.tz_localize(None) | |
| # Determine missing hours | |
| missing_hours = expected_hours.difference(actual_hours) | |
| # Add missing hours to the DataFrame as empty rows | |
| for hour in missing_hours: | |
| if hour not in df.index: | |
| df.loc[hour] = [pd.NA] * len(df.columns) # Initialize missing hours with NA | |
| # Sort the DataFrame after inserting new rows to maintain the chronological order | |
| df.sort_index(inplace=True) | |
| # forward filling | |
| # Ensure the index is in datetime format and normalized | |
| df.index = pd.to_datetime(df.index) | |
| df.index = df.index.tz_localize(None) | |
| # Find the latest date in your data | |
| latest_date = df.index.max().date() | |
| # Select only the data for the latest day | |
| latest_day_data = df[df.index.date == latest_date] | |
| # Perform forward filling on this latest day data | |
| latest_day_data_filled = latest_day_data.ffill() | |
| # Replace the original latest day data in the DataFrame with the filled data | |
| df.loc[df.index.date == latest_date] = latest_day_data_filled | |
| # Optionally, ensure the entire DataFrame is sorted by index | |
| df.sort_index(inplace=True) | |
| return df | |
| def prepare_df_for_model(df): | |
| df.index = pd.to_datetime(df.index) # Ensure the index is datetime | |
| # Extract date and time from the datetime index | |
| df["Date"] = df.index.date | |
| df["Time"] = df.index.time | |
| # Filter out data for hours from 09:30 to 14:30 and the target at 15:30 | |
| df_hours = df[ | |
| df["Time"].isin( | |
| [ | |
| pd.to_datetime("09:30:00").time(), | |
| pd.to_datetime("10:30:00").time(), | |
| pd.to_datetime("11:30:00").time(), | |
| pd.to_datetime("12:30:00").time(), | |
| pd.to_datetime("13:30:00").time(), | |
| pd.to_datetime("14:30:00").time(), | |
| ] | |
| ) | |
| ] | |
| df_target = df[df["Time"] == pd.to_datetime("15:30:00").time()][["Date", "Close"]] | |
| # Rename the target close column for clarity | |
| df_target.rename(columns={"Close": "Close_target"}, inplace=True) | |
| # Pivot the hours data to have one row per day with all the columns | |
| df_pivot = df_hours.pivot( | |
| index="Date", columns="Time", values=["Open", "High", "Low", "Close"] | |
| ) | |
| # Flatten the columns after pivoting and create a multi-level index | |
| df_pivot.columns = [ | |
| "{}_{}".format(feature, time.strftime("%H:%M")) | |
| for feature, time in df_pivot.columns | |
| ] | |
| # Join the pivot table with the target data | |
| df_final = df_pivot.join(df_target.set_index("Date")) | |
| # Convert the index back to datetime if it got changed to object type | |
| df_final.index = pd.to_datetime(df_final.index) | |
| df = df_final.dropna() | |
| return df | |
| def high_low_columns(df_final): | |
| # Extract columns for 'High' and 'Low' values | |
| high_columns = [col for col in df_final.columns if "High_" in col] | |
| low_columns = [col for col in df_final.columns if "Low_" in col] | |
| # Calculate 'max high' and 'min low' for each day | |
| df_final["MAX_high"] = df_final[high_columns].max(axis=1) | |
| df_final["MIN_low"] = df_final[low_columns].min(axis=1) | |
| return df_final | |
| def calc_percentage_change(df): | |
| # Convert index to datetime if necessary (if not already done) | |
| df.index = pd.to_datetime(df.index) | |
| # Calculate the percentage change relative to 'Open_09:30' for each column | |
| for column in df.columns: | |
| if column != "Open_09:30": | |
| df[column] = (df[column] - df["Open_09:30"]) / df["Open_09:30"] * 100 | |
| return df | |
| def create_features(df): | |
| """ | |
| Create time series features based on time series index. | |
| """ | |
| df = df.copy() | |
| df["dayofweek"] = df.index.dayofweek | |
| df["quarter"] = df.index.quarter | |
| df["month"] = df.index.month | |
| df["year"] = df.index.year | |
| df["dayofyear"] = df.index.dayofyear | |
| df["dayofmonth"] = df.index.day | |
| df["weekofyear"] = df.index.isocalendar().week | |
| df["weekofyear"] = df["weekofyear"].astype("Int32") | |
| return df | |
| def train_test_split(df): | |
| df.index = pd.to_datetime(df.index) | |
| # Define the number of test instances (e.g., last 30 days) | |
| num_test = 30 | |
| # Split data into features and target | |
| X = df.drop(columns=["Close_target"]) | |
| y = df["Close_target"] | |
| # Split the data into training and testing sets | |
| X_train, y_train = X[:-num_test], y[:-num_test] | |
| X_test, y_test = X[-num_test:], y[-num_test:] | |
| # Train indices are earlier, and test indices include the last date | |
| train_indices = df.index < df.index[-num_test] | |
| test_indices = df.index >= df.index[-num_test] | |
| return X_train, y_train, X_test, y_test | |
| def run_xgboost(df): | |
| X_train, y_train, X_test, y_test = train_test_split(df) | |
| # Define the model | |
| model = xgb.XGBRegressor( | |
| n_estimators=100, | |
| learning_rate=0.1, | |
| max_depth=3, | |
| subsample=0.8, | |
| colsample_bytree=0.8, | |
| objective="reg:squarederror", | |
| ) | |
| # Train the model with evaluation | |
| model.fit( | |
| X_train, | |
| y_train, | |
| eval_metric="rmse", | |
| eval_set=[(X_train, y_train), (X_test, y_test)], | |
| verbose=True, | |
| early_stopping_rounds=10, | |
| ) | |
| # Making predictions | |
| predictions = model.predict(X_test) | |
| # Prediction for the latest date | |
| latest_prediction = predictions[-1] | |
| # Calculate and print RMSE for the test set | |
| rmse = np.sqrt(mean_squared_error(y_test, predictions)) | |
| np_float = np.float32(rmse) | |
| return {"latest_prediction": latest_prediction, "RMSE": float(np_float)} | |
| def prcess_ticker(ticker: str): | |
| df = get_data(ticker) | |
| df = get_last_date_missing_hours(df) | |
| df = prepare_df_for_model(df) | |
| df = high_low_columns(df) | |
| df = calc_percentage_change(df) | |
| df = create_features(df) | |
| result = run_xgboost(df) | |
| return json.dumps(result, cls=NumpyEncoder) | |
| class NumpyEncoder(json.JSONEncoder): | |
| """Custom encoder for numpy data types""" | |
| def default(self, obj): | |
| if isinstance(obj, np.ndarray): | |
| return obj.tolist() | |
| if isinstance( | |
| obj, | |
| ( | |
| np.int_, | |
| np.intc, | |
| np.intp, | |
| np.int8, | |
| np.int16, | |
| np.int32, | |
| np.int64, | |
| np.uint8, | |
| np.uint16, | |
| np.uint32, | |
| np.uint64, | |
| ), | |
| ): | |
| return int(obj) | |
| if isinstance(obj, (np.float_, np.float16, np.float32, np.float64)): | |
| return float(obj) | |
| if isinstance(obj, (np.complex_, np.complex64, np.complex128)): | |
| return {"real": obj.real, "imag": obj.imag} | |
| return json.JSONEncoder.default(self, obj) | |