Spaces:
Sleeping
Sleeping
| import os | |
| import pandas as pd | |
| import numpy as np | |
| from sklearn.preprocessing import MinMaxScaler | |
| from tensorflow.keras.models import load_model | |
| #βββ Data loading ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def load_and_merge_data(): | |
| # Load stock data | |
| stock = pd.read_csv('scraped_combined_stock_data.csv') | |
| stock['Date'] = pd.to_datetime(stock['Date']) | |
| stock['Close'] = pd.to_numeric(stock['Close'], errors='coerce') | |
| stock = stock.dropna(subset=['Close']) | |
| # Load Reddit sentiment | |
| reddit = pd.read_csv('cleaned_reddit_data.csv') | |
| reddit['Date'] = pd.to_datetime(reddit['Date']) | |
| reddit = (reddit | |
| .groupby('Date')['sentiment'] | |
| .mean() | |
| .reset_index() | |
| .rename(columns={'sentiment': 'reddit_sentiment'})) | |
| # Load Twitter sentiment | |
| twitter = pd.read_csv('cleaned_twitter_data.csv') | |
| twitter['Date'] = pd.to_datetime(twitter['Date']) | |
| twitter = (twitter | |
| .groupby('Date')['sentiment'] | |
| .mean() | |
| .reset_index() | |
| .rename(columns={'sentiment': 'twitter_sentiment'})) | |
| # Drop tz info if present | |
| for df in (stock, reddit, twitter): | |
| df['Date'] = df['Date'].dt.tz_localize(None) | |
| # Merge all | |
| df = stock.merge(reddit, on='Date', how='left') \ | |
| .merge(twitter, on='Date', how='left') | |
| df['reddit_sentiment'] = df['reddit_sentiment'].fillna(0) | |
| df['twitter_sentiment'] = df['twitter_sentiment'].fillna(0) | |
| return df | |
| #βββ Sequence creation βββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def create_sequences_multifeature(data: np.ndarray, time_steps: int = 60): | |
| X, y = [], [] | |
| for i in range(len(data) - time_steps): | |
| X.append(data[i:i + time_steps]) | |
| y.append(data[i + time_steps, 0]) # Close price as target | |
| return np.array(X), np.array(y) | |
| #βββ Preprocessing for a single ticker βββββββββββββββββββββββββββββββββββββββββ | |
| def prepare_input(df, ticker: str, time_steps: int = 60): | |
| """ | |
| Returns: | |
| last_sequence: np.array shaped (1, time_steps, 3) | |
| scaler: MinMaxScaler fitted on that tickerβs data | |
| error: None or error message | |
| """ | |
| sub = df[df['Stock_Type'] == ticker] | |
| if len(sub) < time_steps: | |
| return None, None, f"Not enough data for {ticker}: {len(sub)} rows" | |
| features = sub[['Close', 'reddit_sentiment', 'twitter_sentiment']].values | |
| scaler = MinMaxScaler((0,1)) | |
| scaled = scaler.fit_transform(features) | |
| # Grab last `time_steps` | |
| last_seq = scaled[-time_steps:].reshape(1, time_steps, scaled.shape[1]) | |
| return last_seq, scaler, None | |
| #βββ Model loading + prediction ββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def get_model_path(ticker: str, with_sentiment: bool = True): | |
| ext = 'with_sentiment' if with_sentiment else 'model' | |
| # you can adjust naming here if your files differ | |
| return f"lstm_{ticker}_{'model_with_sentiment' if with_sentiment else 'model'}.h5" | |
| def predict_price(ticker: str, df, time_steps: int = 60): | |
| """ | |
| Loads the model for `ticker`, preprocesses the last `time_steps` of data, | |
| runs a single-step forecast, and returns the denormalized price. | |
| """ | |
| model_file = get_model_path(ticker, with_sentiment=True) | |
| if not os.path.exists(model_file): | |
| raise FileNotFoundError(f"Model file '{model_file}' not found.") | |
| seq, scaler, err = prepare_input(df, ticker, time_steps) | |
| if err: | |
| raise ValueError(err) | |
| model = load_model(model_file) | |
| pred_scaled = model.predict(seq) # shape (1,1) | |
| # pad with zeros for the two sentiment dims so scaler inverse works | |
| padded = np.concatenate([pred_scaled, np.zeros((1,2))], axis=1) | |
| pred = scaler.inverse_transform(padded)[0,0] | |
| return float(pred) | |