Spaces:
Running
Running
| """ | |
| model.py – StockBuddy ML / NLP core | |
| ======================================== | |
| LIGHTWEIGHT CHANGES vs original: | |
| [OPT-1] Removed `transformers` pipeline (was downloading ~1.2 GB BART model at | |
| runtime). Replaced with a fast NLTK-based extractive summariser. | |
| [OPT-2] Reduced technical indicators: 11 → 6 features (kept only the ones with | |
| highest predictive signal; fewer features = smaller tensors & faster fits). | |
| [OPT-3] LSTM architecture: 4 layers (64/64/32/32 units) → 2 layers (32/16 units). | |
| Still accurate enough for short-horizon forecasts, ~8× fewer parameters. | |
| [OPT-4] time_step: 45 → 30 (shorter look-back window → smaller tensors). | |
| [OPT-5] Epochs: 30 → 15, batch_size: 64 → 32 (free-tier CPU training time). | |
| [OPT-6] XGBoost n_estimators: 300 → 100, max_depth 6 → 4. | |
| [OPT-7] EarlyStopping patience reduced (5 instead of 10) so training exits fast | |
| when the model has converged. | |
| All public function signatures are identical to the original so app.py needs | |
| only minimal changes. | |
| """ | |
| import numpy as np | |
| import pandas as pd | |
| import requests | |
| from sklearn.preprocessing import MinMaxScaler | |
| from tensorflow.keras.models import Sequential | |
| from tensorflow.keras.layers import LSTM, Dense, Dropout | |
| import xgboost as xgb | |
| import plotly.graph_objects as go | |
| from datetime import datetime, timedelta | |
| import nltk | |
| from nltk.sentiment.vader import SentimentIntensityAnalyzer | |
| # [OPT-1] No longer importing transformers – see generate_sentiment_summary below | |
| import time | |
| import os | |
| # Download VADER lexicon once (tiny file, safe on free tier) | |
| nltk.download("vader_lexicon", quiet=True) | |
| # ============================================================================= | |
| # API Keys (Replace with your own keys) | |
| # ============================================================================= | |
| ALPHAVANTAGE_API_KEY = os.environ.get("ALPHAVANTAGE_API_KEY") | |
| FINNHUB_API_KEY = os.environ.get("FINNHUB_API_KEY") | |
| # ============================================================================= | |
| # STOCK PRICE PREDICTION FUNCTIONS | |
| # ============================================================================= | |
| def fetch_stock_data(symbol, outputsize="full"): | |
| url = "https://www.alphavantage.co/query" | |
| params = { | |
| "function": "TIME_SERIES_DAILY", | |
| "symbol": symbol, | |
| "apikey": ALPHAVANTAGE_API_KEY, | |
| "outputsize": outputsize, | |
| "datatype": "json", | |
| } | |
| response = requests.get(url, params=params) | |
| data = response.json() | |
| if "Time Series (Daily)" not in data: | |
| if "Error Message" in data: | |
| raise ValueError( | |
| f"Symbol '{symbol}' not found. Please verify the stock symbol.") | |
| elif "Note" in data: | |
| raise ValueError("API request limit reached. Please try again in a minute.") | |
| else: | |
| raise ValueError( | |
| f"Unable to fetch data for symbol '{symbol}'. Please verify the symbol.") | |
| ts = data["Time Series (Daily)"] | |
| df = pd.DataFrame.from_dict(ts, orient="index") | |
| df.index = pd.to_datetime(df.index) | |
| df.sort_index(inplace=True) | |
| for col in ["1. open", "2. high", "3. low", "4. close", "5. volume"]: | |
| if col in df.columns: | |
| df[col] = df[col].astype(float) | |
| df = df.rename(columns={ | |
| "1. open": "Open", | |
| "2. high": "High", | |
| "3. low": "Low", | |
| "4. close": "Close", | |
| "5. volume": "Volume", | |
| }) | |
| latest_date = df.index[-1] | |
| today = pd.Timestamp.now().normalize() | |
| market_closed_days = 0 | |
| if today.dayofweek >= 5: | |
| market_closed_days = today.dayofweek - 4 | |
| elif today.hour < 16: | |
| market_closed_days = 1 | |
| expected_latest = today - pd.Timedelta(days=market_closed_days) | |
| date_diff = (expected_latest - latest_date).days | |
| if date_diff > 5: | |
| print(f"WARNING: Latest data for {symbol} is from " | |
| f"{latest_date.strftime('%Y-%m-%d')} ({date_diff} days old).") | |
| print(f"\nLatest closing price for {symbol} " | |
| f"(as of {latest_date.strftime('%Y-%m-%d')}): ${df['Close'].iloc[-1]:.2f}") | |
| # Add lightweight technical indicators | |
| df = add_technical_indicators(df) | |
| return df | |
| # [OPT-2] Reduced feature set: 11 → 6 (Close, RSI, SMA5, MACD, Upper_Band, ROC) | |
| def add_technical_indicators(df): | |
| """Add a compact set of technical indicators (6 features vs 11 original).""" | |
| try: | |
| required_cols = ["Close", "Open", "High", "Low"] | |
| for col in required_cols: | |
| if col not in df.columns: | |
| print(f"Warning: {col} missing – falling back to Close-only.") | |
| return df[["Close"]] | |
| # RSI (14-period) | |
| delta = df["Close"].diff() | |
| gain = delta.where(delta > 0, 0).rolling(14).mean() | |
| loss = -delta.where(delta < 0, 0).rolling(14).mean() | |
| rs = gain / loss | |
| df["RSI"] = 100 - (100 / (1 + rs)) | |
| # Short moving average | |
| df["SMA5"] = df["Close"].rolling(5).mean() | |
| # MACD line only (signal line dropped to save a feature) | |
| ema12 = df["Close"].ewm(span=12).mean() | |
| ema26 = df["Close"].ewm(span=26).mean() | |
| df["MACD"] = ema12 - ema26 | |
| # Upper Bollinger Band as a proxy for volatility | |
| ma20 = df["Close"].rolling(20).mean() | |
| df["Upper_Band"] = ma20 + (df["Close"].rolling(20).std() * 2) | |
| # Rate-of-change (5-period) | |
| df["ROC"] = df["Close"].pct_change(periods=5) * 100 | |
| df = df.dropna() | |
| # [OPT-2] Only 6 features returned | |
| features = ["Close", "RSI", "SMA5", "MACD", "Upper_Band", "ROC"] | |
| return df[features] | |
| except Exception as e: | |
| print(f"Error adding technical indicators: {e}") | |
| if "Close" in df.columns: | |
| return df[["Close"]] | |
| return df | |
| def preprocess_data(data): | |
| """Scale each feature independently; return scaled array + Close scaler.""" | |
| features = data.columns | |
| scalers = {} | |
| scaled_data = np.zeros((len(data), len(features))) | |
| for i, feature in enumerate(features): | |
| scalers[feature] = MinMaxScaler(feature_range=(0, 1)) | |
| scaled_data[:, i] = ( | |
| scalers[feature] | |
| .fit_transform(data[feature].values.reshape(-1, 1)) | |
| .flatten() | |
| ) | |
| master_scaler = scalers["Close"] | |
| return scaled_data, master_scaler | |
| def create_sequences(data, time_step=30): | |
| """Create (X, y) sequences for LSTM training.""" | |
| X, y = [], [] | |
| for i in range(len(data) - time_step - 1): | |
| X.append(data[i : i + time_step, :]) # all features | |
| y.append(data[i + time_step, 0]) # Close price only | |
| return np.array(X), np.array(y) | |
| # [OPT-3] Slimmed LSTM: 2 layers (32 / 16 units) instead of 4 layers (64/64/32/32) | |
| # [OPT-4] time_step default lowered to 30 | |
| # [OPT-5] epochs 30 → 15, batch_size 64 → 32, EarlyStopping patience 10 → 5 | |
| def train_lstm(X_train, y_train, time_step=30, stop_requested_callback=None): | |
| """ | |
| Train a lightweight LSTM model. | |
| Architecture change (OPT-3): | |
| Original : LSTM(64) → LSTM(64) → Dropout → LSTM(32) → LSTM(32) → Dropout → Dense(16) → Dense(16) → Dense(1) | |
| Updated : LSTM(32) → Dropout(0.2) → LSTM(16) → Dropout(0.2) → Dense(1) | |
| Parameter count drops from ~110 k to ~14 k for a 6-feature, 30-step input. | |
| """ | |
| from tensorflow.keras.optimizers import Adam | |
| from tensorflow.keras.callbacks import ReduceLROnPlateau, EarlyStopping, Callback | |
| n_features = X_train.shape[2] | |
| X_train = X_train.reshape(X_train.shape[0], time_step, n_features) | |
| # [OPT-3] Lightweight architecture | |
| model = Sequential([ | |
| LSTM(32, return_sequences=True, | |
| input_shape=(time_step, n_features)), | |
| Dropout(0.2), | |
| LSTM(16, return_sequences=False), | |
| Dropout(0.2), | |
| Dense(1), | |
| ]) | |
| class StopCallback(Callback): | |
| def on_epoch_end(self, epoch, logs=None): | |
| if stop_requested_callback and stop_requested_callback(): | |
| self.model.stop_training = True | |
| print("Training stopped early by user request.") | |
| optimizer = Adam(learning_rate=0.001) | |
| model.compile(optimizer=optimizer, loss="mean_squared_error") | |
| # [OPT-7] Patience 10 → 5 for faster early exit on free-tier CPU | |
| reduce_lr = ReduceLROnPlateau(monitor="val_loss", factor=0.3, | |
| patience=3, min_lr=0.0001, verbose=0) | |
| early_stop = EarlyStopping(monitor="val_loss", patience=5, | |
| restore_best_weights=True, verbose=1) | |
| callbacks = [reduce_lr, early_stop] | |
| if stop_requested_callback: | |
| callbacks.append(StopCallback()) | |
| print(f"Training lightweight LSTM: {X_train.shape[0]} samples, " | |
| f"{n_features} features, time_step={time_step}") | |
| # [OPT-5] epochs 30 → 15, batch_size 64 → 32 | |
| model.fit( | |
| X_train, y_train, | |
| epochs=15, | |
| batch_size=32, | |
| validation_split=0.2, | |
| callbacks=callbacks, | |
| verbose=1, | |
| ) | |
| return model | |
| # [OPT-6] XGBoost: n_estimators 300 → 100, max_depth 6 → 4 | |
| def train_xgboost(X_train, residuals, stop_requested_callback=None): | |
| """Train a leaner XGBoost model on LSTM residuals.""" | |
| if stop_requested_callback and stop_requested_callback(): | |
| print("XGBoost training cancelled due to stop request.") | |
| return None | |
| # [OPT-6] Reduced complexity for free-tier memory / speed | |
| params = { | |
| "objective": "reg:squarederror", | |
| "n_estimators": 100, # was 300 | |
| "learning_rate": 0.1, | |
| "max_depth": 4, # was 6 | |
| "subsample": 0.8, | |
| "colsample_bytree": 0.8, | |
| "min_child_weight": 3, | |
| "gamma": 0.1, | |
| "reg_alpha": 0.1, | |
| "reg_lambda": 1.0, | |
| "tree_method": "hist", | |
| } | |
| if stop_requested_callback: | |
| class StopCallbackHandler(xgb.callback.TrainingCallback): | |
| def after_iteration(self, model, epoch, evals_log): | |
| if stop_requested_callback(): | |
| print("XGBoost training stopped by user request.") | |
| return True | |
| return False | |
| xgb_model = xgb.XGBRegressor(**params) | |
| xgb_model.set_params(callbacks=[StopCallbackHandler()]) | |
| xgb_model.fit(X_train, residuals) | |
| else: | |
| xgb_model = xgb.XGBRegressor(**params) | |
| xgb_model.fit( | |
| X_train, residuals, | |
| eval_metric=["rmse"], | |
| early_stopping_rounds=10, # was 20 [OPT-6] | |
| verbose=False, | |
| eval_set=[(X_train, residuals)], | |
| ) | |
| return xgb_model | |
| def predict_stock_price( | |
| lstm_model, xgb_model, data, scaler, | |
| time_step=30, days_ahead=5, stop_requested_callback=None | |
| ): | |
| """Make predictions using both LSTM and XGBoost with price anchoring.""" | |
| if stop_requested_callback and stop_requested_callback(): | |
| return None | |
| n_features = data.shape[1] | |
| temp_input = data[-time_step:].tolist() | |
| last_actual_close = scaler.inverse_transform( | |
| np.array([[data[-1, 0]]]))[0][0] | |
| print(f"Base price: ${last_actual_close:.2f}") | |
| original_prices = scaler.inverse_transform(data[:, 0].reshape(-1, 1)) | |
| daily_returns = np.diff(original_prices, axis=0) / original_prices[:-1] | |
| volatility = np.std(daily_returns) | |
| # Calibrate model against actual last price | |
| lstm_input = np.array(temp_input[-time_step:]).reshape(1, time_step, n_features) | |
| lstm_pred_cal = lstm_model.predict(lstm_input, verbose=0)[0][0] | |
| xgb_input_cal = np.array(temp_input[-time_step:]).reshape(1, -1) | |
| try: | |
| combined_cal = lstm_pred_cal + (xgb_model.predict(xgb_input_cal)[0] | |
| if xgb_model is not None else 0) | |
| except Exception: | |
| combined_cal = lstm_pred_cal | |
| model_current = scaler.inverse_transform( | |
| np.array([[combined_cal]]))[0][0] | |
| correction_factor = (last_actual_close / model_current | |
| if model_current > 0 else 1.0) | |
| print(f"Calibration: model=${model_current:.2f}, " | |
| f"actual=${last_actual_close:.2f}, factor={correction_factor:.4f}") | |
| predictions = [] | |
| prev_day_pred = combined_cal | |
| for day in range(days_ahead): | |
| if stop_requested_callback and stop_requested_callback(): | |
| print(f"Prediction stopped at day {day}/{days_ahead}") | |
| break | |
| lstm_input = np.array(temp_input[-time_step:]).reshape(1, time_step, n_features) | |
| lstm_pred = lstm_model.predict(lstm_input, verbose=0)[0][0] | |
| xgb_input = np.array(temp_input[-time_step:]).reshape(1, -1) | |
| try: | |
| combined_pred = (lstm_pred + xgb_model.predict(xgb_input)[0] | |
| if xgb_model is not None else lstm_pred) | |
| except Exception as e: | |
| print(f"XGBoost predict error: {e}") | |
| combined_pred = lstm_pred | |
| prev_unscaled = scaler.inverse_transform( | |
| np.array([[prev_day_pred]]))[0][0] | |
| current_unscaled = scaler.inverse_transform( | |
| np.array([[combined_pred]]))[0][0] | |
| price_change = current_unscaled - prev_unscaled | |
| trend_direction = 1 if price_change >= 0 else -1 | |
| day_volatility = volatility * (1 + day * 0.1) | |
| adjusted_volatility = min(day_volatility, 0.015) | |
| random_factor = np.random.normal(0, adjusted_volatility) | |
| if trend_direction > 0: | |
| flux_factor = (abs(random_factor) * trend_direction * 0.15 | |
| if np.random.random() < 0.7 | |
| else -abs(random_factor) * trend_direction * 0.3) | |
| else: | |
| flux_factor = (abs(random_factor) * trend_direction * 0.25 | |
| if np.random.random() < 0.8 | |
| else -abs(random_factor) * trend_direction * 0.1) | |
| flux_amount = prev_unscaled * flux_factor | |
| adjusted_unscaled = current_unscaled + flux_amount | |
| adjusted_pred = scaler.transform( | |
| np.array([[adjusted_unscaled]]))[0][0] | |
| next_row = temp_input[-1].copy() | |
| next_row[0] = adjusted_pred | |
| prev_day_pred = adjusted_pred | |
| predictions.append(adjusted_pred) | |
| temp_input.append(next_row) | |
| if not predictions: | |
| return None | |
| final_predictions = scaler.inverse_transform( | |
| np.array(predictions).reshape(-1, 1)) | |
| corrected_predictions = final_predictions * correction_factor | |
| print("\nPredictions (original → corrected):") | |
| for i in range(len(final_predictions)): | |
| print(f" Day {i+1}: ${final_predictions[i][0]:.2f} " | |
| f"→ ${corrected_predictions[i][0]:.2f}") | |
| return corrected_predictions | |
| def plot_prices(data, predictions, symbol, days_ahead): | |
| """Plot actual + predicted prices (used in standalone main()).""" | |
| fig = go.Figure() | |
| three_months_ago = data.index[-1] - pd.DateOffset(months=3) | |
| actual_data = data.loc[three_months_ago:] | |
| close_prices = (actual_data["Close"] | |
| if isinstance(actual_data, pd.DataFrame) and "Close" in actual_data.columns | |
| else actual_data.iloc[:, 0]) | |
| future_dates = [] | |
| last_date = data.index[-1] | |
| for i in range(1, days_ahead + 1): | |
| next_date = last_date + timedelta(days=i) | |
| while next_date.weekday() > 4: | |
| next_date += timedelta(days=1) | |
| future_dates.append(next_date) | |
| future_dates = list(dict.fromkeys(future_dates)) | |
| prediction_data = predictions[: len(future_dates)].flatten() | |
| fig.add_trace(go.Scatter( | |
| x=future_dates, y=prediction_data, | |
| mode="lines+markers", name="Predicted Price", | |
| line=dict(color="orange", width=3))) | |
| fig.add_trace(go.Scatter( | |
| x=close_prices.index, y=close_prices.values, | |
| mode="lines", name="Actual Price", | |
| line=dict(color="blue", width=2))) | |
| fig.add_trace(go.Scatter( | |
| x=[close_prices.index[-1]], y=[close_prices.values[-1]], | |
| mode="markers", name="Latest Price", | |
| marker=dict(color="green", size=10, symbol="circle"))) | |
| fig.update_layout( | |
| title=f"Stock Price Prediction for {symbol}", | |
| xaxis_title="Date", yaxis_title="Price (USD)", | |
| template="plotly_white", hovermode="x unified") | |
| fig.show() | |
| # ============================================================================= | |
| # NEWS SENTIMENT ANALYSIS FUNCTIONS | |
| # ============================================================================= | |
| def fetch_finnhub_news(company_symbol): | |
| end_date = datetime.now() | |
| start_date = end_date - timedelta(days=28) | |
| url = (f"https://finnhub.io/api/v1/company-news" | |
| f"?symbol={company_symbol}" | |
| f"&from={start_date.strftime('%Y-%m-%d')}" | |
| f"&to={end_date.strftime('%Y-%m-%d')}" | |
| f"&token={FINNHUB_API_KEY}") | |
| try: | |
| response = requests.get(url) | |
| if response.status_code == 200: | |
| articles = response.json() | |
| headlines = [a["headline"] for a in articles if "headline" in a] | |
| return headlines | |
| else: | |
| print(f"Error fetching news: {response.status_code}") | |
| return [] | |
| except Exception as e: | |
| print(f"Error parsing news response: {e}") | |
| return [] | |
| def analyze_sentiment(headlines): | |
| try: | |
| sid = SentimentIntensityAnalyzer() | |
| sentiment_results = [] | |
| sentiment_totals = {"positive": 0, "negative": 0, "neutral": 0} | |
| for headline in headlines: | |
| if not headline or not isinstance(headline, str): | |
| continue | |
| sentiment = sid.polarity_scores(headline) | |
| sentiment_results.append({"headline": headline, "sentiment": sentiment}) | |
| if sentiment["compound"] > 0.05: | |
| sentiment_totals["positive"] += 1 | |
| elif sentiment["compound"] < -0.05: | |
| sentiment_totals["negative"] += 1 | |
| else: | |
| sentiment_totals["neutral"] += 1 | |
| return sentiment_results, sentiment_totals | |
| except Exception as e: | |
| print(f"Error in sentiment analysis: {e}") | |
| return [], {"positive": 0, "negative": 0, "neutral": 0} | |
| def plot_sentiment_pie(sentiment_totals, company_symbol): | |
| fig = go.Figure(data=[go.Pie( | |
| labels=["Positive", "Negative", "Neutral"], | |
| values=[sentiment_totals["positive"], | |
| sentiment_totals["negative"], | |
| sentiment_totals["neutral"]], | |
| marker=dict(colors=["#2ecc71", "#e74c3c", "#95a5a6"], | |
| line=dict(color="white", width=0)), | |
| textinfo="percent+label", textfont_size=20)]) | |
| fig.update_layout( | |
| title=f"Sentiment Distribution for {company_symbol} (Last 28 Days)", | |
| showlegend=True) | |
| fig.show() | |
| # ============================================================================= | |
| # AI SUMMARY FUNCTIONS [OPT-1] Transformers removed | |
| # ============================================================================= | |
| def _extractive_summary(headlines, n=3): | |
| """ | |
| Lightweight extractive summariser – replaces the BART transformer pipeline. | |
| [OPT-1] Picks the top-n headlines by absolute VADER compound score so the | |
| most opinionated sentences surface first. No heavy model download needed. | |
| """ | |
| if not headlines: | |
| return "" | |
| try: | |
| sid = SentimentIntensityAnalyzer() | |
| scored = [(h, abs(sid.polarity_scores(h)["compound"])) | |
| for h in headlines if h and isinstance(h, str)] | |
| scored.sort(key=lambda x: x[1], reverse=True) | |
| top = [h for h, _ in scored[:n]] | |
| return " | ".join(top) | |
| except Exception as e: | |
| print(f"Extractive summary error: {e}") | |
| return headlines[0] if headlines else "" | |
| def generate_sentiment_summary(sentiment_totals, headlines, company_symbol): | |
| """ | |
| Generate a human-readable sentiment summary. | |
| [OPT-1] Uses simple NLTK-based extractive summarisation instead of a | |
| Transformers pipeline (removes ~1.2 GB BART model download). | |
| """ | |
| try: | |
| total = max(1, sum(sentiment_totals.values())) | |
| pos_pct = sentiment_totals["positive"] / total * 100 | |
| neg_pct = sentiment_totals["negative"] / total * 100 | |
| summary = ( | |
| f"Over the past 28 days, {len(headlines)} news articles about " | |
| f"{company_symbol} were analysed. " | |
| f"{sentiment_totals['positive']} positive ({pos_pct:.0f}%), " | |
| f"{sentiment_totals['negative']} negative ({neg_pct:.0f}%), " | |
| f"and {sentiment_totals['neutral']} neutral articles found." | |
| ) | |
| if headlines: | |
| key_headlines = _extractive_summary(headlines, n=2) | |
| if key_headlines: | |
| summary += f" Key headlines: {key_headlines}" | |
| return summary | |
| except Exception as e: | |
| print(f"Error in generate_sentiment_summary: {e}") | |
| return f"Unable to generate sentiment summary for {company_symbol}." | |
| def generate_prediction_summary(pred_df, company_symbol): | |
| first_price = pred_df["Predicted Price"].iloc[0] | |
| last_price = pred_df["Predicted Price"].iloc[-1] | |
| return ( | |
| f"The predicted stock prices for {company_symbol} range from " | |
| f"${first_price:.2f} to ${last_price:.2f} over the forecast period." | |
| ) | |
| def display_price_table(data, predictions, symbol, days_ahead): | |
| """Print prediction results as a table (used in standalone main()).""" | |
| if isinstance(data, pd.DataFrame) and "Close" in data.columns: | |
| last_price = data["Close"].iloc[-1] | |
| last_date = data.index[-1] | |
| else: | |
| last_price = data.iloc[-1, 0] | |
| last_date = data.index[-1] | |
| future_dates = [] | |
| for i in range(1, days_ahead + 1): | |
| next_date = last_date + timedelta(days=i) | |
| while next_date.weekday() > 4: | |
| next_date += timedelta(days=1) | |
| future_dates.append(next_date) | |
| future_dates = list(dict.fromkeys(future_dates)) | |
| prediction_data = predictions[: len(future_dates)].flatten() | |
| last_price_row = pd.DataFrame({ | |
| "Date": [last_date.strftime("%Y-%m-%d")], | |
| "Price": [f"${last_price:.2f}"], | |
| "Change": ["0.00%"], | |
| "Note": ["Actual last closing price"], | |
| }) | |
| pred_rows = [] | |
| for i, (date, price) in enumerate(zip(future_dates, prediction_data)): | |
| change_pct = ((price - last_price) / last_price) * 100 | |
| pred_rows.append({ | |
| "Date": date.strftime("%Y-%m-%d"), | |
| "Price": f"${price:.2f}", | |
| "Change": f"{change_pct:.2f}%", | |
| "Note": f"Day {i+1} prediction", | |
| }) | |
| combined_df = pd.concat([last_price_row, pd.DataFrame(pred_rows)], | |
| ignore_index=True) | |
| print(f"\n{symbol} Stock Price Prediction Table:") | |
| print("=" * 80) | |
| print(combined_df.to_string(index=False)) | |
| print("=" * 80) | |
| return pd.DataFrame({ | |
| "Date": [d.strftime("%Y-%m-%d") for d in future_dates], | |
| "Predicted Price": prediction_data, | |
| }) | |
| # ============================================================================= | |
| # STANDALONE MAIN | |
| # ============================================================================= | |
| def main(): | |
| symbol = input("Enter the stock symbol (e.g., AAPL): ").upper() | |
| try: | |
| days_ahead = int(input("Number of future days to predict (e.g., 5): ")) | |
| except ValueError: | |
| print("Invalid input. Please enter an integer.") | |
| return | |
| print(f"\nFetching historical data for {symbol}...") | |
| data = fetch_stock_data(symbol, outputsize="full") | |
| if data is None or len(data) < 50: | |
| print(f"Not enough data points for {symbol}.") | |
| return | |
| print("Preprocessing data...") | |
| scaled_data, scaler = preprocess_data(data) | |
| # [OPT-4] time_step 60 → 30 in standalone mode too | |
| time_step = 30 | |
| X, y = create_sequences(scaled_data, time_step) | |
| if len(X) == 0: | |
| print("Could not create sequences.") | |
| return | |
| train_size = int(len(X) * 0.8) | |
| X_train, y_train = X[:train_size], y[:train_size] | |
| print("Training LSTM model...") | |
| lstm_model = train_lstm(X_train, y_train, time_step) | |
| lstm_train_preds = lstm_model.predict(X_train, verbose=0).flatten() | |
| residuals = y_train - lstm_train_preds | |
| print("Training XGBoost model...") | |
| xgb_model = train_xgboost(X_train.reshape(X_train.shape[0], -1), residuals) | |
| print(f"Predicting {days_ahead} days ahead...") | |
| predictions = predict_stock_price( | |
| lstm_model, xgb_model, scaled_data, scaler, time_step, days_ahead) | |
| display_price_table(data, predictions, symbol, days_ahead) | |
| future_dates = [] | |
| last_date = data.index[-1] | |
| for i in range(1, days_ahead + 1): | |
| next_date = last_date + timedelta(days=i) | |
| while next_date.weekday() > 4: | |
| next_date += timedelta(days=1) | |
| future_dates.append(next_date) | |
| future_dates = list(dict.fromkeys(future_dates)) | |
| pred_df = pd.DataFrame({ | |
| "Date": [d.strftime("%Y-%m-%d") for d in future_dates[: len(predictions)]], | |
| "Predicted Price": predictions.flatten()[: len(future_dates)], | |
| }) | |
| print("\nPrediction summary:") | |
| print(generate_prediction_summary(pred_df, symbol)) | |
| print("\nFetching news for sentiment analysis...") | |
| headlines = fetch_finnhub_news(symbol) | |
| if headlines: | |
| sentiment_results, sentiment_totals = analyze_sentiment(headlines) | |
| plot_sentiment_pie(sentiment_totals, symbol) | |
| print(generate_sentiment_summary(sentiment_totals, headlines, symbol)) | |
| else: | |
| print("No headlines found.") | |
| if __name__ == "__main__": | |
| main() | |