import os import sys import traceback from pathlib import Path from typing import List, Tuple, Any import duckdb import pandas as pd import numpy as np import matplotlib matplotlib.use("Agg") # headless for Spaces import matplotlib.pyplot as plt import gradio as gr # ========================= # Basic configuration # ========================= APP_TITLE = "ALCO Liquidity & Interest-Rate Risk Dashboard" TABLE_FQN = "my_db.main.masterdataset_v" # source table VIEW_FQN = "my_db.main.positions_v" # normalized view created by this app PRODUCT_ASSETS = [ "loan", "overdraft", "advances", "bills", "bill", "tbond", "t-bond", "tbill", "t-bill", "repo_asset" ] PRODUCT_SOF = [ "fd", "term_deposit", "td", "savings", "current", "call", "repo_liab" ] # ========================= # Helpers # ========================= def connect_md() -> duckdb.DuckDBPyConnection: token = os.environ.get("MOTHERDUCK_TOKEN", "") if not token: raise RuntimeError("MOTHERDUCK_TOKEN is not set. Add it in Space → Settings → Secrets.") return duckdb.connect(f"md:?motherduck_token={token}") def discover_columns(conn: duckdb.DuckDBPyConnection, table_fqn: str) -> List[str]: # Try DESCRIBE first (fast), fall back to information_schema try: df = conn.execute(f"DESCRIBE {table_fqn};").fetchdf() name_col = "column_name" if "column_name" in df.columns else df.columns[0] return [str(c).lower() for c in df[name_col].tolist()] except Exception: df = conn.execute( f""" SELECT lower(column_name) AS col FROM information_schema.columns WHERE table_catalog = split_part('{table_fqn}', '.', 1) AND table_schema = split_part('{table_fqn}', '.', 2) AND table_name = split_part('{table_fqn}', '.', 3) """ ).fetchdf() return df["col"].tolist() def build_view_sql(existing_cols: List[str]) -> str: wanted = [ "as_of_date", "product", "months", "segments", "currency", "Portfolio_value", "Interest_rate", "days_to_maturity" ] sel = [] for c in wanted: if c.lower() in existing_cols: sel.append(c) else: if c in ("Portfolio_value", "Interest_rate", "days_to_maturity", "months"): sel.append(f"CAST(NULL AS DOUBLE) AS {c}") else: sel.append(f"CAST(NULL AS VARCHAR) AS {c}") sof_list = ", ".join([f"'{p}'" for p in PRODUCT_SOF]) asset_list = ", ".join([f"'{p}'" for p in PRODUCT_ASSETS]) bucket_case = ( f"CASE " f"WHEN lower(product) IN ({sof_list}) THEN 'SoF' " f"WHEN lower(product) IN ({asset_list}) THEN 'Assets' " f"ELSE 'Unknown' END AS bucket" ) select_sql = ",\n ".join(sel + [bucket_case]) return f""" CREATE OR REPLACE VIEW {VIEW_FQN} AS SELECT {select_sql} FROM {TABLE_FQN}; """ def ensure_view(conn: duckdb.DuckDBPyConnection, cols: List[str]) -> None: required = {"product", "portfolio_value", "days_to_maturity"} if not required.issubset(set(cols)): raise RuntimeError( f"Source table {TABLE_FQN} must contain columns {sorted(required)}; found {sorted(cols)}" ) conn.execute(build_view_sql(cols)) def safe_num(x) -> float: try: return float(0.0 if x is None or (isinstance(x, float) and np.isnan(x)) else x) except Exception: return 0.0 def zeros_like_index(index) -> pd.Series: return pd.Series([0] * len(index), index=index) def plot_ladder(df: pd.DataFrame): try: if df is None or df.empty: fig, ax = plt.subplots(figsize=(7, 3)) ax.text(0.5, 0.5, "No data", ha="center", va="center") ax.axis("off") return fig pivot = df.pivot(index="time_bucket", columns="bucket", values="amount").fillna(0) order = ["T+1", "T+2..7", "T+8..30", "T+31+"] pivot = pivot.reindex(order) fig, ax = plt.subplots(figsize=(7, 4)) assets = pivot["Assets"] if "Assets" in pivot.columns else zeros_like_index(pivot.index) sof = pivot["SoF"] if "SoF" in pivot.columns else zeros_like_index(pivot.index) ax.bar(pivot.index, assets, label="Assets") ax.bar(pivot.index, -sof, label="SoF") ax.axhline(0, color="gray", lw=1) ax.set_ylabel("LKR") ax.set_title("Maturity Ladder (Assets vs SoF)") ax.legend() fig.tight_layout() return fig except Exception as e: fig, ax = plt.subplots(figsize=(7, 3)) ax.text(0.01, 0.8, "Chart Error:", fontsize=12, ha="left") ax.text(0.01, 0.5, str(e), fontsize=10, ha="left", wrap=True) ax.axis("off") return fig # ========================= # Query fragments # ========================= KPI_SQL = f""" SELECT COALESCE(SUM(CASE WHEN bucket='Assets' AND days_to_maturity<=1 THEN Portfolio_value END),0) AS assets_t1, COALESCE(SUM(CASE WHEN bucket='SoF' AND days_to_maturity<=1 THEN Portfolio_value END),0) AS sof_t1, COALESCE(SUM(CASE WHEN bucket='Assets' AND days_to_maturity<=1 THEN Portfolio_value END),0) - COALESCE(SUM(CASE WHEN bucket='SoF' AND days_to_maturity<=1 THEN Portfolio_value END),0) AS net_gap_t1 FROM {VIEW_FQN}; """ LADDER_SQL = f""" SELECT CASE WHEN days_to_maturity <= 1 THEN 'T+1' WHEN days_to_maturity BETWEEN 2 AND 7 THEN 'T+2..7' WHEN days_to_maturity BETWEEN 8 AND 30 THEN 'T+8..30' ELSE 'T+31+' END AS time_bucket, bucket, SUM(Portfolio_value) AS amount FROM {VIEW_FQN} GROUP BY 1,2 ORDER BY 1,2; """ def irr_sql(cols: List[str]) -> str: has_months = "months" in cols has_ir = "interest_rate" in cols t_expr = "CASE WHEN days_to_maturity IS NOT NULL THEN days_to_maturity/365.0" if has_months: t_expr += " WHEN months IS NOT NULL THEN months/12.0" t_expr += " ELSE NULL END" y_expr = "(Interest_rate/100.0)" if has_ir else "0.0" return f""" SELECT bucket, SUM(Portfolio_value) AS pv_sum, SUM(Portfolio_value * {t_expr}) / NULLIF(SUM(Portfolio_value),0) AS dur_mac, SUM(Portfolio_value * ({t_expr})/(1+({y_expr}))) / NULLIF(SUM(Portfolio_value),0) AS dur_mod FROM {VIEW_FQN} GROUP BY bucket; """ # ========================= # Dashboard callback # ========================= def run_dashboard() -> Tuple[str, str, str, str, str, Any, pd.DataFrame, pd.DataFrame]: """ Returns: status, as_of, assets_t1, sof_t1, net_gap_t1, figure, ladder_df, irr_df (text KPIs to avoid component type errors) """ try: conn = connect_md() # 1) Discover columns & build view cols = discover_columns(conn, TABLE_FQN) ensure_view(conn, cols) # 2) As-of (optional) as_of = "N/A" if "as_of_date" in cols: tmp = conn.execute(f"SELECT max(as_of_date) AS d FROM {VIEW_FQN}").fetchdf() if not tmp.empty and not pd.isna(tmp["d"].iloc[0]): as_of = str(tmp["d"].iloc[0])[:10] # 3) KPIs kpi = conn.execute(KPI_SQL).fetchdf() assets_t1 = safe_num(kpi["assets_t1"].iloc[0]) if not kpi.empty else 0.0 sof_t1 = safe_num(kpi["sof_t1"].iloc[0]) if not kpi.empty else 0.0 net_gap = safe_num(kpi["net_gap_t1"].iloc[0]) if not kpi.empty else 0.0 # 4) Ladder & IRR ladder = conn.execute(LADDER_SQL).fetchdf() irr = conn.execute(irr_sql(cols)).fetchdf() # 5) Chart fig = plot_ladder(ladder) status = "✅ OK" return ( status, as_of, f"{assets_t1:,.0f}", f"{sof_t1:,.0f}", f"{net_gap:,.0f}", fig, ladder, irr, ) except Exception as e: tb = traceback.format_exc() empty_df = pd.DataFrame() fig = plot_ladder(empty_df) return ( f"❌ Error: {e}\n\n{tb}", "N/A", "0", "0", "0", fig, empty_df, empty_df, ) # ========================= # Build Gradio UI # ========================= with gr.Blocks(title=APP_TITLE) as demo: gr.Markdown(f"# {APP_TITLE}\n_Source:_ `{TABLE_FQN}` → `{VIEW_FQN}`") status = gr.Textbox(label="Status", interactive=False, lines=8) with gr.Row(): refresh_btn = gr.Button("🔄 Refresh", variant="primary") with gr.Row(): as_of = gr.Textbox(label="As of date", interactive=False) with gr.Row(): a1 = gr.Textbox(label="Assets T+1 (LKR)", interactive=False) a2 = gr.Textbox(label="SoF T+1 (LKR)", interactive=False) a3 = gr.Textbox(label="Net Gap T+1 (LKR)", interactive=False) chart = gr.Plot(label="Maturity Ladder") ladder_df = gr.Dataframe(label="Ladder Detail") irr_df = gr.Dataframe(label="Interest-Rate Risk (approx)") refresh_btn.click( fn=run_dashboard, outputs=[status, as_of, a1, a2, a3, chart, ladder_df, irr_df], ) if __name__ == "__main__": demo.launch()