import pandas as pd import numpy as np from pmdarima.arima import auto_arima from sklearn.metrics import mean_absolute_error, mean_squared_error import matplotlib.pyplot as plt # --- Step 1: Load stock data --- stock_prices = pd.read_csv("/work/GOOGL.csv", parse_dates=["Date"], index_col="Date")["Close"] # --- Step 2: Compute Log Returns --- log_returns = np.log(stock_prices / stock_prices.shift(1)).dropna() # --- Step 3: Sliding Window Evaluation --- def evaluate_window(log_returns, stock_prices, window_size, test_size=0.2): train_size = int(len(log_returns) * (1 - test_size)) train, test = log_returns[:train_size], log_returns[train_size:] predictions = [] price_predictions = [] last_train_price = stock_prices.iloc[train_size - 1] price_predictions.append(last_train_price) for t in range(len(test)): # Define rolling window start_idx = train_size + t - window_size if start_idx < 0: window_data = log_returns[:train_size + t] else: window_data = log_returns[start_idx:train_size + t] # Fit ARIMA model = auto_arima( window_data.values, seasonal=False, stepwise=True, suppress_warnings=True, error_action="ignore" ) # Forecast 1-step log return forecast = model.predict(n_periods=1)[0] predictions.append(forecast) # Convert to price price_predictions.append(price_predictions[-1] * np.exp(forecast)) # Drop the initial seed (last_train_price) price_predictions = price_predictions[1:] # --- Ensure same length --- predictions = np.array(predictions) test = test[:len(predictions)] actual_prices = stock_prices.iloc[train_size:train_size + len(price_predictions)] # --- Metrics in log-return space --- mae_log = mean_absolute_error(test, predictions) rmse_log = np.sqrt(mean_squared_error(test, predictions)) # --- Metrics in price space --- mae_price = mean_absolute_error(actual_prices, price_predictions) rmse_price = np.sqrt(mean_squared_error(actual_prices, price_predictions)) # --- Direction Accuracy --- direction_accuracy = np.mean( np.sign(np.diff(actual_prices.values)) == np.sign(np.diff(price_predictions)) ) return { "MAE_Log": mae_log, "RMSE_Log": rmse_log, "MAE_Price": mae_price, "RMSE_Price": rmse_price, "Direction_Accuracy": direction_accuracy, "Price_Predictions": price_predictions, # Store predictions "Actual_Prices": actual_prices # Store actual prices } # --- Step 4: Test multiple window sizes --- window_sizes = [30, 60, 90, 120, 180, 200, 250] results = {} for w in window_sizes: print(f"Evaluating window size: {w}") metrics = evaluate_window(log_returns, stock_prices, w) results[w] = metrics results_df = pd.DataFrame({k: {kk: vv for kk, vv in v.items() if kk not in ['Price_Predictions', 'Actual_Prices']} for k, v in results.items()}).T.sort_values("RMSE_Price") print("\nSliding Window Evaluation Results:") print(results_df) best_rmse_window = results_df.index[0] print(f"\n✅ Best window for RMSE (Price): {best_rmse_window} days") # --- Step 5: Plot the forecast for the best window --- best_metrics = results[best_rmse_window] actual_prices = best_metrics['Actual_Prices'] price_predictions = best_metrics['Price_Predictions'] # Create the plot plt.figure(figsize=(12, 6)) plt.plot(actual_prices.index, actual_prices, label='Actual Prices', color='blue') plt.plot(actual_prices.index[:len(price_predictions)], price_predictions, label='Predicted Prices', color='orange', linestyle='--') plt.title(f'ARIMA Forecast vs Actual Prices (Window Size: {best_rmse_window} days)') plt.xlabel('Date') plt.ylabel('Stock Price (USD)') plt.legend() plt.grid(True) plt.show()