Spaces:
Sleeping
Sleeping
| import os | |
| import re | |
| import json | |
| import time | |
| import traceback | |
| from pathlib import Path | |
| from typing import Dict, Any, List, Tuple, Optional | |
| import pandas as pd | |
| import gradio as gr | |
| import plotly.graph_objects as go | |
| try: | |
| import papermill as pm | |
| except Exception: | |
| pm = None | |
| try: | |
| from huggingface_hub import InferenceClient | |
| except Exception: | |
| InferenceClient = None | |
| # ========================================================= | |
| # CONFIG — ITALY HOSPITALITY MARKET INSIGHT ASSISTANT | |
| # ========================================================= | |
| BASE_DIR = Path(__file__).resolve().parent | |
| NB1 = os.environ.get("NB1", "1_Data_Creation_Italy_Hospitality.ipynb").strip() | |
| NB2 = os.environ.get("NB2", "2a_Python_Analysis_Italy_Hospitality.ipynb").strip() | |
| CLEANED_CSV = os.environ.get("CLEANED_CSV", "italy_hospitality_market_cleaned.csv").strip() | |
| ENRICHED_CSV = os.environ.get("ENRICHED_CSV", "italy_hospitality_market_enriched_synthetic.csv").strip() | |
| RUNS_DIR = BASE_DIR / "runs" | |
| ART_DIR = BASE_DIR / "artifacts" | |
| PY_FIG_DIR = ART_DIR / "py" / "figures" | |
| PY_TAB_DIR = ART_DIR / "py" / "tables" | |
| PAPERMILL_TIMEOUT = int(os.environ.get("PAPERMILL_TIMEOUT", "1800")) | |
| MAX_PREVIEW_ROWS = int(os.environ.get("MAX_FILE_PREVIEW_ROWS", "80")) | |
| MAX_LOG_CHARS = int(os.environ.get("MAX_LOG_CHARS", "8000")) | |
| # Hugging Face Inference API | |
| HF_API_KEY = os.environ.get("HF_API_KEY", "").strip() | |
| MODEL_NAME = os.environ.get("MODEL_NAME", "deepseek-ai/DeepSeek-R1").strip() | |
| HF_PROVIDER = os.environ.get("HF_PROVIDER", "novita").strip() | |
| # Optional n8n automation webhook. Expected JSON response: | |
| # {"answer": "...", "chart": "risk|investment|city|region|overview|none"} | |
| N8N_WEBHOOK_URL = os.environ.get("N8N_WEBHOOK_URL", "").strip() | |
| LLM_ENABLED = bool(HF_API_KEY) and InferenceClient is not None | |
| llm_client = InferenceClient(provider=HF_PROVIDER, api_key=HF_API_KEY) if LLM_ENABLED else None | |
| CHART_PALETTE = ["#7c5cbf", "#2ec4a0", "#e8537a", "#e8a230", "#5e8fef", "#c45ea8"] | |
| # ========================================================= | |
| # HELPERS | |
| # ========================================================= | |
| def ensure_dirs(): | |
| for p in [RUNS_DIR, ART_DIR, PY_FIG_DIR, PY_TAB_DIR]: | |
| p.mkdir(parents=True, exist_ok=True) | |
| def stamp() -> str: | |
| return time.strftime("%Y%m%d-%H%M%S") | |
| def tail(text: str, n: int = MAX_LOG_CHARS) -> str: | |
| return (text or "")[-n:] | |
| def csv_path(prefer_enriched: bool = True) -> Path: | |
| enriched = BASE_DIR / ENRICHED_CSV | |
| cleaned = BASE_DIR / CLEANED_CSV | |
| if prefer_enriched and enriched.exists(): | |
| return enriched | |
| if cleaned.exists(): | |
| return cleaned | |
| return enriched | |
| def read_market_data(prefer_enriched: bool = True) -> pd.DataFrame: | |
| path = csv_path(prefer_enriched) | |
| if not path.exists(): | |
| return pd.DataFrame(columns=["section", "location", "entity", "metric_name", "value", "unit", "period", "source_page", "note"]) | |
| df = pd.read_csv(path) | |
| df["value"] = pd.to_numeric(df.get("value"), errors="coerce") | |
| return df | |
| def _ls(dir_path: Path, exts: Tuple[str, ...]) -> List[str]: | |
| if not dir_path.is_dir(): | |
| return [] | |
| return sorted(p.name for p in dir_path.iterdir() if p.is_file() and p.suffix.lower() in exts) | |
| def _read_csv(path: Path) -> pd.DataFrame: | |
| return pd.read_csv(path, nrows=MAX_PREVIEW_ROWS) | |
| def _read_json(path: Path): | |
| with path.open(encoding="utf-8") as f: | |
| return json.load(f) | |
| def artifacts_index() -> Dict[str, Any]: | |
| return { | |
| "core_files": { | |
| "cleaned_dataset": CLEANED_CSV if (BASE_DIR / CLEANED_CSV).exists() else None, | |
| "enriched_synthetic_dataset": ENRICHED_CSV if (BASE_DIR / ENRICHED_CSV).exists() else None, | |
| }, | |
| "python": { | |
| "figures": _ls(PY_FIG_DIR, (".png", ".jpg", ".jpeg")), | |
| "tables": _ls(PY_TAB_DIR, (".csv", ".json")), | |
| }, | |
| } | |
| def pivot_metrics(df: pd.DataFrame, section: Optional[str] = None, entity: Optional[str] = None) -> pd.DataFrame: | |
| d = df.copy() | |
| if section: | |
| d = d[d["section"].eq(section)] | |
| if entity: | |
| d = d[d["entity"].eq(entity)] | |
| if d.empty: | |
| return pd.DataFrame() | |
| wide = d.pivot_table(index=["location", "entity"], columns="metric_name", values="value", aggfunc="first").reset_index() | |
| wide.columns.name = None | |
| return wide | |
| # ========================================================= | |
| # PIPELINE RUNNERS | |
| # ========================================================= | |
| def run_notebook(nb_name: str) -> str: | |
| ensure_dirs() | |
| if pm is None: | |
| return "ERROR: papermill is not installed. Add it to requirements.txt." | |
| nb_in = BASE_DIR / nb_name | |
| if not nb_in.exists(): | |
| return f"ERROR: {nb_name} not found in {BASE_DIR}." | |
| nb_out = RUNS_DIR / f"run_{stamp()}_{nb_name}" | |
| pm.execute_notebook( | |
| input_path=str(nb_in), | |
| output_path=str(nb_out), | |
| cwd=str(BASE_DIR), | |
| log_output=True, | |
| progress_bar=False, | |
| request_save_on_cell_execute=True, | |
| execution_timeout=PAPERMILL_TIMEOUT, | |
| ) | |
| return f"Executed {nb_name}. Output saved to {nb_out.name}" | |
| def run_datacreation() -> str: | |
| try: | |
| log = run_notebook(NB1) | |
| csvs = sorted(p.name for p in BASE_DIR.glob("*.csv")) | |
| return f"OK — {log}\n\nCSVs available:\n" + "\n".join(f" - {c}" for c in csvs) | |
| except Exception as e: | |
| return f"FAILED — {e}\n\n{traceback.format_exc()[-2000:]}" | |
| def run_pythonanalysis() -> str: | |
| try: | |
| log = run_notebook(NB2) | |
| idx = artifacts_index() | |
| figs = idx["python"]["figures"] | |
| tabs = idx["python"]["tables"] | |
| return f"OK — {log}\n\nFigures: {', '.join(figs) or '(none)'}\nTables: {', '.join(tabs) or '(none)'}" | |
| except Exception as e: | |
| return f"FAILED — {e}\n\n{traceback.format_exc()[-2000:]}" | |
| def run_full_pipeline() -> str: | |
| return "\n".join([ | |
| "=" * 58, | |
| "STEP 1/2: Data Creation — real-world PwC hospitality indicators", | |
| "=" * 58, | |
| run_datacreation(), | |
| "", | |
| "=" * 58, | |
| "STEP 2/2: Python Analysis — synthetic scores + dashboard artifacts", | |
| "=" * 58, | |
| run_pythonanalysis(), | |
| ]) | |
| # ========================================================= | |
| # DATA / TABLE LOADERS | |
| # ========================================================= | |
| def load_table_safe(path: Path) -> pd.DataFrame: | |
| try: | |
| if path.suffix.lower() == ".json": | |
| obj = _read_json(path) | |
| return pd.DataFrame([obj]) if isinstance(obj, dict) else pd.DataFrame(obj) | |
| return _read_csv(path) | |
| except Exception as e: | |
| return pd.DataFrame([{"error": str(e)}]) | |
| def refresh_gallery(): | |
| figures = [(str(p), p.stem.replace("_", " ").title()) for p in sorted(PY_FIG_DIR.glob("*.png"))] | |
| idx = artifacts_index() | |
| table_choices = list(idx["python"]["tables"]) | |
| # Always include core datasets in table dropdown | |
| for core in [CLEANED_CSV, ENRICHED_CSV]: | |
| if (BASE_DIR / core).exists() and core not in table_choices: | |
| table_choices.insert(0, core) | |
| default_df = pd.DataFrame() | |
| if table_choices: | |
| chosen = table_choices[0] | |
| path = BASE_DIR / chosen if (BASE_DIR / chosen).exists() else PY_TAB_DIR / chosen | |
| default_df = load_table_safe(path) | |
| return figures, gr.update(choices=table_choices, value=table_choices[0] if table_choices else None), default_df | |
| def on_table_select(choice: str): | |
| if not choice: | |
| return pd.DataFrame([{"hint": "Select a table above."}]) | |
| path = BASE_DIR / choice if (BASE_DIR / choice).exists() else PY_TAB_DIR / choice | |
| if not path.exists(): | |
| return pd.DataFrame([{"error": f"File not found: {choice}"}]) | |
| return load_table_safe(path) | |
| # ========================================================= | |
| # KPIs | |
| # ========================================================= | |
| def load_kpis() -> Dict[str, Any]: | |
| df = read_market_data(prefer_enriched=True) | |
| if df.empty: | |
| return {} | |
| syn = df[df["section"].eq("synthetic_features")] | |
| risk = syn[syn["metric_name"].eq("risk_score")] | |
| investment = syn[syn["metric_name"].eq("investment_potential_score")] | |
| attractiveness = syn[syn["metric_name"].eq("market_attractiveness_score")] | |
| return { | |
| "locations": int(df["location"].nunique()), | |
| "metrics": int(df["metric_name"].nunique()), | |
| "real_rows": int((df["section"] != "synthetic_features").sum()), | |
| "synthetic_rows": int((df["section"] == "synthetic_features").sum()), | |
| "avg_risk_score": round(float(risk["value"].mean()), 1) if not risk.empty else None, | |
| "avg_investment_potential": round(float(investment["value"].mean()), 1) if not investment.empty else None, | |
| "avg_market_attractiveness": round(float(attractiveness["value"].mean()), 1) if not attractiveness.empty else None, | |
| } | |
| def render_kpi_cards() -> str: | |
| kpis = load_kpis() | |
| if not kpis: | |
| return """ | |
| <div style='background:rgba(255,255,255,.72);border-radius:20px;padding:28px;text-align:center;border:1px solid #ddd;'> | |
| <div style='font-size:34px;'>🏨</div> | |
| <b>No hospitality data found yet</b><br> | |
| <span>Run the pipeline or place the CSV files in the app folder.</span> | |
| </div>""" | |
| cards = [ | |
| ("📍", "Locations", kpis.get("locations"), "#a48de8"), | |
| ("📊", "Metrics", kpis.get("metrics"), "#7aa6f8"), | |
| ("🧪", "Synthetic Rows", kpis.get("synthetic_rows"), "#6ee7c7"), | |
| ("⚠️", "Avg Risk Score", kpis.get("avg_risk_score"), "#e8537a"), | |
| ("💼", "Avg Investment Potential", kpis.get("avg_investment_potential"), "#2ec4a0"), | |
| ("⭐", "Avg Market Attractiveness", kpis.get("avg_market_attractiveness"), "#e8a230"), | |
| ] | |
| html = "<div style='display:grid;grid-template-columns:repeat(auto-fit,minmax(150px,1fr));gap:12px;margin-bottom:24px;'>" | |
| for icon, label, value, colour in cards: | |
| value = "—" if value is None else value | |
| html += f""" | |
| <div style='background:rgba(255,255,255,.78);border-radius:20px;padding:18px;text-align:center; | |
| border:1px solid rgba(124,92,191,.18);box-shadow:0 4px 16px rgba(124,92,191,.08); | |
| border-top:3px solid {colour};'> | |
| <div style='font-size:26px;margin-bottom:7px;'>{icon}</div> | |
| <div style='color:#8b78c6;font-size:10px;text-transform:uppercase;letter-spacing:1.4px;font-weight:800;'>{label}</div> | |
| <div style='color:#2d1f4e;font-size:18px;font-weight:800;margin-top:6px;'>{value}</div> | |
| </div>""" | |
| html += "</div>" | |
| return html | |
| # ========================================================= | |
| # INTERACTIVE CHARTS | |
| # ========================================================= | |
| def styled_layout(**kwargs) -> dict: | |
| defaults = dict( | |
| template="plotly_white", | |
| paper_bgcolor="rgba(255,255,255,0.96)", | |
| plot_bgcolor="rgba(255,255,255,0.98)", | |
| font=dict(family="system-ui, sans-serif", color="#2d1f4e", size=12), | |
| margin=dict(l=60, r=20, t=70, b=70), | |
| title=dict(font=dict(size=16, color="#4b2d8a")), | |
| legend=dict(orientation="h", yanchor="bottom", y=1.02, xanchor="right", x=1), | |
| ) | |
| defaults.update(kwargs) | |
| return defaults | |
| def empty_chart(title: str) -> go.Figure: | |
| fig = go.Figure() | |
| fig.update_layout( | |
| title=title, | |
| height=420, | |
| template="plotly_white", | |
| annotations=[dict(text="No data available yet", x=0.5, y=0.5, xref="paper", yref="paper", showarrow=False)], | |
| ) | |
| return fig | |
| def build_city_performance_chart() -> go.Figure: | |
| df = read_market_data(True) | |
| wide = pivot_metrics(df, section="city_performance", entity="city") | |
| needed = [c for c in ["occupancy_yoy", "adr_yoy", "revpar_yoy"] if c in wide.columns] | |
| if wide.empty or not needed: | |
| return empty_chart("City Performance — Occupancy, ADR, RevPAR") | |
| fig = go.Figure() | |
| for i, col in enumerate(needed): | |
| fig.add_trace(go.Bar(x=wide["location"], y=wide[col], name=col.replace("_", " ").upper(), marker_color=CHART_PALETTE[i])) | |
| fig.update_layout(**styled_layout(title=dict(text="City Performance YoY — Occupancy, ADR, RevPAR"), barmode="group", height=430)) | |
| fig.update_yaxes(title="YoY change (%)") | |
| return fig | |
| def build_region_demand_chart() -> go.Figure: | |
| df = read_market_data(True) | |
| wide = pivot_metrics(df, section="regional_demand", entity="region") | |
| needed = [c for c in ["domestic_demand_growth", "international_demand_growth"] if c in wide.columns] | |
| if wide.empty or not needed: | |
| return empty_chart("Regional Demand Growth") | |
| fig = go.Figure() | |
| for i, col in enumerate(needed): | |
| fig.add_trace(go.Bar(y=wide["location"], x=wide[col], orientation="h", name=col.replace("_", " ").title(), marker_color=CHART_PALETTE[i])) | |
| fig.update_layout(**styled_layout(title=dict(text="Regional Demand Growth — Domestic vs International"), barmode="group", height=max(420, len(wide)*42))) | |
| fig.update_xaxes(title="Growth (%)") | |
| fig.update_yaxes(autorange="reversed") | |
| return fig | |
| def build_synthetic_scores_chart() -> go.Figure: | |
| df = read_market_data(True) | |
| wide = pivot_metrics(df, section="synthetic_features") | |
| score_cols = [c for c in ["growth_score", "market_attractiveness_score", "investment_potential_score", "risk_score"] if c in wide.columns] | |
| if wide.empty or not score_cols: | |
| return empty_chart("Synthetic Scores") | |
| fig = go.Figure() | |
| for i, col in enumerate(score_cols): | |
| fig.add_trace(go.Bar(x=wide["location"], y=wide[col], name=col.replace("_", " ").title(), marker_color=CHART_PALETTE[i % len(CHART_PALETTE)])) | |
| fig.update_layout(**styled_layout(title=dict(text="Synthetic Market Scores by Location"), barmode="group", height=470)) | |
| fig.update_yaxes(title="Score (0–100)", range=[0, 105]) | |
| return fig | |
| def build_risk_chart() -> go.Figure: | |
| df = read_market_data(True) | |
| wide = pivot_metrics(df, section="synthetic_features") | |
| if wide.empty or "risk_score" not in wide.columns: | |
| return empty_chart("Risk Score") | |
| wide = wide.sort_values("risk_score", ascending=True) | |
| fig = go.Figure(go.Bar( | |
| y=wide["location"], | |
| x=wide["risk_score"], | |
| orientation="h", | |
| text=[f"{v:.1f}" for v in wide["risk_score"]], | |
| marker=dict(color=wide["risk_score"], colorscale=[[0, "#2ec4a0"], [0.5, "#e8a230"], [1, "#e8537a"]]), | |
| )) | |
| fig.update_layout(**styled_layout(title=dict(text="Risk Score by Location"), showlegend=False, height=max(420, len(wide)*35))) | |
| fig.update_xaxes(title="Risk score (0–100)", range=[0, 105]) | |
| return fig | |
| def build_investment_chart() -> go.Figure: | |
| df = read_market_data(True) | |
| wide = pivot_metrics(df, section="synthetic_features") | |
| if wide.empty or "investment_potential_score" not in wide.columns: | |
| return empty_chart("Investment Potential") | |
| wide = wide.sort_values("investment_potential_score", ascending=True) | |
| fig = go.Figure(go.Bar( | |
| y=wide["location"], | |
| x=wide["investment_potential_score"], | |
| orientation="h", | |
| text=[f"{v:.1f}" for v in wide["investment_potential_score"]], | |
| marker=dict(color=wide["investment_potential_score"], colorscale=[[0, "#c5b4f0"], [1, "#7c5cbf"]]), | |
| )) | |
| fig.update_layout(**styled_layout(title=dict(text="Investment Potential Score by Location"), showlegend=False, height=max(420, len(wide)*35))) | |
| fig.update_xaxes(title="Investment potential score (0–100)", range=[0, 105]) | |
| return fig | |
| def build_opportunity_table() -> pd.DataFrame: | |
| df = read_market_data(True) | |
| wide = pivot_metrics(df, section="synthetic_features") | |
| if wide.empty: | |
| return pd.DataFrame([{"hint": "No synthetic features found yet."}]) | |
| keep = [c for c in ["location", "entity", "growth_score", "market_attractiveness_score", "investment_potential_score", "risk_score", "risk_level", "opportunity_category"] if c in wide.columns] | |
| out = wide[keep].copy() | |
| for col in ["growth_score", "market_attractiveness_score", "investment_potential_score", "risk_score"]: | |
| if col in out: | |
| out[col] = out[col].round(1) | |
| return out.sort_values(["entity", "investment_potential_score"], ascending=[True, False], na_position="last") if "investment_potential_score" in out else out | |
| def refresh_dashboard(): | |
| figs, dd, df = refresh_gallery() | |
| return ( | |
| render_kpi_cards(), | |
| build_city_performance_chart(), | |
| build_region_demand_chart(), | |
| build_synthetic_scores_chart(), | |
| build_risk_chart(), | |
| build_investment_chart(), | |
| build_opportunity_table(), | |
| figs, | |
| dd, | |
| df, | |
| ) | |
| # ========================================================= | |
| # AI DASHBOARD — N8N / HUGGING FACE / KEYWORD FALLBACK | |
| # ========================================================= | |
| DASHBOARD_SYSTEM = """You are the AI assistant for an Italy Hospitality Market Insight Assistant. | |
| The app uses a real-world PwC Italy Hospitality Market Snapshot dataset and an enriched synthetic dataset. | |
| The dataset has long-format columns: section, location, entity, metric_name, value, unit, period, source_page, note. | |
| Available artifacts: | |
| {artifacts_json} | |
| KPI summary: | |
| {kpis_json} | |
| Key concepts: | |
| - city performance: occupancy_yoy, adr_yoy, revpar_yoy for Milan, Rome, Florence, Venice. | |
| - regional demand: domestic_demand_growth and international_demand_growth. | |
| - synthetic features: growth_score, market_attractiveness_score, investment_potential_score, risk_score, risk_level, opportunity_category. | |
| Answer briefly and practically. At the END, output a JSON block exactly like: | |
| ```json | |
| {{"show": "chart"|"table"|"none", "chart": "city|region|scores|risk|investment|none", "table": "opportunities|raw|none"}} | |
| ``` | |
| Choose: | |
| - city for occupancy / ADR / RevPAR / city performance questions. | |
| - region for domestic/international regional demand questions. | |
| - scores for comparing synthetic scores. | |
| - risk for risk score or risk level. | |
| - investment for investment potential / attractiveness / opportunity. | |
| - opportunities table for opportunity categories or strategic recommendations. | |
| """ | |
| JSON_BLOCK_RE = re.compile(r"```json\s*(\{.*?\})\s*```", re.DOTALL) | |
| FALLBACK_JSON_RE = re.compile(r"\{[^{}]*\"show\"[^{}]*\}", re.DOTALL) | |
| def parse_display_directive(text: str) -> Dict[str, str]: | |
| for regex in [JSON_BLOCK_RE, FALLBACK_JSON_RE]: | |
| m = regex.search(text) | |
| if m: | |
| try: | |
| return json.loads(m.group(1) if regex is JSON_BLOCK_RE else m.group(0)) | |
| except Exception: | |
| continue | |
| return {"show": "none", "chart": "none", "table": "none"} | |
| def clean_response(text: str) -> str: | |
| return JSON_BLOCK_RE.sub("", text).strip() | |
| def n8n_call(msg: str) -> Tuple[str, Optional[Dict[str, str]]]: | |
| import requests | |
| try: | |
| resp = requests.post(N8N_WEBHOOK_URL, json={"question": msg, "project": "italy_hospitality_market"}, timeout=25) | |
| resp.raise_for_status() | |
| data = resp.json() | |
| answer = data.get("answer") or data.get("reply") or "No answer returned by n8n." | |
| chart = data.get("chart", "none") | |
| table = data.get("table", "none") | |
| return answer, {"show": "chart" if chart != "none" else ("table" if table != "none" else "none"), "chart": chart, "table": table} | |
| except Exception as e: | |
| return f"n8n error: {e}. Falling back to local logic.", None | |
| def keyword_fallback(msg: str, kpis: Dict[str, Any]) -> Tuple[str, Dict[str, str]]: | |
| m = msg.lower() | |
| kpi_text = "" | |
| if kpis: | |
| kpi_text = f"The dataset covers {kpis.get('locations', '?')} locations and {kpis.get('metrics', '?')} metrics." | |
| if any(w in m for w in ["occupancy", "adr", "revpar", "city", "milan", "rome", "florence", "venice"]): | |
| return f"Here is the city performance view for occupancy, ADR, and RevPAR. {kpi_text}", {"show": "chart", "chart": "city", "table": "none"} | |
| if any(w in m for w in ["region", "regional", "domestic", "international", "demand", "lazio", "puglia", "sicilia"]): | |
| return f"Here is the regional demand comparison between domestic and international growth. {kpi_text}", {"show": "chart", "chart": "region", "table": "none"} | |
| if any(w in m for w in ["risk", "risky", "safe", "low risk", "high risk"]): | |
| return f"Here is the risk score view. Lower scores indicate safer or more stable opportunities. {kpi_text}", {"show": "chart", "chart": "risk", "table": "opportunities"} | |
| if any(w in m for w in ["investment", "potential", "attractive", "attractiveness", "opportunity", "recommend"]): | |
| return f"Here is the investment potential view, supported by the opportunity-category table. {kpi_text}", {"show": "chart", "chart": "investment", "table": "opportunities"} | |
| if any(w in m for w in ["score", "scores", "growth", "synthetic", "compare"]): | |
| return f"Here is the synthetic score comparison across locations. {kpi_text}", {"show": "chart", "chart": "scores", "table": "opportunities"} | |
| if any(w in m for w in ["table", "data", "raw", "dataset"]): | |
| return f"Here is the enriched hospitality dataset table preview. {kpi_text}", {"show": "table", "chart": "none", "table": "raw"} | |
| return ( | |
| f"You can ask about city performance, regional demand, risk, investment potential, synthetic scores, or opportunity categories. {kpi_text}", | |
| {"show": "none", "chart": "none", "table": "none"}, | |
| ) | |
| def resolve_chart(name: str): | |
| return { | |
| "city": build_city_performance_chart, | |
| "region": build_region_demand_chart, | |
| "scores": build_synthetic_scores_chart, | |
| "risk": build_risk_chart, | |
| "investment": build_investment_chart, | |
| }.get(name, lambda: None)() | |
| def resolve_table(name: str): | |
| if name == "opportunities": | |
| return build_opportunity_table() | |
| if name == "raw": | |
| return read_market_data(True).head(MAX_PREVIEW_ROWS) | |
| return None | |
| def ai_chat(user_msg: str, history: list): | |
| if not user_msg or not user_msg.strip(): | |
| return history, "", None, None | |
| idx = artifacts_index() | |
| kpis = load_kpis() | |
| if N8N_WEBHOOK_URL: | |
| reply, directive = n8n_call(user_msg) | |
| if directive is None: | |
| reply_fb, directive = keyword_fallback(user_msg, kpis) | |
| reply = reply + "\n\n" + reply_fb | |
| elif LLM_ENABLED: | |
| system = DASHBOARD_SYSTEM.format( | |
| artifacts_json=json.dumps(idx, indent=2), | |
| kpis_json=json.dumps(kpis, indent=2) if kpis else "No KPIs available yet.", | |
| ) | |
| msgs = [{"role": "system", "content": system}] | |
| for entry in (history or [])[-6:]: | |
| msgs.append(entry) | |
| msgs.append({"role": "user", "content": user_msg}) | |
| try: | |
| r = llm_client.chat_completion( | |
| model=MODEL_NAME, | |
| messages=msgs, | |
| temperature=0.25, | |
| max_tokens=650, | |
| stream=False, | |
| ) | |
| raw = r["choices"][0]["message"]["content"] if isinstance(r, dict) else r.choices[0].message.content | |
| directive = parse_display_directive(raw) | |
| reply = clean_response(raw) | |
| except Exception as e: | |
| reply_fb, directive = keyword_fallback(user_msg, kpis) | |
| reply = f"Hugging Face error: {e}. Falling back to local logic.\n\n{reply_fb}" | |
| else: | |
| reply, directive = keyword_fallback(user_msg, kpis) | |
| chart_out = resolve_chart(directive.get("chart", "none")) if directive.get("show") in ["chart", "figure"] or directive.get("chart") != "none" else None | |
| table_out = resolve_table(directive.get("table", "none")) if directive.get("table") != "none" else None | |
| new_history = (history or []) + [ | |
| {"role": "user", "content": user_msg}, | |
| {"role": "assistant", "content": reply}, | |
| ] | |
| return new_history, "", chart_out, table_out | |
| # ========================================================= | |
| # UI | |
| # ========================================================= | |
| ensure_dirs() | |
| def load_css() -> str: | |
| css_path = BASE_DIR / "style.css" | |
| return css_path.read_text(encoding="utf-8") if css_path.exists() else "" | |
| with gr.Blocks(title="Italy Hospitality Market Insight Assistant") as demo: | |
| gr.Markdown( | |
| "# Italy Hospitality Market Insight Assistant\n" | |
| "*A Gradio app for PwC-based hospitality indicators, synthetic market scores, n8n automation and Hugging Face AI Q&A.*", | |
| elem_id="escp_title", | |
| ) | |
| with gr.Tab("Pipeline Runner"): | |
| gr.Markdown("Run the project notebooks. Step 1 creates/cleans the dataset; Step 2 creates analysis outputs and synthetic insights.") | |
| with gr.Row(): | |
| btn_nb1 = gr.Button("Step 1: Data Creation", variant="secondary") | |
| btn_nb2 = gr.Button("Step 2: Python Analysis", variant="secondary") | |
| btn_all = gr.Button("Run Full Pipeline", variant="primary") | |
| run_log = gr.Textbox(label="Execution Log", lines=18, max_lines=30, interactive=False) | |
| btn_nb1.click(run_datacreation, outputs=[run_log]) | |
| btn_nb2.click(run_pythonanalysis, outputs=[run_log]) | |
| btn_all.click(run_full_pipeline, outputs=[run_log]) | |
| with gr.Tab("Dashboard"): | |
| kpi_html = gr.HTML(value=render_kpi_cards) | |
| refresh_btn = gr.Button("Refresh Dashboard", variant="primary") | |
| gr.Markdown("#### Interactive Hospitality Charts") | |
| with gr.Row(): | |
| chart_city = gr.Plot(label="City Performance") | |
| chart_region = gr.Plot(label="Regional Demand") | |
| with gr.Row(): | |
| chart_scores = gr.Plot(label="Synthetic Scores") | |
| chart_risk = gr.Plot(label="Risk Score") | |
| chart_investment = gr.Plot(label="Investment Potential") | |
| gr.Markdown("#### Opportunity Categories") | |
| opportunity_table = gr.Dataframe(label="Synthetic Strategy Table", interactive=False) | |
| gr.Markdown("#### Static Figures and Data Tables") | |
| gallery = gr.Gallery(label="Generated Figures", columns=2, height=430, object_fit="contain") | |
| table_dropdown = gr.Dropdown(label="Select a table to view", choices=[], interactive=True) | |
| table_display = gr.Dataframe(label="Table Preview", interactive=False) | |
| refresh_btn.click( | |
| refresh_dashboard, | |
| outputs=[kpi_html, chart_city, chart_region, chart_scores, chart_risk, chart_investment, opportunity_table, gallery, table_dropdown, table_display], | |
| ) | |
| table_dropdown.change(on_table_select, inputs=[table_dropdown], outputs=[table_display]) | |
| with gr.Tab('"AI" Dashboard'): | |
| ai_status = ( | |
| "Connected to your **n8n workflow**." if N8N_WEBHOOK_URL else | |
| "**Hugging Face LLM active.**" if LLM_ENABLED else | |
| "Using **local keyword matching**. To activate AI, set `HF_API_KEY`; to activate automations, set `N8N_WEBHOOK_URL`." | |
| ) | |
| gr.Markdown( | |
| "### Ask questions about the Italy hospitality market\n\n" | |
| f"{ai_status}\n\n" | |
| "Examples: *Which city has the strongest RevPAR?*, *Show risk scores*, *Which regions are investment opportunities?*" | |
| ) | |
| with gr.Row(equal_height=True): | |
| with gr.Column(scale=1): | |
| chatbot = gr.Chatbot(label="Conversation", height=400) | |
| user_input = gr.Textbox(label="Ask about your data", placeholder="e.g. Show me investment potential by location", lines=1) | |
| gr.Examples( | |
| examples=[ | |
| "Show me city performance for occupancy, ADR and RevPAR", | |
| "Which locations have the highest risk?", | |
| "Show investment potential by location", | |
| "Compare synthetic scores", | |
| "What is regional domestic vs international demand?", | |
| "Give me strategic recommendations", | |
| ], | |
| inputs=user_input, | |
| ) | |
| with gr.Column(scale=1): | |
| ai_figure = gr.Plot(label="Interactive Chart") | |
| ai_table = gr.Dataframe(label="Relevant Table", interactive=False) | |
| user_input.submit(ai_chat, inputs=[user_input, chatbot], outputs=[chatbot, user_input, ai_figure, ai_table]) | |
| if __name__ == "__main__": | |
| demo.launch(css=load_css(), allowed_paths=[str(BASE_DIR)]) | |