# ST_TOC — Total Organic Carbon Estimation Using AI
# Abbrev-only UI + model-order-safe predictions (bypass sklearn feature-name check)
import io, json, os, base64, math
from pathlib import Path
from datetime import datetime
import streamlit as st
import pandas as pd
import numpy as np
import joblib
# Matplotlib (preview + cross-plot)
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
from matplotlib.ticker import FuncFormatter
import plotly.graph_objects as go
from sklearn.metrics import mean_squared_error
# =========================
# Constants / Defaults
# =========================
APP_NAME = "ST_TOC"
TAGLINE = "Total Organic Carbon Estimation Using AI"
# UI feature list (abbreviations only)
FEATURES = ["AHT90", "DT", "GR", "K", "RHOB", "TNPH", "Th", "Ur"]
TARGET = "TOC"
PRED_COL = "TOC_Pred"
MODELS_DIR = Path("models")
DEFAULT_MODEL = MODELS_DIR / "toc_rf.joblib"
MODEL_FALLBACKS = [MODELS_DIR / "model.joblib", MODELS_DIR / "model.pkl"]
COLORS = {"pred": "#1f77b4", "actual": "#f2b702", "ref": "#5a5a5a"}
STRICT_VERSION_CHECK = False # optional env banner
# ---- Plot sizing ----
CROSS_W = 350
CROSS_H = 350
TRACK_H = 1000
TRACK_W = 600
FONT_SZ = 13
BOLD_FONT = "Arial Black, Arial, sans-serif"
# =========================
# Page / CSS
# =========================
st.set_page_config(page_title=APP_NAME, page_icon="logo.png", layout="wide")
st.markdown("""
""", unsafe_allow_html=True)
TABLE_CENTER_CSS = [
dict(selector="th", props=[("text-align", "center")]),
dict(selector="td", props=[("text-align", "center")]),
]
# =========================
# Password gate
# =========================
def inline_logo(path="logo.png") -> str:
try:
p = Path(path)
if not p.exists(): return ""
return f"data:image/png;base64,{base64.b64encode(p.read_bytes()).decode('ascii')}"
except Exception:
return ""
def add_password_gate() -> None:
try:
required = st.secrets.get("APP_PASSWORD", "")
except Exception:
required = os.environ.get("APP_PASSWORD", "")
if not required:
st.warning("Set APP_PASSWORD in Secrets (or environment) and restart.")
st.stop()
if st.session_state.get("auth_ok", False):
return
st.sidebar.markdown(f"""
{APP_NAME}
Smart Thinking • Secure Access
""", unsafe_allow_html=True
)
pwd = st.sidebar.text_input("Access key", type="password", placeholder="••••••••")
if st.sidebar.button("Unlock", type="primary"):
if pwd == required:
st.session_state.auth_ok = True
st.rerun()
else:
st.error("Incorrect key.")
st.stop()
add_password_gate()
# =========================
# Utilities
# =========================
def rmse(y_true, y_pred) -> float:
return float(np.sqrt(mean_squared_error(y_true, y_pred)))
def pearson_r(y_true, y_pred) -> float:
a = np.asarray(y_true, dtype=float)
p = np.asarray(y_pred, dtype=float)
if a.size < 2: return float("nan")
if np.all(a == a[0]) or np.all(p == p[0]): return float("nan")
return float(np.corrcoef(a, p)[0, 1])
def mape(y_true, y_pred, eps: float = 1e-8) -> float:
a = np.asarray(y_true, dtype=float)
p = np.asarray(y_pred, dtype=float)
denom = np.where(np.abs(a) < eps, np.nan, np.abs(a))
pct = np.abs(a - p) / denom * 100.0
val = np.nanmean(pct)
return float(val) if np.isfinite(val) else float("nan")
@st.cache_resource(show_spinner=False)
def load_model(model_path: str):
return joblib.load(model_path)
@st.cache_data(show_spinner=False)
def parse_excel(data_bytes: bytes):
bio = io.BytesIO(data_bytes)
xl = pd.ExcelFile(bio)
return {sh: xl.parse(sh) for sh in xl.sheet_names}
def read_book_bytes(b: bytes):
return parse_excel(b) if b else {}
# ---------- Header normalization (to abbreviations for UI) ----------
def _strip_parens(name: str) -> str:
s = str(name).strip()
if "(" in s and s.endswith(")"):
s = s.split("(", 1)[0].strip()
return s
def _abbr(name: str) -> str:
"""Turn any variant into the canonical abbreviation used in UI FEATURES."""
n = _strip_parens(name)
n = n.replace(" ", "").replace("_", "").replace("-", "")
alias = {
"AC": "DT",
"DTus/ft": "DT", "DTusft": "DT",
"NPHI": "TNPH", "TNPHPercent": "TNPH", "TNPH%": "TNPH",
"GammaRay": "GR", "GRAPI": "GR",
"BulkDensity": "RHOB", "RHOBgcc": "RHOB",
"Thorium": "Th", "TH": "Th",
"U": "Ur", "UR": "Ur", "Uranium": "Ur",
"KPercent": "K", "K%": "K", "Potassium": "K",
"AHT_90": "AHT90", "AHT90AverageHydrocarbonTool90°Phase": "AHT90",
}
if n.upper() in {"GR", "DT", "RHOB"}: return n.upper() if n.upper() != "DT" else "DT"
if n.upper() == "AHT90": return "AHT90"
if n.upper() == "TNPH": return "TNPH"
if n.capitalize() == "Th": return "Th"
if n.capitalize() == "Ur": return "Ur"
return alias.get(n, n)
def normalize_to_abbr(df: pd.DataFrame) -> pd.DataFrame:
out = df.copy()
newcols = []
for c in out.columns:
ac = _abbr(c)
if ac in FEATURES:
newcols.append(ac)
elif str(c).strip().lower() in {"toc", "toc (%)", "totalorganiccarbon"}:
newcols.append(TARGET)
elif "depth" in str(c).lower():
newcols.append("Depth")
else:
newcols.append(str(c))
out.columns = newcols
return out
# ---- Model feature order + X builder (returns NumPy to bypass name checks) ----
def _training_feature_order(model, fallback_features: list[str]) -> list[str]:
names = list(getattr(model, "feature_names_in_", []))
if names:
return [str(n) for n in names]
return list(fallback_features)
def _make_X(df_raw: pd.DataFrame, model, fallback_features: list[str]) -> np.ndarray:
df_abbr = normalize_to_abbr(df_raw)
colmap = { _abbr(c): c for c in df_abbr.columns }
train_names = _training_feature_order(model, fallback_features)
order_cols, missing = [], []
for nm in train_names:
ab = _abbr(nm)
if ab in colmap:
order_cols.append(colmap[ab])
else:
missing.append(nm)
if missing:
st.markdown(
'Missing required columns for prediction (by model training): '
+ ", ".join(missing) + '
', unsafe_allow_html=True
)
st.stop()
X_df = df_abbr[order_cols].apply(pd.to_numeric, errors="coerce")
return np.asarray(X_df.to_numpy(dtype=float, copy=False), dtype=float)
def ensure_required_features(df: pd.DataFrame, model, fallback_features: list[str]) -> bool:
df_abbr = normalize_to_abbr(df)
need = [_abbr(nm) for nm in _training_feature_order(model, fallback_features)]
have = {_abbr(c) for c in df_abbr.columns}
miss = [n for n in need if n not in have]
if miss:
st.error(f"Missing columns: {miss}\nFound: {sorted(list(have))}")
return False
return True
def safe_predict(model, df_raw: pd.DataFrame, fallback_features: list[str]) -> np.ndarray:
X = _make_X(df_raw, model, fallback_features)
try:
return model.predict(X)
except Exception:
return model.predict(np.asarray(X, dtype=float))
def find_sheet(book, names):
low2orig = {k.lower(): k for k in book.keys()}
for nm in names:
if nm.lower() in low2orig: return low2orig[nm.lower()]
return None
def _nice_tick0(xmin: float, step: int = 100) -> float:
return step * math.floor(xmin / step) if np.isfinite(xmin) else xmin
def df_centered_rounded(df: pd.DataFrame, hide_index=True):
out = df.copy()
numcols = out.select_dtypes(include=[np.number]).columns
styler = (
out.style
.format({c: "{:.2f}" for c in numcols})
.set_properties(**{"text-align": "center"})
.set_table_styles(TABLE_CENTER_CSS)
)
st.dataframe(styler, use_container_width=True, hide_index=hide_index)
# ---- Excel writer engine (robust to missing xlsxwriter) ----
def _excel_engine() -> str | None:
try:
import xlsxwriter # noqa: F401
return "xlsxwriter"
except Exception:
try:
import openpyxl # noqa: F401
return "openpyxl"
except Exception:
return None # let pandas choose if possible
# =========================
# Cross plot (Matplotlib)
# =========================
def cross_plot_static(actual, pred):
a = pd.Series(actual, dtype=float)
p = pd.Series(pred, dtype=float)
lo = float(min(a.min(), p.min()))
hi = float(max(a.max(), p.max()))
pad = 0.03 * (hi - lo if hi > lo else 1.0)
lo2, hi2 = lo - pad, hi + pad
ticks = np.linspace(lo2, hi2, 5)
dpi = 110
fig, ax = plt.subplots(figsize=(CROSS_W / dpi, CROSS_H / dpi), dpi=dpi, constrained_layout=False)
ax.scatter(a, p, s=14, c=COLORS["pred"], alpha=0.9, linewidths=0)
ax.plot([lo2, hi2], [lo2, hi2], linestyle="--", linewidth=1.2, color=COLORS["ref"])
ax.set_xlim(lo2, hi2); ax.set_ylim(lo2, hi2)
ax.set_xticks(ticks); ax.set_yticks(ticks)
ax.set_aspect("equal", adjustable="box")
fmt = FuncFormatter(lambda x, _: f"{x:,.1f}")
ax.xaxis.set_major_formatter(fmt); ax.yaxis.set_major_formatter(fmt)
ax.set_xlabel("Actual TOC (%)", fontweight="bold", fontsize=10, color="black")
ax.set_ylabel("Predicted TOC (%)", fontweight="bold", fontsize=10, color="black")
ax.tick_params(labelsize=6, colors="black")
ax.grid(True, linestyle=":", alpha=0.3)
for spine in ax.spines.values():
spine.set_linewidth(1.1); spine.set_color("#444")
fig.subplots_adjust(left=0.16, bottom=0.16, right=0.98, top=0.98)
return fig
# =========================
# Track plot (Plotly)
# =========================
def track_plot(df, include_actual=True):
df = normalize_to_abbr(df)
depth_col = next((c for c in df.columns if 'depth' in str(c).lower() or c == "Depth"), None)
if depth_col is not None:
y = pd.Series(df[depth_col]).astype(float); ylab = depth_col
y_range = [float(y.max()), float(y.min())]
else:
y = pd.Series(np.arange(1, len(df) + 1)); ylab = "Point Index"
y_range = [float(y.max()), float(y.min())]
x_series = pd.Series(df.get(PRED_COL, pd.Series(dtype=float))).astype(float)
if include_actual and TARGET in df.columns:
x_series = pd.concat([x_series, pd.Series(df[TARGET]).astype(float)], ignore_index=True)
x_lo, x_hi = float(x_series.min()), float(x_series.max())
x_pad = 0.03 * (x_hi - x_lo if x_hi > x_lo else 1.0)
xmin, xmax = x_lo - x_pad, x_hi + x_pad
tick0 = _nice_tick0(xmin, step=0.5)
fig = go.Figure()
if PRED_COL in df.columns:
fig.add_trace(go.Scatter(
x=df[PRED_COL], y=y, mode="lines",
line=dict(color=COLORS["pred"], width=1.8),
name=PRED_COL,
hovertemplate=f"{PRED_COL}: "+"%{x:.2f}
"+ylab+": %{y}"
))
if include_actual and TARGET in df.columns:
fig.add_trace(go.Scatter(
x=df[TARGET], y=y, mode="lines",
line=dict(color=COLORS["actual"], width=2.0, dash="dot"),
name=f"{TARGET} (actual)",
hovertemplate=f"{TARGET}: "+"%{x:.2f}
"+ylab+": %{y}"
))
fig.update_layout(
height=TRACK_H, width=TRACK_W, autosize=False,
paper_bgcolor="#fff", plot_bgcolor="#fff",
margin=dict(l=64, r=16, t=36, b=48), hovermode="closest",
font=dict(size=FONT_SZ, color="#000"),
legend=dict(x=0.98, y=0.05, xanchor="right", yanchor="bottom",
bgcolor="rgba(255,255,255,0.75)", bordercolor="#ccc", borderwidth=1),
legend_title_text=""
)
fig.update_xaxes(
title_text="TOC (%)",
title_font=dict(size=20, family=BOLD_FONT, color="#000"),
tickfont=dict(size=12, family=BOLD_FONT, color="#000"),
side="top", range=[xmin, xmax],
ticks="outside", tickformat=",.2f", tickmode="auto", tick0=tick0,
showline=True, linewidth=1.2, linecolor="#444", mirror=True,
showgrid=True, gridcolor="rgba(0,0,0,0.12)", automargin=True
)
fig.update_yaxes(
title_text=ylab,
title_font=dict(size=20, family=BOLD_FONT, color="#000"),
tickfont=dict(size=15, family=BOLD_FONT, color="#000"),
range=y_range, ticks="outside",
showline=True, linewidth=1.2, linecolor="#444", mirror=True,
showgrid=True, gridcolor="rgba(0,0,0,0.12)", automargin=True
)
return fig
# ---------- Preview tracks (Matplotlib) ----------
def preview_tracks(df: pd.DataFrame, cols: list[str]):
df = normalize_to_abbr(df)
cols = [c for c in cols if c in df.columns]
n = len(cols)
if n == 0:
fig, ax = plt.subplots(figsize=(4, 2))
ax.text(0.5, 0.5, "No selected columns", ha="center", va="center"); ax.axis("off")
return fig
depth_col = next((c for c in df.columns if 'depth' in str(c).lower() or c == "Depth"), None)
if depth_col is not None:
idx = pd.to_numeric(df[depth_col], errors="coerce")
y_label = depth_col
else:
idx = pd.Series(np.arange(1, len(df) + 1))
y_label = "Point Index"
cmap = plt.get_cmap("tab20")
col_colors = {col: cmap(i % cmap.N) for i, col in enumerate(cols)}
fig, axes = plt.subplots(1, n, figsize=(2.3 * n, 7.0), sharey=True, dpi=100)
if n == 1: axes = [axes]
y_min, y_max = float(idx.min()), float(idx.max())
for i, (ax, col) in enumerate(zip(axes, cols)):
x = pd.to_numeric(df[col], errors="coerce")
ax.plot(x, idx, '-', lw=1.8, color=col_colors[col])
ax.set_xlabel(col)
ax.xaxis.set_label_position('top'); ax.xaxis.tick_top()
ax.set_ylim(y_max, y_min)
ax.grid(True, linestyle=":", alpha=0.3)
if i == 0:
ax.set_ylabel(y_label)
else:
ax.tick_params(labelleft=False)
ax.set_ylabel("")
fig.tight_layout()
return fig
# =========================
# Load model + meta
# =========================
def ensure_model() -> Path|None:
for p in [DEFAULT_MODEL, *MODEL_FALLBACKS]:
if p.exists() and p.stat().st_size > 0: return p
url = os.environ.get("MODEL_URL", "")
if not url: return None
try:
import requests
DEFAULT_MODEL.parent.mkdir(parents=True, exist_ok=True)
with requests.get(url, stream=True, timeout=30) as r:
r.raise_for_status()
with open(DEFAULT_MODEL, "wb") as f:
for chunk in r.iter_content(1<<20):
if chunk: f.write(chunk)
return DEFAULT_MODEL
except Exception:
return None
mpath = ensure_model()
if not mpath:
st.error("Model not found. Upload models/toc_rf.joblib (or set MODEL_URL).")
st.stop()
try:
model = load_model(str(mpath))
except Exception as e:
st.error(f"Failed to load model: {e}")
st.stop()
# Optional meta to override defaults
meta = {}
meta_candidates = [MODELS_DIR / "toc_meta.json", MODELS_DIR / "meta.json"]
meta_path = next((p for p in meta_candidates if p.exists()), None)
if meta_path:
try:
meta = json.loads(meta_path.read_text(encoding="utf-8"))
FEATURES = meta.get("features", FEATURES)
TARGET = meta.get("target", TARGET)
PRED_COL = meta.get("pred_col", PRED_COL)
except Exception as e:
st.warning(f"Could not parse meta file ({meta_path.name}): {e}")
if STRICT_VERSION_CHECK and meta.get("versions"):
import numpy as _np, sklearn as _skl
mv = meta["versions"]; msg=[]
if mv.get("numpy") and mv["numpy"] != _np.__version__:
msg.append(f"NumPy {mv['numpy']} expected, running {_np.__version__}")
if mv.get("scikit_learn") and mv["scikit_learn"] != _skl.__version__:
msg.append(f"scikit-learn {mv['scikit_learn']} expected, running {_skl.__version__}")
if msg:
st.warning("Environment mismatch: " + " | ".join(msg))
# =========================
# Session state
# =========================
st.session_state.setdefault("app_step", "intro")
st.session_state.setdefault("results", {})
st.session_state.setdefault("train_ranges", None)
st.session_state.setdefault("dev_file_name","")
st.session_state.setdefault("dev_file_bytes",b"")
st.session_state.setdefault("dev_file_loaded",False)
st.session_state.setdefault("dev_preview",False)
st.session_state.setdefault("show_preview_modal", False)
# =========================
# Sidebar branding
# =========================
st.sidebar.markdown(f"""
{APP_NAME}
{TAGLINE}
""", unsafe_allow_html=True
)
def sticky_header(title, message):
st.markdown(
f"""
""",
unsafe_allow_html=True
)
# =========================
# INTRO
# =========================
if st.session_state.app_step == "intro":
st.header("Welcome!")
st.markdown("This software is developed by *Smart Thinking AI-Solutions Team* to estimate **Total Organic Carbon (TOC)** from logging data.")
st.subheader("How It Works")
st.markdown(
"1) **Upload your data to build the case and preview the model performance.** \n"
"2) Click **Run Model** to compute metrics and plots. \n"
"3) **Proceed to Validation** (with actual TOC) or **Proceed to Prediction** (no TOC)."
)
st.subheader("Input Features Used by the Model")
st.markdown("""
The TOC estimation model uses the following eight well-logging features:
- **AHT90 (Average Hydrocarbon Tool 90° Phase)**
- **DT (Delta-T Sonic Travel Time)**
- **GR (Gamma Ray)**
- **K (Potassium)**
- **RHOB (Bulk Density)**
- **TNPH (Thermal Neutron Porosity)**
- **Th (Thorium)**
- **Ur (Uranium)**
""")
if st.button("Start Showcase", type="primary"):
st.session_state.app_step = "dev"; st.rerun()
# =========================
# CASE BUILDING
# =========================
if st.session_state.app_step == "dev":
st.sidebar.header("Case Building")
up = st.sidebar.file_uploader("Upload Your Data File", type=["xlsx","xls"])
if up is not None:
st.session_state.dev_file_bytes = up.getvalue()
st.session_state.dev_file_name = up.name
st.session_state.dev_file_loaded = True
st.session_state.dev_preview = False
if st.session_state.dev_file_loaded:
tmp = read_book_bytes(st.session_state.dev_file_bytes)
if tmp:
df0 = next(iter(tmp.values()))
st.sidebar.caption(f"**Data loaded:** {st.session_state.dev_file_name} • {df0.shape[0]} rows × {df0.shape[1]} cols")
if st.sidebar.button("Preview data", use_container_width=True, disabled=not st.session_state.dev_file_loaded):
st.session_state.show_preview_modal = True
st.session_state.dev_preview = True
run = st.sidebar.button("Run Model", type="primary", use_container_width=True)
if st.sidebar.button("Proceed to Validation ▶", use_container_width=True): st.session_state.app_step="validate"; st.rerun()
if st.sidebar.button("Proceed to Prediction ▶", use_container_width=True): st.session_state.app_step="predict"; st.rerun()
if st.session_state.dev_file_loaded and st.session_state.dev_preview:
sticky_header("Case Building", "Previewed ✓ — now click **Run Model**.")
elif st.session_state.dev_file_loaded:
sticky_header("Case Building", "📄 **Preview uploaded data** using the sidebar button, then click **Run Model**.")
else:
sticky_header("Case Building", "**Upload your data to build a case, then run the model to review development performance.**")
if run and st.session_state.dev_file_bytes:
book = read_book_bytes(st.session_state.dev_file_bytes)
sh_train = find_sheet(book, ["Train","Training","training2","train","training"])
sh_test = find_sheet(book, ["Test","Testing","testing2","test","testing"])
if sh_train is None or sh_test is None:
st.markdown('Workbook must include Train/Training/training2 and Test/Testing/testing2 sheets.
', unsafe_allow_html=True)
st.stop()
tr_raw = book[sh_train].copy()
te_raw = book[sh_test].copy()
if not (ensure_required_features(tr_raw, model, FEATURES) and ensure_required_features(te_raw, model, FEATURES)):
st.stop()
tr = normalize_to_abbr(tr_raw)
te = normalize_to_abbr(te_raw)
tr[PRED_COL] = safe_predict(model, tr_raw, FEATURES)
te[PRED_COL] = safe_predict(model, te_raw, FEATURES)
st.session_state.results["Train"]=tr; st.session_state.results["Test"]=te
st.session_state.results["m_train"]={
"R": pearson_r(tr[TARGET], tr[PRED_COL]),
"RMSE": rmse(tr[TARGET], tr[PRED_COL]),
"MAPE": mape(tr[TARGET], tr[PRED_COL])
}
st.session_state.results["m_test"]={
"R": pearson_r(te[TARGET], te[PRED_COL]),
"RMSE": rmse(te[TARGET], te[PRED_COL]),
"MAPE": mape(te[TARGET], te[PRED_COL])
}
tr_min = tr[FEATURES].min().to_dict(); tr_max = tr[FEATURES].max().to_dict()
st.session_state.train_ranges = {f:(float(tr_min[f]), float(tr_max[f])) for f in FEATURES}
st.markdown('Case has been built and results are displayed below.
', unsafe_allow_html=True)
def _dev_block(df, m):
c1,c2,c3 = st.columns(3)
c1.metric("R", f"{m['R']:.3f}")
c2.metric("RMSE", f"{m['RMSE']:.2f}")
c3.metric("MAPE (%)", f"{m['MAPE']:.2f}%")
st.markdown("""
R: Pearson Correlation Coefficient
RMSE: Root Mean Square Error
MAPE: Mean Absolute Percentage Error (percent of actual; rows with near-zero actuals are ignored).
""", unsafe_allow_html=True)
col_track, col_cross = st.columns([2, 3], gap="large")
with col_track:
st.plotly_chart(track_plot(df, include_actual=True), use_container_width=False, config={"displayModeBar": False, "scrollZoom": True})
with col_cross:
st.pyplot(cross_plot_static(df[TARGET], df[PRED_COL]), use_container_width=False)
if "Train" in st.session_state.results or "Test" in st.session_state.results:
tab1, tab2 = st.tabs(["Training", "Testing"])
if "Train" in st.session_state.results:
with tab1: _dev_block(st.session_state.results["Train"], st.session_state.results["m_train"])
if "Test" in st.session_state.results:
with tab2: _dev_block(st.session_state.results["Test"], st.session_state.results["m_test"])
st.divider()
st.markdown("### Export to Excel")
def _excel_safe_name(name: str) -> str:
bad = '[]:*?/\\'
safe = ''.join('_' if ch in bad else ch for ch in str(name))
return safe[:31]
def _round_numeric(df: pd.DataFrame, ndigits: int = 2) -> pd.DataFrame:
out = df.copy()
for c in out.columns:
if pd.api.types.is_float_dtype(out[c]) or pd.api.types.is_integer_dtype(out[c]):
out[c] = pd.to_numeric(out[c], errors="coerce").round(ndigits)
return out
def _summary_table(df: pd.DataFrame, cols: list[str]) -> pd.DataFrame:
cols = [c for c in cols if c in df.columns]
if not cols:
return pd.DataFrame()
tbl = (df[cols]
.agg(['min','max','mean','std'])
.T.rename(columns={"min":"Min","max":"Max","mean":"Mean","std":"Std"})
.reset_index(names="Field"))
return _round_numeric(tbl)
def _train_ranges_df(ranges: dict[str, tuple[float, float]]) -> pd.DataFrame:
if not ranges:
return pd.DataFrame()
df = pd.DataFrame(ranges).T.reset_index()
df.columns = ["Feature", "Min", "Max"]
return _round_numeric(df)
def _available_sections() -> list[str]:
res = st.session_state.get("results", {})
sections = []
if "Train" in res: sections += ["Training","Training_Metrics","Training_Summary"]
if "Test" in res: sections += ["Testing","Testing_Metrics","Testing_Summary"]
if "Validate" in res: sections += ["Validation","Validation_Metrics","Validation_Summary","Validation_OOR"]
if "PredictOnly" in res: sections += ["Prediction","Prediction_Summary"]
if st.session_state.get("train_ranges"): sections += ["Training_Ranges"]
sections += ["Info"]
return sections
def build_export_workbook(selected: list[str] | None = None) -> tuple[bytes|None, str|None, list[str]]:
res = st.session_state.get("results", {})
if not res: return None, None, []
sheets: dict[str, pd.DataFrame] = {}
order: list[str] = []
if ("Training" in (selected or _available_sections())) and "Train" in res:
tr = _round_numeric(res["Train"]); sheets["Training"] = tr; order.append("Training")
m = st.session_state.get("results", {}).get("m_train", {})
if m: sheets["Training_Metrics"] = _round_numeric(pd.DataFrame([m])); order.append("Training_Metrics")
s = _summary_table(tr, FEATURES + [c for c in [TARGET, PRED_COL] if c in tr.columns])
if not s.empty: sheets["Training_Summary"] = s; order.append("Training_Summary")
if ("Testing" in (selected or _available_sections())) and "Test" in res:
te = _round_numeric(res["Test"]); sheets["Testing"] = te; order.append("Testing")
m = st.session_state.get("results", {}).get("m_test", {})
if m: sheets["Testing_Metrics"] = _round_numeric(pd.DataFrame([m])); order.append("Testing_Metrics")
s = _summary_table(te, FEATURES + [c for c in [TARGET, PRED_COL] if c in te.columns])
if not s.empty: sheets["Testing_Summary"] = s; order.append("Testing_Summary")
if ("Validation" in (selected or _available_sections())) and "Validate" in res:
va = _round_numeric(res["Validate"]); sheets["Validation"] = va; order.append("Validation")
m = st.session_state.get("results", {}).get("m_val", {})
if m: sheets["Validation_Metrics"] = _round_numeric(pd.DataFrame([m])); order.append("Validation_Metrics")
sv = st.session_state.get("results", {}).get("sv_val", {})
if sv: sheets["Validation_Summary"] = _round_numeric(pd.DataFrame([sv])); order.append("Validation_Summary")
oor_tbl = st.session_state.get("results", {}).get("oor_tbl")
if isinstance(oor_tbl, pd.DataFrame) and not oor_tbl.empty:
sheets["Validation_OOR"] = _round_numeric(oor_tbl.reset_index(drop=True)); order.append("Validation_OOR")
if ("Prediction" in (selected or _available_sections())) and "PredictOnly" in res:
pr = _round_numeric(res["PredictOnly"]); sheets["Prediction"] = pr; order.append("Prediction")
sv = st.session_state.get("results", {}).get("sv_pred", {})
if sv: sheets["Prediction_Summary"] = _round_numeric(pd.DataFrame([sv])); order.append("Prediction_Summary")
tr_ranges = st.session_state.get("train_ranges")
if ("Training_Ranges" in (selected or _available_sections())) and tr_ranges:
rr = _train_ranges_df(tr_ranges)
if not rr.empty: sheets["Training_Ranges"] = rr; order.append("Training_Ranges")
info = pd.DataFrame([
{"Key": "AppName", "Value": APP_NAME},
{"Key": "Tagline", "Value": TAGLINE},
{"Key": "Target", "Value": TARGET},
{"Key": "PredColumn", "Value": PRED_COL},
{"Key": "Features", "Value": ", ".join(FEATURES)},
{"Key": "ExportedAt", "Value": datetime.now().strftime("%Y-%m-%d %H:%M:%S")},
])
sheets["Info"] = info; order.append("Info")
bio = io.BytesIO()
engine = _excel_engine()
with pd.ExcelWriter(bio, engine=engine) as writer:
for name in order:
sheets[name].to_excel(writer, sheet_name=_excel_safe_name(name), index=False)
bio.seek(0)
fname = f"TOC_Export_{datetime.now().strftime('%Y%m%d_%H%M%S')}.xlsx"
return bio.getvalue(), fname, order
options = _available_sections()
selected_sheets = st.multiselect(
"Sheets to include",
options=options, default=[],
placeholder="Choose option(s)",
help="Pick the sheets you want to include in the Excel export.",
key="sheets_dev",
)
if not selected_sheets:
st.caption("Select one or more sheets above to enable the export.")
st.download_button("⬇️ Export Excel", data=b"", file_name="TOC_Export.xlsx",
mime="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
disabled=True, key="download_dev_disabled")
else:
data, fname, names = build_export_workbook(selected=selected_sheets)
if names: st.caption("Will include: " + ", ".join(names))
st.download_button("⬇️ Export Excel", data=(data or b""), file_name=(fname or "TOC_Export.xlsx"),
mime="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
disabled=(data is None), key="download_dev")
# =========================
# VALIDATION (with actual TOC)
# =========================
if st.session_state.app_step == "validate":
st.sidebar.header("Validate the Model")
up = st.sidebar.file_uploader("Upload Validation Excel", type=["xlsx","xls"])
if up is not None:
book = read_book_bytes(up.getvalue())
if book:
df0 = next(iter(book.values()))
st.sidebar.caption(f"**Data loaded:** {up.name} • {df0.shape[0]} rows × {df0.shape[1]} cols")
if st.sidebar.button("Preview data", use_container_width=True, disabled=(up is None)):
st.session_state.show_preview_modal = True
go_btn = st.sidebar.button("Predict & Validate", type="primary", use_container_width=True)
if st.sidebar.button("⬅ Back to Case Building", use_container_width=True): st.session_state.app_step="dev"; st.rerun()
if st.sidebar.button("Proceed to Prediction ▶", use_container_width=True): st.session_state.app_step="predict"; st.rerun()
sticky_header("Validate the Model", "Upload a dataset with the same **features** and **TOC** to evaluate performance.")
if go_btn and up is not None:
book = read_book_bytes(up.getvalue())
name = find_sheet(book, ["Validation","Validate","validation2","Val","val"]) or list(book.keys())[0]
df_raw = book[name].copy()
if not ensure_required_features(df_raw, model, FEATURES):
st.stop()
df = normalize_to_abbr(df_raw)
df[PRED_COL] = safe_predict(model, df_raw, FEATURES)
st.session_state.results["Validate"]=df
ranges = st.session_state.train_ranges; oor_pct = 0.0; tbl=None
if ranges:
any_viol = pd.DataFrame({f:(df[f]ranges[f][1]) for f in FEATURES if f in df.columns}).any(axis=1)
oor_pct = float(any_viol.mean()*100.0)
if any_viol.any():
tbl = df.loc[any_viol, [c for c in FEATURES if c in df.columns]].copy()
for c in [c for c in FEATURES if c in tbl.columns]:
if pd.api.types.is_numeric_dtype(tbl[c]): tbl[c] = tbl[c].round(2)
tbl["Violations"] = pd.DataFrame({f:(df[f]ranges[f][1]) for f in FEATURES if f in df.columns}).loc[any_viol].apply(
lambda r:", ".join([c for c,v in r.items() if v]), axis=1
)
st.session_state.results["m_val"]={
"R": pearson_r(df[TARGET], df[PRED_COL]),
"RMSE": rmse(df[TARGET], df[PRED_COL]),
"MAPE": mape(df[TARGET], df[PRED_COL])
}
st.session_state.results["sv_val"]={"n":len(df), "pred_min":float(df[PRED_COL].min()), "pred_max":float(df[PRED_COL].max()), "oor":oor_pct}
st.session_state.results["oor_tbl"]=tbl
if "Validate" in st.session_state.results:
m = st.session_state.results["m_val"]
c1,c2,c3 = st.columns(3)
c1.metric("R", f"{m['R']:.3f}")
c2.metric("RMSE", f"{m['RMSE']:.2f}")
c3.metric("MAPE (%)", f"{m['MAPE']:.2f}%")
st.markdown("""
R: Pearson Correlation Coefficient
RMSE: Root Mean Square Error
MAPE: Mean Absolute Percentage Error (percent of actual; rows with near-zero actuals are ignored).
""", unsafe_allow_html=True)
col_track, col_cross = st.columns([2, 3], gap="large")
with col_track:
st.plotly_chart(track_plot(st.session_state.results["Validate"], include_actual=True),
use_container_width=False, config={"displayModeBar": False, "scrollZoom": True})
with col_cross:
st.pyplot(cross_plot_static(st.session_state.results["Validate"][TARGET],
st.session_state.results["Validate"][PRED_COL]),
use_container_width=False)
st.divider()
st.markdown("### Export to Excel")
def _export_val():
res = st.session_state.get("results", {})
sheets = {}
sheets["Validation"] = res["Validate"]
sheets["Validation_Metrics"] = pd.DataFrame([res.get("m_val", {})])
if "sv_val" in res: sheets["Validation_Summary"] = pd.DataFrame([res["sv_val"]])
if isinstance(res.get("oor_tbl"), pd.DataFrame) and not res["oor_tbl"].empty:
sheets["Validation_OOR"] = res["oor_tbl"].reset_index(drop=True)
sheets["Info"] = pd.DataFrame([
{"Key":"AppName","Value":APP_NAME},
{"Key":"Target","Value":TARGET},
{"Key":"PredColumn","Value":PRED_COL},
{"Key":"ExportedAt","Value":datetime.now().strftime("%Y-%m-%d %H:%M:%S")},
])
bio = io.BytesIO()
engine = _excel_engine()
with pd.ExcelWriter(bio, engine=engine) as writer:
for k,v in sheets.items():
v.to_excel(writer, sheet_name=k[:31], index=False)
bio.seek(0)
return bio.getvalue(), f"TOC_Validation_{datetime.now().strftime('%Y%m%d_%H%M%S')}.xlsx"
data_x, fn_x = _export_val()
st.download_button("⬇️ Export Excel", data=data_x, file_name=fn_x,
mime="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet")
sv = st.session_state.results["sv_val"]
if sv["oor"] > 0: st.markdown('Some inputs fall outside **training min–max** ranges.
', unsafe_allow_html=True)
if st.session_state.results["oor_tbl"] is not None:
st.write("*Out-of-range rows (vs. Training min–max):*")
df_centered_rounded(st.session_state.results["oor_tbl"])
# =========================
# PREDICTION (no actual TOC)
# =========================
if st.session_state.app_step == "predict":
st.sidebar.header("Prediction (No Actual TOC)")
up = st.sidebar.file_uploader("Upload Prediction Excel", type=["xlsx","xls"])
if up is not None:
book = read_book_bytes(up.getvalue())
if book:
df0 = next(iter(book.values()))
st.sidebar.caption(f"**Data loaded:** {up.name} • {df0.shape[0]} rows × {df0.shape[1]} cols")
if st.sidebar.button("Preview data", use_container_width=True, disabled=(up is None)):
st.session_state.show_preview_modal = True
go_btn = st.sidebar.button("Predict", type="primary", use_container_width=True)
if st.sidebar.button("⬅ Back to Case Building", use_container_width=True): st.session_state.app_step="dev"; st.rerun()
sticky_header("Prediction", "Upload a dataset with the feature columns (no **TOC**).")
if go_btn and up is not None:
book = read_book_bytes(up.getvalue()); name = list(book.keys())[0]
df_raw = book[name].copy()
if not ensure_required_features(df_raw, model, FEATURES):
st.stop()
df = normalize_to_abbr(df_raw)
df[PRED_COL] = safe_predict(model, df_raw, FEATURES)
st.session_state.results["PredictOnly"]=df
ranges = st.session_state.train_ranges; oor_pct = 0.0
if ranges:
any_viol = pd.DataFrame({f:(df[f]ranges[f][1]) for f in FEATURES if f in df.columns}).any(axis=1)
oor_pct = float(any_viol.mean()*100.0)
st.session_state.results["sv_pred"]={
"n":len(df),
"pred_min":float(df[PRED_COL].min()),
"pred_max":float(df[PRED_COL].max()),
"pred_mean":float(df[PRED_COL].mean()),
"pred_std":float(df[PRED_COL].std(ddof=0)),
"oor":oor_pct
}
if "PredictOnly" in st.session_state.results:
df = st.session_state.results["PredictOnly"]; sv = st.session_state.results["sv_pred"]
col_left, col_right = st.columns([2,3], gap="large")
with col_left:
table = pd.DataFrame({
"Metric": ["# points","Pred min","Pred max","Pred mean","Pred std","OOR %"],
"Value": [sv["n"], round(sv["pred_min"],2), round(sv["pred_max"],2),
round(sv["pred_mean"],2), round(sv["pred_std"],2), f'{sv["oor"]:.1f}%']
})
st.markdown('Predictions ready ✓
', unsafe_allow_html=True)
df_centered_rounded(table, hide_index=True)
st.caption("**★ OOR** = % of rows whose input features fall outside the training min–max range.")
with col_right:
st.plotly_chart(track_plot(df, include_actual=False),
use_container_width=False, config={"displayModeBar": False, "scrollZoom": True})
st.divider()
def _export_pred():
res = st.session_state.get("results", {})
sheets = {"Prediction": res["PredictOnly"], "Prediction_Summary": pd.DataFrame([sv])}
sheets["Info"] = pd.DataFrame([
{"Key":"AppName","Value":APP_NAME},
{"Key":"Target","Value":TARGET},
{"Key":"PredColumn","Value":PRED_COL},
{"Key":"ExportedAt","Value":datetime.now().strftime("%Y-%m-%d %H:%M:%S")},
])
bio = io.BytesIO()
engine = _excel_engine()
with pd.ExcelWriter(bio, engine=engine) as writer:
for k,v in sheets.items():
v.to_excel(writer, sheet_name=k[:31], index=False)
bio.seek(0)
return bio.getvalue(), f"TOC_Prediction_{datetime.now().strftime('%Y%m%d_%H%M%S')}.xlsx"
data_x, fn_x = _export_pred()
st.download_button("⬇️ Export Excel", data=data_x, file_name=fn_x,
mime="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet")
# =========================
# Preview modal
# =========================
if st.session_state.show_preview_modal:
book_to_preview = {}
if st.session_state.app_step == "dev":
book_to_preview = read_book_bytes(st.session_state.dev_file_bytes)
elif st.session_state.app_step in ["validate", "predict"] and 'up' in locals() and up is not None:
book_to_preview = read_book_bytes(up.getvalue())
with st.expander("Preview data", expanded=True):
if not book_to_preview:
st.markdown('No data loaded yet.
', unsafe_allow_html=True)
else:
names = list(book_to_preview.keys())
tabs = st.tabs(names)
for t, name in zip(tabs, names):
with t:
df = normalize_to_abbr(book_to_preview[name])
t1, t2 = st.tabs(["Tracks", "Summary"])
with t1:
st.pyplot(preview_tracks(df, FEATURES), use_container_width=True)
with t2:
feat_present = [c for c in FEATURES if c in df.columns]
if not feat_present:
st.info("No feature columns found to summarize.")
else:
tbl = (
df[feat_present]
.agg(['min','max','mean','std'])
.T.rename(columns={"min":"Min","max":"Max","mean":"Mean","std":"Std"})
.reset_index(names="Feature")
)
df_centered_rounded(tbl)
st.session_state.show_preview_modal = False
# =========================
# Footer
# =========================
st.markdown("""
""", unsafe_allow_html=True)