import streamlit as st
import yfinance as yf
import numpy as np
import pandas as pd
import plotly.graph_objects as go
from plotly.subplots import make_subplots
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import mean_absolute_error, mean_squared_error
from datetime import timedelta
import math, gc, warnings
warnings.filterwarnings("ignore")
import torch
import torch.nn as nn
# ── Page config ───────────────────────────────────────────────────────────────
st.set_page_config(
page_title="StockSense · ML Dashboard",
page_icon="📈",
layout="wide",
initial_sidebar_state="expanded",
)
# ── CSS ───────────────────────────────────────────────────────────────────────
st.markdown("""
""", unsafe_allow_html=True)
# ── Constants ─────────────────────────────────────────────────────────────────
STOCKS = {
"🍎 Apple (AAPL)": "AAPL",
"🔍 Google (GOOGL)": "GOOGL",
"🪟 Microsoft (MSFT)": "MSFT",
"🚗 Tesla (TSLA)": "TSLA",
"📦 Amazon (AMZN)": "AMZN",
"📱 Meta (META)": "META",
"🟢 NVIDIA (NVDA)": "NVDA",
"🛢️ Reliance (RELIANCE.NS)": "RELIANCE.NS",
"💻 TCS (TCS.NS)": "TCS.NS",
"🏦 HDFC Bank (HDFCBANK.NS)": "HDFCBANK.NS",
}
CURRENCY = {
"AAPL":"$","GOOGL":"$","MSFT":"$","TSLA":"$","AMZN":"$","META":"$","NVDA":"$",
"RELIANCE.NS":"₹","TCS.NS":"₹","HDFCBANK.NS":"₹",
}
# ── Technical Indicators ──────────────────────────────────────────────────────
def compute_rsi(s, p=14):
d = s.diff()
g = d.clip(lower=0).rolling(p).mean()
l = (-d.clip(upper=0)).rolling(p).mean()
return 100 - 100 / (1 + g / (l + 1e-10))
def compute_macd(s, fast=12, slow=26, sig=9):
ef = s.ewm(span=fast, adjust=False).mean()
es = s.ewm(span=slow, adjust=False).mean()
m = ef - es
sg = m.ewm(span=sig, adjust=False).mean()
return m, sg, m - sg
def compute_bollinger(s, p=20, w=2):
sma = s.rolling(p).mean()
sd = s.rolling(p).std()
return sma + w*sd, sma, sma - w*sd
# ── PyTorch LSTM ──────────────────────────────────────────────────────────────
class StockLSTM(nn.Module):
def __init__(self, seq_len):
super().__init__()
self.conv = nn.Sequential(
nn.Conv1d(1, 32, kernel_size=3, padding=1),
nn.ReLU(),
nn.MaxPool1d(2),
)
self.lstm1 = nn.LSTM(32, 64, batch_first=True, bidirectional=True)
self.drop1 = nn.Dropout(0.25)
self.lstm2 = nn.LSTM(128, 32, batch_first=True, bidirectional=True)
self.drop2 = nn.Dropout(0.25)
self.fc = nn.Sequential(nn.Linear(64, 32), nn.ReLU(), nn.Linear(32, 1))
def forward(self, x):
x = x.permute(0, 2, 1)
x = self.conv(x)
x = x.permute(0, 2, 1)
x, _ = self.lstm1(x)
x = self.drop1(x)
x, _ = self.lstm2(x)
x = self.drop2(x[:, -1, :])
return self.fc(x)
def train_model(X_tr, y_tr, X_te, y_te, seq_len, epochs, progress_bar):
model = StockLSTM(seq_len)
opt = torch.optim.Adam(model.parameters(), lr=1e-3)
sched = torch.optim.lr_scheduler.ReduceLROnPlateau(opt, patience=3, factor=0.5)
loss_fn = nn.HuberLoss()
Xtr = torch.tensor(X_tr, dtype=torch.float32)
ytr = torch.tensor(y_tr, dtype=torch.float32)
Xte = torch.tensor(X_te, dtype=torch.float32)
yte = torch.tensor(y_te, dtype=torch.float32)
ds = torch.utils.data.TensorDataset(Xtr, ytr)
loader = torch.utils.data.DataLoader(ds, batch_size=32, shuffle=True)
best_val, best_state, patience_cnt = float('inf'), None, 0
for ep in range(int(epochs)):
model.train()
for xb, yb in loader:
opt.zero_grad()
loss_fn(model(xb).squeeze(), yb.squeeze()).backward()
nn.utils.clip_grad_norm_(model.parameters(), 1.0)
opt.step()
model.eval()
with torch.no_grad():
val = loss_fn(model(Xte).squeeze(), yte.squeeze()).item()
sched.step(val)
if val < best_val:
best_val = val
best_state = {k: v.clone() for k, v in model.state_dict().items()}
patience_cnt = 0
else:
patience_cnt += 1
if patience_cnt >= 6:
break
progress_bar.progress(int((ep + 1) / epochs * 100),
text=f"Training epoch {ep+1}/{epochs} · val_loss={val:.5f}")
if best_state:
model.load_state_dict(best_state)
return model
def predict(model, X):
model.eval()
with torch.no_grad():
return model(torch.tensor(X, dtype=torch.float32)).squeeze().numpy()
# ── Sentiment (lazy) ──────────────────────────────────────────────────────────
@st.cache_resource(show_spinner=False)
def load_finbert():
from transformers import pipeline as hf_pipeline
return hf_pipeline("sentiment-analysis", model="ProsusAI/finbert",
truncation=True, max_length=256, device=-1, batch_size=4)
def get_sentiment(ticker):
try:
pipe = load_finbert()
news = yf.Ticker(ticker).news[:8] or []
heads = [n.get("title", "") for n in news if n.get("title")]
if not heads:
return None, "_No recent headlines found._"
res = pipe(heads)
pos = sum(1 for r in res if r['label'] == 'positive')
neg = sum(1 for r in res if r['label'] == 'negative')
neu = sum(1 for r in res if r['label'] == 'neutral')
score = (pos - neg) / len(res)
label = "🟢 Bullish" if score > 0.2 else ("🔴 Bearish" if score < -0.2 else "🟡 Neutral")
rows = []
for h, r in zip(heads, res):
e = {"positive": "🟢", "negative": "🔴", "neutral": "🟡"}.get(r['label'], "⚪")
rows.append(f"{e} **{r['score']*100:.1f}%** — {h[:85]}")
return score, (
f"**Sentiment: {label}** · "
f"FinBERT analyzed {len(res)} headlines · "
f"🟢 {pos} 🔴 {neg} 🟡 {neu}\n\n" + "\n\n".join(rows)
)
except Exception as e:
return None, f"_Sentiment unavailable: {e}_"
# ── Header ────────────────────────────────────────────────────────────────────
st.markdown("""
STOCKSENSE
PyTorch LSTM · FinBERT · Technical Analysis · Monte Carlo CI
""", unsafe_allow_html=True)
st.markdown("
", unsafe_allow_html=True)
# ── Sidebar ───────────────────────────────────────────────────────────────────
with st.sidebar:
st.markdown("### ⚙️ Configuration")
stock_label = st.selectbox("STOCK", list(STOCKS.keys()))
period = st.selectbox("HISTORICAL PERIOD", ["6mo","1y","2y","3y","5y"], index=2)
forecast_days= st.slider("FORECAST DAYS", 3, 14, 7)
epochs = st.slider("TRAINING EPOCHS", 10, 40, 20, step=5)
run_btn = st.button("▶ RUN ANALYSIS")
st.markdown("""
MODEL ARCHITECTURE
Conv1D(32) → MaxPool
BiLSTM(64) → Dropout(0.25)
BiLSTM(32) → Dropout(0.25)
Dense(32) → Dense(1)
Loss: Huber · Opt: Adam
LR: ReduceLROnPlateau
NLP SENTIMENT
ProsusAI/FinBERT
Live Yahoo Finance headlines
""", unsafe_allow_html=True)
st.markdown("""
⚠️ Academic ML project
Not financial advice
""", unsafe_allow_html=True)
# ── Main logic ────────────────────────────────────────────────────────────────
if not run_btn:
st.markdown("""
📈
SELECT A STOCK AND CLICK RUN ANALYSIS
""", unsafe_allow_html=True)
st.stop()
ticker = STOCKS[stock_label]
curr = CURRENCY.get(ticker, "$")
SEQ = 60
# ── Fetch data ────────────────────────────────────────────────────────────────
with st.spinner("Fetching live market data..."):
try:
df = yf.download(ticker, period=period, progress=False, auto_adjust=True)
except Exception as e:
st.error(f"yfinance error: {e}")
st.stop()
st.caption(f"DEBUG — rows: {len(df)}, cols: {list(df.columns)}")
if df.empty:
st.error(f"No data returned for {ticker}. Try again in 30s.")
st.stop()
if isinstance(df.columns, pd.MultiIndex):
df.columns = df.columns.get_level_values(0)
df.dropna(inplace=True)
if len(df) < SEQ + 20:
st.error(f"Only {len(df)} rows — need {SEQ+20}+. Try a longer period.")
st.stop()
close = df["Close"].squeeze()
# ── Indicators ────────────────────────────────────────────────────────────────
with st.spinner("Computing technical indicators..."):
rsi = compute_rsi(close)
macd, sig, hist = compute_macd(close)
bb_up, bb_mid, bb_low = compute_bollinger(close)
# ── Prepare sequences ─────────────────────────────────────────────────────────
prices = close.values.reshape(-1, 1).astype(float)
scaler = MinMaxScaler()
scaled = scaler.fit_transform(prices)
X, y = [], []
for i in range(SEQ, len(scaled)):
X.append(scaled[i-SEQ:i])
y.append(scaled[i, 0])
X = np.array(X, dtype=np.float32)
y = np.array(y, dtype=np.float32)
split = int(len(X) * 0.85)
X_tr, X_te = X[:split], X[split:]
y_tr, y_te = y[:split], y[split:]
# ── Train ─────────────────────────────────────────────────────────────────────
st.markdown("**Training LSTM model...**")
pb = st.progress(0, text="Starting training...")
model = train_model(X_tr, y_tr, X_te, y_te, SEQ, epochs, pb)
pb.empty()
# ── Predictions ───────────────────────────────────────────────────────────────
with st.spinner("Generating predictions..."):
pred_s = predict(model, X_te).reshape(-1, 1)
pred = scaler.inverse_transform(pred_s).flatten()
actual = scaler.inverse_transform(y_te.reshape(-1, 1)).flatten()
mae = mean_absolute_error(actual, pred)
rmse = math.sqrt(mean_squared_error(actual, pred))
mape = float(np.mean(np.abs((actual - pred) / (actual + 1e-10)))) * 100
test_dates = close.index[SEQ + split:]
# ── Forecast ──────────────────────────────────────────────────────────────────
with st.spinner("Forecasting future prices..."):
cur_seq = scaled[-SEQ:].copy()
forecast = []
for _ in range(forecast_days):
inp = cur_seq.reshape(1, SEQ, 1).astype(np.float32)
out = float(predict(model, inp))
forecast.append(out)
cur_seq = np.append(cur_seq[1:], [[out]], axis=0)
forecast_prices = scaler.inverse_transform(
np.array(forecast, dtype=np.float32).reshape(-1, 1)).flatten()
mc = []
for _ in range(30):
sq = scaled[-SEQ:].copy(); run = []
for _ in range(forecast_days):
inp = sq.reshape(1, SEQ, 1).astype(np.float32)
o = float(predict(model, inp)) + np.random.normal(0, 0.006)
run.append(o)
sq = np.append(sq[1:], [[o]], axis=0)
mc.append(scaler.inverse_transform(
np.array(run, dtype=np.float32).reshape(-1, 1)).flatten())
mc = np.array(mc)
ci_upper = np.percentile(mc, 90, axis=0)
ci_lower = np.percentile(mc, 10, axis=0)
last_date = close.index[-1]
forecast_dates = pd.bdate_range(
start=last_date + timedelta(days=1), periods=forecast_days)
# ── Metric cards ─────────────────────────────────────────────────────────────
direction = "📈 UP" if forecast_prices[-1] > float(close.iloc[-1]) else "📉 DOWN"
pct = ((forecast_prices[-1] - float(close.iloc[-1])) / float(close.iloc[-1])) * 100
color = "#48bb78" if pct > 0 else "#fc8181"
c1, c2, c3, c4, c5 = st.columns(5)
for col, label, val, sub in [
(c1, "CURRENT PRICE", f"{curr}{float(close.iloc[-1]):.2f}", ticker),
(c2, f"{forecast_days}D TARGET", f"{curr}{forecast_prices[-1]:.2f}",
f"{'↑' if pct>0 else '↓'} {abs(pct):.2f}%"),
(c3, "MAE", f"{curr}{mae:.2f}", "mean abs error"),
(c4, "RMSE", f"{curr}{rmse:.2f}", "root mean sq error"),
(c5, "MAPE", f"{mape:.2f}%", "mean abs pct error"),
]:
col.markdown(f"""
""", unsafe_allow_html=True)
st.markdown("
", unsafe_allow_html=True)
# ── Chart 1: Price + Indicators ───────────────────────────────────────────────
n = min(200, len(close))
xh = close.index[-n:]
fig1 = make_subplots(rows=3, cols=1, shared_xaxes=True,
row_heights=[0.55, 0.25, 0.20], vertical_spacing=0.03,
subplot_titles=("Price · Bollinger Bands · LSTM Predictions",
"MACD", "RSI (14)"))
fig1.add_trace(go.Scatter(
x=list(xh) + list(xh[::-1]),
y=list(bb_up[-n:]) + list(bb_low[-n:][::-1]),
fill='toself', fillcolor='rgba(99,179,237,0.07)',
line=dict(color='rgba(0,0,0,0)'), showlegend=False, name='BB Band'
), row=1, col=1)
fig1.add_trace(go.Scatter(x=xh, y=close[-n:], name='Close',
line=dict(color='#63b3ed', width=1.5)), row=1, col=1)
fig1.add_trace(go.Scatter(x=xh, y=bb_up[-n:], name='BB Upper',
line=dict(color='#4299e1', width=1, dash='dot')), row=1, col=1)
fig1.add_trace(go.Scatter(x=xh, y=bb_low[-n:], name='BB Lower',
line=dict(color='#4299e1', width=1, dash='dot')), row=1, col=1)
fig1.add_trace(go.Scatter(x=test_dates, y=actual, name='Actual (Test)',
line=dict(color='#68d391', width=2)), row=1, col=1)
fig1.add_trace(go.Scatter(x=test_dates, y=pred, name='LSTM Predicted',
line=dict(color='#f6ad55', width=2, dash='dash')), row=1, col=1)
fig1.add_trace(go.Scatter(x=xh, y=macd[-n:], name='MACD',
line=dict(color='#b794f4', width=1.5)), row=2, col=1)
fig1.add_trace(go.Scatter(x=xh, y=sig[-n:], name='Signal',
line=dict(color='#fc8181', width=1.5)), row=2, col=1)
fig1.add_trace(go.Bar(x=xh, y=hist[-n:], name='Hist',
marker_color='rgba(160,174,192,0.35)'), row=2, col=1)
fig1.add_trace(go.Scatter(x=xh, y=rsi[-n:], name='RSI',
line=dict(color='#f6e05e', width=1.5)), row=3, col=1)
fig1.add_hline(y=70, line_dash='dot', line_color='rgba(252,129,129,0.5)', row=3, col=1)
fig1.add_hline(y=30, line_dash='dot', line_color='rgba(104,211,145,0.5)', row=3, col=1)
fig1.update_layout(template='plotly_dark', paper_bgcolor='#0d1117', plot_bgcolor='#0d1117',
height=600, margin=dict(l=10, r=10, t=40, b=10),
font=dict(family='monospace', size=11, color='#a0aec0'),
legend=dict(orientation='h', y=-0.02, font=dict(size=10)),
hovermode='x unified')
fig1.update_xaxes(gridcolor='#1a2332', zeroline=False)
fig1.update_yaxes(gridcolor='#1a2332', zeroline=False)
st.plotly_chart(fig1, use_container_width=True)
# ── Chart 2: Forecast ─────────────────────────────────────────────────────────
fig2 = go.Figure()
fig2.add_trace(go.Scatter(x=close.index[-60:], y=close[-60:],
name='Recent History', line=dict(color='#63b3ed', width=2)))
fig2.add_trace(go.Scatter(
x=list(forecast_dates) + list(forecast_dates[::-1]),
y=list(ci_upper) + list(ci_lower[::-1]),
fill='toself', fillcolor='rgba(252,129,129,0.12)',
line=dict(color='rgba(0,0,0,0)'), name='80% CI'))
fig2.add_trace(go.Scatter(x=forecast_dates, y=forecast_prices,
name=f'{forecast_days}-Day Forecast', mode='lines+markers',
line=dict(color='#fc8181', width=2.5),
marker=dict(size=7, symbol='circle-open', line=dict(width=2))))
fig2.add_annotation(x=forecast_dates[-1], y=float(forecast_prices[-1]),
text=f"{curr}{forecast_prices[-1]:.2f}",
showarrow=True, arrowhead=2, arrowcolor='#fc8181',
bgcolor='#1a2332', bordercolor='#fc8181',
font=dict(color='#fc8181', size=12))
fig2.add_vline(x=last_date.timestamp() * 1000, line_dash='dash',
line_color='rgba(160,174,192,0.4)',
annotation_text='Today', annotation_font_color='#718096')
fig2.update_layout(template='plotly_dark', paper_bgcolor='#0d1117', plot_bgcolor='#0d1117',
height=400, margin=dict(l=10, r=10, t=30, b=10),
font=dict(family='monospace', size=11, color='#a0aec0'),
legend=dict(orientation='h', y=-0.1), hovermode='x unified',
title=dict(text=f'{ticker} — {forecast_days}-Day Forecast · 80% Confidence Interval',
font=dict(size=13, color='#e2e8f0')))
fig2.update_xaxes(gridcolor='#1a2332')
fig2.update_yaxes(gridcolor='#1a2332')
st.plotly_chart(fig2, use_container_width=True)
# ── Sentiment ─────────────────────────────────────────────────────────────────
with st.spinner("Running FinBERT sentiment analysis..."):
_, sent_text = get_sentiment(ticker)
with st.expander("📰 FinBERT News Sentiment", expanded=True):
st.markdown(sent_text)
gc.collect()