| import os |
| import sys |
| import traceback |
| from pathlib import Path |
| from typing import List, Tuple, Any |
|
|
| import duckdb |
| import pandas as pd |
| import numpy as np |
| import matplotlib |
| matplotlib.use("Agg") |
| import matplotlib.pyplot as plt |
| import gradio as gr |
|
|
| |
| |
| |
| APP_TITLE = "ALCO Liquidity & Interest-Rate Risk Dashboard" |
| TABLE_FQN = "my_db.main.masterdataset_v" |
| VIEW_FQN = "my_db.main.positions_v" |
|
|
| PRODUCT_ASSETS = [ |
| "loan", "overdraft", "advances", "bills", "bill", |
| "tbond", "t-bond", "tbill", "t-bill", "repo_asset", "assets" |
| ] |
| PRODUCT_SOF = [ |
| "fd", "term_deposit", "td", "savings", "current", |
| "call", "repo_liab" |
| ] |
|
|
| |
| |
| |
| def connect_md() -> duckdb.DuckDBPyConnection: |
| token = os.environ.get("MOTHERDUCK_TOKEN", "") |
| if not token: |
| raise RuntimeError("MOTHERDUCK_TOKEN is not set. Add it in Space β Settings β Secrets.") |
| return duckdb.connect(f"md:?motherduck_token={token}") |
|
|
| def discover_columns(conn: duckdb.DuckDBPyConnection, table_fqn: str) -> List[str]: |
| |
| try: |
| df = conn.execute(f"DESCRIBE {table_fqn};").fetchdf() |
| name_col = "column_name" if "column_name" in df.columns else df.columns[0] |
| return [str(c).lower() for c in df[name_col].tolist()] |
| except Exception: |
| df = conn.execute( |
| f""" |
| SELECT lower(column_name) AS col |
| FROM information_schema.columns |
| WHERE table_catalog = split_part('{table_fqn}', '.', 1) |
| AND table_schema = split_part('{table_fqn}', '.', 2) |
| AND table_name = split_part('{table_fqn}', '.', 3) |
| """ |
| ).fetchdf() |
| return df["col"].tolist() |
|
|
| def build_view_sql(existing_cols: List[str]) -> str: |
| wanted = [ |
| "as_of_date", "product", "months", "segments", |
| "currency", "Portfolio_value", "Interest_rate", |
| "days_to_maturity" |
| ] |
| sel = [] |
| for c in wanted: |
| if c.lower() in existing_cols: |
| sel.append(c) |
| else: |
| if c in ("Portfolio_value", "Interest_rate", "days_to_maturity", "months"): |
| sel.append(f"CAST(NULL AS DOUBLE) AS {c}") |
| else: |
| sel.append(f"CAST(NULL AS VARCHAR) AS {c}") |
|
|
| sof_list = ", ".join([f"'{p}'" for p in PRODUCT_SOF]) |
| asset_list = ", ".join([f"'{p}'" for p in PRODUCT_ASSETS]) |
|
|
| bucket_case = ( |
| f"CASE " |
| f"WHEN lower(product) IN ({sof_list}) THEN 'SoF' " |
| f"WHEN lower(product) IN ({asset_list}) THEN 'Assets' " |
| f"ELSE 'Unknown' END AS bucket" |
| ) |
| select_sql = ",\n ".join(sel + [bucket_case]) |
| return f""" |
| CREATE OR REPLACE VIEW {VIEW_FQN} AS |
| SELECT |
| {select_sql} |
| FROM {TABLE_FQN}; |
| """ |
|
|
| def ensure_view(conn: duckdb.DuckDBPyConnection, cols: List[str]) -> None: |
| required = {"product", "portfolio_value", "days_to_maturity"} |
| if not required.issubset(set(cols)): |
| raise RuntimeError( |
| f"Source table {TABLE_FQN} must contain columns {sorted(required)}; found {sorted(cols)}" |
| ) |
| conn.execute(build_view_sql(cols)) |
|
|
| def safe_num(x) -> float: |
| try: |
| return float(0.0 if x is None or (isinstance(x, float) and np.isnan(x)) else x) |
| except Exception: |
| return 0.0 |
|
|
| def zeros_like_index(index) -> pd.Series: |
| return pd.Series([0] * len(index), index=index) |
|
|
| def plot_ladder(df: pd.DataFrame): |
| try: |
| if df is None or df.empty: |
| fig, ax = plt.subplots(figsize=(7, 3)) |
| ax.text(0.5, 0.5, "No data", ha="center", va="center") |
| ax.axis("off") |
| return fig |
| pivot = df.pivot(index="time_bucket", columns="bucket", values="Amount (LKR Mn)").fillna(0) |
| order = ["T+1", "T+2..7", "T+8..30", "T+31+"] |
| pivot = pivot.reindex(order) |
| fig, ax = plt.subplots(figsize=(7, 4)) |
| assets = pivot["Assets"] if "Assets" in pivot.columns else zeros_like_index(pivot.index) |
| sof = pivot["SoF"] if "SoF" in pivot.columns else zeros_like_index(pivot.index) |
| ax.bar(pivot.index, assets, label="Assets") |
| ax.bar(pivot.index, -sof, label="SoF") |
| ax.axhline(0, color="gray", lw=1) |
| ax.set_ylabel("LKR (Mn)") |
| ax.set_title("Maturity Ladder (Assets vs SoF)") |
| ax.legend() |
| fig.tight_layout() |
| return fig |
| except Exception as e: |
| fig, ax = plt.subplots(figsize=(7, 3)) |
| ax.text(0.01, 0.8, "Chart Error:", fontsize=12, ha="left") |
| ax.text(0.01, 0.5, str(e), fontsize=10, ha="left", wrap=True) |
| ax.axis("off") |
| return fig |
|
|
| |
| |
| |
| KPI_SQL = f""" |
| SELECT |
| COALESCE(SUM(CASE WHEN bucket='Assets' AND days_to_maturity<=1 THEN Portfolio_value END),0) AS assets_t1, |
| COALESCE(SUM(CASE WHEN bucket='SoF' AND days_to_maturity<=1 THEN Portfolio_value END),0) AS sof_t1, |
| COALESCE(SUM(CASE WHEN bucket='Assets' AND days_to_maturity<=1 THEN Portfolio_value END),0) |
| - COALESCE(SUM(CASE WHEN bucket='SoF' AND days_to_maturity<=1 THEN Portfolio_value END),0) AS net_gap_t1 |
| FROM {VIEW_FQN}; |
| """ |
|
|
| LADDER_SQL = f""" |
| SELECT |
| CASE |
| WHEN days_to_maturity <= 1 THEN 'T+1' |
| WHEN days_to_maturity BETWEEN 2 AND 7 THEN 'T+2..7' |
| WHEN days_to_maturity BETWEEN 8 AND 30 THEN 'T+8..30' |
| ELSE 'T+31+' |
| END AS time_bucket, |
| bucket, |
| SUM(Portfolio_value) / 1000000.0 AS "Amount (LKR Mn)" |
| FROM {VIEW_FQN} |
| GROUP BY 1,2 |
| ORDER BY 1,2; |
| """ |
|
|
| GAP_DRIVERS_SQL = f""" |
| SELECT |
| product, |
| bucket, |
| SUM(Portfolio_value) / 1000000.0 AS "Amount (LKR Mn)" |
| FROM {VIEW_FQN} |
| WHERE days_to_maturity <= 1 |
| GROUP BY 1, 2 |
| ORDER BY 3 DESC; |
| """ |
|
|
| def irr_sql(cols: List[str]) -> str: |
| has_months = "months" in cols |
| has_ir = "interest_rate" in cols |
| t_expr = "CASE WHEN days_to_maturity IS NOT NULL THEN days_to_maturity/365.0" |
| if has_months: |
| t_expr += " WHEN months IS NOT NULL THEN months/12.0" |
| t_expr += " ELSE NULL END" |
| y_expr = "(Interest_rate/100.0)" if has_ir else "0.0" |
| return f""" |
| WITH irr_calcs AS ( |
| SELECT |
| bucket, |
| Portfolio_value AS pv, |
| -- Modified Duration = Macaulay Duration / (1 + yield) |
| -- We approximate Macaulay Duration with time-to-maturity in years (t_expr) |
| ({t_expr}) / (1 + {y_expr}) AS mod_dur |
| FROM {VIEW_FQN} |
| ) |
| SELECT |
| bucket, |
| SUM(pv) / 1000000.0 AS "Portfolio Value (LKR Mn)", |
| -- BPV (DV01) = SUM(Portfolio Value * Modified Duration * 0.0001) |
| SUM(pv * mod_dur * 0.0001) AS "BPV (DV01)" |
| FROM irr_calcs |
| GROUP BY bucket; |
| """ |
|
|
| |
| |
| |
| def run_dashboard(scenario: str, runoff_pct: float, rate_shock_bps_input: float) -> Tuple[str, str, str, str, str, Any, pd.DataFrame, pd.DataFrame, str, pd.DataFrame]: |
| """ |
| Returns: |
| status, as_of, a1_text, a2_text, a3_text, figure, ladder_df, irr_df, |
| explain_text, drivers_df |
| """ |
| try: |
| conn = connect_md() |
|
|
| |
| |
| |
| stressed_view_fqn = "positions_v_stressed" |
| runoff_factor = 1.0 |
| rate_shock_bps = 0.0 |
|
|
| if scenario == "Liquidity Stress: High Deposit Runoff" and runoff_pct > 0: |
| runoff_factor = (100.0 - runoff_pct) / 100.0 |
| elif scenario == "IRR Stress: Rate Shock" and rate_shock_bps_input != 0: |
| rate_shock_bps = rate_shock_bps_input |
|
|
| scenario_sql = f""" |
| CREATE OR REPLACE TEMP VIEW {stressed_view_fqn} AS |
| SELECT *, |
| CASE WHEN lower(product) IN ('savings', 'fd', 'td', 'term_deposit') THEN Portfolio_value * {runoff_factor} ELSE Portfolio_value END AS stressed_pv |
| FROM {VIEW_FQN}; |
| """ |
| conn.execute(scenario_sql) |
|
|
| |
| cols = discover_columns(conn, TABLE_FQN) |
| ensure_view(conn, cols) |
|
|
| |
| as_of = "N/A" |
| if "as_of_date" in cols: |
| tmp = conn.execute(f"SELECT max(as_of_date) AS d FROM {VIEW_FQN}").fetchdf() |
| if not tmp.empty and not pd.isna(tmp["d"].iloc[0]): |
| as_of = str(tmp["d"].iloc[0])[:10] |
|
|
| |
| |
| kpi_sql_stressed = KPI_SQL.replace(f"FROM {VIEW_FQN}", f"FROM {stressed_view_fqn}").replace("Portfolio_value", "stressed_pv") |
| kpi = conn.execute(kpi_sql_stressed).fetchdf() |
| assets_t1 = safe_num(kpi["assets_t1"].iloc[0]) if not kpi.empty else 0.0 |
| sof_t1 = safe_num(kpi["sof_t1"].iloc[0]) if not kpi.empty else 0.0 |
| net_gap = safe_num(kpi["net_gap_t1"].iloc[0]) if not kpi.empty else 0.0 |
|
|
| |
| ladder_sql_stressed = LADDER_SQL.replace(f"FROM {VIEW_FQN}", f"FROM {stressed_view_fqn}").replace("Portfolio_value", "stressed_pv") |
| drivers_sql_stressed = GAP_DRIVERS_SQL.replace(f"FROM {VIEW_FQN}", f"FROM {stressed_view_fqn}").replace("Portfolio_value", "stressed_pv") |
| irr_sql_stressed = irr_sql(cols).replace(f"FROM {VIEW_FQN}", f"FROM {stressed_view_fqn}").replace("Portfolio_value", "stressed_pv") |
|
|
| ladder = conn.execute(ladder_sql_stressed).fetchdf() |
| irr = conn.execute(irr_sql_stressed).fetchdf() |
| drivers = conn.execute(drivers_sql_stressed).fetchdf() |
|
|
| |
| ladder_display = ladder.copy() |
| if "Amount (LKR Mn)" in ladder.columns: |
| ladder_display["Amount (LKR Mn)"] = ladder_display["Amount (LKR Mn)"].map('{:,.2f}'.format) |
| else: |
| ladder_display = pd.DataFrame() |
|
|
| |
| irr_display = irr.copy() |
| if not irr_display.empty: |
| irr_display["Portfolio Value (LKR Mn)"] = irr_display["Portfolio Value (LKR Mn)"].map('{:,.2f}'.format) |
| irr_display["BPV (DV01)"] = irr_display["BPV (DV01)"].map('{:,.2f}'.format) |
|
|
| if "Amount (LKR Mn)" in drivers.columns: |
| drivers_display = drivers.copy() |
| drivers_display["Amount (LKR Mn)"] = drivers_display["Amount (LKR Mn)"].map('{:,.2f}'.format) |
| else: |
| drivers_display = pd.DataFrame() |
|
|
| |
| fig = plot_ladder(ladder) |
|
|
| |
| assets_t1_mn_str = f"{(assets_t1 / 1_000_000):,.2f}" |
| sof_t1_mn_str = f"{(sof_t1 / 1_000_000):,.2f}" |
| net_gap_mn_str = f"{(net_gap / 1_000_000):,.2f}" |
| gap_sign_str = "positive" if net_gap >= 0 else "negative" |
|
|
| a1_text = f"The amount of Assets maturing tomorrow (T+1) is **LKR {assets_t1_mn_str} Mn**." |
| a2_text = f"The amount of Sources of Funds (SoF) maturing tomorrow (T+1) is **LKR {sof_t1_mn_str} Mn**." |
| a3_text = f"The resulting Net Liquidity Gap for tomorrow (T+1) is **LKR {net_gap_mn_str} Mn**." |
|
|
| |
| sof_drivers = drivers[drivers["bucket"] == "SoF"] |
| asset_drivers = drivers[drivers["bucket"] == "Assets"] |
| top_sof_prod = sof_drivers.iloc[0] if not sof_drivers.empty else None |
| top_asset_prod = asset_drivers.iloc[0] if not asset_drivers.empty else None |
|
|
| explain_text = f"### Why is the T+1 Gap {gap_sign_str}?\n\n" |
| if top_sof_prod is not None: |
| explain_text += f"* **Largest Liability Maturity:** The largest outflow comes from `{top_sof_prod['product']}`, with **LKR {top_sof_prod['Amount (LKR Mn)']:,.2f} Mn** maturing.\n" |
| else: |
| explain_text += "* **Largest Liability Maturity:** No significant liabilities are maturing tomorrow.\n" |
|
|
| if top_asset_prod is not None: |
| explain_text += f"* **Largest Asset Inflow:** The largest inflow comes from `{top_asset_prod['product']}`, with **LKR {top_asset_prod['Amount (LKR Mn)']:,.2f} Mn** maturing.\n" |
| else: |
| explain_text += "* **Largest Asset Inflow:** No significant assets are maturing to provide inflows tomorrow.\n" |
|
|
| |
| explain_text += "* **Seasonal Pattern:** Analysis not possible without relevant time-series features in the source data." |
|
|
| |
| if scenario == "IRR Stress: Rate Shock" and rate_shock_bps != 0 and not irr.empty: |
| net_bpv = irr["BPV (DV01)"].sum() |
| eve_impact = net_bpv * rate_shock_bps |
| eve_impact_mn = eve_impact / 1_000_000 |
| explain_text += f"\n\n### IRR Stress Scenario Impact\n* A **+{rate_shock_bps:.0f} bps** rate shock is projected to change the portfolio's Economic Value by **LKR {eve_impact_mn:,.2f} Mn**." |
|
|
|
|
| status = f"β
OK (as of {pd.Timestamp.now().strftime('%Y-%m-%d %H:%M:%S')})" |
| return ( |
| status, |
| as_of, |
| a1_text, |
| a2_text, |
| a3_text, |
| fig, |
| ladder_display, |
| irr_display, |
| explain_text, |
| drivers_display, |
| ) |
|
|
| except Exception as e: |
| tb = traceback.format_exc() |
| empty_df = pd.DataFrame() |
| fig = plot_ladder(empty_df) |
| return ( |
| f"β Error: {e}\n\n{tb}", |
| "N/A", |
| "0", |
| "0", |
| "0", |
| fig, |
| empty_df, |
| empty_df, |
| "Analysis could not be performed.", |
| empty_df, |
| ) |
|
|
| |
| |
| |
| with gr.Blocks(title=APP_TITLE) as demo: |
| gr.Markdown(f"# {APP_TITLE}\n_Source:_ `{TABLE_FQN}` β `{VIEW_FQN}`") |
|
|
| status = gr.Textbox(label="Status", interactive=False, lines=8) |
|
|
| with gr.Row(): |
| refresh_btn = gr.Button("π Refresh", variant="primary") |
| theme_btn = gr.Button("π Toggle Theme") |
| theme_btn.click( |
| None, |
| None, |
| js="() => { document.querySelector('html').classList.toggle('dark'); }" |
| ) |
|
|
| with gr.Row(): |
| |
| with gr.Column(scale=1): |
| scenario_dd = gr.Dropdown( |
| label="Select Stress Scenario", |
| choices=["Baseline", "Liquidity Stress: High Deposit Runoff", "IRR Stress: Rate Shock"], |
| value="Baseline" |
| ) |
| with gr.Accordion("Stress Scenario Parameters", open=True): |
| runoff_slider = gr.Slider( |
| label="Deposit Runoff (%)", |
| minimum=0, maximum=100, step=1, value=20, |
| info="For Liquidity Stress: Percentage of key deposits that run off." |
| ) |
| shock_slider = gr.Slider( |
| label="Rate Shock (bps)", |
| minimum=-500, maximum=500, step=25, value=200, |
| info="For IRR Stress: Parallel shift in the yield curve in basis points." |
| ) |
| explain_text = gr.Markdown("Analysis of the T+1 gap will appear here...") |
|
|
| |
| with gr.Column(scale=3): |
| with gr.Row(): |
| as_of = gr.Textbox(label="As of date", interactive=False) |
| a1 = gr.Markdown("The amount of Assets maturing tomorrow (T+1) is...") |
| a2 = gr.Markdown("The amount of Sources of Funds (SoF) maturing tomorrow (T+1) is...") |
| a3 = gr.Markdown("The resulting Net Liquidity Gap for tomorrow (T+1) is...") |
|
|
| chart = gr.Plot(label="Maturity Ladder") |
|
|
| with gr.Tabs(): |
| with gr.TabItem("Ladder Detail"): |
| ladder_df = gr.Dataframe() |
| with gr.TabItem("T+1 Gap Drivers"): |
| drivers_df = gr.Dataframe( |
| headers=["Product", "Bucket", "Amount (LKR Mn)"], |
| ) |
| with gr.TabItem("Interest-Rate Risk (BPV/DV01)"): |
| irr_df = gr.Dataframe( |
| headers=["Bucket", "Portfolio Value (LKR Mn)", "BPV (DV01)"] |
| ) |
|
|
| refresh_btn.click( |
| fn=run_dashboard, |
| inputs=[scenario_dd, runoff_slider, shock_slider], |
| outputs=[status, as_of, a1, a2, a3, chart, ladder_df, irr_df, explain_text, drivers_df], |
| ) |
|
|
| if __name__ == "__main__": |
| demo.launch() |
|
|