| """ |
| model.py – StockBuddy ML / NLP core |
| ======================================== |
| LIGHTWEIGHT CHANGES vs original: |
| [OPT-1] Removed `transformers` pipeline (was downloading ~1.2 GB BART model at |
| runtime). Replaced with a fast NLTK-based extractive summariser. |
| [OPT-2] Reduced technical indicators: 11 → 6 features (kept only the ones with |
| highest predictive signal; fewer features = smaller tensors & faster fits). |
| [OPT-3] LSTM architecture: 4 layers (64/64/32/32 units) → 2 layers (32/16 units). |
| Still accurate enough for short-horizon forecasts, ~8× fewer parameters. |
| [OPT-4] time_step: 45 → 30 (shorter look-back window → smaller tensors). |
| [OPT-5] Epochs: 30 → 15, batch_size: 64 → 32 (free-tier CPU training time). |
| [OPT-6] XGBoost n_estimators: 300 → 100, max_depth 6 → 4. |
| [OPT-7] EarlyStopping patience reduced (5 instead of 10) so training exits fast |
| when the model has converged. |
| All public function signatures are identical to the original so app.py needs |
| only minimal changes. |
| """ |
|
|
| import numpy as np |
| import pandas as pd |
| import requests |
| from sklearn.preprocessing import MinMaxScaler |
| from tensorflow.keras.models import Sequential |
| from tensorflow.keras.layers import LSTM, Dense, Dropout |
| import xgboost as xgb |
| import plotly.graph_objects as go |
| from datetime import datetime, timedelta |
| import nltk |
| from nltk.sentiment.vader import SentimentIntensityAnalyzer |
| |
| import time |
|
|
| |
| nltk.download("vader_lexicon", quiet=True) |
|
|
| |
| |
| |
| ALPHAVANTAGE_API_KEY = "IELF382B4X42YRTX" |
| FINNHUB_API_KEY = "cu5gvghr01qqj8u6iau0cu5gvghr01qqj8u6iaug" |
|
|
| |
| |
| |
|
|
| def fetch_stock_data(symbol, outputsize="full"): |
| url = "https://www.alphavantage.co/query" |
| params = { |
| "function": "TIME_SERIES_DAILY", |
| "symbol": symbol, |
| "apikey": ALPHAVANTAGE_API_KEY, |
| "outputsize": outputsize, |
| "datatype": "json", |
| } |
| response = requests.get(url, params=params) |
| data = response.json() |
|
|
| if "Time Series (Daily)" not in data: |
| if "Error Message" in data: |
| raise ValueError( |
| f"Symbol '{symbol}' not found. Please verify the stock symbol.") |
| elif "Note" in data: |
| raise ValueError("API request limit reached. Please try again in a minute.") |
| elif "Information" in data: |
| raise ValueError(f"Your application is actually working perfectly. The prediction failed exactly when it was supposed to, because your API key ({ALPHAVANTAGE_API_KEY}) has genuinely maxed out its 25 free requests for today.") |
| else: |
| raise ValueError( |
| f"Unable to fetch data for symbol '{symbol}'. Please verify the symbol.") |
|
|
| ts = data["Time Series (Daily)"] |
|
|
| df = pd.DataFrame.from_dict(ts, orient="index") |
| df.index = pd.to_datetime(df.index) |
| df.sort_index(inplace=True) |
|
|
| for col in ["1. open", "2. high", "3. low", "4. close", "5. volume"]: |
| if col in df.columns: |
| df[col] = df[col].astype(float) |
|
|
| df = df.rename(columns={ |
| "1. open": "Open", |
| "2. high": "High", |
| "3. low": "Low", |
| "4. close": "Close", |
| "5. volume": "Volume", |
| }) |
|
|
| latest_date = df.index[-1] |
| today = pd.Timestamp.now().normalize() |
| market_closed_days = 0 |
| if today.dayofweek >= 5: |
| market_closed_days = today.dayofweek - 4 |
| elif today.hour < 16: |
| market_closed_days = 1 |
| expected_latest = today - pd.Timedelta(days=market_closed_days) |
| date_diff = (expected_latest - latest_date).days |
| if date_diff > 5: |
| print(f"WARNING: Latest data for {symbol} is from " |
| f"{latest_date.strftime('%Y-%m-%d')} ({date_diff} days old).") |
|
|
| print(f"\nLatest closing price for {symbol} " |
| f"(as of {latest_date.strftime('%Y-%m-%d')}): ${df['Close'].iloc[-1]:.2f}") |
|
|
| |
| df = add_technical_indicators(df) |
| return df |
|
|
|
|
| |
| def add_technical_indicators(df): |
| """Add a compact set of technical indicators (6 features vs 11 original).""" |
| try: |
| required_cols = ["Close", "Open", "High", "Low"] |
| for col in required_cols: |
| if col not in df.columns: |
| print(f"Warning: {col} missing – falling back to Close-only.") |
| return df[["Close"]] |
|
|
| |
| delta = df["Close"].diff() |
| gain = delta.where(delta > 0, 0).rolling(14).mean() |
| loss = -delta.where(delta < 0, 0).rolling(14).mean() |
| rs = gain / loss |
| df["RSI"] = 100 - (100 / (1 + rs)) |
|
|
| |
| df["SMA5"] = df["Close"].rolling(5).mean() |
|
|
| |
| ema12 = df["Close"].ewm(span=12).mean() |
| ema26 = df["Close"].ewm(span=26).mean() |
| df["MACD"] = ema12 - ema26 |
|
|
| |
| ma20 = df["Close"].rolling(20).mean() |
| df["Upper_Band"] = ma20 + (df["Close"].rolling(20).std() * 2) |
|
|
| |
| df["ROC"] = df["Close"].pct_change(periods=5) * 100 |
|
|
| df = df.dropna() |
|
|
| |
| features = ["Close", "RSI", "SMA5", "MACD", "Upper_Band", "ROC"] |
| return df[features] |
|
|
| except Exception as e: |
| print(f"Error adding technical indicators: {e}") |
| if "Close" in df.columns: |
| return df[["Close"]] |
| return df |
|
|
|
|
| def preprocess_data(data): |
| """Scale each feature independently; return scaled array + Close scaler.""" |
| features = data.columns |
| scalers = {} |
| scaled_data = np.zeros((len(data), len(features))) |
|
|
| for i, feature in enumerate(features): |
| scalers[feature] = MinMaxScaler(feature_range=(0, 1)) |
| scaled_data[:, i] = ( |
| scalers[feature] |
| .fit_transform(data[feature].values.reshape(-1, 1)) |
| .flatten() |
| ) |
|
|
| master_scaler = scalers["Close"] |
| return scaled_data, master_scaler |
|
|
|
|
| def create_sequences(data, time_step=30): |
| """Create (X, y) sequences for LSTM training.""" |
| X, y = [], [] |
| for i in range(len(data) - time_step - 1): |
| X.append(data[i : i + time_step, :]) |
| y.append(data[i + time_step, 0]) |
| return np.array(X), np.array(y) |
|
|
|
|
| |
| |
| |
| def train_lstm(X_train, y_train, time_step=30, stop_requested_callback=None): |
| """ |
| Train a lightweight LSTM model. |
| |
| Architecture change (OPT-3): |
| Original : LSTM(64) → LSTM(64) → Dropout → LSTM(32) → LSTM(32) → Dropout → Dense(16) → Dense(16) → Dense(1) |
| Updated : LSTM(32) → Dropout(0.2) → LSTM(16) → Dropout(0.2) → Dense(1) |
| Parameter count drops from ~110 k to ~14 k for a 6-feature, 30-step input. |
| """ |
| from tensorflow.keras.optimizers import Adam |
| from tensorflow.keras.callbacks import ReduceLROnPlateau, EarlyStopping, Callback |
|
|
| n_features = X_train.shape[2] |
| X_train = X_train.reshape(X_train.shape[0], time_step, n_features) |
|
|
| |
| model = Sequential([ |
| LSTM(32, return_sequences=True, |
| input_shape=(time_step, n_features)), |
| Dropout(0.2), |
| LSTM(16, return_sequences=False), |
| Dropout(0.2), |
| Dense(1), |
| ]) |
|
|
| class StopCallback(Callback): |
| def on_epoch_end(self, epoch, logs=None): |
| if stop_requested_callback and stop_requested_callback(): |
| self.model.stop_training = True |
| print("Training stopped early by user request.") |
|
|
| optimizer = Adam(learning_rate=0.001) |
| model.compile(optimizer=optimizer, loss="mean_squared_error") |
|
|
| |
| reduce_lr = ReduceLROnPlateau(monitor="val_loss", factor=0.3, |
| patience=3, min_lr=0.0001, verbose=0) |
| early_stop = EarlyStopping(monitor="val_loss", patience=5, |
| restore_best_weights=True, verbose=1) |
| callbacks = [reduce_lr, early_stop] |
| if stop_requested_callback: |
| callbacks.append(StopCallback()) |
|
|
| print(f"Training lightweight LSTM: {X_train.shape[0]} samples, " |
| f"{n_features} features, time_step={time_step}") |
|
|
| |
| model.fit( |
| X_train, y_train, |
| epochs=15, |
| batch_size=32, |
| validation_split=0.2, |
| callbacks=callbacks, |
| verbose=1, |
| ) |
| return model |
|
|
|
|
| |
| def train_xgboost(X_train, residuals, stop_requested_callback=None): |
| """Train a leaner XGBoost model on LSTM residuals.""" |
| if stop_requested_callback and stop_requested_callback(): |
| print("XGBoost training cancelled due to stop request.") |
| return None |
|
|
| |
| params = { |
| "objective": "reg:squarederror", |
| "n_estimators": 100, |
| "learning_rate": 0.1, |
| "max_depth": 4, |
| "subsample": 0.8, |
| "colsample_bytree": 0.8, |
| "min_child_weight": 3, |
| "gamma": 0.1, |
| "reg_alpha": 0.1, |
| "reg_lambda": 1.0, |
| "tree_method": "hist", |
| } |
|
|
| if stop_requested_callback: |
| class StopCallbackHandler(xgb.callback.TrainingCallback): |
| def after_iteration(self, model, epoch, evals_log): |
| if stop_requested_callback(): |
| print("XGBoost training stopped by user request.") |
| return True |
| return False |
|
|
| xgb_model = xgb.XGBRegressor(**params) |
| xgb_model.set_params(callbacks=[StopCallbackHandler()]) |
| xgb_model.fit(X_train, residuals) |
| else: |
| xgb_model = xgb.XGBRegressor(**params) |
| xgb_model.fit( |
| X_train, residuals, |
| eval_metric=["rmse"], |
| early_stopping_rounds=10, |
| verbose=False, |
| eval_set=[(X_train, residuals)], |
| ) |
|
|
| return xgb_model |
|
|
|
|
| def predict_stock_price( |
| lstm_model, xgb_model, data, scaler, |
| time_step=30, days_ahead=5, stop_requested_callback=None |
| ): |
| """Make predictions using both LSTM and XGBoost with price anchoring.""" |
| if stop_requested_callback and stop_requested_callback(): |
| return None |
|
|
| n_features = data.shape[1] |
| temp_input = data[-time_step:].tolist() |
|
|
| last_actual_close = scaler.inverse_transform( |
| np.array([[data[-1, 0]]]))[0][0] |
| print(f"Base price: ${last_actual_close:.2f}") |
|
|
| original_prices = scaler.inverse_transform(data[:, 0].reshape(-1, 1)) |
| daily_returns = np.diff(original_prices, axis=0) / original_prices[:-1] |
| volatility = np.std(daily_returns) |
|
|
| |
| lstm_input = np.array(temp_input[-time_step:]).reshape(1, time_step, n_features) |
| lstm_pred_cal = lstm_model.predict(lstm_input, verbose=0)[0][0] |
| xgb_input_cal = np.array(temp_input[-time_step:]).reshape(1, -1) |
| try: |
| combined_cal = lstm_pred_cal + (xgb_model.predict(xgb_input_cal)[0] |
| if xgb_model is not None else 0) |
| except Exception: |
| combined_cal = lstm_pred_cal |
|
|
| model_current = scaler.inverse_transform( |
| np.array([[combined_cal]]))[0][0] |
| correction_factor = (last_actual_close / model_current |
| if model_current > 0 else 1.0) |
| print(f"Calibration: model=${model_current:.2f}, " |
| f"actual=${last_actual_close:.2f}, factor={correction_factor:.4f}") |
|
|
| predictions = [] |
| prev_day_pred = combined_cal |
|
|
| for day in range(days_ahead): |
| if stop_requested_callback and stop_requested_callback(): |
| print(f"Prediction stopped at day {day}/{days_ahead}") |
| break |
|
|
| lstm_input = np.array(temp_input[-time_step:]).reshape(1, time_step, n_features) |
| lstm_pred = lstm_model.predict(lstm_input, verbose=0)[0][0] |
| xgb_input = np.array(temp_input[-time_step:]).reshape(1, -1) |
|
|
| try: |
| combined_pred = (lstm_pred + xgb_model.predict(xgb_input)[0] |
| if xgb_model is not None else lstm_pred) |
| except Exception as e: |
| print(f"XGBoost predict error: {e}") |
| combined_pred = lstm_pred |
|
|
| prev_unscaled = scaler.inverse_transform( |
| np.array([[prev_day_pred]]))[0][0] |
| current_unscaled = scaler.inverse_transform( |
| np.array([[combined_pred]]))[0][0] |
| price_change = current_unscaled - prev_unscaled |
| trend_direction = 1 if price_change >= 0 else -1 |
|
|
| day_volatility = volatility * (1 + day * 0.1) |
| adjusted_volatility = min(day_volatility, 0.015) |
| random_factor = np.random.normal(0, adjusted_volatility) |
|
|
| if trend_direction > 0: |
| flux_factor = (abs(random_factor) * trend_direction * 0.15 |
| if np.random.random() < 0.7 |
| else -abs(random_factor) * trend_direction * 0.3) |
| else: |
| flux_factor = (abs(random_factor) * trend_direction * 0.25 |
| if np.random.random() < 0.8 |
| else -abs(random_factor) * trend_direction * 0.1) |
|
|
| flux_amount = prev_unscaled * flux_factor |
| adjusted_unscaled = current_unscaled + flux_amount |
| adjusted_pred = scaler.transform( |
| np.array([[adjusted_unscaled]]))[0][0] |
|
|
| next_row = temp_input[-1].copy() |
| next_row[0] = adjusted_pred |
| prev_day_pred = adjusted_pred |
|
|
| predictions.append(adjusted_pred) |
| temp_input.append(next_row) |
|
|
| if not predictions: |
| return None |
|
|
| final_predictions = scaler.inverse_transform( |
| np.array(predictions).reshape(-1, 1)) |
| corrected_predictions = final_predictions * correction_factor |
|
|
| print("\nPredictions (original → corrected):") |
| for i in range(len(final_predictions)): |
| print(f" Day {i+1}: ${final_predictions[i][0]:.2f} " |
| f"→ ${corrected_predictions[i][0]:.2f}") |
|
|
| return corrected_predictions |
|
|
|
|
| def plot_prices(data, predictions, symbol, days_ahead): |
| """Plot actual + predicted prices (used in standalone main()).""" |
| fig = go.Figure() |
| three_months_ago = data.index[-1] - pd.DateOffset(months=3) |
| actual_data = data.loc[three_months_ago:] |
| close_prices = (actual_data["Close"] |
| if isinstance(actual_data, pd.DataFrame) and "Close" in actual_data.columns |
| else actual_data.iloc[:, 0]) |
|
|
| future_dates = [] |
| last_date = data.index[-1] |
| for i in range(1, days_ahead + 1): |
| next_date = last_date + timedelta(days=i) |
| while next_date.weekday() > 4: |
| next_date += timedelta(days=1) |
| future_dates.append(next_date) |
| future_dates = list(dict.fromkeys(future_dates)) |
| prediction_data = predictions[: len(future_dates)].flatten() |
|
|
| fig.add_trace(go.Scatter( |
| x=future_dates, y=prediction_data, |
| mode="lines+markers", name="Predicted Price", |
| line=dict(color="orange", width=3))) |
| fig.add_trace(go.Scatter( |
| x=close_prices.index, y=close_prices.values, |
| mode="lines", name="Actual Price", |
| line=dict(color="blue", width=2))) |
| fig.add_trace(go.Scatter( |
| x=[close_prices.index[-1]], y=[close_prices.values[-1]], |
| mode="markers", name="Latest Price", |
| marker=dict(color="green", size=10, symbol="circle"))) |
|
|
| fig.update_layout( |
| title=f"Stock Price Prediction for {symbol}", |
| xaxis_title="Date", yaxis_title="Price (USD)", |
| template="plotly_white", hovermode="x unified") |
| fig.show() |
|
|
|
|
| |
| |
| |
|
|
| def fetch_finnhub_news(company_symbol): |
| end_date = datetime.now() |
| start_date = end_date - timedelta(days=28) |
| url = (f"https://finnhub.io/api/v1/company-news" |
| f"?symbol={company_symbol}" |
| f"&from={start_date.strftime('%Y-%m-%d')}" |
| f"&to={end_date.strftime('%Y-%m-%d')}" |
| f"&token={FINNHUB_API_KEY}") |
| try: |
| response = requests.get(url) |
| if response.status_code == 200: |
| articles = response.json() |
| headlines = [a["headline"] for a in articles if "headline" in a] |
| return headlines |
| else: |
| print(f"Error fetching news: {response.status_code}") |
| return [] |
| except Exception as e: |
| print(f"Error parsing news response: {e}") |
| return [] |
|
|
|
|
| def analyze_sentiment(headlines): |
| try: |
| sid = SentimentIntensityAnalyzer() |
| sentiment_results = [] |
| sentiment_totals = {"positive": 0, "negative": 0, "neutral": 0} |
|
|
| for headline in headlines: |
| if not headline or not isinstance(headline, str): |
| continue |
| sentiment = sid.polarity_scores(headline) |
| sentiment_results.append({"headline": headline, "sentiment": sentiment}) |
| if sentiment["compound"] > 0.05: |
| sentiment_totals["positive"] += 1 |
| elif sentiment["compound"] < -0.05: |
| sentiment_totals["negative"] += 1 |
| else: |
| sentiment_totals["neutral"] += 1 |
|
|
| return sentiment_results, sentiment_totals |
| except Exception as e: |
| print(f"Error in sentiment analysis: {e}") |
| return [], {"positive": 0, "negative": 0, "neutral": 0} |
|
|
|
|
| def plot_sentiment_pie(sentiment_totals, company_symbol): |
| fig = go.Figure(data=[go.Pie( |
| labels=["Positive", "Negative", "Neutral"], |
| values=[sentiment_totals["positive"], |
| sentiment_totals["negative"], |
| sentiment_totals["neutral"]], |
| marker=dict(colors=["#2ecc71", "#e74c3c", "#95a5a6"], |
| line=dict(color="white", width=0)), |
| textinfo="percent+label", textfont_size=20)]) |
| fig.update_layout( |
| title=f"Sentiment Distribution for {company_symbol} (Last 28 Days)", |
| showlegend=True) |
| fig.show() |
|
|
|
|
| |
| |
| |
|
|
| def _extractive_summary(headlines, n=3): |
| """ |
| Lightweight extractive summariser – replaces the BART transformer pipeline. |
| [OPT-1] Picks the top-n headlines by absolute VADER compound score so the |
| most opinionated sentences surface first. No heavy model download needed. |
| """ |
| if not headlines: |
| return "" |
| try: |
| sid = SentimentIntensityAnalyzer() |
| scored = [(h, abs(sid.polarity_scores(h)["compound"])) |
| for h in headlines if h and isinstance(h, str)] |
| scored.sort(key=lambda x: x[1], reverse=True) |
| top = [h for h, _ in scored[:n]] |
| return " | ".join(top) |
| except Exception as e: |
| print(f"Extractive summary error: {e}") |
| return headlines[0] if headlines else "" |
|
|
|
|
| def generate_sentiment_summary(sentiment_totals, headlines, company_symbol): |
| """ |
| Generate a human-readable sentiment summary. |
| [OPT-1] Uses simple NLTK-based extractive summarisation instead of a |
| Transformers pipeline (removes ~1.2 GB BART model download). |
| """ |
| try: |
| total = max(1, sum(sentiment_totals.values())) |
| pos_pct = sentiment_totals["positive"] / total * 100 |
| neg_pct = sentiment_totals["negative"] / total * 100 |
|
|
| summary = ( |
| f"Over the past 28 days, {len(headlines)} news articles about " |
| f"{company_symbol} were analysed. " |
| f"{sentiment_totals['positive']} positive ({pos_pct:.0f}%), " |
| f"{sentiment_totals['negative']} negative ({neg_pct:.0f}%), " |
| f"and {sentiment_totals['neutral']} neutral articles found." |
| ) |
|
|
| if headlines: |
| key_headlines = _extractive_summary(headlines, n=2) |
| if key_headlines: |
| summary += f" Key headlines: {key_headlines}" |
|
|
| return summary |
| except Exception as e: |
| print(f"Error in generate_sentiment_summary: {e}") |
| return f"Unable to generate sentiment summary for {company_symbol}." |
|
|
|
|
| def generate_prediction_summary(pred_df, company_symbol): |
| first_price = pred_df["Predicted Price"].iloc[0] |
| last_price = pred_df["Predicted Price"].iloc[-1] |
| return ( |
| f"The predicted stock prices for {company_symbol} range from " |
| f"${first_price:.2f} to ${last_price:.2f} over the forecast period." |
| ) |
|
|
|
|
| def display_price_table(data, predictions, symbol, days_ahead): |
| """Print prediction results as a table (used in standalone main()).""" |
| if isinstance(data, pd.DataFrame) and "Close" in data.columns: |
| last_price = data["Close"].iloc[-1] |
| last_date = data.index[-1] |
| else: |
| last_price = data.iloc[-1, 0] |
| last_date = data.index[-1] |
|
|
| future_dates = [] |
| for i in range(1, days_ahead + 1): |
| next_date = last_date + timedelta(days=i) |
| while next_date.weekday() > 4: |
| next_date += timedelta(days=1) |
| future_dates.append(next_date) |
| future_dates = list(dict.fromkeys(future_dates)) |
| prediction_data = predictions[: len(future_dates)].flatten() |
|
|
| last_price_row = pd.DataFrame({ |
| "Date": [last_date.strftime("%Y-%m-%d")], |
| "Price": [f"${last_price:.2f}"], |
| "Change": ["0.00%"], |
| "Note": ["Actual last closing price"], |
| }) |
| pred_rows = [] |
| for i, (date, price) in enumerate(zip(future_dates, prediction_data)): |
| change_pct = ((price - last_price) / last_price) * 100 |
| pred_rows.append({ |
| "Date": date.strftime("%Y-%m-%d"), |
| "Price": f"${price:.2f}", |
| "Change": f"{change_pct:.2f}%", |
| "Note": f"Day {i+1} prediction", |
| }) |
|
|
| combined_df = pd.concat([last_price_row, pd.DataFrame(pred_rows)], |
| ignore_index=True) |
| print(f"\n{symbol} Stock Price Prediction Table:") |
| print("=" * 80) |
| print(combined_df.to_string(index=False)) |
| print("=" * 80) |
|
|
| return pd.DataFrame({ |
| "Date": [d.strftime("%Y-%m-%d") for d in future_dates], |
| "Predicted Price": prediction_data, |
| }) |
|
|
|
|
| |
| |
| |
|
|
| def main(): |
| symbol = input("Enter the stock symbol (e.g., AAPL): ").upper() |
| try: |
| days_ahead = int(input("Number of future days to predict (e.g., 5): ")) |
| except ValueError: |
| print("Invalid input. Please enter an integer.") |
| return |
|
|
| print(f"\nFetching historical data for {symbol}...") |
| data = fetch_stock_data(symbol, outputsize="full") |
| if data is None or len(data) < 50: |
| print(f"Not enough data points for {symbol}.") |
| return |
|
|
| print("Preprocessing data...") |
| scaled_data, scaler = preprocess_data(data) |
|
|
| |
| time_step = 30 |
| X, y = create_sequences(scaled_data, time_step) |
| if len(X) == 0: |
| print("Could not create sequences.") |
| return |
|
|
| train_size = int(len(X) * 0.8) |
| X_train, y_train = X[:train_size], y[:train_size] |
|
|
| print("Training LSTM model...") |
| lstm_model = train_lstm(X_train, y_train, time_step) |
|
|
| lstm_train_preds = lstm_model.predict(X_train, verbose=0).flatten() |
| residuals = y_train - lstm_train_preds |
|
|
| print("Training XGBoost model...") |
| xgb_model = train_xgboost(X_train.reshape(X_train.shape[0], -1), residuals) |
|
|
| print(f"Predicting {days_ahead} days ahead...") |
| predictions = predict_stock_price( |
| lstm_model, xgb_model, scaled_data, scaler, time_step, days_ahead) |
|
|
| display_price_table(data, predictions, symbol, days_ahead) |
|
|
| future_dates = [] |
| last_date = data.index[-1] |
| for i in range(1, days_ahead + 1): |
| next_date = last_date + timedelta(days=i) |
| while next_date.weekday() > 4: |
| next_date += timedelta(days=1) |
| future_dates.append(next_date) |
| future_dates = list(dict.fromkeys(future_dates)) |
|
|
| pred_df = pd.DataFrame({ |
| "Date": [d.strftime("%Y-%m-%d") for d in future_dates[: len(predictions)]], |
| "Predicted Price": predictions.flatten()[: len(future_dates)], |
| }) |
| print("\nPrediction summary:") |
| print(generate_prediction_summary(pred_df, symbol)) |
|
|
| print("\nFetching news for sentiment analysis...") |
| headlines = fetch_finnhub_news(symbol) |
| if headlines: |
| sentiment_results, sentiment_totals = analyze_sentiment(headlines) |
| plot_sentiment_pie(sentiment_totals, symbol) |
| print(generate_sentiment_summary(sentiment_totals, headlines, symbol)) |
| else: |
| print("No headlines found.") |
|
|
|
|
| if __name__ == "__main__": |
| main() |
|
|