stocksense / app.py
Peaaa's picture
Upload 2 files
9f90741 verified
# ── Python 3.13 compatibility patch ─────────────────────────────────────────
import sys
try:
import audioop
except ModuleNotFoundError:
try:
import pyaudioop as audioop
sys.modules['audioop'] = audioop
except ModuleNotFoundError:
import types
sys.modules['audioop'] = types.ModuleType('audioop')
import gradio as gr
import yfinance as yf
import numpy as np
import pandas as pd
import plotly.graph_objects as go
from plotly.subplots import make_subplots
from sklearn.preprocessing import MinMaxScaler
from sklearn.metrics import mean_absolute_error, mean_squared_error
from datetime import timedelta
import warnings, math, gc
warnings.filterwarnings("ignore")
import torch
import torch.nn as nn
# ── Lazy-load FinBERT only on first sentiment call ───────────────────────────
_SENTIMENT_PIPE = None
def get_sentiment_pipe():
global _SENTIMENT_PIPE
if _SENTIMENT_PIPE is None:
from transformers import pipeline as hf_pipeline
_SENTIMENT_PIPE = hf_pipeline(
"sentiment-analysis",
model="ProsusAI/finbert",
truncation=True,
max_length=256,
device=-1, # CPU only
batch_size=4,
)
return _SENTIMENT_PIPE
# ── Constants ────────────────────────────────────────────────────────────────
STOCKS = {
"🍎 Apple (AAPL)": "AAPL",
"🔍 Google (GOOGL)": "GOOGL",
"🪟 Microsoft (MSFT)": "MSFT",
"🚗 Tesla (TSLA)": "TSLA",
"📦 Amazon (AMZN)": "AMZN",
"📱 Meta (META)": "META",
"🟢 NVIDIA (NVDA)": "NVDA",
"🛢️ Reliance (RELIANCE.NS)": "RELIANCE.NS",
"💻 TCS (TCS.NS)": "TCS.NS",
"🏦 HDFC Bank (HDFCBANK.NS)":"HDFCBANK.NS",
}
CURRENCY = {
"AAPL":"$","GOOGL":"$","MSFT":"$","TSLA":"$","AMZN":"$","META":"$","NVDA":"$",
"RELIANCE.NS":"₹","TCS.NS":"₹","HDFCBANK.NS":"₹",
}
# ── Technical Indicators ─────────────────────────────────────────────────────
def compute_rsi(s, p=14):
d = s.diff(); g = d.clip(lower=0).rolling(p).mean()
l = (-d.clip(upper=0)).rolling(p).mean()
return 100 - 100/(1 + g/(l+1e-10))
def compute_macd(s, fast=12, slow=26, sig=9):
ef = s.ewm(span=fast,adjust=False).mean()
es = s.ewm(span=slow,adjust=False).mean()
m = ef - es; sg = m.ewm(span=sig,adjust=False).mean()
return m, sg, m-sg
def compute_bollinger(s, p=20, w=2):
sma=s.rolling(p).mean(); sd=s.rolling(p).std()
return sma+w*sd, sma, sma-w*sd
# ── PyTorch LSTM ─────────────────────────────────────────────────────────────
class StockLSTM(nn.Module):
def __init__(self, seq_len):
super().__init__()
self.conv = nn.Sequential(
nn.Conv1d(1, 32, kernel_size=3, padding=1),
nn.ReLU(),
nn.MaxPool1d(2),
)
lstm_in = seq_len // 2
self.lstm1 = nn.LSTM(32, 64, batch_first=True, bidirectional=True)
self.drop1 = nn.Dropout(0.25)
self.lstm2 = nn.LSTM(128, 32, batch_first=True, bidirectional=True)
self.drop2 = nn.Dropout(0.25)
self.fc = nn.Sequential(nn.Linear(64, 32), nn.ReLU(), nn.Linear(32, 1))
def forward(self, x): # x: (B, SEQ, 1)
x = x.permute(0,2,1) # (B, 1, SEQ) for Conv1d
x = self.conv(x) # (B, 32, SEQ//2)
x = x.permute(0,2,1) # (B, SEQ//2, 32)
x, _ = self.lstm1(x)
x = self.drop1(x)
x, _ = self.lstm2(x)
x = self.drop2(x[:, -1, :]) # last timestep
return self.fc(x)
def train_model(X_tr, y_tr, X_te, y_te, seq_len, epochs):
device = torch.device("cpu")
model = StockLSTM(seq_len).to(device)
opt = torch.optim.Adam(model.parameters(), lr=1e-3)
sched = torch.optim.lr_scheduler.ReduceLROnPlateau(opt, patience=3, factor=0.5)
loss_fn= nn.HuberLoss()
Xtr = torch.tensor(X_tr, dtype=torch.float32)
ytr = torch.tensor(y_tr, dtype=torch.float32)
Xte = torch.tensor(X_te, dtype=torch.float32)
yte = torch.tensor(y_te, dtype=torch.float32)
ds = torch.utils.data.TensorDataset(Xtr, ytr)
loader = torch.utils.data.DataLoader(ds, batch_size=32, shuffle=True)
best_val, best_state, patience_cnt = float('inf'), None, 0
for ep in range(int(epochs)):
model.train()
for xb, yb in loader:
opt.zero_grad()
loss_fn(model(xb).squeeze(), yb.squeeze()).backward()
nn.utils.clip_grad_norm_(model.parameters(), 1.0)
opt.step()
model.eval()
with torch.no_grad():
val_loss = loss_fn(model(Xte).squeeze(), yte.squeeze()).item()
sched.step(val_loss)
if val_loss < best_val:
best_val = val_loss
best_state = {k:v.clone() for k,v in model.state_dict().items()}
patience_cnt = 0
else:
patience_cnt += 1
if patience_cnt >= 6:
break
if best_state:
model.load_state_dict(best_state)
return model
def predict(model, X):
model.eval()
with torch.no_grad():
t = torch.tensor(X, dtype=torch.float32)
return model(t).squeeze().numpy()
# ── Sentiment ────────────────────────────────────────────────────────────────
def get_sentiment(ticker):
try:
pipe = get_sentiment_pipe()
news = yf.Ticker(ticker).news[:8] or []
heads = [n.get("title","") for n in news if n.get("title")]
if not heads: return None, "_No recent headlines found._"
res = pipe(heads)
pos = sum(1 for r in res if r['label']=='positive')
neg = sum(1 for r in res if r['label']=='negative')
neu = sum(1 for r in res if r['label']=='neutral')
score = (pos-neg)/len(res)
label = "🟢 Bullish" if score>0.2 else ("🔴 Bearish" if score<-0.2 else "🟡 Neutral")
rows = []
for h,r in zip(heads,res):
e = {"positive":"🟢","negative":"🔴","neutral":"🟡"}.get(r['label'],"⚪")
rows.append(f"{e} **{r['score']*100:.1f}%** — {h[:85]}")
return score, (
f"### Market Sentiment: {label}\n\n"
f"FinBERT analyzed **{len(res)} headlines** &nbsp;|&nbsp; "
f"🟢 {pos} &nbsp;🔴 {neg} &nbsp;🟡 {neu}\n\n" + "\n\n".join(rows)
)
except Exception as e:
return None, f"_Sentiment error: {e}_"
# ── Main ─────────────────────────────────────────────────────────────────────
def run_dashboard(stock_label, period, forecast_days, epochs, progress=gr.Progress()):
ticker = STOCKS.get(stock_label, stock_label)
curr = CURRENCY.get(ticker, "$")
SEQ = 60
progress(0.05, desc="Fetching live data...")
df = yf.download(ticker, period=period, progress=False)
if df.empty or len(df) < SEQ+20:
return None, None, "❌ Not enough data. Try a longer period."
df.dropna(inplace=True)
close = df['Close'].squeeze()
progress(0.15, desc="Computing indicators...")
rsi = compute_rsi(close)
macd, sig, hist = compute_macd(close)
bb_up,bb_mid,bb_low = compute_bollinger(close)
progress(0.25, desc="Building sequences...")
prices = close.values.reshape(-1,1).astype(float)
scaler = MinMaxScaler()
scaled = scaler.fit_transform(prices)
X, y = [], []
for i in range(SEQ, len(scaled)):
X.append(scaled[i-SEQ:i]); y.append(scaled[i,0])
X, y = np.array(X, dtype=np.float32), np.array(y, dtype=np.float32)
split = int(len(X)*0.85)
X_tr,X_te = X[:split], X[split:]
y_tr,y_te = y[:split], y[split:]
progress(0.35, desc="Training LSTM (PyTorch)...")
model = train_model(X_tr, y_tr, X_te, y_te, SEQ, epochs)
progress(0.70, desc="Generating predictions...")
pred_s = predict(model, X_te).reshape(-1,1)
pred = scaler.inverse_transform(pred_s).flatten()
actual = scaler.inverse_transform(y_te.reshape(-1,1)).flatten()
mae = mean_absolute_error(actual, pred)
rmse = math.sqrt(mean_squared_error(actual, pred))
mape = float(np.mean(np.abs((actual-pred)/(actual+1e-10))))*100
test_dates = close.index[SEQ+split:]
progress(0.78, desc="Forecasting...")
cur_seq = scaled[-SEQ:].copy()
forecast = []
for _ in range(int(forecast_days)):
inp = cur_seq.reshape(1, SEQ, 1).astype(np.float32)
out = float(predict(model, inp))
forecast.append(out)
cur_seq = np.append(cur_seq[1:], [[out]], axis=0)
forecast_prices = scaler.inverse_transform(
np.array(forecast, dtype=np.float32).reshape(-1,1)).flatten()
# Monte Carlo CI
mc = []
for _ in range(30):
sq = scaled[-SEQ:].copy(); run=[]
for _ in range(int(forecast_days)):
inp = sq.reshape(1,SEQ,1).astype(np.float32)
o = float(predict(model, inp)) + np.random.normal(0, 0.006)
run.append(o); sq = np.append(sq[1:],[[o]],axis=0)
mc.append(scaler.inverse_transform(
np.array(run,dtype=np.float32).reshape(-1,1)).flatten())
mc = np.array(mc)
ci_upper = np.percentile(mc, 90, axis=0)
ci_lower = np.percentile(mc, 10, axis=0)
last_date = close.index[-1]
forecast_dates = pd.bdate_range(start=last_date+timedelta(days=1),
periods=int(forecast_days))
# ── Chart 1 ──────────────────────────────────────────────────────────────
progress(0.86, desc="Rendering charts...")
n = min(200, len(close))
xh = close.index[-n:]
fig1 = make_subplots(rows=3, cols=1, shared_xaxes=True,
row_heights=[0.55,0.25,0.20], vertical_spacing=0.04,
subplot_titles=("Price · Bollinger · LSTM",
"MACD","RSI"))
# BB fill
fig1.add_trace(go.Scatter(
x=list(xh)+list(xh[::-1]),
y=list(bb_up[-n:])+list(bb_low[-n:][::-1]),
fill='toself', fillcolor='rgba(99,179,237,0.07)',
line=dict(color='rgba(0,0,0,0)'), showlegend=False, name='BB Band'
), row=1,col=1)
fig1.add_trace(go.Scatter(x=xh, y=close[-n:], name='Close',
line=dict(color='#63b3ed',width=1.5)), row=1,col=1)
fig1.add_trace(go.Scatter(x=xh, y=bb_up[-n:], name='BB Upper',
line=dict(color='#4299e1',width=1,dash='dot')), row=1,col=1)
fig1.add_trace(go.Scatter(x=xh, y=bb_low[-n:], name='BB Lower',
line=dict(color='#4299e1',width=1,dash='dot')), row=1,col=1)
fig1.add_trace(go.Scatter(x=test_dates, y=actual, name='Actual',
line=dict(color='#68d391',width=2)), row=1,col=1)
fig1.add_trace(go.Scatter(x=test_dates, y=pred, name='LSTM Pred',
line=dict(color='#f6ad55',width=2,dash='dash')), row=1,col=1)
fig1.add_trace(go.Scatter(x=xh, y=macd[-n:], name='MACD',
line=dict(color='#b794f4',width=1.5)), row=2,col=1)
fig1.add_trace(go.Scatter(x=xh, y=sig[-n:], name='Signal',
line=dict(color='#fc8181',width=1.5)), row=2,col=1)
fig1.add_trace(go.Bar(x=xh, y=hist[-n:], name='Hist',
marker_color='rgba(160,174,192,0.35)'), row=2,col=1)
fig1.add_trace(go.Scatter(x=xh, y=rsi[-n:], name='RSI',
line=dict(color='#f6e05e',width=1.5)), row=3,col=1)
fig1.add_hline(y=70, line_dash='dot', line_color='rgba(252,129,129,0.5)', row=3,col=1)
fig1.add_hline(y=30, line_dash='dot', line_color='rgba(104,211,145,0.5)', row=3,col=1)
fig1.update_layout(template='plotly_dark', paper_bgcolor='#0d1117',
plot_bgcolor='#0d1117', height=620,
margin=dict(l=10,r=10,t=40,b=10),
font=dict(family='monospace',size=11,color='#a0aec0'),
legend=dict(orientation='h',y=-0.02,font=dict(size=10)),
hovermode='x unified')
fig1.update_xaxes(gridcolor='#1a2332', zeroline=False)
fig1.update_yaxes(gridcolor='#1a2332', zeroline=False)
# ── Chart 2 ──────────────────────────────────────────────────────────────
fig2 = go.Figure()
fig2.add_trace(go.Scatter(x=close.index[-60:], y=close[-60:],
name='Recent', line=dict(color='#63b3ed',width=2)))
fig2.add_trace(go.Scatter(
x=list(forecast_dates)+list(forecast_dates[::-1]),
y=list(ci_upper)+list(ci_lower[::-1]),
fill='toself', fillcolor='rgba(252,129,129,0.12)',
line=dict(color='rgba(0,0,0,0)'), name='80% CI'))
fig2.add_trace(go.Scatter(x=forecast_dates, y=forecast_prices,
name=f'{int(forecast_days)}d Forecast', mode='lines+markers',
line=dict(color='#fc8181',width=2.5),
marker=dict(size=7,symbol='circle-open',line=dict(width=2))))
fig2.add_annotation(x=forecast_dates[-1], y=float(forecast_prices[-1]),
text=f"<b>{curr}{forecast_prices[-1]:.2f}</b>",
showarrow=True, arrowhead=2, arrowcolor='#fc8181',
bgcolor='#1a2332', bordercolor='#fc8181',
font=dict(color='#fc8181',size=12))
fig2.add_vline(x=str(last_date.date()), line_dash='dash',
line_color='rgba(160,174,192,0.4)',
annotation_text='Today', annotation_font_color='#718096')
fig2.update_layout(template='plotly_dark', paper_bgcolor='#0d1117',
plot_bgcolor='#0d1117', height=420,
margin=dict(l=10,r=10,t=30,b=10),
font=dict(family='monospace',size=11,color='#a0aec0'),
legend=dict(orientation='h',y=-0.08), hovermode='x unified',
title=dict(text=f'{ticker}{int(forecast_days)}-Day Forecast · 80% CI',
font=dict(size=13,color='#e2e8f0')))
fig2.update_xaxes(gridcolor='#1a2332')
fig2.update_yaxes(gridcolor='#1a2332')
# ── Sentiment ─────────────────────────────────────────────────────────────
progress(0.93, desc="FinBERT sentiment...")
_, sent_text = get_sentiment(ticker)
# ── Metrics ───────────────────────────────────────────────────────────────
direction = "📈 UP" if forecast_prices[-1] > float(close.iloc[-1]) else "📉 DOWN"
pct = ((forecast_prices[-1]-float(close.iloc[-1]))/float(close.iloc[-1]))*100
metrics_md = f"""
## 📊 Model Performance
| Metric | Value |
|--------|-------|
| **MAE** | {curr}{mae:.2f} |
| **RMSE** | {curr}{rmse:.2f} |
| **MAPE** | {mape:.2f}% |
| **Train samples** | {len(X_tr):,} |
| **Test samples** | {len(X_te):,} |
## 🔮 Forecast
| | Value |
|--|--|
| **Current Price** | {curr}{float(close.iloc[-1]):.2f} |
| **{int(forecast_days)}-Day Target** | {curr}{forecast_prices[-1]:.2f} |
| **Expected Move** | {direction} {abs(pct):.2f}% |
| **Upper Bound (90%)** | {curr}{ci_upper[-1]:.2f} |
| **Lower Bound (10%)** | {curr}{ci_lower[-1]:.2f} |
---
{sent_text}
---
> ⚠️ Academic ML project. Not financial advice.
"""
gc.collect()
progress(1.0, desc="Done!")
return fig1, fig2, metrics_md
# ── CSS ───────────────────────────────────────────────────────────────────────
CSS = """
@import url('https://fonts.googleapis.com/css2?family=JetBrains+Mono:wght@400;600&family=Syne:wght@700;800&display=swap');
body, .gradio-container { background:#0d1117 !important; font-family:'JetBrains Mono',monospace !important; color:#e2e8f0 !important; }
h1 { font-family:'Syne',sans-serif !important; font-weight:800 !important; }
footer { display:none !important; }
"""
# ── UI ────────────────────────────────────────────────────────────────────────
with gr.Blocks(css=CSS, title="StockSense · ML Dashboard") as demo:
gr.HTML("""
<div style="text-align:center;padding:28px 0 8px 0;">
<div style="font-size:11px;letter-spacing:.25em;color:#48bb78;font-family:'JetBrains Mono',monospace;margin-bottom:8px;">
PRABUDH RASTOGI · MANIPAL UNIVERSITY JAIPUR
</div>
<h1 style="font-size:2.6rem;font-weight:800;color:#f0fff4;font-family:'Syne',sans-serif;margin:0;line-height:1.1;">STOCKSENSE</h1>
<div style="font-size:12px;color:#4b5563;letter-spacing:.15em;margin-top:6px;">
PyTorch LSTM · FinBERT · Technical Analysis · Monte Carlo CI
</div>
<div style="width:60px;height:2px;background:linear-gradient(90deg,#48bb78,#63b3ed);margin:14px auto 0 auto;border-radius:2px;"></div>
</div>
""")
with gr.Row():
with gr.Column(scale=1):
stock_dd = gr.Dropdown(choices=list(STOCKS.keys()), value="🍎 Apple (AAPL)", label="STOCK")
period_dd = gr.Dropdown(choices=["6mo","1y","2y","3y","5y"], value="2y", label="PERIOD")
forecast_sl = gr.Slider(3, 14, value=7, step=1, label="FORECAST DAYS")
epochs_sl = gr.Slider(10, 40, value=20, step=5, label="EPOCHS")
run_btn = gr.Button("▶ RUN ANALYSIS", variant="primary", size="lg")
gr.HTML("""
<div style="margin-top:16px;padding:12px;background:#0d1117;border:1px solid #1f2937;border-radius:6px;font-size:10px;color:#4b5563;line-height:1.9;">
<div style="color:#48bb78;margin-bottom:4px;font-weight:600;">ARCHITECTURE</div>
Conv1D(32) → MaxPool<br>BiLSTM(64) → Dropout<br>BiLSTM(32) → Dropout<br>Dense(32) → Dense(1)<br>Loss: Huber · Adam + ReduceLR
<div style="color:#48bb78;margin-top:10px;font-weight:600;">NLP</div>
ProsusAI/finbert · lazy-loaded
</div>
""")
with gr.Column(scale=3):
chart1 = gr.Plot(show_label=False)
chart2 = gr.Plot(show_label=False)
metrics = gr.Markdown("*Select a stock and click **RUN ANALYSIS**.*")
run_btn.click(fn=run_dashboard,
inputs=[stock_dd, period_dd, forecast_sl, epochs_sl],
outputs=[chart1, chart2, metrics])
gr.HTML("""
<div style="text-align:center;padding:16px 0 8px 0;font-size:10px;color:#374151;letter-spacing:.1em;">
PYTORCH · HUGGINGFACE · PLOTLY · GRADIO · YFINANCE &nbsp;·&nbsp; NOT FINANCIAL ADVICE
</div>
""")
demo.launch()