StockPredictor / model.py
danielthatu12's picture
Update model.py
4507b94 verified
"""
Stock Price Prediction Model - Optimized for HuggingFace Spaces
Uses PyTorch LSTM + VADER sentiment analysis
No TensorFlow, no transformers, lightweight and fast
"""
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
from sklearn.preprocessing import MinMaxScaler
import nltk
from nltk.sentiment.vader import SentimentIntensityAnalyzer
from datetime import datetime, timedelta
import yfinance as yf
import os
from dotenv import load_dotenv
import joblib
import requests
import json
from pathlib import Path
# Load environment variables
load_dotenv()
# Download VADER lexicon
try:
nltk.data.find('vader_lexicon')
except LookupError:
nltk.download("vader_lexicon", quiet=True)
# Initialize sentiment analyzer
sia = SentimentIntensityAnalyzer()
# Get API keys from environment
FINNHUB_API_KEY = os.getenv("FINNHUB_API_KEY", "")
# ============================================================================
# PYTORCH LSTM MODEL
# ============================================================================
class StockLSTM(nn.Module):
"""Lightweight PyTorch LSTM for stock price prediction"""
def __init__(self, input_size=5, hidden_size=32, num_layers=1, dropout=0.2):
super(StockLSTM, self).__init__()
self.lstm = nn.LSTM(
input_size=input_size,
hidden_size=hidden_size,
num_layers=num_layers,
dropout=dropout if num_layers > 1 else 0,
batch_first=True
)
self.fc = nn.Linear(hidden_size, 1)
def forward(self, x):
"""Forward pass"""
lstm_out, _ = self.lstm(x)
last_hidden = lstm_out[:, -1, :]
output = self.fc(last_hidden)
return output
# ============================================================================
# PREDICTION CACHE (24-hour expiry)
# ============================================================================
class PredictionCache:
"""Simple in-memory cache for predictions"""
def __init__(self, cache_file="prediction_cache.pkl"):
self.cache_file = cache_file
self.cache = {}
self.load_cache()
def get(self, symbol, days_ahead):
"""Get cached prediction if valid (24 hours)"""
key = f"{symbol}_{days_ahead}"
if key in self.cache:
timestamp, result = self.cache[key]
# Check if cache is still valid (24 hours)
if (datetime.now() - timestamp).seconds < 86400:
return result
return None
def set(self, symbol, days_ahead, result):
"""Cache prediction"""
key = f"{symbol}_{days_ahead}"
self.cache[key] = (datetime.now(), result)
self.save_cache()
def save_cache(self):
"""Save cache to disk"""
try:
joblib.dump(self.cache, self.cache_file)
except Exception as e:
print(f"Warning: Could not save cache: {str(e)}")
def load_cache(self):
"""Load cache from disk"""
try:
if Path(self.cache_file).exists():
self.cache = joblib.load(self.cache_file)
except Exception as e:
print(f"Warning: Could not load cache: {str(e)}")
self.cache = {}
# Initialize global cache
cache = PredictionCache()
# Model cache for loading pre-trained models
model_cache = {}
# ============================================================================
# DATA FETCHING & PREPROCESSING
# ============================================================================
def fetch_stock_data(symbol, days=100):
"""
Fetch stock data using yfinance (free, fast, no API key)
Args:
symbol: Stock ticker (e.g., 'AAPL')
days: Number of days of historical data
Returns:
DataFrame with OHLCV data
"""
try:
end_date = datetime.now()
start_date = end_date - timedelta(days=days)
# Download data
df = yf.download(
symbol,
start=start_date,
end=end_date,
auto_adjust=False,
progress=False
)
# Fix MultiIndex columns from newer yfinance
if isinstance(df.columns, pd.MultiIndex):
df.columns = df.columns.get_level_values(0)
print("Downloaded columns:", df.columns.tolist())
if df.empty:
raise ValueError(f"No data returned for {symbol}")
if 'Close' not in df.columns:
raise ValueError(
f"'Close' column missing. Available columns: {df.columns.tolist()}"
)
if len(df) < 60:
raise ValueError(
f"Insufficient data for {symbol}. Need at least 60 days."
)
return df[['Close', 'Open', 'High', 'Low', 'Volume']].astype(np.float32)
except Exception as e:
raise ValueError(f"Error fetching data for {symbol}: {str(e)}")
def add_technical_indicators(df):
"""
Add technical indicators for prediction
Optimized: Only 5 most important features
Features:
- Close: Stock closing price
- RSI: Relative Strength Index (momentum)
- MACD: Moving Average Convergence Divergence (trend)
- Volatility: Price volatility indicator
- SMA20: 20-day Simple Moving Average (trend)
"""
df = df.copy()
try:
# RSI (Relative Strength Index)
delta = df['Close'].diff()
gain = delta.where(delta > 0, 0).rolling(window=14).mean()
loss = -delta.where(delta < 0, 0).rolling(window=14).mean()
rs = gain / loss
df['RSI'] = 100 - (100 / (1 + rs))
# MACD (Moving Average Convergence Divergence)
ema12 = df['Close'].ewm(span=12).mean()
ema26 = df['Close'].ewm(span=26).mean()
df['MACD'] = ema12 - ema26
# Volatility (standard deviation of returns)
df['Volatility'] = df['Close'].rolling(window=10).std() / df['Close'] * 100
# SMA20 (20-day Simple Moving Average)
df['SMA20'] = df['Close'].rolling(window=20).mean()
# ROC (Rate of Change)
df['ROC'] = df['Close'].pct_change(periods=5) * 100
# Remove NaN rows
df = df.dropna()
# Return only required features
features = ['Close', 'RSI', 'MACD', 'Volatility', 'SMA20', 'ROC']
return df[features].astype(np.float32)
except Exception as e:
print(f"Error calculating technical indicators: {str(e)}")
return df[['Close']].astype(np.float32)
def preprocess_data(data):
"""
Normalize data using MinMaxScaler
Args:
data: DataFrame with features
Returns:
scaled_data: Numpy array with normalized values [0, 1]
close_scaler: Scaler for inverse transform
"""
scalers = {}
scaled_data = np.zeros((len(data), data.shape[1]), dtype=np.float32)
for i, col in enumerate(data.columns):
scaler = MinMaxScaler(feature_range=(0, 1))
scaled_data[:, i] = scaler.fit_transform(
data.iloc[:, i].values.reshape(-1, 1)
).flatten()
scalers[col] = scaler
return scaled_data, scalers['Close']
def create_sequences(data, time_step=30):
"""
Create sequences for LSTM training
Args:
data: Scaled data array
time_step: Look-back period (30 days)
Returns:
X: Input sequences
y: Target values (next day price)
"""
X, y = [], []
for i in range(len(data) - time_step - 1):
X.append(data[i:(i + time_step), :])
y.append(data[i + time_step, 0]) # Close price is first column
return np.array(X, dtype=np.float32), np.array(y, dtype=np.float32)
# ============================================================================
# MODEL TRAINING
# ============================================================================
def train_lstm_model(X_train, y_train, epochs=50, device='cpu'):
"""
Train PyTorch LSTM model
Args:
X_train: Training sequences
y_train: Training targets
epochs: Number of training epochs
device: 'cpu' or 'cuda'
Returns:
Trained model
"""
n_features = X_train.shape[2]
model = StockLSTM(
input_size=n_features,
hidden_size=32,
num_layers=1,
dropout=0.2
).to(device)
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
# Convert to tensors
X_tensor = torch.FloatTensor(X_train).to(device)
y_tensor = torch.FloatTensor(y_train).unsqueeze(1).to(device)
model.train()
batch_size = 32
for epoch in range(epochs):
total_loss = 0
# Mini-batch training
for i in range(0, len(X_tensor), batch_size):
batch_X = X_tensor[i:i+batch_size]
batch_y = y_tensor[i:i+batch_size]
optimizer.zero_grad()
predictions = model(batch_X)
loss = criterion(predictions, batch_y)
loss.backward()
optimizer.step()
total_loss += loss.item()
if (epoch + 1) % 10 == 0:
avg_loss = total_loss / (len(X_tensor) / batch_size)
print(f"Epoch {epoch+1}/{epochs}, Loss: {avg_loss:.6f}")
return model
# ============================================================================
# PREDICTION
# ============================================================================
def predict_prices(model, scaled_data, scaler, time_step=30, days_ahead=5, device='cpu'):
"""
Predict future stock prices using trained LSTM
Args:
model: Trained LSTM model
scaled_data: Normalized data
scaler: MinMaxScaler for inverse transform
time_step: Look-back period
days_ahead: Number of days to predict
device: 'cpu' or 'cuda'
Returns:
Array of predicted prices (unscaled)
"""
model.eval()
# Start with last sequence
current_seq = scaled_data[-time_step:, :].copy()
predictions = []
with torch.no_grad():
for _ in range(days_ahead):
# Prepare input
X_test = torch.FloatTensor(current_seq).unsqueeze(0).to(device)
# Predict next value
next_pred = model(X_test).cpu().numpy()[0, 0]
predictions.append(next_pred)
# Update sequence: shift left, add new prediction
current_seq = np.vstack([
current_seq[1:, :],
np.concatenate([[next_pred], current_seq[-1, 1:]])
])
# Inverse transform to get actual prices
predictions_unscaled = scaler.inverse_transform(
np.array(predictions).reshape(-1, 1)
)
return predictions_unscaled.flatten()
# ============================================================================
# SENTIMENT ANALYSIS (VADER)
# ============================================================================
def fetch_finnhub_news(symbol):
"""
Fetch latest news headlines for sentiment analysis
Args:
symbol: Stock ticker
Returns:
List of news articles (max 10)
"""
if not FINNHUB_API_KEY:
print("Warning: FINNHUB_API_KEY not set")
return []
try:
headers = {'X-Finnhub-Token': FINNHUB_API_KEY}
url = f'https://finnhub.io/api/v1/news?symbol={symbol}&limit=10'
response = requests.get(url, headers=headers, timeout=5)
if response.status_code == 200:
return response.json()
return []
except Exception as e:
print(f"Error fetching news: {str(e)}")
return []
def analyze_sentiment(headlines):
"""
Analyze sentiment using VADER (Valence Aware Dictionary and sEntiment Reasoner)
Much faster and lighter than transformers
Args:
headlines: List of news articles
Returns:
Dictionary with sentiment counts
"""
sentiment_counts = {
'positive': 0,
'negative': 0,
'neutral': 0
}
if not headlines:
return sentiment_counts
for article in headlines[:10]: # Limit to top 10
# Combine headline and summary for analysis
text = (article.get('headline', '') + " " +
article.get('summary', '')).strip()
if not text:
continue
# Get sentiment scores
scores = sia.polarity_scores(text)
compound = scores['compound']
# Classify based on compound score
if compound > 0.05:
sentiment_counts['positive'] += 1
elif compound < -0.05:
sentiment_counts['negative'] += 1
else:
sentiment_counts['neutral'] += 1
return sentiment_counts
# ============================================================================
# MAIN PREDICTION FUNCTION
# ============================================================================
def predict_stock(symbol, days_ahead=5, use_cache=True):
"""
Complete prediction pipeline
Args:
symbol: Stock ticker (e.g., 'AAPL')
days_ahead: Number of days to predict (1-30)
use_cache: Use cached results if available
Returns:
Dictionary with predictions and sentiment analysis
"""
symbol = symbol.upper().strip()
# Validate input
if not symbol or len(symbol) > 5 or not symbol.isalpha():
raise ValueError("Invalid stock symbol")
if days_ahead < 1 or days_ahead > 30:
raise ValueError("Days ahead must be between 1 and 30")
# Check cache
if use_cache:
cached_result = cache.get(symbol, days_ahead)
if cached_result:
print(f"โœ“ Using cached prediction for {symbol}")
return cached_result
# Determine device
device = 'cuda' if torch.cuda.is_available() else 'cpu'
print(f"Predicting {symbol} using device: {device}")
try:
# Step 1: Fetch data
print(f"๐Ÿ“ฅ Fetching data for {symbol}...")
data = fetch_stock_data(symbol, days=100)
last_price = float(data['Close'].iloc[-1])
last_date = data.index[-1]
# Step 2: Add technical indicators
print("๐Ÿ“Š Computing technical indicators...")
data_with_indicators = add_technical_indicators(data)
# Step 3: Preprocess
print("๐Ÿ”„ Preprocessing data...")
scaled_data, scaler = preprocess_data(data_with_indicators)
X, y = create_sequences(scaled_data, time_step=30)
if len(X) == 0:
raise ValueError("Insufficient data for sequences")
# Step 4: Load or train model
model_key = f"lstm_{symbol}"
if model_key in model_cache:
model = model_cache[model_key]
print("โœ“ Loaded cached model")
else:
print("๐Ÿง  Training LSTM model...")
model = train_lstm_model(X, y, epochs=50, device=device)
model_cache[model_key] = model
# Step 5: Predict
print(f"๐Ÿ”ฎ Predicting next {days_ahead} days...")
predictions = predict_prices(
model, scaled_data, scaler,
time_step=30, days_ahead=days_ahead, device=device
)
# Step 6: Sentiment analysis
print("๐Ÿ’ญ Analyzing sentiment...")
headlines = fetch_finnhub_news(symbol)
sentiment = analyze_sentiment(headlines)
# Calculate sentiment score
total_sentiment = sum(sentiment.values())
if total_sentiment > 0:
sentiment_score = (sentiment['positive'] - sentiment['negative']) / total_sentiment
else:
sentiment_score = 0.0
# Step 7: Format results
result = {
'symbol': symbol,
'last_price': float(last_price),
'last_date': last_date.strftime('%Y-%m-%d'),
'predictions': [],
'sentiment': sentiment,
'sentiment_score': float(sentiment_score),
'timestamp': datetime.now().isoformat()
}
# Add predictions with dates
current_date = last_date
for i, pred_price in enumerate(predictions):
# Skip weekends
current_date += timedelta(days=1)
while current_date.weekday() > 4: # 5=Saturday, 6=Sunday
current_date += timedelta(days=1)
change_pct = ((float(pred_price) - last_price) / last_price) * 100
result['predictions'].append({
'date': current_date.strftime('%Y-%m-%d'),
'price': float(pred_price),
'change_pct': float(change_pct),
'day': i + 1
})
# Cache the result
if use_cache:
cache.set(symbol, days_ahead, result)
print("โœ… Prediction complete!")
return result
except Exception as e:
raise Exception(f"Prediction failed for {symbol}: {str(e)}")
# ============================================================================
# UTILITY FUNCTIONS
# ============================================================================
def get_supported_symbols():
"""Get list of commonly used stock symbols"""
return [
'AAPL', 'GOOGL', 'MSFT', 'AMZN', 'TSLA',
'NVDA', 'META', 'NFLX', 'INTC', 'AMD',
'SBUX', 'PYPL', 'IBM', 'CSCO', 'TWTR'
]
def validate_prediction_result(result):
"""Validate prediction result structure"""
required_keys = [
'symbol', 'last_price', 'last_date',
'predictions', 'sentiment', 'sentiment_score'
]
for key in required_keys:
if key not in result:
raise ValueError(f"Missing key in result: {key}")
if not isinstance(result['predictions'], list):
raise ValueError("Predictions must be a list")
if len(result['predictions']) == 0:
raise ValueError("No predictions generated")
return True