SESSION_5 / app.py
llauravalente's picture
Update app.py
49431a7 verified
import os
import re
import json
import time
import traceback
import sys
import subprocess
from pathlib import Path
from typing import Dict, Any, List, Tuple
import pandas as pd
import gradio as gr
import papermill as pm
# Optional LLM (HuggingFace Inference API)
try:
from huggingface_hub import InferenceClient
except Exception:
InferenceClient = None
# =========================================================
# CONFIG + PERSONALIZATION DEFAULTS
# =========================================================
BASE_DIR = Path(__file__).resolve().parent
NB1 = os.environ.get("NB1", "datacreation.ipynb").strip()
NB2 = os.environ.get("NB2", "pythonanalysis.ipynb").strip()
NB3 = os.environ.get("NB3", "ranalysis.ipynb").strip()
RUNS_DIR = BASE_DIR / "runs"
ART_DIR = BASE_DIR / "artifacts"
PY_FIG_DIR = ART_DIR / "py" / "figures"
PY_TAB_DIR = ART_DIR / "py" / "tables"
R_FIG_DIR = ART_DIR / "r" / "figures"
R_TAB_DIR = ART_DIR / "r" / "tables"
PAPERMILL_TIMEOUT = int(os.environ.get("PAPERMILL_TIMEOUT", "1800"))
MAX_PREVIEW_ROWS = int(os.environ.get("MAX_FILE_PREVIEW_ROWS", "50"))
MAX_LOG_CHARS = int(os.environ.get("MAX_LOG_CHARS", "8000"))
HF_API_KEY = os.environ.get("HF_API_KEY", "").strip()
MODEL_NAME = os.environ.get("MODEL_NAME", "deepseek-ai/DeepSeek-R1").strip()
HF_PROVIDER = os.environ.get("HF_PROVIDER", "novita").strip()
# PERSONALIZATION: default user name and accent color (can be set in Space variables)
DEFAULT_USER_NAME = os.environ.get("USER_NAME", "Friend").strip()
DEFAULT_ACCENT = os.environ.get("ACCENT_COLOR", "#3b0b6f").strip()
LLM_ENABLED = bool(HF_API_KEY) and InferenceClient is not None
llm_client = (
InferenceClient(provider=HF_PROVIDER, api_key=HF_API_KEY)
if LLM_ENABLED
else None
)
# =========================================================
# KERNEL SETUP (for papermill to run notebooks)
# =========================================================
def ensure_python_kernelspec() -> str:
"""
Ensure a python kernelspec is available for papermill.
Relies on ipykernel being installed (add to requirements.txt).
"""
from jupyter_client.kernelspec import KernelSpecManager
ksm = KernelSpecManager()
specs = ksm.find_kernel_specs() # dict: name -> path
if not specs:
# Try to install a python kernelspec named python3
try:
import ipykernel # noqa: F401
except Exception as e:
raise RuntimeError(
"ipykernel is not installed. Add 'ipykernel' to requirements.txt and rebuild the Space.\n"
f"Original error: {e}"
)
subprocess.check_call(
[
sys.executable, "-m", "ipykernel", "install",
"--user",
"--name", "python3",
"--display-name", "Python 3 (Space)"
]
)
specs = ksm.find_kernel_specs()
# Prefer python3 if present, otherwise any kernel name containing 'python'
if "python3" in specs:
return "python3"
for name in specs:
if "python" in name.lower():
return name
raise RuntimeError(f"No usable Python kernel found. Available kernels: {list(specs.keys())}")
try:
PY_KERNEL = ensure_python_kernelspec()
KERNEL_INIT_ERROR = ""
except Exception as e:
PY_KERNEL = None
KERNEL_INIT_ERROR = str(e)
# =========================================================
# HELPERS
# =========================================================
def ensure_dirs():
for p in [RUNS_DIR, ART_DIR, PY_FIG_DIR, PY_TAB_DIR, R_FIG_DIR, R_TAB_DIR]:
p.mkdir(parents=True, exist_ok=True)
def stamp():
return time.strftime("%Y%m%d-%H%M%S")
def tail(text: str, n: int = MAX_LOG_CHARS) -> str:
return (text or "")[-n:]
def _ls(dir_path: Path, exts: Tuple[str, ...]) -> List[str]:
if not dir_path.is_dir():
return []
return sorted(p.name for p in dir_path.iterdir() if p.is_file() and p.suffix.lower() in exts)
def _read_csv(path: Path) -> pd.DataFrame:
return pd.read_csv(path, nrows=MAX_PREVIEW_ROWS)
def _read_json(path: Path):
with path.open(encoding="utf-8") as f:
return json.load(f)
def artifacts_index() -> Dict[str, Any]:
return {
"python": {
"figures": _ls(PY_FIG_DIR, (".png", ".jpg", ".jpeg")),
"tables": _ls(PY_TAB_DIR, (".csv", ".json")),
},
"r": {
"figures": _ls(R_FIG_DIR, (".png", ".jpg", ".jpeg")),
"tables": _ls(R_TAB_DIR, (".csv", ".json")),
},
}
def _debug_repo_listing() -> str:
try:
top = sorted([p.name for p in BASE_DIR.iterdir()])
ipynbs = sorted([p.name for p in BASE_DIR.glob("*.ipynb")])
return (
f"BASE_DIR: {BASE_DIR}\n"
f"Top-level entries: {top}\n"
f".ipynb files in BASE_DIR: {ipynbs}\n"
)
except Exception as e:
return f"(Could not list BASE_DIR: {e})\n"
# =========================================================
# PIPELINE RUNNERS
# =========================================================
def run_notebook(nb_name: str, kernel_name: str) -> str:
ensure_dirs()
nb_in = BASE_DIR / nb_name
if not nb_in.exists():
return (
f"ERROR: {nb_name} not found.\n\n"
+ _debug_repo_listing()
+ f"Expected full path: {nb_in}\n"
)
nb_out = RUNS_DIR / f"run_{stamp()}_{nb_name}"
pm.execute_notebook(
input_path=str(nb_in),
output_path=str(nb_out),
cwd=str(BASE_DIR),
log_output=True,
progress_bar=False,
request_save_on_cell_execute=True,
execution_timeout=PAPERMILL_TIMEOUT,
kernel_name=kernel_name,
)
return f"Executed {nb_name}"
def run_datacreation(user_name: str = DEFAULT_USER_NAME) -> str:
try:
if not PY_KERNEL:
return (
"FAILED Kernel not available.\n\n"
f"{KERNEL_INIT_ERROR}\n\n"
"FIX:\n"
"1) Add `ipykernel` to requirements.txt\n"
"2) Let the Space rebuild\n"
)
log = run_notebook(NB1, kernel_name=PY_KERNEL)
if log.startswith("ERROR:"):
return log
csvs = [f.name for f in BASE_DIR.glob("*.csv")]
header = f"OK {log}\n\nHi {user_name}! CSVs now in /app:\n"
return header + "\n".join(f" - {c}" for c in sorted(csvs))
except Exception as e:
return f"FAILED {e}\n\n{traceback.format_exc()[-2000:]}"
def run_pythonanalysis(user_name: str = DEFAULT_USER_NAME) -> str:
try:
if not PY_KERNEL:
return (
"FAILED Kernel not available.\n\n"
f"{KERNEL_INIT_ERROR}\n\n"
"FIX:\n"
"1) Add `ipykernel` to requirements.txt\n"
"2) Let the Space rebuild\n"
)
log = run_notebook(NB2, kernel_name=PY_KERNEL)
if log.startswith("ERROR:"):
return log
idx = artifacts_index()
figs = idx["python"]["figures"]
tabs = idx["python"]["tables"]
return (
f"OK {log}\n\n"
f"Hello {user_name}! Figures: {', '.join(figs) or '(none)'}\n"
f"Tables: {', '.join(tabs) or '(none)'}"
)
except Exception as e:
return f"FAILED {e}\n\n{traceback.format_exc()[-2000:]}"
def run_r(user_name: str = DEFAULT_USER_NAME) -> str:
# Keep the button, but make it explicit (and non-broken).
return (
f"Hi {user_name}! R execution is not enabled in this Space runtime unless you provide R + IRkernel.\n\n"
"If you want Step 2b to run inside the Space:\n"
"1) Use a Docker Space with R + IRkernel installed\n"
"2) Execute ranalysis.ipynb with papermill using the 'ir' kernelspec\n\n"
"Otherwise, run ranalysis.ipynb locally and copy outputs into:\n"
" - artifacts/r/figures (PNG)\n"
" - artifacts/r/tables (CSV/JSON)\n"
)
def run_full_pipeline(user_name: str = DEFAULT_USER_NAME) -> str:
logs = []
logs.append("=" * 50)
logs.append("STEP 1/3: Data Creation (web scraping + synthetic data)")
logs.append("=" * 50)
logs.append(run_datacreation(user_name=user_name))
logs.append("")
logs.append("=" * 50)
logs.append("STEP 2/3: Python Analysis (sentiment, ARIMA, dashboard)")
logs.append("=" * 50)
logs.append(run_pythonanalysis(user_name=user_name))
logs.append("")
logs.append("=" * 50)
logs.append("STEP 3/3: R Analysis (ETS/ARIMA forecasting)")
logs.append("=" * 50)
logs.append(run_r(user_name=user_name))
return "\n".join(logs)
# =========================================================
# GALLERY + KPIS + AI
# =========================================================
def _load_all_figures() -> List[Tuple[str, str]]:
items = []
for p in sorted(PY_FIG_DIR.glob("*.png")):
items.append((str(p), f"Python | {p.stem.replace('_', ' ').title()}"))
for p in sorted(R_FIG_DIR.glob("*.png")):
items.append((str(p), f"R | {p.stem.replace('_', ' ').title()}"))
return items
def _load_table_safe(path: Path) -> pd.DataFrame:
try:
if path.suffix == ".json":
obj = _read_json(path)
if isinstance(obj, dict):
return pd.DataFrame([obj])
return pd.DataFrame(obj)
return _read_csv(path)
except Exception as e:
return pd.DataFrame([{"error": str(e)}])
def refresh_gallery():
figures = _load_all_figures()
idx = artifacts_index()
table_choices = []
for scope in ("python", "r"):
for name in idx[scope]["tables"]:
table_choices.append(f"{scope}/{name}")
default_df = pd.DataFrame()
if table_choices:
parts = table_choices[0].split("/", 1)
base = PY_TAB_DIR if parts[0] == "python" else R_TAB_DIR
default_df = _load_table_safe(base / parts[1])
return (
figures if figures else [],
gr.update(choices=table_choices, value=table_choices[0] if table_choices else None),
default_df,
)
def on_table_select(choice: str):
if not choice or "/" not in choice:
return pd.DataFrame([{"hint": "Select a table above."}])
scope, name = choice.split("/", 1)
base = {"python": PY_TAB_DIR, "r": R_TAB_DIR}.get(scope)
if not base:
return pd.DataFrame([{"error": f"Unknown scope: {scope}"}])
path = base / name
if not path.exists():
return pd.DataFrame([{"error": f"File not found: {path}"}])
return _load_table_safe(path)
def load_kpis() -> Dict[str, Any]:
for candidate in [PY_TAB_DIR / "kpis.json", PY_FIG_DIR / "kpis.json"]:
if candidate.exists():
try:
return _read_json(candidate)
except Exception:
pass
return {}
DASHBOARD_SYSTEM = """You are an AI dashboard assistant for a book-sales analytics app.
The user asks questions or requests about their data. You have access to pre-computed
artifacts from Python and R analysis pipelines.
AVAILABLE ARTIFACTS (only reference ones that exist):
{artifacts_json}
KPI SUMMARY: {kpis_json}
YOUR JOB:
1. Answer the user's question conversationally using the KPIs and your knowledge of the artifacts.
2. At the END of your response, output a JSON block (fenced with ```json ... ```) that tells
the dashboard which artifact to display. The JSON must have this shape:
{{"show": "figure"|"table"|"none", "scope": "python"|"r", "filename": "..."}}
RULES:
- If the user asks about sales trends or forecasting by title, show sales_trends or arima figures.
- If the user asks about sentiment, show sentiment figure or sentiment_counts table.
- If the user asks about R regression, the R notebook focuses on forecasting, show accuracy_table.csv.
- If the user asks about forecast accuracy or model comparison, show accuracy_table.csv or forecast_compare.png.
- If the user asks about top sellers, show top_titles_by_units_sold.csv.
- If the user asks a general data question, pick the most relevant artifact.
- Keep your answer concise (2-4 sentences), then the JSON block.
"""
JSON_BLOCK_RE = re.compile(r"```json\s*(\{.*?\})\s*```", re.DOTALL)
FALLBACK_JSON_RE = re.compile(r"\{[^{}]*\"show\"[^{}]*\}", re.DOTALL)
def _parse_display_directive(text: str) -> Dict[str, str]:
m = JSON_BLOCK_RE.search(text)
if m:
try:
return json.loads(m.group(1))
except json.JSONDecodeError:
pass
m = FALLBACK_JSON_RE.search(text)
if m:
try:
return json.loads(m.group(0))
except json.JSONDecodeError:
pass
return {"show": "none"}
def _clean_response(text: str) -> str:
return JSON_BLOCK_RE.sub("", text).strip()
def _keyword_fallback(msg: str, idx: Dict, kpis: Dict) -> Tuple[str, Dict]:
msg_lower = msg.lower()
if not any(idx[s]["figures"] or idx[s]["tables"] for s in ("python", "r")):
return (
"No artifacts found yet. Please run the pipeline first (Tab 1), then come back here to explore the results.",
{"show": "none"},
)
kpi_text = ""
if kpis:
total = kpis.get("total_units_sold", 0)
kpi_text = (
f"Quick summary: **{kpis.get('n_titles', '?')}** book titles across "
f"**{kpis.get('n_months', '?')}** months, with **{total:,.0f}** total units sold."
)
if any(w in msg_lower for w in ["trend", "sales trend", "monthly sale"]):
return (
f"Here are the sales trends for sampled titles. {kpi_text}",
{"show": "figure", "scope": "python", "filename": "sales_trends_sampled_titles.png"},
)
if any(w in msg_lower for w in ["sentiment", "review", "positive", "negative"]):
return (
f"Here is the sentiment distribution across sampled book titles. {kpi_text}",
{"show": "figure", "scope": "python", "filename": "sentiment_distribution_sampled_titles.png"},
)
if any(w in msg_lower for w in ["arima", "forecast", "predict"]):
if "compar" in msg_lower or "ets" in msg_lower or "accuracy" in msg_lower:
if "forecast_compare.png" in idx.get("r", {}).get("figures", []):
return (
"Here is the ARIMA+Fourier vs ETS forecast comparison from the R analysis.",
{"show": "figure", "scope": "r", "filename": "forecast_compare.png"},
)
return (
f"Here are the ARIMA forecasts for sampled titles from the Python analysis. {kpi_text}",
{"show": "figure", "scope": "python", "filename": "arima_forecasts_sampled_titles.png"},
)
if any(w in msg_lower for w in ["regression", "lm", "coefficient", "price effect", "rating effect"]):
return (
"The R notebook focuses on forecasting rather than regression. Here is the forecast accuracy comparison instead.",
{"show": "table", "scope": "r", "filename": "accuracy_table.csv"},
)
if any(w in msg_lower for w in ["top", "best sell", "popular", "rank"]):
return (
f"Here are the top-selling titles by units sold. {kpi_text}",
{"show": "table", "scope": "python", "filename": "top_titles_by_units_sold.csv"},
)
if any(w in msg_lower for w in ["accuracy", "benchmark", "rmse", "mape"]):
return (
"Here is the forecast accuracy comparison (ARIMA+Fourier vs ETS) from the R analysis.",
{"show": "table", "scope": "r", "filename": "accuracy_table.csv"},
)
if any(w in msg_lower for w in ["r analysis", "r output", "r result"]):
if "forecast_compare.png" in idx.get("r", {}).get("figures", []):
return (
"Here is the main R output: forecast model comparison plot.",
{"show": "figure", "scope": "r", "filename": "forecast_compare.png"},
)
if any(w in msg_lower for w in ["dashboard", "overview", "summary", "kpi"]):
return (
f"Dashboard overview: {kpi_text}\n\nAsk me about sales trends, sentiment, forecasts, "
"forecast accuracy, or top sellers to see specific visualizations.",
{"show": "table", "scope": "python", "filename": "df_dashboard.csv"},
)
return (
f"I can show you various analyses. {kpi_text}\n\n"
"Try asking about: **sales trends**, **sentiment**, **ARIMA forecasts**, "
"**forecast accuracy**, **top sellers**, or **dashboard overview**.",
{"show": "none"},
)
def ai_chat(user_msg: str, history: list):
if not user_msg or not user_msg.strip():
return history, "", None, None
idx = artifacts_index()
kpis = load_kpis()
if not LLM_ENABLED:
reply, directive = _keyword_fallback(user_msg, idx, kpis)
else:
system = DASHBOARD_SYSTEM.format(
artifacts_json=json.dumps(idx, indent=2),
kpis_json=json.dumps(kpis, indent=2) if kpis else "(no KPIs yet, run the pipeline first)",
)
msgs = [{"role": "system", "content": system}]
for entry in (history or [])[-6:]:
msgs.append(entry)
msgs.append({"role": "user", "content": user_msg})
try:
r = llm_client.chat_completion(
model=MODEL_NAME,
messages=msgs,
temperature=0.3,
max_tokens=600,
stream=False,
)
raw = (
r["choices"][0]["message"]["content"]
if isinstance(r, dict)
else r.choices[0].message.content
)
directive = _parse_display_directive(raw)
reply = _clean_response(raw)
except Exception as e:
reply = f"LLM error: {e}. Falling back to keyword matching."
reply_fb, directive = _keyword_fallback(user_msg, idx, kpis)
reply += "\n\n" + reply_fb
fig_out = None
tab_out = None
show = directive.get("show", "none")
scope = directive.get("scope", "")
fname = directive.get("filename", "")
if show == "figure" and scope and fname:
base = {"python": PY_FIG_DIR, "r": R_FIG_DIR}.get(scope)
if base and (base / fname).exists():
fig_out = str(base / fname)
else:
reply += f"\n\n*(Could not find figure: {scope}/{fname})*"
if show == "table" and scope and fname:
base = {"python": PY_TAB_DIR, "r": R_TAB_DIR}.get(scope)
if base and (base / fname).exists():
tab_out = _load_table_safe(base / fname)
else:
reply += f"\n\n*(Could not find table: {scope}/{fname})*"
new_history = (history or []) + [
{"role": "user", "content": user_msg},
{"role": "assistant", "content": reply},
]
return new_history, "", fig_out, tab_out
# =========================================================
# UI + Personalization wiring (NAME ONLY)
# =========================================================
ensure_dirs()
def load_css() -> str:
css_path = BASE_DIR / "style.css"
base_css = css_path.read_text(encoding="utf-8") if css_path.exists() else ""
accent = DEFAULT_ACCENT or "#3b0b6f"
accent_css = f"""
:root {{
--accent-color: {accent};
}}
#escp_title h1, #escp_title h2 {{ color: var(--accent-color); }}
.gradio-container .primary {{ background: var(--accent-color) !important; }}
"""
return base_css + "\n" + accent_css
def render_greeting(name: str) -> str:
name = (name or "").strip() or "Friend"
return (
"<div style='display:flex;align-items:center;gap:12px'>"
"<div style='width:64px;height:64px;border-radius:12px;background:#eee;display:flex;"
"align-items:center;justify-content:center;font-weight:700;color:#666'>👋</div>"
"<div>"
f"<h2 style='margin:0'>Welcome, <span id='rx-user-name' style='color:var(--accent-color)'>{name}</span>! 🎉</h2>"
"<div style='font-size:14px;color:#777'>This app links three notebooks into a reproducible pipeline.</div>"
"</div></div>"
)
with gr.Blocks(title="RX12 Workshop App") as demo:
greeting_html = gr.HTML(
value=render_greeting(DEFAULT_USER_NAME),
elem_id="user_greeting",
)
gr.Markdown(
"# RX12 - Intro to Python and R - Workshop App\n"
"*The app to integrate the three notebooks in to get a functioning blueprint of the group project's final product*",
elem_id="escp_title",
)
# Personalization controls (NAME ONLY)
with gr.Row():
with gr.Column(scale=1):
name_input = gr.Textbox(label="Display name (personalize)", value=DEFAULT_USER_NAME, lines=1)
set_name_btn = gr.Button("Apply display name")
user_state = gr.State(DEFAULT_USER_NAME)
def _set_name(name: str):
name = (name or "").strip() or "Friend"
return render_greeting(name), name
set_name_btn.click(fn=_set_name, inputs=[name_input], outputs=[greeting_html, user_state])
# ===========================================================
# TAB 1 -- Pipeline Runner
# ===========================================================
with gr.Tab("Pipeline Runner"):
if PY_KERNEL:
gr.Markdown(f"✅ **Notebook execution kernel available:** `{PY_KERNEL}`")
else:
gr.Markdown(
"⚠️ **Notebook execution kernel NOT available.**\n\n"
"Add `ipykernel` to `requirements.txt` and rebuild the Space.\n\n"
f"Error:\n\n`{KERNEL_INIT_ERROR}`"
)
with gr.Row():
with gr.Column(scale=1):
btn_nb1 = gr.Button("Step 1: Data Creation", variant="secondary")
with gr.Column(scale=1):
btn_nb2 = gr.Button("Step 2a: Python Analysis", variant="secondary")
with gr.Column(scale=1):
btn_r = gr.Button("Step 2b: R Analysis", variant="secondary")
with gr.Row():
btn_all = gr.Button("Run All 3 Steps", variant="primary")
run_log = gr.Textbox(
label="Execution Log",
lines=18,
max_lines=30,
interactive=False,
)
btn_nb1.click(lambda name: run_datacreation(user_name=name), inputs=[user_state], outputs=[run_log])
btn_nb2.click(lambda name: run_pythonanalysis(user_name=name), inputs=[user_state], outputs=[run_log])
btn_r.click(lambda name: run_r(user_name=name), inputs=[user_state], outputs=[run_log])
btn_all.click(lambda name: run_full_pipeline(user_name=name), inputs=[user_state], outputs=[run_log])
# ===========================================================
# TAB 2 -- Results Gallery
# ===========================================================
with gr.Tab("Results Gallery"):
gr.Markdown(
"### All generated artifacts\n\n"
"After running the pipeline, click **Refresh** to load all figures and tables. "
"Figures are shown in the gallery; select a table from the dropdown to inspect it."
)
refresh_btn = gr.Button("Refresh Gallery", variant="primary")
gr.Markdown("#### Figures")
gallery = gr.Gallery(
label="All Figures (Python + R)",
columns=2,
height=480,
object_fit="contain",
)
gr.Markdown("#### Tables")
table_dropdown = gr.Dropdown(
label="Select a table to view",
choices=[],
interactive=True,
)
table_display = gr.Dataframe(
label="Table Preview",
interactive=False,
)
refresh_btn.click(
refresh_gallery,
outputs=[gallery, table_dropdown, table_display],
)
table_dropdown.change(
on_table_select,
inputs=[table_dropdown],
outputs=[table_display],
)
# ===========================================================
# TAB 3 -- AI Dashboard
# ===========================================================
with gr.Tab('"AI" Dashboard'):
gr.Markdown(
"### Ask questions, get visualisations\n\n"
"Describe what you want to see and the AI will pick the right chart or table. "
+ (
"*LLM is active.*"
if LLM_ENABLED
else "*No API key detected — using keyword matching. "
"Set `HF_API_KEY` in Space secrets for full LLM support.*"
)
)
with gr.Row(equal_height=True):
with gr.Column(scale=1):
chatbot = gr.Chatbot(
label="Conversation",
height=380,
)
user_input = gr.Textbox(
label="Ask about your data",
placeholder="e.g. Show me sales trends / What drives revenue? / Compare forecast models",
lines=1,
)
gr.Examples(
examples=[
"Show me the sales trends",
"What does the sentiment look like?",
"Which titles sell the most?",
"Show the forecast accuracy comparison",
"Compare the ARIMA and ETS forecasts",
"Give me a dashboard overview",
],
inputs=user_input,
)
with gr.Column(scale=1):
ai_figure = gr.Image(
label="Visualisation",
height=350,
)
ai_table = gr.Dataframe(
label="Data Table",
interactive=False,
)
user_input.submit(
ai_chat,
inputs=[user_input, chatbot],
outputs=[chatbot, user_input, ai_figure, ai_table],
)
demo.launch(css=load_css(), allowed_paths=[str(BASE_DIR)])