""" Stock Price Prediction Model - Optimized for HuggingFace Spaces Uses PyTorch LSTM + VADER sentiment analysis No TensorFlow, no transformers, lightweight and fast """ import numpy as np import pandas as pd import torch import torch.nn as nn from sklearn.preprocessing import MinMaxScaler import nltk from nltk.sentiment.vader import SentimentIntensityAnalyzer from datetime import datetime, timedelta import yfinance as yf import os from dotenv import load_dotenv import joblib import requests import json from pathlib import Path # Load environment variables load_dotenv() # Download VADER lexicon try: nltk.data.find('vader_lexicon') except LookupError: nltk.download("vader_lexicon", quiet=True) # Initialize sentiment analyzer sia = SentimentIntensityAnalyzer() # Get API keys from environment FINNHUB_API_KEY = os.getenv("FINNHUB_API_KEY", "") # ============================================================================ # PYTORCH LSTM MODEL # ============================================================================ class StockLSTM(nn.Module): """Lightweight PyTorch LSTM for stock price prediction""" def __init__(self, input_size=5, hidden_size=32, num_layers=1, dropout=0.2): super(StockLSTM, self).__init__() self.lstm = nn.LSTM( input_size=input_size, hidden_size=hidden_size, num_layers=num_layers, dropout=dropout if num_layers > 1 else 0, batch_first=True ) self.fc = nn.Linear(hidden_size, 1) def forward(self, x): """Forward pass""" lstm_out, _ = self.lstm(x) last_hidden = lstm_out[:, -1, :] output = self.fc(last_hidden) return output # ============================================================================ # PREDICTION CACHE (24-hour expiry) # ============================================================================ class PredictionCache: """Simple in-memory cache for predictions""" def __init__(self, cache_file="prediction_cache.pkl"): self.cache_file = cache_file self.cache = {} self.load_cache() def get(self, symbol, days_ahead): """Get cached prediction if valid (24 hours)""" key = f"{symbol}_{days_ahead}" if key in self.cache: timestamp, result = self.cache[key] # Check if cache is still valid (24 hours) if (datetime.now() - timestamp).seconds < 86400: return result return None def set(self, symbol, days_ahead, result): """Cache prediction""" key = f"{symbol}_{days_ahead}" self.cache[key] = (datetime.now(), result) self.save_cache() def save_cache(self): """Save cache to disk""" try: joblib.dump(self.cache, self.cache_file) except Exception as e: print(f"Warning: Could not save cache: {str(e)}") def load_cache(self): """Load cache from disk""" try: if Path(self.cache_file).exists(): self.cache = joblib.load(self.cache_file) except Exception as e: print(f"Warning: Could not load cache: {str(e)}") self.cache = {} # Initialize global cache cache = PredictionCache() # Model cache for loading pre-trained models model_cache = {} # ============================================================================ # DATA FETCHING & PREPROCESSING # ============================================================================ def fetch_stock_data(symbol, days=100): """ Fetch stock data using yfinance (free, fast, no API key) Args: symbol: Stock ticker (e.g., 'AAPL') days: Number of days of historical data Returns: DataFrame with OHLCV data """ try: end_date = datetime.now() start_date = end_date - timedelta(days=days) # Download data df = yf.download( symbol, start=start_date, end=end_date, auto_adjust=False, progress=False ) # Fix MultiIndex columns from newer yfinance if isinstance(df.columns, pd.MultiIndex): df.columns = df.columns.get_level_values(0) print("Downloaded columns:", df.columns.tolist()) if df.empty: raise ValueError(f"No data returned for {symbol}") if 'Close' not in df.columns: raise ValueError( f"'Close' column missing. Available columns: {df.columns.tolist()}" ) if len(df) < 60: raise ValueError( f"Insufficient data for {symbol}. Need at least 60 days." ) return df[['Close', 'Open', 'High', 'Low', 'Volume']].astype(np.float32) except Exception as e: raise ValueError(f"Error fetching data for {symbol}: {str(e)}") def add_technical_indicators(df): """ Add technical indicators for prediction Optimized: Only 5 most important features Features: - Close: Stock closing price - RSI: Relative Strength Index (momentum) - MACD: Moving Average Convergence Divergence (trend) - Volatility: Price volatility indicator - SMA20: 20-day Simple Moving Average (trend) """ df = df.copy() try: # RSI (Relative Strength Index) delta = df['Close'].diff() gain = delta.where(delta > 0, 0).rolling(window=14).mean() loss = -delta.where(delta < 0, 0).rolling(window=14).mean() rs = gain / loss df['RSI'] = 100 - (100 / (1 + rs)) # MACD (Moving Average Convergence Divergence) ema12 = df['Close'].ewm(span=12).mean() ema26 = df['Close'].ewm(span=26).mean() df['MACD'] = ema12 - ema26 # Volatility (standard deviation of returns) df['Volatility'] = df['Close'].rolling(window=10).std() / df['Close'] * 100 # SMA20 (20-day Simple Moving Average) df['SMA20'] = df['Close'].rolling(window=20).mean() # ROC (Rate of Change) df['ROC'] = df['Close'].pct_change(periods=5) * 100 # Remove NaN rows df = df.dropna() # Return only required features features = ['Close', 'RSI', 'MACD', 'Volatility', 'SMA20', 'ROC'] return df[features].astype(np.float32) except Exception as e: print(f"Error calculating technical indicators: {str(e)}") return df[['Close']].astype(np.float32) def preprocess_data(data): """ Normalize data using MinMaxScaler Args: data: DataFrame with features Returns: scaled_data: Numpy array with normalized values [0, 1] close_scaler: Scaler for inverse transform """ scalers = {} scaled_data = np.zeros((len(data), data.shape[1]), dtype=np.float32) for i, col in enumerate(data.columns): scaler = MinMaxScaler(feature_range=(0, 1)) scaled_data[:, i] = scaler.fit_transform( data.iloc[:, i].values.reshape(-1, 1) ).flatten() scalers[col] = scaler return scaled_data, scalers['Close'] def create_sequences(data, time_step=30): """ Create sequences for LSTM training Args: data: Scaled data array time_step: Look-back period (30 days) Returns: X: Input sequences y: Target values (next day price) """ X, y = [], [] for i in range(len(data) - time_step - 1): X.append(data[i:(i + time_step), :]) y.append(data[i + time_step, 0]) # Close price is first column return np.array(X, dtype=np.float32), np.array(y, dtype=np.float32) # ============================================================================ # MODEL TRAINING # ============================================================================ def train_lstm_model(X_train, y_train, epochs=50, device='cpu'): """ Train PyTorch LSTM model Args: X_train: Training sequences y_train: Training targets epochs: Number of training epochs device: 'cpu' or 'cuda' Returns: Trained model """ n_features = X_train.shape[2] model = StockLSTM( input_size=n_features, hidden_size=32, num_layers=1, dropout=0.2 ).to(device) criterion = nn.MSELoss() optimizer = torch.optim.Adam(model.parameters(), lr=0.001) # Convert to tensors X_tensor = torch.FloatTensor(X_train).to(device) y_tensor = torch.FloatTensor(y_train).unsqueeze(1).to(device) model.train() batch_size = 32 for epoch in range(epochs): total_loss = 0 # Mini-batch training for i in range(0, len(X_tensor), batch_size): batch_X = X_tensor[i:i+batch_size] batch_y = y_tensor[i:i+batch_size] optimizer.zero_grad() predictions = model(batch_X) loss = criterion(predictions, batch_y) loss.backward() optimizer.step() total_loss += loss.item() if (epoch + 1) % 10 == 0: avg_loss = total_loss / (len(X_tensor) / batch_size) print(f"Epoch {epoch+1}/{epochs}, Loss: {avg_loss:.6f}") return model # ============================================================================ # PREDICTION # ============================================================================ def predict_prices(model, scaled_data, scaler, time_step=30, days_ahead=5, device='cpu'): """ Predict future stock prices using trained LSTM Args: model: Trained LSTM model scaled_data: Normalized data scaler: MinMaxScaler for inverse transform time_step: Look-back period days_ahead: Number of days to predict device: 'cpu' or 'cuda' Returns: Array of predicted prices (unscaled) """ model.eval() # Start with last sequence current_seq = scaled_data[-time_step:, :].copy() predictions = [] with torch.no_grad(): for _ in range(days_ahead): # Prepare input X_test = torch.FloatTensor(current_seq).unsqueeze(0).to(device) # Predict next value next_pred = model(X_test).cpu().numpy()[0, 0] predictions.append(next_pred) # Update sequence: shift left, add new prediction current_seq = np.vstack([ current_seq[1:, :], np.concatenate([[next_pred], current_seq[-1, 1:]]) ]) # Inverse transform to get actual prices predictions_unscaled = scaler.inverse_transform( np.array(predictions).reshape(-1, 1) ) return predictions_unscaled.flatten() # ============================================================================ # SENTIMENT ANALYSIS (VADER) # ============================================================================ def fetch_finnhub_news(symbol): """ Fetch latest news headlines for sentiment analysis Args: symbol: Stock ticker Returns: List of news articles (max 10) """ if not FINNHUB_API_KEY: print("Warning: FINNHUB_API_KEY not set") return [] try: headers = {'X-Finnhub-Token': FINNHUB_API_KEY} url = f'https://finnhub.io/api/v1/news?symbol={symbol}&limit=10' response = requests.get(url, headers=headers, timeout=5) if response.status_code == 200: return response.json() return [] except Exception as e: print(f"Error fetching news: {str(e)}") return [] def analyze_sentiment(headlines): """ Analyze sentiment using VADER (Valence Aware Dictionary and sEntiment Reasoner) Much faster and lighter than transformers Args: headlines: List of news articles Returns: Dictionary with sentiment counts """ sentiment_counts = { 'positive': 0, 'negative': 0, 'neutral': 0 } if not headlines: return sentiment_counts for article in headlines[:10]: # Limit to top 10 # Combine headline and summary for analysis text = (article.get('headline', '') + " " + article.get('summary', '')).strip() if not text: continue # Get sentiment scores scores = sia.polarity_scores(text) compound = scores['compound'] # Classify based on compound score if compound > 0.05: sentiment_counts['positive'] += 1 elif compound < -0.05: sentiment_counts['negative'] += 1 else: sentiment_counts['neutral'] += 1 return sentiment_counts # ============================================================================ # MAIN PREDICTION FUNCTION # ============================================================================ def predict_stock(symbol, days_ahead=5, use_cache=True): """ Complete prediction pipeline Args: symbol: Stock ticker (e.g., 'AAPL') days_ahead: Number of days to predict (1-30) use_cache: Use cached results if available Returns: Dictionary with predictions and sentiment analysis """ symbol = symbol.upper().strip() # Validate input if not symbol or len(symbol) > 5 or not symbol.isalpha(): raise ValueError("Invalid stock symbol") if days_ahead < 1 or days_ahead > 30: raise ValueError("Days ahead must be between 1 and 30") # Check cache if use_cache: cached_result = cache.get(symbol, days_ahead) if cached_result: print(f"✓ Using cached prediction for {symbol}") return cached_result # Determine device device = 'cuda' if torch.cuda.is_available() else 'cpu' print(f"Predicting {symbol} using device: {device}") try: # Step 1: Fetch data print(f"📥 Fetching data for {symbol}...") data = fetch_stock_data(symbol, days=100) last_price = float(data['Close'].iloc[-1]) last_date = data.index[-1] # Step 2: Add technical indicators print("📊 Computing technical indicators...") data_with_indicators = add_technical_indicators(data) # Step 3: Preprocess print("🔄 Preprocessing data...") scaled_data, scaler = preprocess_data(data_with_indicators) X, y = create_sequences(scaled_data, time_step=30) if len(X) == 0: raise ValueError("Insufficient data for sequences") # Step 4: Load or train model model_key = f"lstm_{symbol}" if model_key in model_cache: model = model_cache[model_key] print("✓ Loaded cached model") else: print("🧠 Training LSTM model...") model = train_lstm_model(X, y, epochs=50, device=device) model_cache[model_key] = model # Step 5: Predict print(f"🔮 Predicting next {days_ahead} days...") predictions = predict_prices( model, scaled_data, scaler, time_step=30, days_ahead=days_ahead, device=device ) # Step 6: Sentiment analysis print("💭 Analyzing sentiment...") headlines = fetch_finnhub_news(symbol) sentiment = analyze_sentiment(headlines) # Calculate sentiment score total_sentiment = sum(sentiment.values()) if total_sentiment > 0: sentiment_score = (sentiment['positive'] - sentiment['negative']) / total_sentiment else: sentiment_score = 0.0 # Step 7: Format results result = { 'symbol': symbol, 'last_price': float(last_price), 'last_date': last_date.strftime('%Y-%m-%d'), 'predictions': [], 'sentiment': sentiment, 'sentiment_score': float(sentiment_score), 'timestamp': datetime.now().isoformat() } # Add predictions with dates current_date = last_date for i, pred_price in enumerate(predictions): # Skip weekends current_date += timedelta(days=1) while current_date.weekday() > 4: # 5=Saturday, 6=Sunday current_date += timedelta(days=1) change_pct = ((float(pred_price) - last_price) / last_price) * 100 result['predictions'].append({ 'date': current_date.strftime('%Y-%m-%d'), 'price': float(pred_price), 'change_pct': float(change_pct), 'day': i + 1 }) # Cache the result if use_cache: cache.set(symbol, days_ahead, result) print("✅ Prediction complete!") return result except Exception as e: raise Exception(f"Prediction failed for {symbol}: {str(e)}") # ============================================================================ # UTILITY FUNCTIONS # ============================================================================ def get_supported_symbols(): """Get list of commonly used stock symbols""" return [ 'AAPL', 'GOOGL', 'MSFT', 'AMZN', 'TSLA', 'NVDA', 'META', 'NFLX', 'INTC', 'AMD', 'SBUX', 'PYPL', 'IBM', 'CSCO', 'TWTR' ] def validate_prediction_result(result): """Validate prediction result structure""" required_keys = [ 'symbol', 'last_price', 'last_date', 'predictions', 'sentiment', 'sentiment_score' ] for key in required_keys: if key not in result: raise ValueError(f"Missing key in result: {key}") if not isinstance(result['predictions'], list): raise ValueError("Predictions must be a list") if len(result['predictions']) == 0: raise ValueError("No predictions generated") return True