Spaces:
Sleeping
Sleeping
| import os | |
| import re | |
| import json | |
| import time | |
| import traceback | |
| from pathlib import Path | |
| from typing import Dict, Any, List, Optional, Tuple | |
| import pandas as pd | |
| import gradio as gr | |
| import papermill as pm | |
| # Optional LLM (HuggingFace Inference API) | |
| try: | |
| from huggingface_hub import InferenceClient | |
| except Exception: | |
| InferenceClient = None | |
| # ========================================================= | |
| # CONFIG | |
| # ========================================================= | |
| BASE_DIR = Path(__file__).resolve().parent | |
| NB1 = os.environ.get("NB1", "datacreation.ipynb").strip() | |
| NB2 = os.environ.get("NB2", "pythonanalysis.ipynb").strip() | |
| NB3 = os.environ.get("NB3", "ranalysis.ipynb").strip() | |
| RUNS_DIR = BASE_DIR / "runs" | |
| ART_DIR = BASE_DIR / "artifacts" | |
| PY_FIG_DIR = ART_DIR / "py" / "figures" | |
| PY_TAB_DIR = ART_DIR / "py" / "tables" | |
| R_FIG_DIR = ART_DIR / "r" / "figures" | |
| R_TAB_DIR = ART_DIR / "r" / "tables" | |
| PAPERMILL_TIMEOUT = int(os.environ.get("PAPERMILL_TIMEOUT", "1800")) | |
| MAX_PREVIEW_ROWS = int(os.environ.get("MAX_FILE_PREVIEW_ROWS", "50")) | |
| MAX_LOG_CHARS = int(os.environ.get("MAX_LOG_CHARS", "8000")) | |
| HF_API_KEY = os.environ.get("HF_API_KEY", "").strip() | |
| MODEL_NAME = os.environ.get("MODEL_NAME", "deepseek-ai/DeepSeek-R1").strip() | |
| HF_PROVIDER = os.environ.get("HF_PROVIDER", "novita").strip() | |
| LLM_ENABLED = bool(HF_API_KEY) and InferenceClient is not None | |
| llm_client = ( | |
| InferenceClient(provider=HF_PROVIDER, api_key=HF_API_KEY) | |
| if LLM_ENABLED | |
| else None | |
| ) | |
| # ========================================================= | |
| # HELPERS | |
| # ========================================================= | |
| def ensure_dirs(): | |
| for p in [RUNS_DIR, ART_DIR, PY_FIG_DIR, PY_TAB_DIR, R_FIG_DIR, R_TAB_DIR]: | |
| p.mkdir(parents=True, exist_ok=True) | |
| def stamp(): | |
| return time.strftime("%Y%m%d-%H%M%S") | |
| def tail(text: str, n: int = MAX_LOG_CHARS) -> str: | |
| return (text or "")[-n:] | |
| def _ls(dir_path: Path, exts: Tuple[str, ...]) -> List[str]: | |
| if not dir_path.is_dir(): | |
| return [] | |
| return sorted(p.name for p in dir_path.iterdir() if p.is_file() and p.suffix.lower() in exts) | |
| def _read_csv(path: Path) -> pd.DataFrame: | |
| return pd.read_csv(path, nrows=MAX_PREVIEW_ROWS) | |
| def _read_json(path: Path): | |
| with path.open(encoding="utf-8") as f: | |
| return json.load(f) | |
| def artifacts_index() -> Dict[str, Any]: | |
| return { | |
| "python": { | |
| "figures": _ls(PY_FIG_DIR, (".png", ".jpg", ".jpeg")), | |
| "tables": _ls(PY_TAB_DIR, (".csv", ".json")), | |
| }, | |
| "r": { | |
| "figures": _ls(R_FIG_DIR, (".png", ".jpg", ".jpeg")), | |
| "tables": _ls(R_TAB_DIR, (".csv", ".json")), | |
| }, | |
| } | |
| # ========================================================= | |
| # PIPELINE RUNNERS | |
| # ========================================================= | |
| def run_notebook(nb_name: str) -> str: | |
| ensure_dirs() | |
| nb_in = BASE_DIR / nb_name | |
| if not nb_in.exists(): | |
| return f"ERROR: {nb_name} not found." | |
| nb_out = RUNS_DIR / f"run_{stamp()}_{nb_name}" | |
| pm.execute_notebook( | |
| input_path=str(nb_in), | |
| output_path=str(nb_out), | |
| cwd=str(BASE_DIR), | |
| log_output=True, | |
| progress_bar=False, | |
| request_save_on_cell_execute=True, | |
| execution_timeout=PAPERMILL_TIMEOUT, | |
| ) | |
| return f"Executed {nb_name}" | |
| def run_datacreation() -> str: | |
| try: | |
| log = run_notebook(NB1) | |
| csvs = [f.name for f in BASE_DIR.glob("*.csv")] | |
| return f"OK {log}\n\nCSVs now in /app:\n" + "\n".join(f" - {c}" for c in sorted(csvs)) | |
| except Exception as e: | |
| return f"FAILED {e}\n\n{traceback.format_exc()[-2000:]}" | |
| def run_pythonanalysis() -> str: | |
| try: | |
| log = run_notebook(NB2) | |
| idx = artifacts_index() | |
| figs = idx["python"]["figures"] | |
| tabs = idx["python"]["tables"] | |
| return ( | |
| f"OK {log}\n\n" | |
| f"Figures: {', '.join(figs) or '(none)'}\n" | |
| f"Tables: {', '.join(tabs) or '(none)'}" | |
| ) | |
| except Exception as e: | |
| return f"FAILED {e}\n\n{traceback.format_exc()[-2000:]}" | |
| def run_r() -> str: | |
| try: | |
| log = run_notebook(NB3) | |
| idx = artifacts_index() | |
| figs = idx["r"]["figures"] | |
| tabs = idx["r"]["tables"] | |
| return ( | |
| f"OK {log}\n\n" | |
| f"Figures: {', '.join(figs) or '(none)'}\n" | |
| f"Tables: {', '.join(tabs) or '(none)'}" | |
| ) | |
| except Exception as e: | |
| return f"FAILED {e}\n\n{traceback.format_exc()[-2000:]}" | |
| def run_full_pipeline() -> str: | |
| logs = [] | |
| logs.append("=" * 50) | |
| logs.append("STEP 1/3: Data Creation (web scraping + synthetic data)") | |
| logs.append("=" * 50) | |
| logs.append(run_datacreation()) | |
| logs.append("") | |
| logs.append("=" * 50) | |
| logs.append("STEP 2/3: Python Analysis (sentiment, ARIMA, dashboard)") | |
| logs.append("=" * 50) | |
| logs.append(run_pythonanalysis()) | |
| logs.append("") | |
| logs.append("=" * 50) | |
| logs.append("STEP 3/3: R Analysis (ETS/ARIMA forecasting)") | |
| logs.append("=" * 50) | |
| logs.append(run_r()) | |
| return "\n".join(logs) | |
| # ========================================================= | |
| # GALLERY LOADERS | |
| # ========================================================= | |
| def _load_all_figures() -> List[Tuple[str, str]]: | |
| """Return list of (filepath, caption) for Gallery.""" | |
| items = [] | |
| for p in sorted(PY_FIG_DIR.glob("*.png")): | |
| items.append((str(p), f"Python | {p.stem.replace('_', ' ').title()}")) | |
| for p in sorted(R_FIG_DIR.glob("*.png")): | |
| items.append((str(p), f"R | {p.stem.replace('_', ' ').title()}")) | |
| return items | |
| def _load_table_safe(path: Path) -> pd.DataFrame: | |
| try: | |
| if path.suffix == ".json": | |
| obj = _read_json(path) | |
| if isinstance(obj, dict): | |
| return pd.DataFrame([obj]) | |
| return pd.DataFrame(obj) | |
| return _read_csv(path) | |
| except Exception as e: | |
| return pd.DataFrame([{"error": str(e)}]) | |
| def refresh_gallery(): | |
| """Called when user clicks Refresh on Gallery tab.""" | |
| figures = _load_all_figures() | |
| idx = artifacts_index() | |
| # Build table choices | |
| table_choices = [] | |
| for scope in ("python", "r"): | |
| for name in idx[scope]["tables"]: | |
| table_choices.append(f"{scope}/{name}") | |
| # Default: show first table if available | |
| default_df = pd.DataFrame() | |
| if table_choices: | |
| parts = table_choices[0].split("/", 1) | |
| base = PY_TAB_DIR if parts[0] == "python" else R_TAB_DIR | |
| default_df = _load_table_safe(base / parts[1]) | |
| return ( | |
| figures if figures else [], | |
| gr.update(choices=table_choices, value=table_choices[0] if table_choices else None), | |
| default_df, | |
| ) | |
| def on_table_select(choice: str): | |
| if not choice or "/" not in choice: | |
| return pd.DataFrame([{"hint": "Select a table above."}]) | |
| scope, name = choice.split("/", 1) | |
| base = {"python": PY_TAB_DIR, "r": R_TAB_DIR}.get(scope) | |
| if not base: | |
| return pd.DataFrame([{"error": f"Unknown scope: {scope}"}]) | |
| path = base / name | |
| if not path.exists(): | |
| return pd.DataFrame([{"error": f"File not found: {path}"}]) | |
| return _load_table_safe(path) | |
| # ========================================================= | |
| # KPI LOADER | |
| # ========================================================= | |
| def load_kpis() -> Dict[str, Any]: | |
| for candidate in [PY_TAB_DIR / "kpis.json", PY_FIG_DIR / "kpis.json"]: | |
| if candidate.exists(): | |
| try: | |
| return _read_json(candidate) | |
| except Exception: | |
| pass | |
| return {} | |
| # ========================================================= | |
| # AI DASHBOARD (Tab 3) -- LLM picks what to display | |
| # ========================================================= | |
| DASHBOARD_SYSTEM = """You are an AI dashboard assistant for a book-sales analytics app. | |
| The user asks questions or requests about their data. You have access to pre-computed | |
| artifacts from Python and R analysis pipelines. | |
| AVAILABLE ARTIFACTS (only reference ones that exist): | |
| {artifacts_json} | |
| KPI SUMMARY: {kpis_json} | |
| YOUR JOB: | |
| 1. Answer the user's question conversationally using the KPIs and your knowledge of the artifacts. | |
| 2. At the END of your response, output a JSON block (fenced with ```json ... ```) that tells | |
| the dashboard which artifact to display. The JSON must have this shape: | |
| {{"show": "figure"|"table"|"none", "scope": "python"|"r", "filename": "..."}} | |
| - Use "show": "figure" to display a chart image. | |
| - Use "show": "table" to display a CSV/JSON table. | |
| - Use "show": "none" if no artifact is relevant. | |
| RULES: | |
| - If the user asks about sales trends or forecasting by title, show sales_trends or arima figures. | |
| - If the user asks about sentiment, show sentiment figure or sentiment_counts table. | |
| - If the user asks about R regression, the R notebook focuses on forecasting, show accuracy_table.csv. | |
| - If the user asks about forecast accuracy or model comparison, show accuracy_table.csv or forecast_compare.png. | |
| - If the user asks about top sellers, show top_titles_by_units_sold.csv. | |
| - If the user asks a general data question, pick the most relevant artifact. | |
| - Keep your answer concise (2-4 sentences), then the JSON block. | |
| """ | |
| JSON_BLOCK_RE = re.compile(r"```json\s*(\{.*?\})\s*```", re.DOTALL) | |
| FALLBACK_JSON_RE = re.compile(r"\{[^{}]*\"show\"[^{}]*\}", re.DOTALL) | |
| def _parse_display_directive(text: str) -> Dict[str, str]: | |
| m = JSON_BLOCK_RE.search(text) | |
| if m: | |
| try: | |
| return json.loads(m.group(1)) | |
| except json.JSONDecodeError: | |
| pass | |
| m = FALLBACK_JSON_RE.search(text) | |
| if m: | |
| try: | |
| return json.loads(m.group(0)) | |
| except json.JSONDecodeError: | |
| pass | |
| return {"show": "none"} | |
| def _clean_response(text: str) -> str: | |
| """Strip the JSON directive block from the displayed response.""" | |
| return JSON_BLOCK_RE.sub("", text).strip() | |
| def ai_chat(user_msg: str, history: list): | |
| """Chat function for the AI Dashboard tab.""" | |
| if not user_msg or not user_msg.strip(): | |
| return history, "", None, None | |
| idx = artifacts_index() | |
| kpis = load_kpis() | |
| if not LLM_ENABLED: | |
| reply, directive = _keyword_fallback(user_msg, idx, kpis) | |
| else: | |
| system = DASHBOARD_SYSTEM.format( | |
| artifacts_json=json.dumps(idx, indent=2), | |
| kpis_json=json.dumps(kpis, indent=2) if kpis else "(no KPIs yet, run the pipeline first)", | |
| ) | |
| msgs = [{"role": "system", "content": system}] | |
| for entry in (history or [])[-6:]: | |
| msgs.append(entry) | |
| msgs.append({"role": "user", "content": user_msg}) | |
| try: | |
| r = llm_client.chat_completion( | |
| model=MODEL_NAME, | |
| messages=msgs, | |
| temperature=0.3, | |
| max_tokens=600, | |
| stream=False, | |
| ) | |
| raw = ( | |
| r["choices"][0]["message"]["content"] | |
| if isinstance(r, dict) | |
| else r.choices[0].message.content | |
| ) | |
| directive = _parse_display_directive(raw) | |
| reply = _clean_response(raw) | |
| except Exception as e: | |
| reply = f"LLM error: {e}. Falling back to keyword matching." | |
| reply_fb, directive = _keyword_fallback(user_msg, idx, kpis) | |
| reply += "\n\n" + reply_fb | |
| # Resolve artifact paths | |
| fig_out = None | |
| tab_out = None | |
| show = directive.get("show", "none") | |
| scope = directive.get("scope", "") | |
| fname = directive.get("filename", "") | |
| if show == "figure" and scope and fname: | |
| base = {"python": PY_FIG_DIR, "r": R_FIG_DIR}.get(scope) | |
| if base and (base / fname).exists(): | |
| fig_out = str(base / fname) | |
| else: | |
| reply += f"\n\n*(Could not find figure: {scope}/{fname})*" | |
| if show == "table" and scope and fname: | |
| base = {"python": PY_TAB_DIR, "r": R_TAB_DIR}.get(scope) | |
| if base and (base / fname).exists(): | |
| tab_out = _load_table_safe(base / fname) | |
| else: | |
| reply += f"\n\n*(Could not find table: {scope}/{fname})*" | |
| new_history = (history or []) + [ | |
| {"role": "user", "content": user_msg}, | |
| {"role": "assistant", "content": reply}, | |
| ] | |
| return new_history, "", fig_out, tab_out | |
| def _keyword_fallback(msg: str, idx: Dict, kpis: Dict) -> Tuple[str, Dict]: | |
| """Simple keyword matcher when LLM is unavailable.""" | |
| msg_lower = msg.lower() | |
| if not any(idx[s]["figures"] or idx[s]["tables"] for s in ("python", "r")): | |
| return ( | |
| "No artifacts found yet. Please run the pipeline first (Tab 1), " | |
| "then come back here to explore the results.", | |
| {"show": "none"}, | |
| ) | |
| kpi_text = "" | |
| if kpis: | |
| total = kpis.get("total_units_sold", 0) | |
| kpi_text = ( | |
| f"Quick summary: **{kpis.get('n_titles', '?')}** book titles across " | |
| f"**{kpis.get('n_months', '?')}** months, with **{total:,.0f}** total units sold." | |
| ) | |
| if any(w in msg_lower for w in ["trend", "sales trend", "monthly sale"]): | |
| return ( | |
| f"Here are the sales trends for sampled titles. {kpi_text}", | |
| {"show": "figure", "scope": "python", "filename": "sales_trends_sampled_titles.png"}, | |
| ) | |
| if any(w in msg_lower for w in ["sentiment", "review", "positive", "negative"]): | |
| return ( | |
| f"Here is the sentiment distribution across sampled book titles. {kpi_text}", | |
| {"show": "figure", "scope": "python", "filename": "sentiment_distribution_sampled_titles.png"}, | |
| ) | |
| if any(w in msg_lower for w in ["arima", "forecast", "predict"]): | |
| if "compar" in msg_lower or "ets" in msg_lower or "accuracy" in msg_lower: | |
| if "forecast_compare.png" in idx.get("r", {}).get("figures", []): | |
| return ( | |
| "Here is the ARIMA+Fourier vs ETS forecast comparison from the R analysis.", | |
| {"show": "figure", "scope": "r", "filename": "forecast_compare.png"}, | |
| ) | |
| return ( | |
| f"Here are the ARIMA forecasts for sampled titles from the Python analysis. {kpi_text}", | |
| {"show": "figure", "scope": "python", "filename": "arima_forecasts_sampled_titles.png"}, | |
| ) | |
| if any(w in msg_lower for w in ["regression", "lm", "coefficient", "price effect", "rating effect"]): | |
| return ( | |
| "The R notebook focuses on forecasting rather than regression. " | |
| "Here is the forecast accuracy comparison instead.", | |
| {"show": "table", "scope": "r", "filename": "accuracy_table.csv"}, | |
| ) | |
| if any(w in msg_lower for w in ["top", "best sell", "popular", "rank"]): | |
| return ( | |
| f"Here are the top-selling titles by units sold. {kpi_text}", | |
| {"show": "table", "scope": "python", "filename": "top_titles_by_units_sold.csv"}, | |
| ) | |
| if any(w in msg_lower for w in ["accuracy", "benchmark", "rmse", "mape"]): | |
| return ( | |
| "Here is the forecast accuracy comparison (ARIMA+Fourier vs ETS) from the R analysis.", | |
| {"show": "table", "scope": "r", "filename": "accuracy_table.csv"}, | |
| ) | |
| if any(w in msg_lower for w in ["r analysis", "r output", "r result"]): | |
| if "forecast_compare.png" in idx.get("r", {}).get("figures", []): | |
| return ( | |
| "Here is the main R output: forecast model comparison plot.", | |
| {"show": "figure", "scope": "r", "filename": "forecast_compare.png"}, | |
| ) | |
| if any(w in msg_lower for w in ["dashboard", "overview", "summary", "kpi"]): | |
| return ( | |
| f"Dashboard overview: {kpi_text}\n\nAsk me about sales trends, sentiment, forecasts, " | |
| "forecast accuracy, or top sellers to see specific visualizations.", | |
| {"show": "table", "scope": "python", "filename": "df_dashboard.csv"}, | |
| ) | |
| # Default | |
| return ( | |
| f"I can show you various analyses. {kpi_text}\n\n" | |
| "Try asking about: **sales trends**, **sentiment**, **ARIMA forecasts**, " | |
| "**forecast accuracy**, **top sellers**, or **dashboard overview**.", | |
| {"show": "none"}, | |
| ) | |
| # ========================================================= | |
| # UI | |
| # ========================================================= | |
| ensure_dirs() | |
| def load_css() -> str: | |
| css_path = BASE_DIR / "style.css" | |
| return css_path.read_text(encoding="utf-8") if css_path.exists() else "" | |
| with gr.Blocks(title="RX12 Workshop App") as demo: | |
| gr.Markdown( | |
| "# RX12 - Intro to Python and R - Workshop App\n" | |
| "*The app to integrate the three notebooks in to get a functioning blueprint of the group project's final product*", | |
| elem_id="escp_title", | |
| ) | |
| # =========================================================== | |
| # TAB 1 -- Pipeline Runner | |
| # =========================================================== | |
| with gr.Tab("Pipeline Runner"): | |
| gr.Markdown( | |
| ) | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| btn_nb1 = gr.Button( | |
| "Step 1: Data Creation", | |
| variant="secondary", | |
| ) | |
| gr.Markdown( | |
| ) | |
| with gr.Column(scale=1): | |
| btn_nb2 = gr.Button( | |
| "Step 2a: Python Analysis", | |
| variant="secondary", | |
| ) | |
| gr.Markdown( | |
| ) | |
| with gr.Column(scale=1): | |
| btn_r = gr.Button( | |
| "Step 2b: R Analysis", | |
| variant="secondary", | |
| ) | |
| gr.Markdown( | |
| ) | |
| with gr.Row(): | |
| btn_all = gr.Button( | |
| "Run All 3 Steps", | |
| variant="primary", | |
| ) | |
| run_log = gr.Textbox( | |
| label="Execution Log", | |
| lines=18, | |
| max_lines=30, | |
| interactive=False, | |
| ) | |
| btn_nb1.click(run_datacreation, outputs=[run_log]) | |
| btn_nb2.click(run_pythonanalysis, outputs=[run_log]) | |
| btn_r.click(run_r, outputs=[run_log]) | |
| btn_all.click(run_full_pipeline, outputs=[run_log]) | |
| # =========================================================== | |
| # TAB 2 -- Results Gallery | |
| # =========================================================== | |
| with gr.Tab("Results Gallery"): | |
| gr.Markdown( | |
| "### All generated artifacts\n\n" | |
| "After running the pipeline, click **Refresh** to load all figures and tables. " | |
| "Figures are shown in the gallery; select a table from the dropdown to inspect it." | |
| ) | |
| refresh_btn = gr.Button("Refresh Gallery", variant="primary") | |
| gr.Markdown("#### Figures") | |
| gallery = gr.Gallery( | |
| label="All Figures (Python + R)", | |
| columns=2, | |
| height=480, | |
| object_fit="contain", | |
| ) | |
| gr.Markdown("#### Tables") | |
| table_dropdown = gr.Dropdown( | |
| label="Select a table to view", | |
| choices=[], | |
| interactive=True, | |
| ) | |
| table_display = gr.Dataframe( | |
| label="Table Preview", | |
| interactive=False, | |
| ) | |
| refresh_btn.click( | |
| refresh_gallery, | |
| outputs=[gallery, table_dropdown, table_display], | |
| ) | |
| table_dropdown.change( | |
| on_table_select, | |
| inputs=[table_dropdown], | |
| outputs=[table_display], | |
| ) | |
| # =========================================================== | |
| # TAB 3 -- AI Dashboard | |
| # =========================================================== | |
| with gr.Tab('"AI" Dashboard'): | |
| gr.Markdown( | |
| "### Ask questions, get visualisations\n\n" | |
| "Describe what you want to see and the AI will pick the right chart or table. " | |
| + ( | |
| "*LLM is active.*" | |
| if LLM_ENABLED | |
| else "*No API key detected \u2014 using keyword matching. " | |
| "Set `HF_API_KEY` in Space secrets for full LLM support.*" | |
| ) | |
| ) | |
| with gr.Row(equal_height=True): | |
| with gr.Column(scale=1): | |
| chatbot = gr.Chatbot( | |
| label="Conversation", | |
| height=380, | |
| ) | |
| user_input = gr.Textbox( | |
| label="Ask about your data", | |
| placeholder="e.g. Show me sales trends / What drives revenue? / Compare forecast models", | |
| lines=1, | |
| ) | |
| gr.Examples( | |
| examples=[ | |
| "Show me the sales trends", | |
| "What does the sentiment look like?", | |
| "Which titles sell the most?", | |
| "Show the forecast accuracy comparison", | |
| "Compare the ARIMA and ETS forecasts", | |
| "Give me a dashboard overview", | |
| ], | |
| inputs=user_input, | |
| ) | |
| with gr.Column(scale=1): | |
| ai_figure = gr.Image( | |
| label="Visualisation", | |
| height=350, | |
| ) | |
| ai_table = gr.Dataframe( | |
| label="Data Table", | |
| interactive=False, | |
| ) | |
| user_input.submit( | |
| ai_chat, | |
| inputs=[user_input, chatbot], | |
| outputs=[chatbot, user_input, ai_figure, ai_table], | |
| ) | |
| demo.launch(css=load_css(), allowed_paths=[str(BASE_DIR)]) |