AnnNaserNabil's picture
Create ARIMA.py
e845ced verified
raw
history blame
3.89 kB
import pandas as pd
import numpy as np
from pmdarima.arima import auto_arima
from sklearn.metrics import mean_absolute_error, mean_squared_error
import matplotlib.pyplot as plt
# --- Step 1: Load stock data ---
stock_prices = pd.read_csv("/work/GOOGL.csv", parse_dates=["Date"], index_col="Date")["Close"]
# --- Step 2: Compute Log Returns ---
log_returns = np.log(stock_prices / stock_prices.shift(1)).dropna()
# --- Step 3: Sliding Window Evaluation ---
def evaluate_window(log_returns, stock_prices, window_size, test_size=0.2):
train_size = int(len(log_returns) * (1 - test_size))
train, test = log_returns[:train_size], log_returns[train_size:]
predictions = []
price_predictions = []
last_train_price = stock_prices.iloc[train_size - 1]
price_predictions.append(last_train_price)
for t in range(len(test)):
# Define rolling window
start_idx = train_size + t - window_size
if start_idx < 0:
window_data = log_returns[:train_size + t]
else:
window_data = log_returns[start_idx:train_size + t]
# Fit ARIMA
model = auto_arima(
window_data.values,
seasonal=False,
stepwise=True,
suppress_warnings=True,
error_action="ignore"
)
# Forecast 1-step log return
forecast = model.predict(n_periods=1)[0]
predictions.append(forecast)
# Convert to price
price_predictions.append(price_predictions[-1] * np.exp(forecast))
# Drop the initial seed (last_train_price)
price_predictions = price_predictions[1:]
# --- Ensure same length ---
predictions = np.array(predictions)
test = test[:len(predictions)]
actual_prices = stock_prices.iloc[train_size:train_size + len(price_predictions)]
# --- Metrics in log-return space ---
mae_log = mean_absolute_error(test, predictions)
rmse_log = np.sqrt(mean_squared_error(test, predictions))
# --- Metrics in price space ---
mae_price = mean_absolute_error(actual_prices, price_predictions)
rmse_price = np.sqrt(mean_squared_error(actual_prices, price_predictions))
# --- Direction Accuracy ---
direction_accuracy = np.mean(
np.sign(np.diff(actual_prices.values)) == np.sign(np.diff(price_predictions))
)
return {
"MAE_Log": mae_log,
"RMSE_Log": rmse_log,
"MAE_Price": mae_price,
"RMSE_Price": rmse_price,
"Direction_Accuracy": direction_accuracy,
"Price_Predictions": price_predictions, # Store predictions
"Actual_Prices": actual_prices # Store actual prices
}
# --- Step 4: Test multiple window sizes ---
window_sizes = [30, 60, 90, 120, 180, 200, 250]
results = {}
for w in window_sizes:
print(f"Evaluating window size: {w}")
metrics = evaluate_window(log_returns, stock_prices, w)
results[w] = metrics
results_df = pd.DataFrame({k: {kk: vv for kk, vv in v.items() if kk not in ['Price_Predictions', 'Actual_Prices']} for k, v in results.items()}).T.sort_values("RMSE_Price")
print("\nSliding Window Evaluation Results:")
print(results_df)
best_rmse_window = results_df.index[0]
print(f"\n✅ Best window for RMSE (Price): {best_rmse_window} days")
# --- Step 5: Plot the forecast for the best window ---
best_metrics = results[best_rmse_window]
actual_prices = best_metrics['Actual_Prices']
price_predictions = best_metrics['Price_Predictions']
# Create the plot
plt.figure(figsize=(12, 6))
plt.plot(actual_prices.index, actual_prices, label='Actual Prices', color='blue')
plt.plot(actual_prices.index[:len(price_predictions)], price_predictions, label='Predicted Prices', color='orange', linestyle='--')
plt.title(f'ARIMA Forecast vs Actual Prices (Window Size: {best_rmse_window} days)')
plt.xlabel('Date')
plt.ylabel('Stock Price (USD)')
plt.legend()
plt.grid(True)
plt.show()