| import os |
| import re |
| import json |
| import time |
| import traceback |
| from pathlib import Path |
| from typing import Dict, Any, List, Tuple |
|
|
| import pandas as pd |
| import gradio as gr |
| import papermill as pm |
| import plotly.graph_objects as go |
|
|
| |
| try: |
| from huggingface_hub import InferenceClient |
| except Exception: |
| InferenceClient = None |
|
|
| |
| |
| |
|
|
| BASE_DIR = Path(__file__).resolve().parent |
|
|
| NB1 = os.environ.get("NB1", "datacreation.ipynb").strip() |
| NB2 = os.environ.get("NB2", "pythonanalysis.ipynb").strip() |
|
|
| RUNS_DIR = BASE_DIR / "runs" |
| ART_DIR = BASE_DIR / "artifacts" |
| PY_FIG_DIR = ART_DIR / "py" / "figures" |
| PY_TAB_DIR = ART_DIR / "py" / "tables" |
|
|
| PAPERMILL_TIMEOUT = int(os.environ.get("PAPERMILL_TIMEOUT", "1800")) |
| MAX_PREVIEW_ROWS = int(os.environ.get("MAX_FILE_PREVIEW_ROWS", "50")) |
| MAX_LOG_CHARS = int(os.environ.get("MAX_LOG_CHARS", "8000")) |
|
|
| HF_API_KEY = os.environ.get("HF_API_KEY", "").strip() |
| MODEL_NAME = os.environ.get("MODEL_NAME", "deepseek-ai/DeepSeek-R1").strip() |
| HF_PROVIDER = os.environ.get("HF_PROVIDER", "novita").strip() |
| N8N_WEBHOOK_URL = os.environ.get("N8N_WEBHOOK_URL", "").strip() |
|
|
| LLM_ENABLED = bool(HF_API_KEY) and InferenceClient is not None |
| llm_client = ( |
| InferenceClient(provider=HF_PROVIDER, api_key=HF_API_KEY) |
| if LLM_ENABLED |
| else None |
| ) |
|
|
| |
| |
| |
|
|
| def ensure_dirs(): |
| for p in [RUNS_DIR, ART_DIR, PY_FIG_DIR, PY_TAB_DIR]: |
| p.mkdir(parents=True, exist_ok=True) |
|
|
| def stamp(): |
| return time.strftime("%Y%m%d-%H%M%S") |
|
|
| def tail(text: str, n: int = MAX_LOG_CHARS) -> str: |
| return (text or "")[-n:] |
|
|
| def _ls(dir_path: Path, exts: Tuple[str, ...]) -> List[str]: |
| if not dir_path.is_dir(): |
| return [] |
| return sorted(p.name for p in dir_path.iterdir() if p.is_file() and p.suffix.lower() in exts) |
|
|
| def _read_csv(path: Path) -> pd.DataFrame: |
| return pd.read_csv(path, nrows=MAX_PREVIEW_ROWS) |
|
|
| def _read_json(path: Path): |
| with path.open(encoding="utf-8") as f: |
| return json.load(f) |
|
|
| def artifacts_index() -> Dict[str, Any]: |
| return { |
| "python": { |
| "figures": _ls(PY_FIG_DIR, (".png", ".jpg", ".jpeg")), |
| "tables": _ls(PY_TAB_DIR, (".csv", ".json")), |
| }, |
| } |
|
|
| |
| |
| |
|
|
| def run_notebook(nb_name: str) -> str: |
| ensure_dirs() |
| nb_in = BASE_DIR / nb_name |
| if not nb_in.exists(): |
| return f"ERROR: {nb_name} not found." |
| nb_out = RUNS_DIR / f"run_{stamp()}_{nb_name}" |
| pm.execute_notebook( |
| input_path=str(nb_in), |
| output_path=str(nb_out), |
| cwd=str(BASE_DIR), |
| log_output=True, |
| progress_bar=False, |
| request_save_on_cell_execute=True, |
| execution_timeout=PAPERMILL_TIMEOUT, |
| ) |
| return f"Executed {nb_name}" |
|
|
|
|
| def run_datacreation() -> str: |
| try: |
| log = run_notebook(NB1) |
| csvs = [f.name for f in BASE_DIR.glob("*.csv")] |
| return f"OK {log}\n\nCSVs now in /app:\n" + "\n".join(f" - {c}" for c in sorted(csvs)) |
| except Exception as e: |
| return f"FAILED {e}\n\n{traceback.format_exc()[-2000:]}" |
|
|
|
|
| def run_pythonanalysis() -> str: |
| try: |
| log = run_notebook(NB2) |
| idx = artifacts_index() |
| figs = idx["python"]["figures"] |
| tabs = idx["python"]["tables"] |
| return ( |
| f"OK {log}\n\n" |
| f"Figures: {', '.join(figs) or '(none)'}\n" |
| f"Tables: {', '.join(tabs) or '(none)'}" |
| ) |
| except Exception as e: |
| return f"FAILED {e}\n\n{traceback.format_exc()[-2000:]}" |
|
|
|
|
| def run_full_pipeline() -> str: |
| logs = [] |
| logs.append("=" * 50) |
| logs.append("STEP 1/2: Data Creation (web scraping + synthetic data)") |
| logs.append("=" * 50) |
| logs.append(run_datacreation()) |
| logs.append("") |
| logs.append("=" * 50) |
| logs.append("STEP 2/2: Python Analysis (sentiment, ARIMA, dashboard)") |
| logs.append("=" * 50) |
| logs.append(run_pythonanalysis()) |
| return "\n".join(logs) |
|
|
|
|
| |
| |
| |
|
|
| def _load_all_figures() -> List[str]: |
| """Return list of figure filepaths for Gallery.""" |
| return [str(p) for p in sorted(PY_FIG_DIR.glob("*.png"))] |
|
|
|
|
| def _load_table_safe(path: Path) -> pd.DataFrame: |
| try: |
| if path.suffix == ".json": |
| obj = _read_json(path) |
| if isinstance(obj, dict): |
| return pd.DataFrame([obj]) |
| return pd.DataFrame(obj) |
| return _read_csv(path) |
| except Exception as e: |
| return pd.DataFrame([{"error": str(e)}]) |
|
|
|
|
| def refresh_gallery(): |
| figures = _load_all_figures() |
|
|
| preferred_tables = [ |
| "df_dashboard.csv", |
| "ticker_summary.csv", |
| "rule_based_investment_signals.csv", |
| "real_dataset_with_vader.csv", |
| "merged_real_synthetic_dataset.csv", |
| "random_forest_feature_importance.csv", |
| "lstm_predictions.csv", |
| ] |
|
|
| table_choices = [t for t in preferred_tables if (PY_TAB_DIR / t).exists()] |
|
|
| default_df = pd.DataFrame() |
| if table_choices: |
| default_df = _load_table_safe(PY_TAB_DIR / table_choices[0]) |
|
|
| return ( |
| figures if figures else [], |
| gr.update(choices=table_choices, value=table_choices[0] if table_choices else None), |
| default_df, |
| ) |
|
|
|
|
| def on_table_select(choice: str): |
| if not choice: |
| return pd.DataFrame([{"hint": "Select a table above."}]) |
| path = PY_TAB_DIR / choice |
| if not path.exists(): |
| return pd.DataFrame([{"error": f"File not found: {choice}"}]) |
| return _load_table_safe(path) |
|
|
|
|
| |
| |
| |
|
|
| def load_kpis() -> Dict[str, Any]: |
| for candidate in [PY_TAB_DIR / "kpis.json", PY_FIG_DIR / "kpis.json"]: |
| if candidate.exists(): |
| try: |
| return _read_json(candidate) |
| except Exception: |
| pass |
| return {} |
|
|
|
|
| |
| |
| |
|
|
| DASHBOARD_SYSTEM = """You are an AI dashboard assistant for a book-sales analytics app. |
| The user asks questions or requests about their data. You have access to pre-computed |
| artifacts from a Python analysis pipeline. |
| |
| AVAILABLE ARTIFACTS (only reference ones that exist): |
| {artifacts_json} |
| |
| KPI SUMMARY: {kpis_json} |
| |
| YOUR JOB: |
| 1. Answer the user's question conversationally using the KPIs and your knowledge of the artifacts. |
| 2. At the END of your response, output a JSON block (fenced with ```json ... ```) that tells |
| the dashboard which artifact to display. The JSON must have this shape: |
| {{"show": "figure"|"table"|"none", "scope": "python", "filename": "..."}} |
| |
| - Use "show": "figure" to display a chart image. |
| - Use "show": "table" to display a CSV/JSON table. |
| - Use "show": "none" if no artifact is relevant. |
| |
| RULES: |
| - If the user asks about sales trends or forecasting by title, show sales_trends or arima figures. |
| - If the user asks about sentiment, show sentiment figure or sentiment_counts table. |
| - If the user asks about forecast accuracy or ARIMA, show arima figures. |
| - If the user asks about top sellers, show top_titles_by_units_sold.csv. |
| - If the user asks a general data question, pick the most relevant artifact. |
| - Keep your answer concise (2-4 sentences), then the JSON block. |
| """ |
|
|
| JSON_BLOCK_RE = re.compile(r"```json\s*(\{.*?\})\s*```", re.DOTALL) |
| FALLBACK_JSON_RE = re.compile(r"\{[^{}]*\"show\"[^{}]*\}", re.DOTALL) |
|
|
|
|
| def _parse_display_directive(text: str) -> Dict[str, str]: |
| m = JSON_BLOCK_RE.search(text) |
| if m: |
| try: |
| return json.loads(m.group(1)) |
| except json.JSONDecodeError: |
| pass |
| m = FALLBACK_JSON_RE.search(text) |
| if m: |
| try: |
| return json.loads(m.group(0)) |
| except json.JSONDecodeError: |
| pass |
| return {"show": "none"} |
|
|
|
|
| def _clean_response(text: str) -> str: |
| """Strip the JSON directive block from the displayed response.""" |
| return JSON_BLOCK_RE.sub("", text).strip() |
|
|
| def _build_n8n_context() -> Dict[str, Any]: |
| kpis = load_kpis() |
|
|
| def preview_csv(filename: str, n: int = 5) -> List[Dict[str, Any]]: |
| path = PY_TAB_DIR / filename |
| if not path.exists(): |
| return [] |
| try: |
| df = pd.read_csv(path).head(n) |
| return df.to_dict(orient="records") |
| except Exception: |
| return [] |
|
|
| return { |
| "kpis": kpis, |
| "ticker_summary_preview": preview_csv("ticker_summary.csv", 5), |
| "real_sentiment_summary_preview": preview_csv("real_sentiment_summary.csv", 5), |
| "synthetic_sentiment_summary_preview": preview_csv("synthetic_sentiment_summary.csv", 5), |
| "rf_feature_importance_preview": preview_csv("random_forest_feature_importance.csv", 5), |
| } |
|
|
| def _n8n_call(msg: str) -> Tuple[str, Dict]: |
| import requests as req |
|
|
| def _extract_answer_chart_from_text(text: str): |
| text = (text or "").strip() |
| if not text: |
| return "", "none" |
|
|
| answer = "" |
| chart = "none" |
|
|
| answer_match = re.search(r"ANSWER:\s*(.*?)\s*CHART:", text, flags=re.IGNORECASE | re.DOTALL) |
| chart_match = re.search(r"CHART:\s*(sales|sentiment|top_sellers|returns|none)", text, flags=re.IGNORECASE) |
|
|
| if answer_match: |
| answer = answer_match.group(1).strip() |
| else: |
| |
| answer = re.sub( |
| r"CHART:\s*(sales|sentiment|top_sellers|returns|none)", |
| "", |
| text, |
| flags=re.IGNORECASE, |
| ).strip() |
|
|
| if chart_match: |
| chart = chart_match.group(1).strip().lower() |
|
|
| return answer, chart |
|
|
| def _normalize_payload(data): |
| """ |
| Handle common n8n webhook output shapes: |
| - {"answer": "...", "chart": "..."} |
| - [{"answer": "...", "chart": "..."}] |
| - {"json": {"answer": "...", "chart": "..."}} |
| - {"body": {"answer": "...", "chart": "..."}} |
| - stringified JSON |
| """ |
| if isinstance(data, list): |
| if not data: |
| return {} |
| data = data[0] |
|
|
| if isinstance(data, dict) and "json" in data and isinstance(data["json"], dict): |
| data = data["json"] |
|
|
| if isinstance(data, dict) and "body" in data: |
| body = data["body"] |
| if isinstance(body, dict): |
| data = body |
| elif isinstance(body, str): |
| try: |
| data = json.loads(body) |
| except Exception: |
| pass |
|
|
| if isinstance(data, str): |
| try: |
| data = json.loads(data) |
| except Exception: |
| |
| return data |
|
|
| if isinstance(data, list): |
| if not data: |
| return {} |
| data = data[0] |
|
|
| if isinstance(data, dict) and "json" in data and isinstance(data["json"], dict): |
| data = data["json"] |
|
|
| return data |
|
|
| try: |
| payload = { |
| "question": msg, |
| "dashboard_context": _build_n8n_context(), |
| } |
|
|
| resp = req.post(N8N_WEBHOOK_URL, json=payload, timeout=30) |
| raw = (resp.text or "").strip() |
| print(f"[n8n] status={resp.status_code} raw={raw[:1000]}") |
|
|
| parsed = None |
|
|
| |
| try: |
| parsed = resp.json() |
| except Exception: |
| parsed = None |
|
|
| |
| if parsed is None and raw: |
| try: |
| parsed = json.loads(raw) |
| except Exception: |
| parsed = None |
|
|
| parsed = _normalize_payload(parsed) |
|
|
| valid_charts = {"sales", "sentiment", "top_sellers", "returns", "none"} |
|
|
| |
| if isinstance(parsed, dict): |
| answer = str(parsed.get("answer", "") or "").strip() |
| chart = str(parsed.get("chart", "none") or "none").strip().lower() |
|
|
| if chart not in valid_charts: |
| chart = "none" |
|
|
| if answer: |
| directive = {"show": "figure", "chart": chart} if chart != "none" else {"show": "none"} |
| return answer, directive |
|
|
| |
| raw_model_output = str(parsed.get("raw_model_output", "") or "").strip() |
| if raw_model_output: |
| answer2, chart2 = _extract_answer_chart_from_text(raw_model_output) |
| if chart2 not in valid_charts: |
| chart2 = "none" |
| if answer2: |
| directive = {"show": "figure", "chart": chart2} if chart2 != "none" else {"show": "none"} |
| return answer2, directive |
|
|
| |
| if isinstance(parsed, str) and parsed.strip(): |
| answer, chart = _extract_answer_chart_from_text(parsed) |
| if chart not in valid_charts: |
| chart = "none" |
| if answer: |
| directive = {"show": "figure", "chart": chart} if chart != "none" else {"show": "none"} |
| return answer, directive |
|
|
| |
| if raw: |
| answer, chart = _extract_answer_chart_from_text(raw) |
| if chart not in valid_charts: |
| chart = "none" |
| if answer: |
| directive = {"show": "figure", "chart": chart} if chart != "none" else {"show": "none"} |
| return answer, directive |
|
|
| return "", None |
|
|
| except req.exceptions.Timeout: |
| return "", None |
| except Exception as e: |
| print(f"[n8n] exception: {e}") |
| return "", None |
|
|
|
|
| def ai_chat(user_msg: str, history: list): |
| """Chat function for the AI Dashboard tab.""" |
| if not user_msg or not user_msg.strip(): |
| return history, "", None, None |
|
|
| idx = artifacts_index() |
| kpis = load_kpis() |
|
|
| |
| if N8N_WEBHOOK_URL: |
| reply, directive = _n8n_call(user_msg) |
| print(f"[AI DASHBOARD] n8n reply={reply!r} directive={directive}") |
| if not reply.strip(): |
| reply, directive = _keyword_fallback(user_msg, idx, kpis) |
| print(f"[AI DASHBOARD] using fallback reply={reply!r} directive={directive}") |
| elif not LLM_ENABLED: |
| reply, directive = _keyword_fallback(user_msg, idx, kpis) |
| else: |
| system = DASHBOARD_SYSTEM.format( |
| artifacts_json=json.dumps(idx, indent=2), |
| kpis_json=json.dumps(kpis, indent=2) if kpis else "(no KPIs yet, run the pipeline first)", |
| ) |
| msgs = [{"role": "system", "content": system}] |
| for entry in (history or [])[-6:]: |
| msgs.append(entry) |
| msgs.append({"role": "user", "content": user_msg}) |
|
|
| try: |
| r = llm_client.chat_completion( |
| model=MODEL_NAME, |
| messages=msgs, |
| temperature=0.3, |
| max_tokens=600, |
| stream=False, |
| ) |
| raw = ( |
| r["choices"][0]["message"]["content"] |
| if isinstance(r, dict) |
| else r.choices[0].message.content |
| ) |
| directive = _parse_display_directive(raw) |
| reply = _clean_response(raw) |
| except Exception as e: |
| reply = f"LLM error: {e}. Falling back to keyword matching." |
| reply_fb, directive = _keyword_fallback(user_msg, idx, kpis) |
| reply += "\n\n" + reply_fb |
|
|
| |
| chart_out = None |
| tab_out = None |
| show = directive.get("show", "none") |
| fname = directive.get("filename", "") |
| chart_name = directive.get("chart", "") |
|
|
| |
| chart_builders = { |
| "sales": build_sales_chart, |
| "sentiment": build_sentiment_chart, |
| "top_sellers": build_top_sellers_chart, |
| "returns": build_return_distribution_chart, |
| } |
|
|
| if chart_name and chart_name in chart_builders: |
| chart_out = chart_builders[chart_name]() |
| elif show == "figure" and fname: |
| |
| if "sales_trend" in fname: |
| chart_out = build_sales_chart() |
| elif "sentiment" in fname: |
| chart_out = build_sentiment_chart() |
| elif "arima" in fname or "forecast" in fname: |
| chart_out = build_sales_chart() |
| else: |
| chart_out = _empty_chart(f"No interactive chart for {fname}") |
|
|
| if show == "table" and fname: |
| fp = PY_TAB_DIR / fname |
| if fp.exists(): |
| tab_out = _load_table_safe(fp) |
| else: |
| reply += f"\n\n*(Could not find table: {fname})*" |
|
|
| new_history = (history or []) + [ |
| {"role": "user", "content": user_msg}, |
| {"role": "assistant", "content": reply}, |
| ] |
|
|
| return new_history, "", chart_out, tab_out |
|
|
|
|
| def _keyword_fallback(msg: str, idx: Dict, kpis: Dict) -> Tuple[str, Dict]: |
| msg_lower = msg.lower() |
|
|
| if not idx["python"]["figures"] and not idx["python"]["tables"]: |
| return ( |
| "No artifacts are available yet. Please run the pipeline first so the finance analysis outputs can be loaded.", |
| {"show": "none"}, |
| ) |
|
|
| n_tickers = kpis.get("n_tickers", "?") |
| n_days = kpis.get("n_days_real", "?") |
| buy_signals = kpis.get("buy_signals", "?") |
| hold_signals = kpis.get("hold_signals", "?") |
| sell_signals = kpis.get("sell_signals", "?") |
| rf_accuracy = kpis.get("random_forest_accuracy", None) |
| corr_compound = kpis.get("corr_compound_vs_return", None) |
| corr_headline = kpis.get("corr_headline_vs_return", None) |
| agreement = kpis.get("agreement_synth_vs_vader", None) |
|
|
| def fmt(v, digits=3): |
| if v is None: |
| return None |
| try: |
| return f"{float(v):.{digits}f}" |
| except Exception: |
| return str(v) |
|
|
| rf_text = fmt(rf_accuracy) |
| corr_compound_text = fmt(corr_compound) |
| corr_headline_text = fmt(corr_headline) |
| agreement_text = fmt(agreement) |
|
|
| if any(w in msg_lower for w in ["price", "stock trend", "price trend", "normalized", "stock price"]): |
| reply = ( |
| f"The normalized stock price chart compares relative short-term price movement across {n_tickers} tickers over {n_days} trading days. " |
| f"It is built from the real dataset fields `ticker`, `date`, and `close`, so it is most useful for comparing movement patterns rather than absolute price levels." |
| ) |
| return reply, {"show": "figure", "chart": "sales"} |
|
|
| if any(w in msg_lower for w in ["sentiment", "vader", "compound", "headline tone"]): |
| extra = "" |
| if agreement_text is not None: |
| extra = f" The agreement between synthetic sentiment labels and VADER sentiment is {agreement_text}, so the comparison should be interpreted cautiously." |
| reply = ( |
| "The sentiment chart is grounded in `real_dataset_with_vader.csv`, which contains VADER compound scores derived from aggregated financial headlines. " |
| "It helps show whether headline tone shifts over time and whether those shifts appear to line up at all with short-term market movement." |
| + extra |
| ) |
| return reply, {"show": "figure", "chart": "sentiment"} |
|
|
| if any(w in msg_lower for w in ["signal", "buy", "hold", "sell", "recommendation", "ticker"]): |
| reply = ( |
| f"The investment signal chart summarizes the rule-based outputs across tickers, with {buy_signals} buy signals, {hold_signals} hold signals, and {sell_signals} sell signals in the current sample. " |
| "This suggests the strategy is behaving conservatively, with most observations falling into hold rather than strong directional recommendations." |
| ) |
| return reply, {"show": "figure", "chart": "top_sellers"} |
|
|
| if any(w in msg_lower for w in ["synthetic", "real vs synthetic", "return distribution", "returns", "compare returns"]): |
| extra = "" |
| if corr_compound_text is not None: |
| extra = f" The observed correlation between VADER compound and next-day return is {corr_compound_text}, which supports a cautious interpretation if it is close to zero." |
| reply = ( |
| "The return comparison chart is the best view for checking whether the synthetic return distribution behaves similarly to the observed next-day return distribution. " |
| "It helps assess realism in the simulated data without implying that either distribution alone provides strong predictive signal." |
| + extra |
| ) |
| return reply, {"show": "figure", "chart": "returns"} |
|
|
| if any(w in msg_lower for w in ["model", "random forest", "accuracy", "prediction", "predictive"]): |
| extra = "" |
| if rf_text is not None: |
| extra += f" The current Random Forest accuracy is {rf_text}." |
| if corr_headline_text is not None: |
| extra += f" The correlation between headline count and next-day return is {corr_headline_text}." |
| if corr_compound_text is not None: |
| extra += f" The correlation between VADER compound and next-day return is {corr_compound_text}." |
| reply = ( |
| "The modelling results should be interpreted conservatively because the project tests whether headline-derived sentiment carries usable short-term signal in a noisy market setting." |
| + extra |
| ) |
| return reply, {"show": "table", "scope": "python", "filename": "random_forest_feature_importance.csv"} |
|
|
| if any(w in msg_lower for w in ["dashboard", "overview", "summary", "kpi"]): |
| reply = ( |
| f"This dashboard covers {n_tickers} tickers across {n_days} trading days and combines price movement, headline sentiment, synthetic comparisons, and investment signals. " |
| "The most useful overview files are `df_dashboard.csv`, `ticker_summary.csv`, and `real_dataset_with_vader.csv`." |
| ) |
| return reply, {"show": "table", "scope": "python", "filename": "df_dashboard.csv"} |
|
|
| return ( |
| "I can answer questions about stock price trends, VADER sentiment, investment signals, real versus synthetic returns, and model interpretation using the dashboard artifacts as the source of truth.", |
| {"show": "none"}, |
| ) |
|
|
| |
| |
| |
|
|
| def render_kpi_cards() -> str: |
| kpis = load_kpis() |
| if not kpis: |
| return ( |
| '<div style="background:rgba(255,255,255,.65);backdrop-filter:blur(16px);' |
| 'border-radius:20px;padding:28px;text-align:center;' |
| 'border:1.5px solid rgba(255,255,255,.7);' |
| 'box-shadow:0 8px 32px rgba(124,92,191,.08);">' |
| '<div style="font-size:36px;margin-bottom:10px;">📊</div>' |
| '<div style="color:#a48de8;font-size:14px;' |
| 'font-weight:800;margin-bottom:6px;">No data yet</div>' |
| '<div style="color:#9d8fc4;font-size:12px;">' |
| 'Run the pipeline to populate these cards.</div>' |
| '</div>' |
| ) |
|
|
| def card(icon, label, value, colour): |
| return f""" |
| <div style="background:rgba(255,255,255,.72);backdrop-filter:blur(16px); |
| border-radius:20px;padding:18px 14px 16px;text-align:center; |
| border:1.5px solid rgba(255,255,255,.8); |
| box-shadow:0 4px 16px rgba(124,92,191,.08); |
| border-top:3px solid {colour};"> |
| <div style="font-size:26px;margin-bottom:7px;line-height:1;">{icon}</div> |
| <div style="color:#9d8fc4;font-size:9.5px;text-transform:uppercase; |
| letter-spacing:1.8px;margin-bottom:7px;font-weight:800;">{label}</div> |
| <div style="color:#2d1f4e;font-size:16px;font-weight:800;">{value}</div> |
| </div>""" |
|
|
| kpi_config = [ |
| ("n_tickers", "📈", "N Tickers", "#a48de8"), |
| ("n_rows_real", "📄", "N Rows Real", "#7aa6f8"), |
| ("n_rows_synth", "🧪", "N Rows Synth", "#6ee7c7"), |
| ("n_days_real", "📅", "N Days Real", "#3dcba8"), |
| ("avg_headline_count", "📰", "Avg Headline Count", "#8fa8f8"), |
| ("avg_next_day_return_real", "💹", "Avg Next Day Return", "#c45ea8"), |
| ("avg_synthetic_return", "🧬", "Avg Synthetic Return", "#e8a230"), |
| ("agreement_synth_vs_vader", "🤝", "Agreement Synth vs VADER", "#7c5cbf"), |
| ("corr_headline_vs_return", "🔗", "Corr Headline vs Return", "#5e8fef"), |
| ("corr_compound_vs_return", "🧠", "Corr Compound vs Return", "#e8537a"), |
| ("buy_signals", "🟢", "Buy Signals", "#2ec4a0"), |
| ("hold_signals", "🟡", "Hold Signals", "#e8a230"), |
| ("sell_signals", "🔴", "Sell Signals", "#e8537a"), |
| ("random_forest_accuracy", "🌲", "Random Forest Accuracy", "#6aaa3a"), |
| ("lstm_rmse", "🤖", "LSTM RMSE", "#5e8fef"), |
| ] |
|
|
| html = ( |
| '<div style="display:grid;grid-template-columns:repeat(auto-fit,minmax(140px,1fr));' |
| 'gap:12px;margin-bottom:24px;">' |
| ) |
| for key, icon, label, colour in kpi_config: |
| val = kpis.get(key) |
| if val is None: |
| continue |
| if isinstance(val, float): |
| val = round(val, 4) |
| elif isinstance(val, int) and val > 100: |
| val = f"{val:,}" |
| |
| html += card(icon, label, str(val), colour) |
| html += "</div>" |
| return html |
|
|
|
|
| |
| |
| |
|
|
| CHART_PALETTE = ["#7c5cbf", "#2ec4a0", "#e8537a", "#e8a230", "#5e8fef", |
| "#c45ea8", "#3dbacc", "#a0522d", "#6aaa3a", "#d46060"] |
|
|
| def _styled_layout(**kwargs) -> dict: |
| defaults = dict( |
| template="plotly_white", |
| paper_bgcolor="rgba(255,255,255,0.95)", |
| plot_bgcolor="rgba(255,255,255,0.98)", |
| font=dict(family="system-ui, sans-serif", color="#2d1f4e", size=12), |
| margin=dict(l=60, r=20, t=70, b=70), |
| legend=dict( |
| orientation="h", yanchor="bottom", y=1.02, xanchor="right", x=1, |
| bgcolor="rgba(255,255,255,0.92)", |
| bordercolor="rgba(124,92,191,0.35)", borderwidth=1, |
| ), |
| title=dict(font=dict(size=15, color="#4b2d8a")), |
| ) |
| defaults.update(kwargs) |
| return defaults |
|
|
|
|
| def _empty_chart(title: str) -> go.Figure: |
| fig = go.Figure() |
| fig.update_layout( |
| title=title, height=420, template="plotly_white", |
| paper_bgcolor="rgba(255,255,255,0.95)", |
| annotations=[dict(text="Run the pipeline to generate data", |
| x=0.5, y=0.5, xref="paper", yref="paper", showarrow=False, |
| font=dict(size=14, color="rgba(124,92,191,0.5)"))], |
| ) |
| return fig |
|
|
|
|
| def build_sales_chart() -> go.Figure: |
| path = PY_TAB_DIR / "real_dataset_with_vader.csv" |
| if not path.exists(): |
| return _empty_chart("Normalized Stock Price Trends — run the pipeline first") |
|
|
| df = pd.read_csv(path) |
| required = {"ticker", "date", "close"} |
| if not required.issubset(df.columns): |
| return _empty_chart("Missing required columns in real_dataset_with_vader.csv") |
|
|
| df["date"] = pd.to_datetime(df["date"], errors="coerce") |
| df = df.dropna(subset=["date", "close"]).copy() |
|
|
| sample_tickers = sorted(df["ticker"].dropna().unique())[:5] |
| df = df[df["ticker"].isin(sample_tickers)].copy() |
|
|
| df = df.sort_values(["ticker", "date"]) |
| df["normalized_price"] = df.groupby("ticker")["close"].transform(lambda s: s / s.iloc[0] * 100) |
|
|
| fig = go.Figure() |
| for i, ticker in enumerate(sample_tickers): |
| temp = df[df["ticker"] == ticker] |
| fig.add_trace(go.Scatter( |
| x=temp["date"], |
| y=temp["normalized_price"], |
| name=ticker, |
| mode="lines+markers", |
| line=dict(color=CHART_PALETTE[i % len(CHART_PALETTE)], width=2), |
| marker=dict(size=5), |
| hovertemplate=f"<b>{ticker}</b><br>%{{x|%Y-%m-%d}}<br>Normalized: %{{y:.2f}}<extra></extra>", |
| )) |
|
|
| fig.update_layout(**_styled_layout( |
| height=450, |
| hovermode="x unified", |
| title=dict(text="Normalized Stock Price Trends (Base = 100)") |
| )) |
| fig.update_xaxes(gridcolor="rgba(124,92,191,0.15)", showgrid=True) |
| fig.update_yaxes(gridcolor="rgba(124,92,191,0.15)", showgrid=True, title="Normalized Price") |
| return fig |
|
|
|
|
| def build_sentiment_chart() -> go.Figure: |
| path = PY_TAB_DIR / "real_dataset_with_vader.csv" |
| if not path.exists(): |
| return _empty_chart("VADER Sentiment Over Time — run the pipeline first") |
|
|
| df = pd.read_csv(path) |
| required = {"ticker", "date", "compound"} |
| if not required.issubset(df.columns): |
| return _empty_chart("Missing required columns in real_dataset_with_vader.csv") |
|
|
| df["date"] = pd.to_datetime(df["date"], errors="coerce") |
| df = df.dropna(subset=["date", "compound"]).copy() |
|
|
| sample_tickers = sorted(df["ticker"].dropna().unique())[:5] |
| df = df[df["ticker"].isin(sample_tickers)].copy().sort_values(["ticker", "date"]) |
|
|
| fig = go.Figure() |
| for i, ticker in enumerate(sample_tickers): |
| temp = df[df["ticker"] == ticker] |
| fig.add_trace(go.Scatter( |
| x=temp["date"], |
| y=temp["compound"], |
| name=ticker, |
| mode="lines+markers", |
| line=dict(color=CHART_PALETTE[i % len(CHART_PALETTE)], width=2), |
| marker=dict(size=5), |
| hovertemplate=f"<b>{ticker}</b><br>%{{x|%Y-%m-%d}}<br>Compound: %{{y:.3f}}<extra></extra>", |
| )) |
|
|
| fig.update_layout(**_styled_layout( |
| height=450, |
| hovermode="x unified", |
| title=dict(text="VADER Sentiment Over Time") |
| )) |
| fig.update_xaxes(gridcolor="rgba(124,92,191,0.15)", showgrid=True) |
| fig.update_yaxes(gridcolor="rgba(124,92,191,0.15)", showgrid=True, title="VADER Compound Score") |
| return fig |
|
|
|
|
| def build_top_sellers_chart() -> go.Figure: |
| path = PY_TAB_DIR / "rule_based_investment_signals.csv" |
| if not path.exists(): |
| return _empty_chart("Investment Signal Distribution by Ticker — run the pipeline first") |
|
|
| df = pd.read_csv(path) |
| required = {"ticker", "investment_signal"} |
| if not required.issubset(df.columns): |
| return _empty_chart("Missing required columns in rule_based_investment_signals.csv") |
|
|
| counts = ( |
| df.groupby(["ticker", "investment_signal"]) |
| .size() |
| .unstack(fill_value=0) |
| .reset_index() |
| ) |
|
|
| for col in ["buy", "hold", "sell"]: |
| if col not in counts.columns: |
| counts[col] = 0 |
|
|
| colors = {"buy": "#2ec4a0", "hold": "#e8a230", "sell": "#e8537a"} |
|
|
| fig = go.Figure() |
| for signal in ["buy", "hold", "sell"]: |
| fig.add_trace(go.Bar( |
| x=counts["ticker"], |
| y=counts[signal], |
| name=signal.title(), |
| marker_color=colors[signal], |
| hovertemplate=f"<b>{signal.title()}</b><br>Ticker: %{{x}}<br>Count: %{{y}}<extra></extra>", |
| )) |
|
|
| fig.update_layout(**_styled_layout( |
| height=450, |
| barmode="stack", |
| title=dict(text="Investment Signal Distribution by Ticker") |
| )) |
| fig.update_xaxes(title="Ticker") |
| fig.update_yaxes(title="Count", gridcolor="rgba(124,92,191,0.15)", showgrid=True) |
| return fig |
|
|
|
|
| def refresh_dashboard(): |
| return ( |
| render_kpi_cards(), |
| build_sales_chart(), |
| build_sentiment_chart(), |
| build_top_sellers_chart(), |
| build_return_distribution_chart(), |
| ) |
|
|
| def build_return_distribution_chart() -> go.Figure: |
| real_path = PY_TAB_DIR / "real_dataset_with_vader.csv" |
| synth_path = PY_TAB_DIR / "synthetic_dataset_analysis_ready.csv" |
|
|
| if not real_path.exists() or not synth_path.exists(): |
| return _empty_chart("Return Distribution: Real vs Synthetic — run the pipeline first") |
|
|
| df_real = pd.read_csv(real_path) |
| df_synth = pd.read_csv(synth_path) |
|
|
| if "next_day_return" not in df_real.columns or "avg_synthetic_return" not in df_synth.columns: |
| return _empty_chart("Missing return columns in saved datasets") |
|
|
| real_vals = df_real["next_day_return"].dropna() |
| synth_vals = df_synth["avg_synthetic_return"].dropna() |
|
|
| fig = go.Figure() |
| fig.add_trace(go.Histogram( |
| x=real_vals, |
| name="Real", |
| opacity=0.6, |
| marker_color="#5e8fef", |
| nbinsx=30, |
| )) |
| fig.add_trace(go.Histogram( |
| x=synth_vals, |
| name="Synthetic", |
| opacity=0.6, |
| marker_color="#e8a230", |
| nbinsx=30, |
| )) |
|
|
| fig.update_layout(**_styled_layout( |
| height=450, |
| barmode="overlay", |
| title=dict(text="Return Distribution: Real vs Synthetic") |
| )) |
| fig.update_xaxes(title="Return") |
| fig.update_yaxes(title="Frequency", gridcolor="rgba(124,92,191,0.15)", showgrid=True) |
| return fig |
|
|
| |
| |
| |
|
|
| ensure_dirs() |
|
|
| def load_css() -> str: |
| css_path = BASE_DIR / "style.css" |
| base_css = css_path.read_text(encoding="utf-8") if css_path.exists() else "" |
|
|
| overrides = """ |
| /* ========================= |
| DATA TABLE: force dark headers and cells |
| ========================= */ |
| #table_preview table, |
| #table_preview .table-wrap, |
| #table_preview [role="grid"] { |
| color: #1a1a1a !important; |
| background: #ffffff !important; |
| } |
| |
| #table_preview thead th, |
| #table_preview th, |
| #table_preview [role="columnheader"] { |
| color: #111111 !important; |
| -webkit-text-fill-color: #111111 !important; |
| opacity: 1 !important; |
| background: #ece8f5 !important; |
| font-weight: 700 !important; |
| text-shadow: none !important; |
| } |
| |
| #table_preview tbody td, |
| #table_preview td, |
| #table_preview [role="gridcell"] { |
| color: #1f1f1f !important; |
| -webkit-text-fill-color: #1f1f1f !important; |
| opacity: 1 !important; |
| background: #ffffff !important; |
| text-shadow: none !important; |
| } |
| |
| #table_preview * { |
| text-shadow: none !important; |
| } |
| |
| /* ========================= |
| GALLERY: just container styling |
| ========================= */ |
| #static_gallery { |
| background: rgba(255,255,255,0.92) !important; |
| border-radius: 12px; |
| } |
| |
| #static_gallery img { |
| background: #f7f7fb !important; |
| border: 1px solid rgba(124,92,191,0.18) !important; |
| border-radius: 10px !important; |
| padding: 6px !important; |
| } |
| |
| #static_gallery figcaption, |
| #static_gallery .caption-label, |
| #static_gallery [class*="caption"] { |
| color: #2d1f4e !important; |
| background: rgba(255,255,255,0.9) !important; |
| font-weight: 600 !important; |
| } |
| """ |
| return base_css + "\n" + overrides |
| |
| with gr.Blocks(title="Stock Sentiment & Market Impact Dashboard") as demo: |
|
|
| gr.Markdown( |
| "# Stock Sentiment Analyser\n" |
| "*Analyzing how financial headlines relate to short-term stock returns*", |
| elem_id="escp_title", |
| ) |
|
|
| |
| |
| |
| with gr.Tab("Pipeline Runner"): |
| gr.Markdown() |
|
|
| with gr.Row(): |
| with gr.Column(scale=1): |
| btn_nb1 = gr.Button("Step 1: Data Creation", variant="secondary") |
| with gr.Column(scale=1): |
| btn_nb2 = gr.Button("Step 2: Python Analysis", variant="secondary") |
|
|
| with gr.Row(): |
| btn_all = gr.Button("Run Full Pipeline (Both Steps)", variant="primary") |
|
|
| run_log = gr.Textbox( |
| label="Execution Log", |
| lines=18, |
| max_lines=30, |
| interactive=False, |
| ) |
|
|
| btn_nb1.click(run_datacreation, outputs=[run_log]) |
| btn_nb2.click(run_pythonanalysis, outputs=[run_log]) |
| btn_all.click(run_full_pipeline, outputs=[run_log]) |
|
|
| |
| |
| |
| with gr.Tab("Dashboard"): |
| kpi_html = gr.HTML(value=render_kpi_cards) |
|
|
| refresh_btn = gr.Button("Refresh Dashboard", variant="primary") |
|
|
| gr.Markdown("#### Interactive Charts") |
| chart_sales = gr.Plot(label="Normalized Stock Price Trends") |
| chart_sentiment = gr.Plot(label="VADER Sentiment Over Time") |
| chart_top = gr.Plot(label="Investment Signal Distribution by Ticker") |
| chart_returns = gr.Plot(label="Return Distribution: Real vs Synthetic") |
|
|
| gr.Markdown("#### Static Figures (from notebooks)") |
| gallery = gr.Gallery( |
| label="Generated Figures", |
| columns=2, |
| height=480, |
| object_fit="contain", |
| elem_id="static_gallery", |
| ) |
|
|
| gr.Markdown("#### Data Tables") |
| table_dropdown = gr.Dropdown( |
| label="Select a table to view", |
| choices=[], |
| interactive=True, |
| ) |
| table_display = gr.Dataframe( |
| label="Table Preview", |
| interactive=False, |
| elem_id="table_preview", |
| ) |
|
|
| def _on_refresh(): |
| kpi, c1, c2, c3, c4 = refresh_dashboard() |
| figs, dd, df = refresh_gallery() |
| return kpi, c1, c2, c3, c4, figs, dd, df |
|
|
| refresh_btn.click( |
| _on_refresh, |
| outputs=[kpi_html, chart_sales, chart_sentiment, chart_top, chart_returns, |
| gallery, table_dropdown, table_display], |
| ) |
| |
| table_dropdown.change( |
| on_table_select, |
| inputs=[table_dropdown], |
| outputs=[table_display], |
| ) |
|
|
| |
| |
| |
| with gr.Tab('"AI" Dashboard'): |
| _ai_status = ( |
| "Connected to your **n8n workflow**." if N8N_WEBHOOK_URL |
| else "**LLM active.**" if LLM_ENABLED |
| else "Using **keyword matching**. Upgrade options: " |
| "set `N8N_WEBHOOK_URL` to connect your n8n workflow, " |
| "or set `HF_API_KEY` for direct LLM access." |
| ) |
| gr.Markdown( |
| "### Ask questions, get interactive visualisations\n\n" |
| f"Type a question and the system will pick the right interactive chart or table. {_ai_status}" |
| ) |
|
|
| with gr.Row(equal_height=True): |
| with gr.Column(scale=1): |
| chatbot = gr.Chatbot( |
| label="Conversation", |
| height=380, |
| ) |
| user_input = gr.Textbox( |
| label="Ask about your data", |
| placeholder="e.g. Show me stock price trends / What are the buy recommendations? / Sentiment analysis", |
| lines=1, |
| ) |
| gr.Examples( |
| examples=[ |
| "Show me the normalized stock price trends", |
| "What does VADER sentiment look like over time?", |
| "Which tickers generate buy or hold signals?", |
| "Show the real vs synthetic return distribution", |
| "Give me a dashboard overview", |
| ], |
| inputs=user_input, |
| ) |
|
|
| with gr.Column(scale=1): |
| ai_figure = gr.Plot( |
| label="Interactive Chart", |
| ) |
| ai_table = gr.Dataframe( |
| label="Data Table", |
| interactive=False, |
| ) |
|
|
| user_input.submit( |
| ai_chat, |
| inputs=[user_input, chatbot], |
| outputs=[chatbot, user_input, ai_figure, ai_table], |
| ) |
|
|
|
|
| demo.launch(css=load_css(), allowed_paths=[str(BASE_DIR)]) |
|
|