|
|
import os |
|
|
import sys |
|
|
import traceback |
|
|
from pathlib import Path |
|
|
from typing import List, Tuple, Any |
|
|
|
|
|
import duckdb |
|
|
import pandas as pd |
|
|
import numpy as np |
|
|
import matplotlib |
|
|
matplotlib.use("Agg") |
|
|
import matplotlib.pyplot as plt |
|
|
import gradio as gr |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
APP_TITLE = "ALCO Liquidity & Interest-Rate Risk Dashboard" |
|
|
TABLE_FQN = "my_db.main.masterdataset_v" |
|
|
VIEW_FQN = "my_db.main.positions_v" |
|
|
|
|
|
PRODUCT_ASSETS = [ |
|
|
"loan", "overdraft", "advances", "bills", "bill", |
|
|
"tbond", "t-bond", "tbill", "t-bill", "repo_asset" |
|
|
] |
|
|
PRODUCT_SOF = [ |
|
|
"fd", "term_deposit", "td", "savings", "current", |
|
|
"call", "repo_liab" |
|
|
] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def connect_md() -> duckdb.DuckDBPyConnection: |
|
|
token = os.environ.get("MOTHERDUCK_TOKEN", "") |
|
|
if not token: |
|
|
raise RuntimeError("MOTHERDUCK_TOKEN is not set. Add it in Space β Settings β Secrets.") |
|
|
return duckdb.connect(f"md:?motherduck_token={token}") |
|
|
|
|
|
def discover_columns(conn: duckdb.DuckDBPyConnection, table_fqn: str) -> List[str]: |
|
|
|
|
|
try: |
|
|
df = conn.execute(f"DESCRIBE {table_fqn};").fetchdf() |
|
|
name_col = "column_name" if "column_name" in df.columns else df.columns[0] |
|
|
return [str(c).lower() for c in df[name_col].tolist()] |
|
|
except Exception: |
|
|
df = conn.execute( |
|
|
f""" |
|
|
SELECT lower(column_name) AS col |
|
|
FROM information_schema.columns |
|
|
WHERE table_catalog = split_part('{table_fqn}', '.', 1) |
|
|
AND table_schema = split_part('{table_fqn}', '.', 2) |
|
|
AND table_name = split_part('{table_fqn}', '.', 3) |
|
|
""" |
|
|
).fetchdf() |
|
|
return df["col"].tolist() |
|
|
|
|
|
def build_view_sql(existing_cols: List[str]) -> str: |
|
|
wanted = [ |
|
|
"as_of_date", "product", "months", "segments", |
|
|
"currency", "Portfolio_value", "Interest_rate", |
|
|
"days_to_maturity" |
|
|
] |
|
|
sel = [] |
|
|
for c in wanted: |
|
|
if c.lower() in existing_cols: |
|
|
sel.append(c) |
|
|
else: |
|
|
if c in ("Portfolio_value", "Interest_rate", "days_to_maturity", "months"): |
|
|
sel.append(f"CAST(NULL AS DOUBLE) AS {c}") |
|
|
else: |
|
|
sel.append(f"CAST(NULL AS VARCHAR) AS {c}") |
|
|
|
|
|
sof_list = ", ".join([f"'{p}'" for p in PRODUCT_SOF]) |
|
|
asset_list = ", ".join([f"'{p}'" for p in PRODUCT_ASSETS]) |
|
|
|
|
|
bucket_case = ( |
|
|
f"CASE " |
|
|
f"WHEN lower(product) IN ({sof_list}) THEN 'SoF' " |
|
|
f"WHEN lower(product) IN ({asset_list}) THEN 'Assets' " |
|
|
f"ELSE 'Unknown' END AS bucket" |
|
|
) |
|
|
select_sql = ",\n ".join(sel + [bucket_case]) |
|
|
return f""" |
|
|
CREATE OR REPLACE VIEW {VIEW_FQN} AS |
|
|
SELECT |
|
|
{select_sql} |
|
|
FROM {TABLE_FQN}; |
|
|
""" |
|
|
|
|
|
def ensure_view(conn: duckdb.DuckDBPyConnection, cols: List[str]) -> None: |
|
|
required = {"product", "portfolio_value", "days_to_maturity"} |
|
|
if not required.issubset(set(cols)): |
|
|
raise RuntimeError( |
|
|
f"Source table {TABLE_FQN} must contain columns {sorted(required)}; found {sorted(cols)}" |
|
|
) |
|
|
conn.execute(build_view_sql(cols)) |
|
|
|
|
|
def safe_num(x) -> float: |
|
|
try: |
|
|
return float(0.0 if x is None or (isinstance(x, float) and np.isnan(x)) else x) |
|
|
except Exception: |
|
|
return 0.0 |
|
|
|
|
|
def zeros_like_index(index) -> pd.Series: |
|
|
return pd.Series([0] * len(index), index=index) |
|
|
|
|
|
def plot_ladder(df: pd.DataFrame): |
|
|
try: |
|
|
if df is None or df.empty: |
|
|
fig, ax = plt.subplots(figsize=(7, 3)) |
|
|
ax.text(0.5, 0.5, "No data", ha="center", va="center") |
|
|
ax.axis("off") |
|
|
return fig |
|
|
pivot = df.pivot(index="time_bucket", columns="bucket", values="amount").fillna(0) |
|
|
order = ["T+1", "T+2..7", "T+8..30", "T+31+"] |
|
|
pivot = pivot.reindex(order) |
|
|
fig, ax = plt.subplots(figsize=(7, 4)) |
|
|
assets = pivot["Assets"] if "Assets" in pivot.columns else zeros_like_index(pivot.index) |
|
|
sof = pivot["SoF"] if "SoF" in pivot.columns else zeros_like_index(pivot.index) |
|
|
ax.bar(pivot.index, assets, label="Assets") |
|
|
ax.bar(pivot.index, -sof, label="SoF") |
|
|
ax.axhline(0, color="gray", lw=1) |
|
|
ax.set_ylabel("LKR") |
|
|
ax.set_title("Maturity Ladder (Assets vs SoF)") |
|
|
ax.legend() |
|
|
fig.tight_layout() |
|
|
return fig |
|
|
except Exception as e: |
|
|
fig, ax = plt.subplots(figsize=(7, 3)) |
|
|
ax.text(0.01, 0.8, "Chart Error:", fontsize=12, ha="left") |
|
|
ax.text(0.01, 0.5, str(e), fontsize=10, ha="left", wrap=True) |
|
|
ax.axis("off") |
|
|
return fig |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
KPI_SQL = f""" |
|
|
SELECT |
|
|
COALESCE(SUM(CASE WHEN bucket='Assets' AND days_to_maturity<=1 THEN Portfolio_value END),0) AS assets_t1, |
|
|
COALESCE(SUM(CASE WHEN bucket='SoF' AND days_to_maturity<=1 THEN Portfolio_value END),0) AS sof_t1, |
|
|
COALESCE(SUM(CASE WHEN bucket='Assets' AND days_to_maturity<=1 THEN Portfolio_value END),0) |
|
|
- COALESCE(SUM(CASE WHEN bucket='SoF' AND days_to_maturity<=1 THEN Portfolio_value END),0) AS net_gap_t1 |
|
|
FROM {VIEW_FQN}; |
|
|
""" |
|
|
|
|
|
LADDER_SQL = f""" |
|
|
SELECT |
|
|
CASE |
|
|
WHEN days_to_maturity <= 1 THEN 'T+1' |
|
|
WHEN days_to_maturity BETWEEN 2 AND 7 THEN 'T+2..7' |
|
|
WHEN days_to_maturity BETWEEN 8 AND 30 THEN 'T+8..30' |
|
|
ELSE 'T+31+' |
|
|
END AS time_bucket, |
|
|
bucket, |
|
|
SUM(Portfolio_value) AS amount |
|
|
FROM {VIEW_FQN} |
|
|
GROUP BY 1,2 |
|
|
ORDER BY 1,2; |
|
|
""" |
|
|
|
|
|
def irr_sql(cols: List[str]) -> str: |
|
|
has_months = "months" in cols |
|
|
has_ir = "interest_rate" in cols |
|
|
t_expr = "CASE WHEN days_to_maturity IS NOT NULL THEN days_to_maturity/365.0" |
|
|
if has_months: |
|
|
t_expr += " WHEN months IS NOT NULL THEN months/12.0" |
|
|
t_expr += " ELSE NULL END" |
|
|
y_expr = "(Interest_rate/100.0)" if has_ir else "0.0" |
|
|
return f""" |
|
|
SELECT |
|
|
bucket, |
|
|
SUM(Portfolio_value) AS pv_sum, |
|
|
SUM(Portfolio_value * {t_expr}) / NULLIF(SUM(Portfolio_value),0) AS dur_mac, |
|
|
SUM(Portfolio_value * ({t_expr})/(1+({y_expr}))) / NULLIF(SUM(Portfolio_value),0) AS dur_mod |
|
|
FROM {VIEW_FQN} |
|
|
GROUP BY bucket; |
|
|
""" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def run_dashboard() -> Tuple[str, str, str, str, str, Any, pd.DataFrame, pd.DataFrame]: |
|
|
""" |
|
|
Returns: |
|
|
status, as_of, assets_t1, sof_t1, net_gap_t1, figure, ladder_df, irr_df |
|
|
(text KPIs to avoid component type errors) |
|
|
""" |
|
|
try: |
|
|
conn = connect_md() |
|
|
|
|
|
|
|
|
cols = discover_columns(conn, TABLE_FQN) |
|
|
ensure_view(conn, cols) |
|
|
|
|
|
|
|
|
as_of = "N/A" |
|
|
if "as_of_date" in cols: |
|
|
tmp = conn.execute(f"SELECT max(as_of_date) AS d FROM {VIEW_FQN}").fetchdf() |
|
|
if not tmp.empty and not pd.isna(tmp["d"].iloc[0]): |
|
|
as_of = str(tmp["d"].iloc[0])[:10] |
|
|
|
|
|
|
|
|
kpi = conn.execute(KPI_SQL).fetchdf() |
|
|
assets_t1 = safe_num(kpi["assets_t1"].iloc[0]) if not kpi.empty else 0.0 |
|
|
sof_t1 = safe_num(kpi["sof_t1"].iloc[0]) if not kpi.empty else 0.0 |
|
|
net_gap = safe_num(kpi["net_gap_t1"].iloc[0]) if not kpi.empty else 0.0 |
|
|
|
|
|
|
|
|
ladder = conn.execute(LADDER_SQL).fetchdf() |
|
|
irr = conn.execute(irr_sql(cols)).fetchdf() |
|
|
|
|
|
|
|
|
fig = plot_ladder(ladder) |
|
|
|
|
|
status = "β
OK" |
|
|
return ( |
|
|
status, |
|
|
as_of, |
|
|
f"{assets_t1:,.0f}", |
|
|
f"{sof_t1:,.0f}", |
|
|
f"{net_gap:,.0f}", |
|
|
fig, |
|
|
ladder, |
|
|
irr, |
|
|
) |
|
|
|
|
|
except Exception as e: |
|
|
tb = traceback.format_exc() |
|
|
empty_df = pd.DataFrame() |
|
|
fig = plot_ladder(empty_df) |
|
|
return ( |
|
|
f"β Error: {e}\n\n{tb}", |
|
|
"N/A", |
|
|
"0", |
|
|
"0", |
|
|
"0", |
|
|
fig, |
|
|
empty_df, |
|
|
empty_df, |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
with gr.Blocks(title=APP_TITLE) as demo: |
|
|
gr.Markdown(f"# {APP_TITLE}\n_Source:_ `{TABLE_FQN}` β `{VIEW_FQN}`") |
|
|
|
|
|
status = gr.Textbox(label="Status", interactive=False, lines=8) |
|
|
|
|
|
with gr.Row(): |
|
|
refresh_btn = gr.Button("π Refresh", variant="primary") |
|
|
|
|
|
with gr.Row(): |
|
|
as_of = gr.Textbox(label="As of date", interactive=False) |
|
|
|
|
|
with gr.Row(): |
|
|
a1 = gr.Textbox(label="Assets T+1 (LKR)", interactive=False) |
|
|
a2 = gr.Textbox(label="SoF T+1 (LKR)", interactive=False) |
|
|
a3 = gr.Textbox(label="Net Gap T+1 (LKR)", interactive=False) |
|
|
|
|
|
chart = gr.Plot(label="Maturity Ladder") |
|
|
ladder_df = gr.Dataframe(label="Ladder Detail") |
|
|
irr_df = gr.Dataframe(label="Interest-Rate Risk (approx)") |
|
|
|
|
|
refresh_btn.click( |
|
|
fn=run_dashboard, |
|
|
outputs=[status, as_of, a1, a2, a3, chart, ladder_df, irr_df], |
|
|
) |
|
|
|
|
|
if __name__ == "__main__": |
|
|
demo.launch() |
|
|
|