Jaron67's picture
Update app.py
0028d6d verified
import os
import re
import json
import time
import traceback
from pathlib import Path
from typing import Dict, Any, List, Tuple
import pandas as pd
import gradio as gr
import papermill as pm
import plotly.graph_objects as go
# Optional LLM (HuggingFace Inference API)
try:
from huggingface_hub import InferenceClient
except Exception:
InferenceClient = None
# =========================================================
# CONFIG
# =========================================================
BASE_DIR = Path(__file__).resolve().parent
NB1 = os.environ.get("NB1", "datacreation.ipynb").strip()
NB2 = os.environ.get("NB2", "pythonanalysis.ipynb").strip()
RUNS_DIR = BASE_DIR / "runs"
ART_DIR = BASE_DIR / "artifacts"
PY_FIG_DIR = ART_DIR / "py" / "figures"
PY_TAB_DIR = ART_DIR / "py" / "tables"
PAPERMILL_TIMEOUT = int(os.environ.get("PAPERMILL_TIMEOUT", "1800"))
MAX_PREVIEW_ROWS = int(os.environ.get("MAX_FILE_PREVIEW_ROWS", "50"))
MAX_LOG_CHARS = int(os.environ.get("MAX_LOG_CHARS", "8000"))
HF_API_KEY = os.environ.get("HF_API_KEY", "").strip()
MODEL_NAME = os.environ.get("MODEL_NAME", "deepseek-ai/DeepSeek-R1").strip()
HF_PROVIDER = os.environ.get("HF_PROVIDER", "novita").strip()
N8N_WEBHOOK_URL = os.environ.get("N8N_WEBHOOK_URL", "").strip()
LLM_ENABLED = bool(HF_API_KEY) and InferenceClient is not None
llm_client = (
InferenceClient(provider=HF_PROVIDER, api_key=HF_API_KEY)
if LLM_ENABLED
else None
)
# =========================================================
# HELPERS
# =========================================================
def ensure_dirs():
for p in [RUNS_DIR, ART_DIR, PY_FIG_DIR, PY_TAB_DIR]:
p.mkdir(parents=True, exist_ok=True)
def stamp():
return time.strftime("%Y%m%d-%H%M%S")
def tail(text: str, n: int = MAX_LOG_CHARS) -> str:
return (text or "")[-n:]
def _ls(dir_path: Path, exts: Tuple[str, ...]) -> List[str]:
if not dir_path.is_dir():
return []
return sorted(p.name for p in dir_path.iterdir() if p.is_file() and p.suffix.lower() in exts)
def _read_csv(path: Path) -> pd.DataFrame:
return pd.read_csv(path, nrows=MAX_PREVIEW_ROWS)
def _read_json(path: Path):
with path.open(encoding="utf-8") as f:
return json.load(f)
def artifacts_index() -> Dict[str, Any]:
return {
"python": {
"figures": _ls(PY_FIG_DIR, (".png", ".jpg", ".jpeg")),
"tables": _ls(PY_TAB_DIR, (".csv", ".json")),
},
}
# =========================================================
# PIPELINE RUNNERS
# =========================================================
def run_notebook(nb_name: str) -> str:
ensure_dirs()
nb_in = BASE_DIR / nb_name
if not nb_in.exists():
return f"ERROR: {nb_name} not found."
nb_out = RUNS_DIR / f"run_{stamp()}_{nb_name}"
pm.execute_notebook(
input_path=str(nb_in),
output_path=str(nb_out),
cwd=str(BASE_DIR),
log_output=True,
progress_bar=False,
request_save_on_cell_execute=True,
execution_timeout=PAPERMILL_TIMEOUT,
)
return f"Executed {nb_name}"
def run_datacreation() -> str:
try:
log = run_notebook(NB1)
csvs = [f.name for f in BASE_DIR.glob("*.csv")]
return f"OK {log}\n\nCSVs now in /app:\n" + "\n".join(f" - {c}" for c in sorted(csvs))
except Exception as e:
return f"FAILED {e}\n\n{traceback.format_exc()[-2000:]}"
def run_pythonanalysis() -> str:
try:
log = run_notebook(NB2)
idx = artifacts_index()
figs = idx["python"]["figures"]
tabs = idx["python"]["tables"]
return (
f"OK {log}\n\n"
f"Figures: {', '.join(figs) or '(none)'}\n"
f"Tables: {', '.join(tabs) or '(none)'}"
)
except Exception as e:
return f"FAILED {e}\n\n{traceback.format_exc()[-2000:]}"
def run_full_pipeline() -> str:
logs = []
logs.append("=" * 50)
logs.append("STEP 1/2: Data Creation (web scraping + synthetic data)")
logs.append("=" * 50)
logs.append(run_datacreation())
logs.append("")
logs.append("=" * 50)
logs.append("STEP 2/2: Python Analysis (sentiment, ARIMA, dashboard)")
logs.append("=" * 50)
logs.append(run_pythonanalysis())
return "\n".join(logs)
# =========================================================
# GALLERY LOADERS
# =========================================================
def _load_all_figures() -> List[str]:
"""Return list of figure filepaths for Gallery."""
return [str(p) for p in sorted(PY_FIG_DIR.glob("*.png"))]
def _load_table_safe(path: Path) -> pd.DataFrame:
try:
if path.suffix == ".json":
obj = _read_json(path)
if isinstance(obj, dict):
return pd.DataFrame([obj])
return pd.DataFrame(obj)
return _read_csv(path)
except Exception as e:
return pd.DataFrame([{"error": str(e)}])
def refresh_gallery():
figures = _load_all_figures()
preferred_tables = [
"df_dashboard.csv",
"ticker_summary.csv",
"rule_based_investment_signals.csv",
"real_dataset_with_vader.csv",
"merged_real_synthetic_dataset.csv",
"random_forest_feature_importance.csv",
"lstm_predictions.csv",
]
table_choices = [t for t in preferred_tables if (PY_TAB_DIR / t).exists()]
default_df = pd.DataFrame()
if table_choices:
default_df = _load_table_safe(PY_TAB_DIR / table_choices[0])
return (
figures if figures else [],
gr.update(choices=table_choices, value=table_choices[0] if table_choices else None),
default_df,
)
def on_table_select(choice: str):
if not choice:
return pd.DataFrame([{"hint": "Select a table above."}])
path = PY_TAB_DIR / choice
if not path.exists():
return pd.DataFrame([{"error": f"File not found: {choice}"}])
return _load_table_safe(path)
# =========================================================
# KPI LOADER
# =========================================================
def load_kpis() -> Dict[str, Any]:
for candidate in [PY_TAB_DIR / "kpis.json", PY_FIG_DIR / "kpis.json"]:
if candidate.exists():
try:
return _read_json(candidate)
except Exception:
pass
return {}
# =========================================================
# AI DASHBOARD -- LLM picks what to display
# =========================================================
DASHBOARD_SYSTEM = """You are an AI dashboard assistant for a book-sales analytics app.
The user asks questions or requests about their data. You have access to pre-computed
artifacts from a Python analysis pipeline.
AVAILABLE ARTIFACTS (only reference ones that exist):
{artifacts_json}
KPI SUMMARY: {kpis_json}
YOUR JOB:
1. Answer the user's question conversationally using the KPIs and your knowledge of the artifacts.
2. At the END of your response, output a JSON block (fenced with ```json ... ```) that tells
the dashboard which artifact to display. The JSON must have this shape:
{{"show": "figure"|"table"|"none", "scope": "python", "filename": "..."}}
- Use "show": "figure" to display a chart image.
- Use "show": "table" to display a CSV/JSON table.
- Use "show": "none" if no artifact is relevant.
RULES:
- If the user asks about sales trends or forecasting by title, show sales_trends or arima figures.
- If the user asks about sentiment, show sentiment figure or sentiment_counts table.
- If the user asks about forecast accuracy or ARIMA, show arima figures.
- If the user asks about top sellers, show top_titles_by_units_sold.csv.
- If the user asks a general data question, pick the most relevant artifact.
- Keep your answer concise (2-4 sentences), then the JSON block.
"""
JSON_BLOCK_RE = re.compile(r"```json\s*(\{.*?\})\s*```", re.DOTALL)
FALLBACK_JSON_RE = re.compile(r"\{[^{}]*\"show\"[^{}]*\}", re.DOTALL)
def _parse_display_directive(text: str) -> Dict[str, str]:
m = JSON_BLOCK_RE.search(text)
if m:
try:
return json.loads(m.group(1))
except json.JSONDecodeError:
pass
m = FALLBACK_JSON_RE.search(text)
if m:
try:
return json.loads(m.group(0))
except json.JSONDecodeError:
pass
return {"show": "none"}
def _clean_response(text: str) -> str:
"""Strip the JSON directive block from the displayed response."""
return JSON_BLOCK_RE.sub("", text).strip()
def _build_n8n_context() -> Dict[str, Any]:
kpis = load_kpis()
def preview_csv(filename: str, n: int = 5) -> List[Dict[str, Any]]:
path = PY_TAB_DIR / filename
if not path.exists():
return []
try:
df = pd.read_csv(path).head(n)
return df.to_dict(orient="records")
except Exception:
return []
return {
"kpis": kpis,
"ticker_summary_preview": preview_csv("ticker_summary.csv", 5),
"real_sentiment_summary_preview": preview_csv("real_sentiment_summary.csv", 5),
"synthetic_sentiment_summary_preview": preview_csv("synthetic_sentiment_summary.csv", 5),
"rf_feature_importance_preview": preview_csv("random_forest_feature_importance.csv", 5),
}
def _n8n_call(msg: str) -> Tuple[str, Dict]:
import requests as req
def _extract_answer_chart_from_text(text: str):
text = (text or "").strip()
if not text:
return "", "none"
answer = ""
chart = "none"
answer_match = re.search(r"ANSWER:\s*(.*?)\s*CHART:", text, flags=re.IGNORECASE | re.DOTALL)
chart_match = re.search(r"CHART:\s*(sales|sentiment|top_sellers|returns|none)", text, flags=re.IGNORECASE)
if answer_match:
answer = answer_match.group(1).strip()
else:
# If there is no explicit ANSWER:, use the text itself minus any CHART line
answer = re.sub(
r"CHART:\s*(sales|sentiment|top_sellers|returns|none)",
"",
text,
flags=re.IGNORECASE,
).strip()
if chart_match:
chart = chart_match.group(1).strip().lower()
return answer, chart
def _normalize_payload(data):
"""
Handle common n8n webhook output shapes:
- {"answer": "...", "chart": "..."}
- [{"answer": "...", "chart": "..."}]
- {"json": {"answer": "...", "chart": "..."}}
- {"body": {"answer": "...", "chart": "..."}}
- stringified JSON
"""
if isinstance(data, list):
if not data:
return {}
data = data[0]
if isinstance(data, dict) and "json" in data and isinstance(data["json"], dict):
data = data["json"]
if isinstance(data, dict) and "body" in data:
body = data["body"]
if isinstance(body, dict):
data = body
elif isinstance(body, str):
try:
data = json.loads(body)
except Exception:
pass
if isinstance(data, str):
try:
data = json.loads(data)
except Exception:
# keep as raw string
return data
if isinstance(data, list):
if not data:
return {}
data = data[0]
if isinstance(data, dict) and "json" in data and isinstance(data["json"], dict):
data = data["json"]
return data
try:
payload = {
"question": msg,
"dashboard_context": _build_n8n_context(),
}
resp = req.post(N8N_WEBHOOK_URL, json=payload, timeout=30)
raw = (resp.text or "").strip()
print(f"[n8n] status={resp.status_code} raw={raw[:1000]}")
parsed = None
# 1) Try normal JSON parsing first
try:
parsed = resp.json()
except Exception:
parsed = None
# 2) If that failed, try raw text as JSON
if parsed is None and raw:
try:
parsed = json.loads(raw)
except Exception:
parsed = None
parsed = _normalize_payload(parsed)
valid_charts = {"sales", "sentiment", "top_sellers", "returns", "none"}
# Case A: proper dict payload
if isinstance(parsed, dict):
answer = str(parsed.get("answer", "") or "").strip()
chart = str(parsed.get("chart", "none") or "none").strip().lower()
if chart not in valid_charts:
chart = "none"
if answer:
directive = {"show": "figure", "chart": chart} if chart != "none" else {"show": "none"}
return answer, directive
# maybe raw_model_output contains the actual LLM output
raw_model_output = str(parsed.get("raw_model_output", "") or "").strip()
if raw_model_output:
answer2, chart2 = _extract_answer_chart_from_text(raw_model_output)
if chart2 not in valid_charts:
chart2 = "none"
if answer2:
directive = {"show": "figure", "chart": chart2} if chart2 != "none" else {"show": "none"}
return answer2, directive
# Case B: parsed is a string like "ANSWER: ... CHART: sentiment"
if isinstance(parsed, str) and parsed.strip():
answer, chart = _extract_answer_chart_from_text(parsed)
if chart not in valid_charts:
chart = "none"
if answer:
directive = {"show": "figure", "chart": chart} if chart != "none" else {"show": "none"}
return answer, directive
# Case C: raw response text contains ANSWER/CHART format
if raw:
answer, chart = _extract_answer_chart_from_text(raw)
if chart not in valid_charts:
chart = "none"
if answer:
directive = {"show": "figure", "chart": chart} if chart != "none" else {"show": "none"}
return answer, directive
return "", None
except req.exceptions.Timeout:
return "", None
except Exception as e:
print(f"[n8n] exception: {e}")
return "", None
def ai_chat(user_msg: str, history: list):
"""Chat function for the AI Dashboard tab."""
if not user_msg or not user_msg.strip():
return history, "", None, None
idx = artifacts_index()
kpis = load_kpis()
# Priority: n8n webhook > HF LLM > keyword fallback
if N8N_WEBHOOK_URL:
reply, directive = _n8n_call(user_msg)
print(f"[AI DASHBOARD] n8n reply={reply!r} directive={directive}")
if not reply.strip():
reply, directive = _keyword_fallback(user_msg, idx, kpis)
print(f"[AI DASHBOARD] using fallback reply={reply!r} directive={directive}")
elif not LLM_ENABLED:
reply, directive = _keyword_fallback(user_msg, idx, kpis)
else:
system = DASHBOARD_SYSTEM.format(
artifacts_json=json.dumps(idx, indent=2),
kpis_json=json.dumps(kpis, indent=2) if kpis else "(no KPIs yet, run the pipeline first)",
)
msgs = [{"role": "system", "content": system}]
for entry in (history or [])[-6:]:
msgs.append(entry)
msgs.append({"role": "user", "content": user_msg})
try:
r = llm_client.chat_completion(
model=MODEL_NAME,
messages=msgs,
temperature=0.3,
max_tokens=600,
stream=False,
)
raw = (
r["choices"][0]["message"]["content"]
if isinstance(r, dict)
else r.choices[0].message.content
)
directive = _parse_display_directive(raw)
reply = _clean_response(raw)
except Exception as e:
reply = f"LLM error: {e}. Falling back to keyword matching."
reply_fb, directive = _keyword_fallback(user_msg, idx, kpis)
reply += "\n\n" + reply_fb
# Resolve artifacts — build interactive Plotly charts when possible
chart_out = None
tab_out = None
show = directive.get("show", "none")
fname = directive.get("filename", "")
chart_name = directive.get("chart", "")
# Interactive chart builders keyed by name
chart_builders = {
"sales": build_sales_chart,
"sentiment": build_sentiment_chart,
"top_sellers": build_top_sellers_chart,
"returns": build_return_distribution_chart,
}
if chart_name and chart_name in chart_builders:
chart_out = chart_builders[chart_name]()
elif show == "figure" and fname:
# Fallback: try to match filename to a chart builder
if "sales_trend" in fname:
chart_out = build_sales_chart()
elif "sentiment" in fname:
chart_out = build_sentiment_chart()
elif "arima" in fname or "forecast" in fname:
chart_out = build_sales_chart() # closest interactive equivalent
else:
chart_out = _empty_chart(f"No interactive chart for {fname}")
if show == "table" and fname:
fp = PY_TAB_DIR / fname
if fp.exists():
tab_out = _load_table_safe(fp)
else:
reply += f"\n\n*(Could not find table: {fname})*"
new_history = (history or []) + [
{"role": "user", "content": user_msg},
{"role": "assistant", "content": reply},
]
return new_history, "", chart_out, tab_out
def _keyword_fallback(msg: str, idx: Dict, kpis: Dict) -> Tuple[str, Dict]:
msg_lower = msg.lower()
if not idx["python"]["figures"] and not idx["python"]["tables"]:
return (
"No artifacts are available yet. Please run the pipeline first so the finance analysis outputs can be loaded.",
{"show": "none"},
)
n_tickers = kpis.get("n_tickers", "?")
n_days = kpis.get("n_days_real", "?")
buy_signals = kpis.get("buy_signals", "?")
hold_signals = kpis.get("hold_signals", "?")
sell_signals = kpis.get("sell_signals", "?")
rf_accuracy = kpis.get("random_forest_accuracy", None)
corr_compound = kpis.get("corr_compound_vs_return", None)
corr_headline = kpis.get("corr_headline_vs_return", None)
agreement = kpis.get("agreement_synth_vs_vader", None)
def fmt(v, digits=3):
if v is None:
return None
try:
return f"{float(v):.{digits}f}"
except Exception:
return str(v)
rf_text = fmt(rf_accuracy)
corr_compound_text = fmt(corr_compound)
corr_headline_text = fmt(corr_headline)
agreement_text = fmt(agreement)
if any(w in msg_lower for w in ["price", "stock trend", "price trend", "normalized", "stock price"]):
reply = (
f"The normalized stock price chart compares relative short-term price movement across {n_tickers} tickers over {n_days} trading days. "
f"It is built from the real dataset fields `ticker`, `date`, and `close`, so it is most useful for comparing movement patterns rather than absolute price levels."
)
return reply, {"show": "figure", "chart": "sales"}
if any(w in msg_lower for w in ["sentiment", "vader", "compound", "headline tone"]):
extra = ""
if agreement_text is not None:
extra = f" The agreement between synthetic sentiment labels and VADER sentiment is {agreement_text}, so the comparison should be interpreted cautiously."
reply = (
"The sentiment chart is grounded in `real_dataset_with_vader.csv`, which contains VADER compound scores derived from aggregated financial headlines. "
"It helps show whether headline tone shifts over time and whether those shifts appear to line up at all with short-term market movement."
+ extra
)
return reply, {"show": "figure", "chart": "sentiment"}
if any(w in msg_lower for w in ["signal", "buy", "hold", "sell", "recommendation", "ticker"]):
reply = (
f"The investment signal chart summarizes the rule-based outputs across tickers, with {buy_signals} buy signals, {hold_signals} hold signals, and {sell_signals} sell signals in the current sample. "
"This suggests the strategy is behaving conservatively, with most observations falling into hold rather than strong directional recommendations."
)
return reply, {"show": "figure", "chart": "top_sellers"}
if any(w in msg_lower for w in ["synthetic", "real vs synthetic", "return distribution", "returns", "compare returns"]):
extra = ""
if corr_compound_text is not None:
extra = f" The observed correlation between VADER compound and next-day return is {corr_compound_text}, which supports a cautious interpretation if it is close to zero."
reply = (
"The return comparison chart is the best view for checking whether the synthetic return distribution behaves similarly to the observed next-day return distribution. "
"It helps assess realism in the simulated data without implying that either distribution alone provides strong predictive signal."
+ extra
)
return reply, {"show": "figure", "chart": "returns"}
if any(w in msg_lower for w in ["model", "random forest", "accuracy", "prediction", "predictive"]):
extra = ""
if rf_text is not None:
extra += f" The current Random Forest accuracy is {rf_text}."
if corr_headline_text is not None:
extra += f" The correlation between headline count and next-day return is {corr_headline_text}."
if corr_compound_text is not None:
extra += f" The correlation between VADER compound and next-day return is {corr_compound_text}."
reply = (
"The modelling results should be interpreted conservatively because the project tests whether headline-derived sentiment carries usable short-term signal in a noisy market setting."
+ extra
)
return reply, {"show": "table", "scope": "python", "filename": "random_forest_feature_importance.csv"}
if any(w in msg_lower for w in ["dashboard", "overview", "summary", "kpi"]):
reply = (
f"This dashboard covers {n_tickers} tickers across {n_days} trading days and combines price movement, headline sentiment, synthetic comparisons, and investment signals. "
"The most useful overview files are `df_dashboard.csv`, `ticker_summary.csv`, and `real_dataset_with_vader.csv`."
)
return reply, {"show": "table", "scope": "python", "filename": "df_dashboard.csv"}
return (
"I can answer questions about stock price trends, VADER sentiment, investment signals, real versus synthetic returns, and model interpretation using the dashboard artifacts as the source of truth.",
{"show": "none"},
)
# =========================================================
# KPI CARDS (BubbleBusters style)
# =========================================================
def render_kpi_cards() -> str:
kpis = load_kpis()
if not kpis:
return (
'<div style="background:rgba(255,255,255,.65);backdrop-filter:blur(16px);'
'border-radius:20px;padding:28px;text-align:center;'
'border:1.5px solid rgba(255,255,255,.7);'
'box-shadow:0 8px 32px rgba(124,92,191,.08);">'
'<div style="font-size:36px;margin-bottom:10px;">📊</div>'
'<div style="color:#a48de8;font-size:14px;'
'font-weight:800;margin-bottom:6px;">No data yet</div>'
'<div style="color:#9d8fc4;font-size:12px;">'
'Run the pipeline to populate these cards.</div>'
'</div>'
)
def card(icon, label, value, colour):
return f"""
<div style="background:rgba(255,255,255,.72);backdrop-filter:blur(16px);
border-radius:20px;padding:18px 14px 16px;text-align:center;
border:1.5px solid rgba(255,255,255,.8);
box-shadow:0 4px 16px rgba(124,92,191,.08);
border-top:3px solid {colour};">
<div style="font-size:26px;margin-bottom:7px;line-height:1;">{icon}</div>
<div style="color:#9d8fc4;font-size:9.5px;text-transform:uppercase;
letter-spacing:1.8px;margin-bottom:7px;font-weight:800;">{label}</div>
<div style="color:#2d1f4e;font-size:16px;font-weight:800;">{value}</div>
</div>"""
kpi_config = [
("n_tickers", "📈", "N Tickers", "#a48de8"),
("n_rows_real", "📄", "N Rows Real", "#7aa6f8"),
("n_rows_synth", "🧪", "N Rows Synth", "#6ee7c7"),
("n_days_real", "📅", "N Days Real", "#3dcba8"),
("avg_headline_count", "📰", "Avg Headline Count", "#8fa8f8"),
("avg_next_day_return_real", "💹", "Avg Next Day Return", "#c45ea8"),
("avg_synthetic_return", "🧬", "Avg Synthetic Return", "#e8a230"),
("agreement_synth_vs_vader", "🤝", "Agreement Synth vs VADER", "#7c5cbf"),
("corr_headline_vs_return", "🔗", "Corr Headline vs Return", "#5e8fef"),
("corr_compound_vs_return", "🧠", "Corr Compound vs Return", "#e8537a"),
("buy_signals", "🟢", "Buy Signals", "#2ec4a0"),
("hold_signals", "🟡", "Hold Signals", "#e8a230"),
("sell_signals", "🔴", "Sell Signals", "#e8537a"),
("random_forest_accuracy", "🌲", "Random Forest Accuracy", "#6aaa3a"),
("lstm_rmse", "🤖", "LSTM RMSE", "#5e8fef"),
]
html = (
'<div style="display:grid;grid-template-columns:repeat(auto-fit,minmax(140px,1fr));'
'gap:12px;margin-bottom:24px;">'
)
for key, icon, label, colour in kpi_config:
val = kpis.get(key)
if val is None:
continue
if isinstance(val, float):
val = round(val, 4)
elif isinstance(val, int) and val > 100:
val = f"{val:,}"
html += card(icon, label, str(val), colour)
html += "</div>"
return html
# =========================================================
# INTERACTIVE PLOTLY CHARTS (BubbleBusters style)
# =========================================================
CHART_PALETTE = ["#7c5cbf", "#2ec4a0", "#e8537a", "#e8a230", "#5e8fef",
"#c45ea8", "#3dbacc", "#a0522d", "#6aaa3a", "#d46060"]
def _styled_layout(**kwargs) -> dict:
defaults = dict(
template="plotly_white",
paper_bgcolor="rgba(255,255,255,0.95)",
plot_bgcolor="rgba(255,255,255,0.98)",
font=dict(family="system-ui, sans-serif", color="#2d1f4e", size=12),
margin=dict(l=60, r=20, t=70, b=70),
legend=dict(
orientation="h", yanchor="bottom", y=1.02, xanchor="right", x=1,
bgcolor="rgba(255,255,255,0.92)",
bordercolor="rgba(124,92,191,0.35)", borderwidth=1,
),
title=dict(font=dict(size=15, color="#4b2d8a")),
)
defaults.update(kwargs)
return defaults
def _empty_chart(title: str) -> go.Figure:
fig = go.Figure()
fig.update_layout(
title=title, height=420, template="plotly_white",
paper_bgcolor="rgba(255,255,255,0.95)",
annotations=[dict(text="Run the pipeline to generate data",
x=0.5, y=0.5, xref="paper", yref="paper", showarrow=False,
font=dict(size=14, color="rgba(124,92,191,0.5)"))],
)
return fig
def build_sales_chart() -> go.Figure:
path = PY_TAB_DIR / "real_dataset_with_vader.csv"
if not path.exists():
return _empty_chart("Normalized Stock Price Trends — run the pipeline first")
df = pd.read_csv(path)
required = {"ticker", "date", "close"}
if not required.issubset(df.columns):
return _empty_chart("Missing required columns in real_dataset_with_vader.csv")
df["date"] = pd.to_datetime(df["date"], errors="coerce")
df = df.dropna(subset=["date", "close"]).copy()
sample_tickers = sorted(df["ticker"].dropna().unique())[:5]
df = df[df["ticker"].isin(sample_tickers)].copy()
df = df.sort_values(["ticker", "date"])
df["normalized_price"] = df.groupby("ticker")["close"].transform(lambda s: s / s.iloc[0] * 100)
fig = go.Figure()
for i, ticker in enumerate(sample_tickers):
temp = df[df["ticker"] == ticker]
fig.add_trace(go.Scatter(
x=temp["date"],
y=temp["normalized_price"],
name=ticker,
mode="lines+markers",
line=dict(color=CHART_PALETTE[i % len(CHART_PALETTE)], width=2),
marker=dict(size=5),
hovertemplate=f"<b>{ticker}</b><br>%{{x|%Y-%m-%d}}<br>Normalized: %{{y:.2f}}<extra></extra>",
))
fig.update_layout(**_styled_layout(
height=450,
hovermode="x unified",
title=dict(text="Normalized Stock Price Trends (Base = 100)")
))
fig.update_xaxes(gridcolor="rgba(124,92,191,0.15)", showgrid=True)
fig.update_yaxes(gridcolor="rgba(124,92,191,0.15)", showgrid=True, title="Normalized Price")
return fig
def build_sentiment_chart() -> go.Figure:
path = PY_TAB_DIR / "real_dataset_with_vader.csv"
if not path.exists():
return _empty_chart("VADER Sentiment Over Time — run the pipeline first")
df = pd.read_csv(path)
required = {"ticker", "date", "compound"}
if not required.issubset(df.columns):
return _empty_chart("Missing required columns in real_dataset_with_vader.csv")
df["date"] = pd.to_datetime(df["date"], errors="coerce")
df = df.dropna(subset=["date", "compound"]).copy()
sample_tickers = sorted(df["ticker"].dropna().unique())[:5]
df = df[df["ticker"].isin(sample_tickers)].copy().sort_values(["ticker", "date"])
fig = go.Figure()
for i, ticker in enumerate(sample_tickers):
temp = df[df["ticker"] == ticker]
fig.add_trace(go.Scatter(
x=temp["date"],
y=temp["compound"],
name=ticker,
mode="lines+markers",
line=dict(color=CHART_PALETTE[i % len(CHART_PALETTE)], width=2),
marker=dict(size=5),
hovertemplate=f"<b>{ticker}</b><br>%{{x|%Y-%m-%d}}<br>Compound: %{{y:.3f}}<extra></extra>",
))
fig.update_layout(**_styled_layout(
height=450,
hovermode="x unified",
title=dict(text="VADER Sentiment Over Time")
))
fig.update_xaxes(gridcolor="rgba(124,92,191,0.15)", showgrid=True)
fig.update_yaxes(gridcolor="rgba(124,92,191,0.15)", showgrid=True, title="VADER Compound Score")
return fig
def build_top_sellers_chart() -> go.Figure:
path = PY_TAB_DIR / "rule_based_investment_signals.csv"
if not path.exists():
return _empty_chart("Investment Signal Distribution by Ticker — run the pipeline first")
df = pd.read_csv(path)
required = {"ticker", "investment_signal"}
if not required.issubset(df.columns):
return _empty_chart("Missing required columns in rule_based_investment_signals.csv")
counts = (
df.groupby(["ticker", "investment_signal"])
.size()
.unstack(fill_value=0)
.reset_index()
)
for col in ["buy", "hold", "sell"]:
if col not in counts.columns:
counts[col] = 0
colors = {"buy": "#2ec4a0", "hold": "#e8a230", "sell": "#e8537a"}
fig = go.Figure()
for signal in ["buy", "hold", "sell"]:
fig.add_trace(go.Bar(
x=counts["ticker"],
y=counts[signal],
name=signal.title(),
marker_color=colors[signal],
hovertemplate=f"<b>{signal.title()}</b><br>Ticker: %{{x}}<br>Count: %{{y}}<extra></extra>",
))
fig.update_layout(**_styled_layout(
height=450,
barmode="stack",
title=dict(text="Investment Signal Distribution by Ticker")
))
fig.update_xaxes(title="Ticker")
fig.update_yaxes(title="Count", gridcolor="rgba(124,92,191,0.15)", showgrid=True)
return fig
def refresh_dashboard():
return (
render_kpi_cards(),
build_sales_chart(),
build_sentiment_chart(),
build_top_sellers_chart(),
build_return_distribution_chart(),
)
def build_return_distribution_chart() -> go.Figure:
real_path = PY_TAB_DIR / "real_dataset_with_vader.csv"
synth_path = PY_TAB_DIR / "synthetic_dataset_analysis_ready.csv"
if not real_path.exists() or not synth_path.exists():
return _empty_chart("Return Distribution: Real vs Synthetic — run the pipeline first")
df_real = pd.read_csv(real_path)
df_synth = pd.read_csv(synth_path)
if "next_day_return" not in df_real.columns or "avg_synthetic_return" not in df_synth.columns:
return _empty_chart("Missing return columns in saved datasets")
real_vals = df_real["next_day_return"].dropna()
synth_vals = df_synth["avg_synthetic_return"].dropna()
fig = go.Figure()
fig.add_trace(go.Histogram(
x=real_vals,
name="Real",
opacity=0.6,
marker_color="#5e8fef",
nbinsx=30,
))
fig.add_trace(go.Histogram(
x=synth_vals,
name="Synthetic",
opacity=0.6,
marker_color="#e8a230",
nbinsx=30,
))
fig.update_layout(**_styled_layout(
height=450,
barmode="overlay",
title=dict(text="Return Distribution: Real vs Synthetic")
))
fig.update_xaxes(title="Return")
fig.update_yaxes(title="Frequency", gridcolor="rgba(124,92,191,0.15)", showgrid=True)
return fig
# =========================================================
# UI
# =========================================================
ensure_dirs()
def load_css() -> str:
css_path = BASE_DIR / "style.css"
base_css = css_path.read_text(encoding="utf-8") if css_path.exists() else ""
overrides = """
/* =========================
DATA TABLE: force dark headers and cells
========================= */
#table_preview table,
#table_preview .table-wrap,
#table_preview [role="grid"] {
color: #1a1a1a !important;
background: #ffffff !important;
}
#table_preview thead th,
#table_preview th,
#table_preview [role="columnheader"] {
color: #111111 !important;
-webkit-text-fill-color: #111111 !important;
opacity: 1 !important;
background: #ece8f5 !important;
font-weight: 700 !important;
text-shadow: none !important;
}
#table_preview tbody td,
#table_preview td,
#table_preview [role="gridcell"] {
color: #1f1f1f !important;
-webkit-text-fill-color: #1f1f1f !important;
opacity: 1 !important;
background: #ffffff !important;
text-shadow: none !important;
}
#table_preview * {
text-shadow: none !important;
}
/* =========================
GALLERY: just container styling
========================= */
#static_gallery {
background: rgba(255,255,255,0.92) !important;
border-radius: 12px;
}
#static_gallery img {
background: #f7f7fb !important;
border: 1px solid rgba(124,92,191,0.18) !important;
border-radius: 10px !important;
padding: 6px !important;
}
#static_gallery figcaption,
#static_gallery .caption-label,
#static_gallery [class*="caption"] {
color: #2d1f4e !important;
background: rgba(255,255,255,0.9) !important;
font-weight: 600 !important;
}
"""
return base_css + "\n" + overrides
with gr.Blocks(title="Stock Sentiment & Market Impact Dashboard") as demo:
gr.Markdown(
"# Stock Sentiment Analyser\n"
"*Analyzing how financial headlines relate to short-term stock returns*",
elem_id="escp_title",
)
# ===========================================================
# TAB 1 -- Pipeline Runner
# ===========================================================
with gr.Tab("Pipeline Runner"):
gr.Markdown()
with gr.Row():
with gr.Column(scale=1):
btn_nb1 = gr.Button("Step 1: Data Creation", variant="secondary")
with gr.Column(scale=1):
btn_nb2 = gr.Button("Step 2: Python Analysis", variant="secondary")
with gr.Row():
btn_all = gr.Button("Run Full Pipeline (Both Steps)", variant="primary")
run_log = gr.Textbox(
label="Execution Log",
lines=18,
max_lines=30,
interactive=False,
)
btn_nb1.click(run_datacreation, outputs=[run_log])
btn_nb2.click(run_pythonanalysis, outputs=[run_log])
btn_all.click(run_full_pipeline, outputs=[run_log])
# ===========================================================
# TAB 2 -- Dashboard (KPIs + Interactive Charts + Gallery)
# ===========================================================
with gr.Tab("Dashboard"):
kpi_html = gr.HTML(value=render_kpi_cards)
refresh_btn = gr.Button("Refresh Dashboard", variant="primary")
gr.Markdown("#### Interactive Charts")
chart_sales = gr.Plot(label="Normalized Stock Price Trends")
chart_sentiment = gr.Plot(label="VADER Sentiment Over Time")
chart_top = gr.Plot(label="Investment Signal Distribution by Ticker")
chart_returns = gr.Plot(label="Return Distribution: Real vs Synthetic")
gr.Markdown("#### Static Figures (from notebooks)")
gallery = gr.Gallery(
label="Generated Figures",
columns=2,
height=480,
object_fit="contain",
elem_id="static_gallery",
)
gr.Markdown("#### Data Tables")
table_dropdown = gr.Dropdown(
label="Select a table to view",
choices=[],
interactive=True,
)
table_display = gr.Dataframe(
label="Table Preview",
interactive=False,
elem_id="table_preview",
)
def _on_refresh():
kpi, c1, c2, c3, c4 = refresh_dashboard()
figs, dd, df = refresh_gallery()
return kpi, c1, c2, c3, c4, figs, dd, df
refresh_btn.click(
_on_refresh,
outputs=[kpi_html, chart_sales, chart_sentiment, chart_top, chart_returns,
gallery, table_dropdown, table_display],
)
table_dropdown.change(
on_table_select,
inputs=[table_dropdown],
outputs=[table_display],
)
# ===========================================================
# TAB 3 -- AI Dashboard
# ===========================================================
with gr.Tab('"AI" Dashboard'):
_ai_status = (
"Connected to your **n8n workflow**." if N8N_WEBHOOK_URL
else "**LLM active.**" if LLM_ENABLED
else "Using **keyword matching**. Upgrade options: "
"set `N8N_WEBHOOK_URL` to connect your n8n workflow, "
"or set `HF_API_KEY` for direct LLM access."
)
gr.Markdown(
"### Ask questions, get interactive visualisations\n\n"
f"Type a question and the system will pick the right interactive chart or table. {_ai_status}"
)
with gr.Row(equal_height=True):
with gr.Column(scale=1):
chatbot = gr.Chatbot(
label="Conversation",
height=380,
)
user_input = gr.Textbox(
label="Ask about your data",
placeholder="e.g. Show me stock price trends / What are the buy recommendations? / Sentiment analysis",
lines=1,
)
gr.Examples(
examples=[
"Show me the normalized stock price trends",
"What does VADER sentiment look like over time?",
"Which tickers generate buy or hold signals?",
"Show the real vs synthetic return distribution",
"Give me a dashboard overview",
],
inputs=user_input,
)
with gr.Column(scale=1):
ai_figure = gr.Plot(
label="Interactive Chart",
)
ai_table = gr.Dataframe(
label="Data Table",
interactive=False,
)
user_input.submit(
ai_chat,
inputs=[user_input, chatbot],
outputs=[chatbot, user_input, ai_figure, ai_table],
)
demo.launch(css=load_css(), allowed_paths=[str(BASE_DIR)])