Spaces:
Sleeping
Sleeping
| """ | |
| Stock Price Prediction Model - Optimized for HuggingFace Spaces | |
| Uses PyTorch LSTM + VADER sentiment analysis | |
| No TensorFlow, no transformers, lightweight and fast | |
| """ | |
| import numpy as np | |
| import pandas as pd | |
| import torch | |
| import torch.nn as nn | |
| from sklearn.preprocessing import MinMaxScaler | |
| import nltk | |
| from nltk.sentiment.vader import SentimentIntensityAnalyzer | |
| from datetime import datetime, timedelta | |
| import yfinance as yf | |
| import os | |
| from dotenv import load_dotenv | |
| import joblib | |
| import requests | |
| import json | |
| from pathlib import Path | |
| # Load environment variables | |
| load_dotenv() | |
| # Download VADER lexicon | |
| try: | |
| nltk.data.find('vader_lexicon') | |
| except LookupError: | |
| nltk.download("vader_lexicon", quiet=True) | |
| # Initialize sentiment analyzer | |
| sia = SentimentIntensityAnalyzer() | |
| # Get API keys from environment | |
| FINNHUB_API_KEY = os.getenv("FINNHUB_API_KEY", "") | |
| # ============================================================================ | |
| # PYTORCH LSTM MODEL | |
| # ============================================================================ | |
| class StockLSTM(nn.Module): | |
| """Lightweight PyTorch LSTM for stock price prediction""" | |
| def __init__(self, input_size=5, hidden_size=32, num_layers=1, dropout=0.2): | |
| super(StockLSTM, self).__init__() | |
| self.lstm = nn.LSTM( | |
| input_size=input_size, | |
| hidden_size=hidden_size, | |
| num_layers=num_layers, | |
| dropout=dropout if num_layers > 1 else 0, | |
| batch_first=True | |
| ) | |
| self.fc = nn.Linear(hidden_size, 1) | |
| def forward(self, x): | |
| """Forward pass""" | |
| lstm_out, _ = self.lstm(x) | |
| last_hidden = lstm_out[:, -1, :] | |
| output = self.fc(last_hidden) | |
| return output | |
| # ============================================================================ | |
| # PREDICTION CACHE (24-hour expiry) | |
| # ============================================================================ | |
| class PredictionCache: | |
| """Simple in-memory cache for predictions""" | |
| def __init__(self, cache_file="prediction_cache.pkl"): | |
| self.cache_file = cache_file | |
| self.cache = {} | |
| self.load_cache() | |
| def get(self, symbol, days_ahead): | |
| """Get cached prediction if valid (24 hours)""" | |
| key = f"{symbol}_{days_ahead}" | |
| if key in self.cache: | |
| timestamp, result = self.cache[key] | |
| # Check if cache is still valid (24 hours) | |
| if (datetime.now() - timestamp).seconds < 86400: | |
| return result | |
| return None | |
| def set(self, symbol, days_ahead, result): | |
| """Cache prediction""" | |
| key = f"{symbol}_{days_ahead}" | |
| self.cache[key] = (datetime.now(), result) | |
| self.save_cache() | |
| def save_cache(self): | |
| """Save cache to disk""" | |
| try: | |
| joblib.dump(self.cache, self.cache_file) | |
| except Exception as e: | |
| print(f"Warning: Could not save cache: {str(e)}") | |
| def load_cache(self): | |
| """Load cache from disk""" | |
| try: | |
| if Path(self.cache_file).exists(): | |
| self.cache = joblib.load(self.cache_file) | |
| except Exception as e: | |
| print(f"Warning: Could not load cache: {str(e)}") | |
| self.cache = {} | |
| # Initialize global cache | |
| cache = PredictionCache() | |
| # Model cache for loading pre-trained models | |
| model_cache = {} | |
| # ============================================================================ | |
| # DATA FETCHING & PREPROCESSING | |
| # ============================================================================ | |
| def fetch_stock_data(symbol, days=100): | |
| """ | |
| Fetch stock data using yfinance (free, fast, no API key) | |
| Args: | |
| symbol: Stock ticker (e.g., 'AAPL') | |
| days: Number of days of historical data | |
| Returns: | |
| DataFrame with OHLCV data | |
| """ | |
| try: | |
| end_date = datetime.now() | |
| start_date = end_date - timedelta(days=days) | |
| # Download data | |
| df = yf.download( | |
| symbol, | |
| start=start_date, | |
| end=end_date, | |
| auto_adjust=False, | |
| progress=False | |
| ) | |
| # Fix MultiIndex columns from newer yfinance | |
| if isinstance(df.columns, pd.MultiIndex): | |
| df.columns = df.columns.get_level_values(0) | |
| print("Downloaded columns:", df.columns.tolist()) | |
| if df.empty: | |
| raise ValueError(f"No data returned for {symbol}") | |
| if 'Close' not in df.columns: | |
| raise ValueError( | |
| f"'Close' column missing. Available columns: {df.columns.tolist()}" | |
| ) | |
| if len(df) < 60: | |
| raise ValueError( | |
| f"Insufficient data for {symbol}. Need at least 60 days." | |
| ) | |
| return df[['Close', 'Open', 'High', 'Low', 'Volume']].astype(np.float32) | |
| except Exception as e: | |
| raise ValueError(f"Error fetching data for {symbol}: {str(e)}") | |
| def add_technical_indicators(df): | |
| """ | |
| Add technical indicators for prediction | |
| Optimized: Only 5 most important features | |
| Features: | |
| - Close: Stock closing price | |
| - RSI: Relative Strength Index (momentum) | |
| - MACD: Moving Average Convergence Divergence (trend) | |
| - Volatility: Price volatility indicator | |
| - SMA20: 20-day Simple Moving Average (trend) | |
| """ | |
| df = df.copy() | |
| try: | |
| # RSI (Relative Strength Index) | |
| delta = df['Close'].diff() | |
| gain = delta.where(delta > 0, 0).rolling(window=14).mean() | |
| loss = -delta.where(delta < 0, 0).rolling(window=14).mean() | |
| rs = gain / loss | |
| df['RSI'] = 100 - (100 / (1 + rs)) | |
| # MACD (Moving Average Convergence Divergence) | |
| ema12 = df['Close'].ewm(span=12).mean() | |
| ema26 = df['Close'].ewm(span=26).mean() | |
| df['MACD'] = ema12 - ema26 | |
| # Volatility (standard deviation of returns) | |
| df['Volatility'] = df['Close'].rolling(window=10).std() / df['Close'] * 100 | |
| # SMA20 (20-day Simple Moving Average) | |
| df['SMA20'] = df['Close'].rolling(window=20).mean() | |
| # ROC (Rate of Change) | |
| df['ROC'] = df['Close'].pct_change(periods=5) * 100 | |
| # Remove NaN rows | |
| df = df.dropna() | |
| # Return only required features | |
| features = ['Close', 'RSI', 'MACD', 'Volatility', 'SMA20', 'ROC'] | |
| return df[features].astype(np.float32) | |
| except Exception as e: | |
| print(f"Error calculating technical indicators: {str(e)}") | |
| return df[['Close']].astype(np.float32) | |
| def preprocess_data(data): | |
| """ | |
| Normalize data using MinMaxScaler | |
| Args: | |
| data: DataFrame with features | |
| Returns: | |
| scaled_data: Numpy array with normalized values [0, 1] | |
| close_scaler: Scaler for inverse transform | |
| """ | |
| scalers = {} | |
| scaled_data = np.zeros((len(data), data.shape[1]), dtype=np.float32) | |
| for i, col in enumerate(data.columns): | |
| scaler = MinMaxScaler(feature_range=(0, 1)) | |
| scaled_data[:, i] = scaler.fit_transform( | |
| data.iloc[:, i].values.reshape(-1, 1) | |
| ).flatten() | |
| scalers[col] = scaler | |
| return scaled_data, scalers['Close'] | |
| def create_sequences(data, time_step=30): | |
| """ | |
| Create sequences for LSTM training | |
| Args: | |
| data: Scaled data array | |
| time_step: Look-back period (30 days) | |
| Returns: | |
| X: Input sequences | |
| y: Target values (next day price) | |
| """ | |
| X, y = [], [] | |
| for i in range(len(data) - time_step - 1): | |
| X.append(data[i:(i + time_step), :]) | |
| y.append(data[i + time_step, 0]) # Close price is first column | |
| return np.array(X, dtype=np.float32), np.array(y, dtype=np.float32) | |
| # ============================================================================ | |
| # MODEL TRAINING | |
| # ============================================================================ | |
| def train_lstm_model(X_train, y_train, epochs=50, device='cpu'): | |
| """ | |
| Train PyTorch LSTM model | |
| Args: | |
| X_train: Training sequences | |
| y_train: Training targets | |
| epochs: Number of training epochs | |
| device: 'cpu' or 'cuda' | |
| Returns: | |
| Trained model | |
| """ | |
| n_features = X_train.shape[2] | |
| model = StockLSTM( | |
| input_size=n_features, | |
| hidden_size=32, | |
| num_layers=1, | |
| dropout=0.2 | |
| ).to(device) | |
| criterion = nn.MSELoss() | |
| optimizer = torch.optim.Adam(model.parameters(), lr=0.001) | |
| # Convert to tensors | |
| X_tensor = torch.FloatTensor(X_train).to(device) | |
| y_tensor = torch.FloatTensor(y_train).unsqueeze(1).to(device) | |
| model.train() | |
| batch_size = 32 | |
| for epoch in range(epochs): | |
| total_loss = 0 | |
| # Mini-batch training | |
| for i in range(0, len(X_tensor), batch_size): | |
| batch_X = X_tensor[i:i+batch_size] | |
| batch_y = y_tensor[i:i+batch_size] | |
| optimizer.zero_grad() | |
| predictions = model(batch_X) | |
| loss = criterion(predictions, batch_y) | |
| loss.backward() | |
| optimizer.step() | |
| total_loss += loss.item() | |
| if (epoch + 1) % 10 == 0: | |
| avg_loss = total_loss / (len(X_tensor) / batch_size) | |
| print(f"Epoch {epoch+1}/{epochs}, Loss: {avg_loss:.6f}") | |
| return model | |
| # ============================================================================ | |
| # PREDICTION | |
| # ============================================================================ | |
| def predict_prices(model, scaled_data, scaler, time_step=30, days_ahead=5, device='cpu'): | |
| """ | |
| Predict future stock prices using trained LSTM | |
| Args: | |
| model: Trained LSTM model | |
| scaled_data: Normalized data | |
| scaler: MinMaxScaler for inverse transform | |
| time_step: Look-back period | |
| days_ahead: Number of days to predict | |
| device: 'cpu' or 'cuda' | |
| Returns: | |
| Array of predicted prices (unscaled) | |
| """ | |
| model.eval() | |
| # Start with last sequence | |
| current_seq = scaled_data[-time_step:, :].copy() | |
| predictions = [] | |
| with torch.no_grad(): | |
| for _ in range(days_ahead): | |
| # Prepare input | |
| X_test = torch.FloatTensor(current_seq).unsqueeze(0).to(device) | |
| # Predict next value | |
| next_pred = model(X_test).cpu().numpy()[0, 0] | |
| predictions.append(next_pred) | |
| # Update sequence: shift left, add new prediction | |
| current_seq = np.vstack([ | |
| current_seq[1:, :], | |
| np.concatenate([[next_pred], current_seq[-1, 1:]]) | |
| ]) | |
| # Inverse transform to get actual prices | |
| predictions_unscaled = scaler.inverse_transform( | |
| np.array(predictions).reshape(-1, 1) | |
| ) | |
| return predictions_unscaled.flatten() | |
| # ============================================================================ | |
| # SENTIMENT ANALYSIS (VADER) | |
| # ============================================================================ | |
| def fetch_finnhub_news(symbol): | |
| """ | |
| Fetch latest news headlines for sentiment analysis | |
| Args: | |
| symbol: Stock ticker | |
| Returns: | |
| List of news articles (max 10) | |
| """ | |
| if not FINNHUB_API_KEY: | |
| print("Warning: FINNHUB_API_KEY not set") | |
| return [] | |
| try: | |
| headers = {'X-Finnhub-Token': FINNHUB_API_KEY} | |
| url = f'https://finnhub.io/api/v1/news?symbol={symbol}&limit=10' | |
| response = requests.get(url, headers=headers, timeout=5) | |
| if response.status_code == 200: | |
| return response.json() | |
| return [] | |
| except Exception as e: | |
| print(f"Error fetching news: {str(e)}") | |
| return [] | |
| def analyze_sentiment(headlines): | |
| """ | |
| Analyze sentiment using VADER (Valence Aware Dictionary and sEntiment Reasoner) | |
| Much faster and lighter than transformers | |
| Args: | |
| headlines: List of news articles | |
| Returns: | |
| Dictionary with sentiment counts | |
| """ | |
| sentiment_counts = { | |
| 'positive': 0, | |
| 'negative': 0, | |
| 'neutral': 0 | |
| } | |
| if not headlines: | |
| return sentiment_counts | |
| for article in headlines[:10]: # Limit to top 10 | |
| # Combine headline and summary for analysis | |
| text = (article.get('headline', '') + " " + | |
| article.get('summary', '')).strip() | |
| if not text: | |
| continue | |
| # Get sentiment scores | |
| scores = sia.polarity_scores(text) | |
| compound = scores['compound'] | |
| # Classify based on compound score | |
| if compound > 0.05: | |
| sentiment_counts['positive'] += 1 | |
| elif compound < -0.05: | |
| sentiment_counts['negative'] += 1 | |
| else: | |
| sentiment_counts['neutral'] += 1 | |
| return sentiment_counts | |
| # ============================================================================ | |
| # MAIN PREDICTION FUNCTION | |
| # ============================================================================ | |
| def predict_stock(symbol, days_ahead=5, use_cache=True): | |
| """ | |
| Complete prediction pipeline | |
| Args: | |
| symbol: Stock ticker (e.g., 'AAPL') | |
| days_ahead: Number of days to predict (1-30) | |
| use_cache: Use cached results if available | |
| Returns: | |
| Dictionary with predictions and sentiment analysis | |
| """ | |
| symbol = symbol.upper().strip() | |
| # Validate input | |
| if not symbol or len(symbol) > 5 or not symbol.isalpha(): | |
| raise ValueError("Invalid stock symbol") | |
| if days_ahead < 1 or days_ahead > 30: | |
| raise ValueError("Days ahead must be between 1 and 30") | |
| # Check cache | |
| if use_cache: | |
| cached_result = cache.get(symbol, days_ahead) | |
| if cached_result: | |
| print(f"โ Using cached prediction for {symbol}") | |
| return cached_result | |
| # Determine device | |
| device = 'cuda' if torch.cuda.is_available() else 'cpu' | |
| print(f"Predicting {symbol} using device: {device}") | |
| try: | |
| # Step 1: Fetch data | |
| print(f"๐ฅ Fetching data for {symbol}...") | |
| data = fetch_stock_data(symbol, days=100) | |
| last_price = float(data['Close'].iloc[-1]) | |
| last_date = data.index[-1] | |
| # Step 2: Add technical indicators | |
| print("๐ Computing technical indicators...") | |
| data_with_indicators = add_technical_indicators(data) | |
| # Step 3: Preprocess | |
| print("๐ Preprocessing data...") | |
| scaled_data, scaler = preprocess_data(data_with_indicators) | |
| X, y = create_sequences(scaled_data, time_step=30) | |
| if len(X) == 0: | |
| raise ValueError("Insufficient data for sequences") | |
| # Step 4: Load or train model | |
| model_key = f"lstm_{symbol}" | |
| if model_key in model_cache: | |
| model = model_cache[model_key] | |
| print("โ Loaded cached model") | |
| else: | |
| print("๐ง Training LSTM model...") | |
| model = train_lstm_model(X, y, epochs=50, device=device) | |
| model_cache[model_key] = model | |
| # Step 5: Predict | |
| print(f"๐ฎ Predicting next {days_ahead} days...") | |
| predictions = predict_prices( | |
| model, scaled_data, scaler, | |
| time_step=30, days_ahead=days_ahead, device=device | |
| ) | |
| # Step 6: Sentiment analysis | |
| print("๐ญ Analyzing sentiment...") | |
| headlines = fetch_finnhub_news(symbol) | |
| sentiment = analyze_sentiment(headlines) | |
| # Calculate sentiment score | |
| total_sentiment = sum(sentiment.values()) | |
| if total_sentiment > 0: | |
| sentiment_score = (sentiment['positive'] - sentiment['negative']) / total_sentiment | |
| else: | |
| sentiment_score = 0.0 | |
| # Step 7: Format results | |
| result = { | |
| 'symbol': symbol, | |
| 'last_price': float(last_price), | |
| 'last_date': last_date.strftime('%Y-%m-%d'), | |
| 'predictions': [], | |
| 'sentiment': sentiment, | |
| 'sentiment_score': float(sentiment_score), | |
| 'timestamp': datetime.now().isoformat() | |
| } | |
| # Add predictions with dates | |
| current_date = last_date | |
| for i, pred_price in enumerate(predictions): | |
| # Skip weekends | |
| current_date += timedelta(days=1) | |
| while current_date.weekday() > 4: # 5=Saturday, 6=Sunday | |
| current_date += timedelta(days=1) | |
| change_pct = ((float(pred_price) - last_price) / last_price) * 100 | |
| result['predictions'].append({ | |
| 'date': current_date.strftime('%Y-%m-%d'), | |
| 'price': float(pred_price), | |
| 'change_pct': float(change_pct), | |
| 'day': i + 1 | |
| }) | |
| # Cache the result | |
| if use_cache: | |
| cache.set(symbol, days_ahead, result) | |
| print("โ Prediction complete!") | |
| return result | |
| except Exception as e: | |
| raise Exception(f"Prediction failed for {symbol}: {str(e)}") | |
| # ============================================================================ | |
| # UTILITY FUNCTIONS | |
| # ============================================================================ | |
| def get_supported_symbols(): | |
| """Get list of commonly used stock symbols""" | |
| return [ | |
| 'AAPL', 'GOOGL', 'MSFT', 'AMZN', 'TSLA', | |
| 'NVDA', 'META', 'NFLX', 'INTC', 'AMD', | |
| 'SBUX', 'PYPL', 'IBM', 'CSCO', 'TWTR' | |
| ] | |
| def validate_prediction_result(result): | |
| """Validate prediction result structure""" | |
| required_keys = [ | |
| 'symbol', 'last_price', 'last_date', | |
| 'predictions', 'sentiment', 'sentiment_score' | |
| ] | |
| for key in required_keys: | |
| if key not in result: | |
| raise ValueError(f"Missing key in result: {key}") | |
| if not isinstance(result['predictions'], list): | |
| raise ValueError("Predictions must be a list") | |
| if len(result['predictions']) == 0: | |
| raise ValueError("No predictions generated") | |
| return True | |