# app.py — ST_Sonic_Ts (Shear Slowness Ts)
import io, json, os, base64, math
from pathlib import Path
import streamlit as st
import pandas as pd
import numpy as np
import joblib
from datetime import datetime
# Matplotlib (static plots)
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
from matplotlib.ticker import FuncFormatter
import plotly.graph_objects as go
from sklearn.metrics import mean_squared_error
# =========================
# Constants (Ts variant)
# =========================
APP_NAME = "ST_Log_Sonic (Ts)"
TAGLINE = "Real-Time Shear Slowness (Ts) Prediction"
# Defaults (overridden by ts_meta.json if present)
FEATURES = [
"WOB (klbf)",
"Torque (kft.lbf)",
"SPP (psi)",
"RPM (1/min)",
"ROP (ft/h)",
"Flow Rate (gpm)",
]
TARGET = "Ts (us/ft_Actual)"
PRED_COL = "Ts_Pred"
MODELS_DIR = Path("models")
DEFAULT_MODEL = MODELS_DIR / "ts_model.joblib"
MODEL_FALLBACKS = [MODELS_DIR / "model.joblib", MODELS_DIR / "model.pkl"]
COLORS = {"pred": "#1f77b4", "actual": "#f2b702", "ref": "#5a5a5a"}
# Optional env banner from meta
STRICT_VERSION_CHECK = False
# ---- Plot sizing ----
CROSS_W = 350
CROSS_H = 350
TRACK_H = 1000
TRACK_W = 500
FONT_SZ = 13
BOLD_FONT = "Arial Black, Arial, sans-serif"
# =========================
# Page / CSS
# =========================
st.set_page_config(page_title=APP_NAME, page_icon="logo.png", layout="wide")
st.markdown("""
""", unsafe_allow_html=True)
TABLE_CENTER_CSS = [
dict(selector="th", props=[("text-align", "center")]),
dict(selector="td", props=[("text-align", "center")]),
]
# =========================
# Password gate
# =========================
def inline_logo(path="logo.png") -> str:
try:
p = Path(path)
if not p.exists(): return ""
return f"data:image/png;base64,{base64.b64encode(p.read_bytes()).decode('ascii')}"
except Exception:
return ""
def add_password_gate() -> None:
try:
required = st.secrets.get("APP_PASSWORD", "")
except Exception:
required = os.environ.get("APP_PASSWORD", "")
if not required:
st.warning("Set APP_PASSWORD in Secrets (or environment) and restart.")
st.stop()
if st.session_state.get("auth_ok", False):
return
st.sidebar.markdown(f"""
{APP_NAME}
Smart Thinking • Secure Access
""", unsafe_allow_html=True
)
pwd = st.sidebar.text_input("Access key", type="password", placeholder="••••••••")
if st.sidebar.button("Unlock", type="primary"):
if pwd == required:
st.session_state.auth_ok = True
st.rerun()
else:
st.error("Incorrect key.")
st.stop()
add_password_gate()
# =========================
# Utilities
# =========================
def rmse(y_true, y_pred) -> float:
return float(np.sqrt(mean_squared_error(y_true, y_pred)))
def pearson_r(y_true, y_pred) -> float:
a = np.asarray(y_true, dtype=float)
p = np.asarray(y_pred, dtype=float)
if a.size < 2: return float("nan")
if np.all(a == a[0]) or np.all(p == p[0]): return float("nan")
return float(np.corrcoef(a, p)[0, 1])
def mape(y_true, y_pred, eps: float = 1e-8) -> float:
"""Mean Absolute Percentage Error in percent; ignores near-zero actuals."""
a = np.asarray(y_true, dtype=float)
p = np.asarray(y_pred, dtype=float)
denom = np.where(np.abs(a) < eps, np.nan, np.abs(a))
return float(np.nanmean(np.abs(a - p) / denom) * 100.0)
@st.cache_resource(show_spinner=False)
def load_model(model_path: str):
return joblib.load(model_path)
@st.cache_data(show_spinner=False)
def parse_excel(data_bytes: bytes):
bio = io.BytesIO(data_bytes)
xl = pd.ExcelFile(bio)
return {sh: xl.parse(sh) for sh in xl.sheet_names}
def read_book_bytes(b: bytes):
return parse_excel(b) if b else {}
# ---- Canonical feature aliasing ------------------------------------------
def _build_alias_map(canonical_features: list[str], target_name: str) -> dict:
"""
Map common header variants -> the *canonical* names in canonical_features.
Whatever appears in canonical_features (from ts_meta.json) wins.
"""
def pick(expected_list, variants):
for v in variants:
if v in expected_list:
return v
return variants[0]
can_WOB = pick(canonical_features, ["WOB (klbf)", "WOB, klbf", "WOB(klbf)", "WOB( klbf)"])
can_TORQUE = pick(canonical_features, ["Torque (kft.lbf)", "Torque(kft.lbf)", "TORQUE(kft.lbf)"])
can_SPP = pick(canonical_features, ["SPP (psi)", "SPP(psi)"])
can_RPM = pick(canonical_features, ["RPM (1/min)", "RPM(1/min)"])
can_ROP = pick(canonical_features, ["ROP (ft/h)", "ROP(ft/h)"])
can_FR = pick(canonical_features, ["Flow Rate (gpm)", "Flow Rate, gpm", "Flow Rate,gpm", "Flow Rate , gpm", "Fow Rate, gpm", "Fow Rate, gpm "])
can_DEPTH = "Depth (ft)"
alias = {
# Features
"WOB (klbf)": can_WOB, "WOB, klbf": can_WOB, "WOB(klbf)": can_WOB, "WOB( klbf)": can_WOB,
"Torque (kft.lbf)": can_TORQUE, "Torque(kft.lbf)": can_TORQUE, "TORQUE(kft.lbf)": can_TORQUE,
"SPP (psi)": can_SPP, "SPP(psi)": can_SPP,
"RPM (1/min)": can_RPM, "RPM(1/min)": can_RPM,
"ROP (ft/h)": can_ROP, "ROP(ft/h)": can_ROP,
"Flow Rate (gpm)": can_FR, "Flow Rate, gpm": can_FR, "Flow Rate,gpm": can_FR, "Flow Rate , gpm": can_FR,
"Fow Rate, gpm": can_FR, "Fow Rate, gpm ": can_FR,
# Depth (plot only)
"Depth (ft)": can_DEPTH, "Depth, ft": can_DEPTH, "Depth(ft)": can_DEPTH, "DEPTH, ft": can_DEPTH,
# Target family
"Ts (us/ft_Actual)": target_name,
"Ts,us/ft_Actual": target_name,
"Ts, us/ft_Actual": target_name,
"Ts": target_name,
"TS_Actual": target_name,
"Ts (us/ft)_Actual": target_name,
}
return alias
def _normalize_columns(df: pd.DataFrame, canonical_features: list[str], target_name: str) -> pd.DataFrame:
out = df.copy()
out.columns = [str(c).strip().replace(" ,", ",").replace(", ", ", ").replace(" ", " ") for c in out.columns]
alias = _build_alias_map(canonical_features, target_name)
actual = {k: v for k, v in alias.items() if k in out.columns and k != v}
return out.rename(columns=actual)
def ensure_cols(df: pd.DataFrame, cols: list[str]) -> bool:
miss = [c for c in cols if c not in df.columns]
if miss:
st.error(f"Missing columns: {miss}\nFound: {list(df.columns)}")
return False
return True
def find_sheet(book, names):
low2orig = {k.lower(): k for k in book.keys()}
for nm in names:
if nm.lower() in low2orig: return low2orig[nm.lower()]
return None
def _nice_tick0(xmin: float, step: float = 0.1) -> float:
return step * math.floor(xmin / step) if np.isfinite(xmin) else xmin
def df_centered_rounded(df: pd.DataFrame, hide_index=True):
out = df.copy()
numcols = out.select_dtypes(include=[np.number]).columns
styler = (
out.style
.format({c: "{:.2f}" for c in numcols})
.set_properties(**{"text-align": "center"})
.set_table_styles(TABLE_CENTER_CSS)
)
st.dataframe(styler, use_container_width=True, hide_index=hide_index)
# ---------- Build X exactly as trained ----------
def _make_X(df: pd.DataFrame, features: list[str]) -> pd.DataFrame:
"""
Reindex columns to the exact training feature order and coerce to numeric.
Prevents scikit-learn 'feature names should match' errors.
"""
X = df.reindex(columns=features, copy=False)
for c in X.columns:
X[c] = pd.to_numeric(X[c], errors="coerce")
return X
# === Excel export helpers =================================================
def _excel_engine() -> str:
try:
import xlsxwriter # noqa: F401
return "xlsxwriter"
except Exception:
return "openpyxl"
def _excel_safe_name(name: str) -> str:
bad = '[]:*?/\\'
safe = ''.join('_' if ch in bad else ch for ch in str(name))
return safe[:31]
def _round_numeric(df: pd.DataFrame, ndigits: int = 3) -> pd.DataFrame:
out = df.copy()
for c in out.columns:
if pd.api.types.is_float_dtype(out[c]) or pd.api.types.is_integer_dtype(out[c]):
out[c] = pd.to_numeric(out[c], errors="coerce").round(ndigits)
return out
def _summary_table(df: pd.DataFrame, cols: list[str]) -> pd.DataFrame:
cols = [c for c in cols if c in df.columns]
if not cols:
return pd.DataFrame()
tbl = (df[cols]
.agg(['min','max','mean','std'])
.T.rename(columns={"min":"Min","max":"Max","mean":"Mean","std":"Std"})
.reset_index(names="Field"))
return _round_numeric(tbl, 3)
def _train_ranges_df(ranges: dict[str, tuple[float, float]]) -> pd.DataFrame:
if not ranges:
return pd.DataFrame()
df = pd.DataFrame(ranges).T.reset_index()
df.columns = ["Feature", "Min", "Max"]
return _round_numeric(df, 3)
def _excel_autofit(writer, sheet_name: str, df: pd.DataFrame, min_w: int = 8, max_w: int = 40):
try:
import xlsxwriter # noqa: F401
except Exception:
return
ws = writer.sheets[sheet_name]
for i, col in enumerate(df.columns):
series = df[col].astype(str)
max_len = max([len(str(col))] + series.map(len).tolist())
ws.set_column(i, i, max(min_w, min(max_len + 2, max_w)))
ws.freeze_panes(1, 0)
def _add_sheet(sheets: dict, order: list, name: str, df: pd.DataFrame, ndigits: int):
if df is None or df.empty: return
sheets[name] = _round_numeric(df, ndigits)
order.append(name)
def _available_sections() -> list[str]:
res = st.session_state.get("results", {})
sections = []
if "Train" in res: sections += ["Training","Training_Metrics","Training_Summary"]
if "Test" in res: sections += ["Testing","Testing_Metrics","Testing_Summary"]
if "Validate" in res: sections += ["Validation","Validation_Metrics","Validation_Summary","Validation_OOR"]
if "PredictOnly" in res: sections += ["Prediction","Prediction_Summary"]
if st.session_state.get("train_ranges"): sections += ["Training_Ranges"]
sections += ["Info"]
return sections
def build_export_workbook(selected: list[str], ndigits: int = 3, do_autofit: bool = True) -> tuple[bytes|None, str|None, list[str]]:
res = st.session_state.get("results", {})
if not res: return None, None, []
sheets: dict[str, pd.DataFrame] = {}
order: list[str] = []
if "Training" in selected and "Train" in res:
_add_sheet(sheets, order, "Training", res["Train"], ndigits)
if "Training_Metrics" in selected and res.get("m_train"):
_add_sheet(sheets, order, "Training_Metrics", pd.DataFrame([res["m_train"]]), ndigits)
if "Training_Summary" in selected and "Train" in res:
tr_cols = FEATURES + [c for c in [TARGET, PRED_COL] if c in res["Train"].columns]
_add_sheet(sheets, order, "Training_Summary", _summary_table(res["Train"], tr_cols), ndigits)
if "Testing" in selected and "Test" in res:
_add_sheet(sheets, order, "Testing", res["Test"], ndigits)
if "Testing_Metrics" in selected and res.get("m_test"):
_add_sheet(sheets, order, "Testing_Metrics", pd.DataFrame([res["m_test"]]), ndigits)
if "Testing_Summary" in selected and "Test" in res:
te_cols = FEATURES + [c for c in [TARGET, PRED_COL] if c in res["Test"].columns]
_add_sheet(sheets, order, "Testing_Summary", _summary_table(res["Test"], te_cols), ndigits)
if "Validation" in selected and "Validate" in res:
_add_sheet(sheets, order, "Validation", res["Validate"], ndigits)
if "Validation_Metrics" in selected and res.get("m_val"):
_add_sheet(sheets, order, "Validation_Metrics", pd.DataFrame([res["m_val"]]), ndigits)
if "Validation_Summary" in selected and res.get("sv_val"):
_add_sheet(sheets, order, "Validation_Summary", pd.DataFrame([res["sv_val"]]), ndigits)
if "Validation_OOR" in selected and isinstance(res.get("oor_tbl"), pd.DataFrame) and not res["oor_tbl"].empty:
_add_sheet(sheets, order, "Validation_OOR", res["oor_tbl"].reset_index(drop=True), ndigits)
if "Prediction" in selected and "PredictOnly" in res:
_add_sheet(sheets, order, "Prediction", res["PredictOnly"], ndigits)
if "Prediction_Summary" in selected and res.get("sv_pred"):
_add_sheet(sheets, order, "Prediction_Summary", pd.DataFrame([res["sv_pred"]]), ndigits)
if "Training_Ranges" in selected and st.session_state.get("train_ranges"):
rr = _train_ranges_df(st.session_state["train_ranges"])
_add_sheet(sheets, order, "Training_Ranges", rr, ndigits)
if "Info" in selected:
info = pd.DataFrame([
{"Key": "AppName", "Value": APP_NAME},
{"Key": "Tagline", "Value": TAGLINE},
{"Key": "Target", "Value": TARGET},
{"Key": "PredColumn", "Value": PRED_COL},
{"Key": "Features", "Value": ", ".join(FEATURES)},
{"Key": "ExportedAt", "Value": datetime.now().strftime("%Y-%m-%d %H:%M:%S")},
])
_add_sheet(sheets, order, "Info", info, ndigits)
if not order: return None, None, []
bio = io.BytesIO()
engine = _excel_engine()
with pd.ExcelWriter(bio, engine=engine) as writer:
for name in order:
df = sheets[name]
sheet = _excel_safe_name(name)
df.to_excel(writer, sheet_name=sheet, index=False)
if do_autofit:
_excel_autofit(writer, sheet, df)
bio.seek(0)
fname = f"TS_Export_{datetime.now().strftime('%Y%m%d_%H%M%S')}.xlsx"
return bio.getvalue(), fname, order
# --------- SIMPLE export UI ----------
def render_export_button(phase_key: str) -> None:
res = st.session_state.get("results", {})
if not res: return
st.divider()
st.markdown("### Export to Excel")
options = _available_sections()
selected_sheets = st.multiselect(
"Sheets to include",
options=options,
default=[],
placeholder="Choose option(s)",
help="Pick the sheets you want to include in the Excel export.",
key=f"sheets_{phase_key}",
)
if not selected_sheets:
st.caption("Select one or more sheets above to enable the export.")
st.download_button(
label="⬇️ Export Excel",
data=b"",
file_name="TS_Export.xlsx",
mime="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
disabled=True,
key=f"download_{phase_key}",
)
return
data, fname, names = build_export_workbook(selected=selected_sheets, ndigits=3, do_autofit=True)
if names:
st.caption("Will include: " + ", ".join(names))
st.download_button(
"⬇️ Export Excel",
data=(data or b""),
file_name=(fname or "TS_Export.xlsx"),
mime="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
disabled=(data is None),
key=f"download_{phase_key}",
)
# =========================
# Cross plot (Matplotlib)
# =========================
def cross_plot_static(actual, pred, xlabel="Actual Ts (µs/ft)", ylabel="Predicted Ts (µs/ft)"):
a = pd.Series(actual, dtype=float)
p = pd.Series(pred, dtype=float)
lo = float(min(a.min(), p.min()))
hi = float(max(a.max(), p.max()))
pad = 0.03 * (hi - lo if hi > lo else 1.0)
lo2, hi2 = lo - pad, hi + pad
ticks = np.linspace(lo2, hi2, 5)
dpi = 110
fig, ax = plt.subplots(figsize=(CROSS_W / dpi, CROSS_H / dpi), dpi=dpi, constrained_layout=False)
ax.scatter(a, p, s=14, c=COLORS["pred"], alpha=0.9, linewidths=0)
ax.plot([lo2, hi2], [lo2, hi2], linestyle="--", linewidth=1.2, color=COLORS["ref"])
ax.set_xlim(lo2, hi2)
ax.set_ylim(lo2, hi2)
ax.set_xticks(ticks); ax.set_yticks(ticks)
ax.set_aspect("equal", adjustable="box")
fmt = FuncFormatter(lambda x, _: f"{x:.2f}")
ax.xaxis.set_major_formatter(fmt); ax.yaxis.set_major_formatter(fmt)
ax.set_xlabel(xlabel, fontweight="bold", fontsize=10, color="black")
ax.set_ylabel(ylabel, fontweight="bold", fontsize=10, color="black")
ax.tick_params(labelsize=6, colors="black")
ax.grid(True, linestyle=":", alpha=0.3)
for spine in ax.spines.values():
spine.set_linewidth(1.1); spine.set_color("#444")
fig.subplots_adjust(left=0.16, bottom=0.16, right=0.98, top=0.98)
return fig
# =========================
# Track plot (Plotly)
# =========================
def track_plot(df, include_actual=True):
depth_col = next((c for c in df.columns if 'depth' in str(c).lower()), None)
if depth_col is not None:
y = pd.Series(df[depth_col]).astype(float); ylab = depth_col
y_range = [float(y.max()), float(y.min())]
else:
y = pd.Series(np.arange(1, len(df) + 1)); ylab = "Point Index"
y_range = [float(y.max()), float(y.min())]
x_series = pd.Series(df.get(PRED_COL, pd.Series(dtype=float))).astype(float)
if include_actual and TARGET in df.columns:
x_series = pd.concat([x_series, pd.Series(df[TARGET]).astype(float)], ignore_index=True)
x_lo, x_hi = float(x_series.min()), float(x_series.max())
x_pad = 0.03 * (x_hi - x_lo if x_hi > x_lo else 1.0)
xmin, xmax = x_lo - x_pad, x_hi + x_pad
tick0 = _nice_tick0(xmin, step=max((xmax - xmin) / 10.0, 0.1))
fig = go.Figure()
if PRED_COL in df.columns:
fig.add_trace(go.Scatter(
x=df[PRED_COL], y=y, mode="lines",
line=dict(color=COLORS["pred"], width=1.8),
name=PRED_COL,
hovertemplate=f"{PRED_COL}: "+"%{x:.0f}
"+ylab+": %{y}"
))
if include_actual and TARGET in df.columns:
fig.add_trace(go.Scatter(
x=df[TARGET], y=y, mode="lines",
line=dict(color=COLORS["actual"], width=2.0, dash="dot"),
name=f"{TARGET} (actual)",
hovertemplate=f"{TARGET}: "+"%{x:.0f}
"+ylab+": %{y}"
))
fig.update_layout(
height=TRACK_H, width=TRACK_W, autosize=False,
paper_bgcolor="#fff", plot_bgcolor="#fff",
margin=dict(l=64, r=16, t=36, b=48), hovermode="closest",
font=dict(size=FONT_SZ, color="#000"),
legend=dict(x=0.98, y=0.05, xanchor="right", yanchor="bottom",
bgcolor="rgba(255,255,255,0.75)", bordercolor="#ccc", borderwidth=1),
legend_title_text=""
)
fig.update_xaxes(
title_text="Ts (μs/ft)",
title_font=dict(size=20, family=BOLD_FONT, color="#000"),
tickfont=dict(size=15, family=BOLD_FONT, color="#000"),
side="top", range=[xmin, xmax],
ticks="outside", tickformat=",.0f", tickmode="auto", tick0=tick0,
showline=True, linewidth=1.2, linecolor="#444", mirror=True,
showgrid=True, gridcolor="rgba(0,0,0,0.12)", automargin=True
)
fig.update_yaxes(
title_text=ylab,
title_font=dict(size=20, family=BOLD_FONT, color="#000"),
tickfont=dict(size=15, family=BOLD_FONT, color="#000"),
range=y_range, ticks="outside",
showline=True, linewidth=1.2, linecolor="#444", mirror=True,
showgrid=True, gridcolor="rgba(0,0,0,0.12)", automargin=True
)
return fig
# ---------- Preview (matplotlib) ----------
def preview_tracks(df: pd.DataFrame, cols: list[str]):
"""
Quick-look multi-track preview:
- one subplot per selected column
- distinct stable colors per column
- shared & reversed Y-axis (Depth downwards)
"""
cols = [c for c in cols if c in df.columns]
n = len(cols)
if n == 0:
fig, ax = plt.subplots(figsize=(4, 2))
ax.text(0.5, 0.5, "No selected columns", ha="center", va="center")
ax.axis("off")
return fig
# Depth or fallback to index
depth_col = next((c for c in df.columns if 'depth' in str(c).lower()), None)
if depth_col is not None:
idx = pd.to_numeric(df[depth_col], errors="coerce")
y_label = depth_col
else:
idx = pd.Series(np.arange(1, len(df) + 1))
y_label = "Point Index"
y_min, y_max = float(idx.min()), float(idx.max())
# Stable qualitative palette
cmap = plt.get_cmap("tab20")
col_colors = {col: cmap(i % cmap.N) for i, col in enumerate(cols)}
fig, axes = plt.subplots(1, n, figsize=(2.3 * n, 7.0), sharey=True, dpi=100)
if n == 1:
axes = [axes]
for i, (ax, col) in enumerate(zip(axes, cols)):
x = pd.to_numeric(df[col], errors="coerce")
ax.plot(x, idx, '-', lw=1.8, color=col_colors[col])
ax.set_xlabel(col)
ax.xaxis.set_label_position('top')
ax.xaxis.tick_top()
ax.set_ylim(y_max, y_min) # reversed Y (Depth down)
ax.grid(True, linestyle=":", alpha=0.3)
if i == 0:
ax.set_ylabel(y_label)
else:
ax.tick_params(labelleft=False)
ax.set_ylabel("")
fig.tight_layout()
return fig
# =========================
# Load model + meta
# =========================
def ensure_model() -> Path|None:
for p in [DEFAULT_MODEL, *MODEL_FALLBACKS]:
if p.exists() and p.stat().st_size > 0: return p
url = os.environ.get("MODEL_URL", "")
if not url: return None
try:
import requests
DEFAULT_MODEL.parent.mkdir(parents=True, exist_ok=True)
with requests.get(url, stream=True, timeout=30) as r:
r.raise_for_status()
with open(DEFAULT_MODEL, "wb") as f:
for chunk in r.iter_content(1<<20):
if chunk: f.write(chunk)
return DEFAULT_MODEL
except Exception:
return None
mpath = ensure_model()
if not mpath:
st.error("Model not found. Upload models/ts_model.joblib (or set MODEL_URL).")
st.stop()
try:
model = load_model(str(mpath))
except Exception as e:
st.error(f"Failed to load model: {e}")
st.stop()
# Load meta (prefer Ts-specific)
meta = {}
meta_candidates = [MODELS_DIR / "ts_meta.json", MODELS_DIR / "meta.json", MODELS_DIR / "ym_meta.json"]
meta_path = next((p for p in meta_candidates if p.exists()), None)
if meta_path:
try:
meta = json.loads(meta_path.read_text(encoding="utf-8"))
FEATURES = meta.get("features", FEATURES)
TARGET = meta.get("target", TARGET)
PRED_COL = meta.get("pred_col", PRED_COL)
except Exception as e:
st.warning(f"Could not parse meta file ({meta_path.name}): {e}")
# Optional: version banner
if STRICT_VERSION_CHECK and meta.get("versions"):
import numpy as _np, sklearn as _skl
mv = meta["versions"]; msg=[]
if mv.get("numpy") and mv["numpy"] != _np.__version__:
msg.append(f"NumPy {mv['numpy']} expected, running {_np.__version__}")
if mv.get("scikit_learn") and mv["scikit_learn"] != _skl.__version__:
msg.append(f"scikit-learn {mv['scikit_learn']} expected, running {_skl.__version__}")
if msg:
st.warning("Environment mismatch: " + " | ".join(msg))
# =========================
# Session state
# =========================
st.session_state.setdefault("app_step", "intro")
st.session_state.setdefault("results", {})
st.session_state.setdefault("train_ranges", None)
st.session_state.setdefault("dev_file_name","")
st.session_state.setdefault("dev_file_bytes",b"")
st.session_state.setdefault("dev_file_loaded",False)
st.session_state.setdefault("dev_preview",False)
st.session_state.setdefault("show_preview_modal", False)
# =========================
# Sidebar branding
# =========================
st.sidebar.markdown(f"""
{APP_NAME}
{TAGLINE}
""", unsafe_allow_html=True
)
def sticky_header(title, message):
st.markdown(
f"""
""",
unsafe_allow_html=True
)
# =========================
# INTRO
# =========================
if st.session_state.app_step == "intro":
st.header("Welcome!")
st.markdown("This software is developed by *Smart Thinking AI-Solutions Team* to estimate **Shear Slowness (Ts)** from drilling data.")
st.subheader("How It Works")
st.markdown(
"1) **Upload your data to build the case and preview the model performance.** \n"
"2) Click **Run Model** to compute metrics and plots. \n"
"3) **Proceed to Validation** (with actual Ts) or **Proceed to Prediction** (no Ts)."
)
if st.button("Start Showcase", type="primary"):
st.session_state.app_step = "dev"; st.rerun()
# =========================
# CASE BUILDING
# =========================
if st.session_state.app_step == "dev":
st.sidebar.header("Case Building")
up = st.sidebar.file_uploader("Upload Your Data File", type=["xlsx","xls"])
if up is not None:
st.session_state.dev_file_bytes = up.getvalue()
st.session_state.dev_file_name = up.name
st.session_state.dev_file_loaded = True
st.session_state.dev_preview = False
if st.session_state.dev_file_loaded:
tmp = read_book_bytes(st.session_state.dev_file_bytes)
if tmp:
df0 = next(iter(tmp.values()))
st.sidebar.caption(f"**Data loaded:** {st.session_state.dev_file_name} • {df0.shape[0]} rows × {df0.shape[1]} cols")
if st.sidebar.button("Preview data", use_container_width=True, disabled=not st.session_state.dev_file_loaded):
st.session_state.show_preview_modal = True
st.session_state.dev_preview = True
run = st.sidebar.button("Run Model", type="primary", use_container_width=True)
if st.sidebar.button("Proceed to Validation ▶", use_container_width=True): st.session_state.app_step="validate"; st.rerun()
if st.sidebar.button("Proceed to Prediction ▶", use_container_width=True): st.session_state.app_step="predict"; st.rerun()
if st.session_state.dev_file_loaded and st.session_state.dev_preview:
sticky_header("Case Building", "Previewed ✓ — now click **Run Model**.")
elif st.session_state.dev_file_loaded:
sticky_header("Case Building", "📄 **Preview uploaded data** using the sidebar button, then click **Run Model**.")
else:
sticky_header("Case Building", "**Upload your data to build a case, then run the model to review development performance.**")
if run and st.session_state.dev_file_bytes:
book = read_book_bytes(st.session_state.dev_file_bytes)
sh_train = find_sheet(book, ["Train","Training","training2","train","training"])
sh_test = find_sheet(book, ["Test","Testing","testing2","test","testing"])
if sh_train is None or sh_test is None:
st.markdown('Workbook must include Train/Training/training2 and Test/Testing/testing2 sheets.
', unsafe_allow_html=True)
st.stop()
tr = _normalize_columns(book[sh_train].copy(), FEATURES, TARGET)
te = _normalize_columns(book[sh_test].copy(), FEATURES, TARGET)
if not (ensure_cols(tr, FEATURES+[TARGET]) and ensure_cols(te, FEATURES+[TARGET])):
st.markdown('Missing required columns.
', unsafe_allow_html=True)
st.stop()
# Predict with exactly the training feature order
tr[PRED_COL] = model.predict(_make_X(tr, FEATURES))
te[PRED_COL] = model.predict(_make_X(te, FEATURES))
st.session_state.results["Train"]=tr; st.session_state.results["Test"]=te
st.session_state.results["m_train"]={
"R": pearson_r(tr[TARGET], tr[PRED_COL]),
"RMSE": rmse(tr[TARGET], tr[PRED_COL]),
"MAPE": mape(tr[TARGET], tr[PRED_COL])
}
st.session_state.results["m_test"]={
"R": pearson_r(te[TARGET], te[PRED_COL]),
"RMSE": rmse(te[TARGET], te[PRED_COL]),
"MAPE": mape(te[TARGET], te[PRED_COL])
}
tr_min = tr[FEATURES].min().to_dict(); tr_max = tr[FEATURES].max().to_dict()
st.session_state.train_ranges = {f:(float(tr_min[f]), float(tr_max[f])) for f in FEATURES}
st.markdown('Case has been built and results are displayed below.
', unsafe_allow_html=True)
def _dev_block(df, m):
c1,c2,c3 = st.columns(3)
c1.metric("R", f"{m['R']:.3f}")
c2.metric("RMSE", f"{m['RMSE']:.2f}")
c3.metric("MAPE (%)", f"{m['MAPE']:.2f}")
st.markdown("""
R: Pearson Correlation Coefficient
RMSE: Root Mean Square Error
MAPE: Mean Absolute Percentage Error
""", unsafe_allow_html=True)
col_track, col_cross = st.columns([2, 3], gap="large")
with col_track:
st.plotly_chart(track_plot(df, include_actual=True), use_container_width=False, config={"displayModeBar": False, "scrollZoom": True})
with col_cross:
st.pyplot(cross_plot_static(df[TARGET], df[PRED_COL]), use_container_width=False)
if "Train" in st.session_state.results or "Test" in st.session_state.results:
tab1, tab2 = st.tabs(["Training", "Testing"])
if "Train" in st.session_state.results:
with tab1: _dev_block(st.session_state.results["Train"], st.session_state.results["m_train"])
if "Test" in st.session_state.results:
with tab2: _dev_block(st.session_state.results["Test"], st.session_state.results["m_test"])
render_export_button(phase_key="dev")
# =========================
# VALIDATION (with actual Ts)
# =========================
if st.session_state.app_step == "validate":
st.sidebar.header("Validate the Model")
up = st.sidebar.file_uploader("Upload Validation Excel", type=["xlsx","xls"])
if up is not None:
book = read_book_bytes(up.getvalue())
if book:
df0 = next(iter(book.values()))
st.sidebar.caption(f"**Data loaded:** {up.name} • {df0.shape[0]} rows × {df0.shape[1]} cols")
if st.sidebar.button("Preview data", use_container_width=True, disabled=(up is None)):
st.session_state.show_preview_modal = True
go_btn = st.sidebar.button("Predict & Validate", type="primary", use_container_width=True)
if st.sidebar.button("⬅ Back to Case Building", use_container_width=True): st.session_state.app_step="dev"; st.rerun()
if st.sidebar.button("Proceed to Prediction ▶", use_container_width=True): st.session_state.app_step="predict"; st.rerun()
sticky_header("Validate the Model", "Upload a dataset with the same **features** and **Ts** to evaluate performance.")
if go_btn and up is not None:
book = read_book_bytes(up.getvalue())
name = find_sheet(book, ["Validation","Validate","validation2","Val","val"]) or list(book.keys())[0]
df = _normalize_columns(book[name].copy(), FEATURES, TARGET)
if not ensure_cols(df, FEATURES+[TARGET]):
st.markdown('Missing required columns.
', unsafe_allow_html=True); st.stop()
df[PRED_COL] = model.predict(_make_X(df, FEATURES))
st.session_state.results["Validate"]=df
ranges = st.session_state.train_ranges; oor_pct = 0.0; tbl=None
if ranges:
any_viol = pd.DataFrame({f:(df[f]ranges[f][1]) for f in FEATURES}).any(axis=1)
oor_pct = float(any_viol.mean()*100.0)
if any_viol.any():
tbl = df.loc[any_viol, FEATURES].copy()
for c in FEATURES:
if pd.api.types.is_numeric_dtype(tbl[c]): tbl[c] = tbl[c].round(2)
tbl["Violations"] = pd.DataFrame({f:(df[f]ranges[f][1]) for f in FEATURES}).loc[any_viol].apply(
lambda r:", ".join([c for c,v in r.items() if v]), axis=1
)
st.session_state.results["m_val"]={
"R": pearson_r(df[TARGET], df[PRED_COL]),
"RMSE": rmse(df[TARGET], df[PRED_COL]),
"MAPE": mape(df[TARGET], df[PRED_COL])
}
st.session_state.results["sv_val"]={"n":len(df), "pred_min":float(df[PRED_COL].min()), "pred_max":float(df[PRED_COL].max()), "oor":oor_pct}
st.session_state.results["oor_tbl"]=tbl
if "Validate" in st.session_state.results:
m = st.session_state.results["m_val"]
c1,c2,c3 = st.columns(3)
c1.metric("R", f"{m['R']:.3f}"); c2.metric("RMSE", f"{m['RMSE']:.2f}"); c3.metric("MAPE (%)", f"{m['MAPE']:.2f}")
st.markdown("""
R: Pearson Correlation Coefficient
RMSE: Root Mean Square Error
MAPE: Mean Absolute Percentage Error
""", unsafe_allow_html=True)
col_track, col_cross = st.columns([2, 3], gap="large")
with col_track:
st.plotly_chart(track_plot(st.session_state.results["Validate"], include_actual=True),
use_container_width=False, config={"displayModeBar": False, "scrollZoom": True})
with col_cross:
st.pyplot(cross_plot_static(st.session_state.results["Validate"][TARGET],
st.session_state.results["Validate"][PRED_COL]),
use_container_width=False)
render_export_button(phase_key="validate")
sv = st.session_state.results["sv_val"]
if sv["oor"] > 0: st.markdown('Some inputs fall outside **training min–max** ranges.
', unsafe_allow_html=True)
if st.session_state.results["oor_tbl"] is not None:
st.write("*Out-of-range rows (vs. Training min–max):*")
df_centered_rounded(st.session_state.results["oor_tbl"])
# =========================
# PREDICTION (no actual Ts)
# =========================
if st.session_state.app_step == "predict":
st.sidebar.header("Prediction (No Actual Ts)")
up = st.sidebar.file_uploader("Upload Prediction Excel", type=["xlsx","xls"])
if up is not None:
book = read_book_bytes(up.getvalue())
if book:
df0 = next(iter(book.values()))
st.sidebar.caption(f"**Data loaded:** {up.name} • {df0.shape[0]} rows × {df0.shape[1]} cols")
if st.sidebar.button("Preview data", use_container_width=True, disabled=(up is None)):
st.session_state.show_preview_modal = True
go_btn = st.sidebar.button("Predict", type="primary", use_container_width=True)
if st.sidebar.button("⬅ Back to Case Building", use_container_width=True): st.session_state.app_step="dev"; st.rerun()
sticky_header("Prediction", "Upload a dataset with the feature columns (no **Ts**).")
if go_btn and up is not None:
book = read_book_bytes(up.getvalue()); name = list(book.keys())[0]
df = _normalize_columns(book[name].copy(), FEATURES, TARGET)
if not ensure_cols(df, FEATURES):
st.markdown('Missing required columns.
', unsafe_allow_html=True); st.stop()
df[PRED_COL] = model.predict(_make_X(df, FEATURES))
st.session_state.results["PredictOnly"]=df
ranges = st.session_state.train_ranges; oor_pct = 0.0
if ranges:
any_viol = pd.DataFrame({f:(df[f]ranges[f][1]) for f in FEATURES}).any(axis=1)
oor_pct = float(any_viol.mean()*100.0)
st.session_state.results["sv_pred"]={
"n":len(df),
"pred_min":float(df[PRED_COL].min()),
"pred_max":float(df[PRED_COL].max()),
"pred_mean":float(df[PRED_COL].mean()),
"pred_std":float(df[PRED_COL].std(ddof=0)),
"oor":oor_pct
}
if "PredictOnly" in st.session_state.results:
df = st.session_state.results["PredictOnly"]; sv = st.session_state.results["sv_pred"]
col_left, col_right = st.columns([2,3], gap="large")
with col_left:
table = pd.DataFrame({
"Metric": ["# points","Pred min","Pred max","Pred mean","Pred std","OOR %"],
"Value": [sv["n"], round(sv["pred_min"],3), round(sv["pred_max"],3),
round(sv["pred_mean"],3), round(sv["pred_std"],3), f'{sv["oor"]:.1f}%']
})
st.markdown('Predictions ready ✓
', unsafe_allow_html=True)
df_centered_rounded(table, hide_index=True)
st.caption("**★ OOR** = % of rows whose input features fall outside the training min–max range.")
with col_right:
st.plotly_chart(track_plot(df, include_actual=False),
use_container_width=False, config={"displayModeBar": False, "scrollZoom": True})
render_export_button(phase_key="predict")
# =========================
# Preview modal
# =========================
if st.session_state.show_preview_modal:
book_to_preview = {}
if st.session_state.app_step == "dev":
book_to_preview = read_book_bytes(st.session_state.dev_file_bytes)
elif st.session_state.app_step in ["validate", "predict"] and 'up' in locals() and up is not None:
book_to_preview = read_book_bytes(up.getvalue())
with st.expander("Preview data", expanded=True):
if not book_to_preview:
st.markdown('No data loaded yet.
', unsafe_allow_html=True)
else:
names = list(book_to_preview.keys())
tabs = st.tabs(names)
for t, name in zip(tabs, names):
with t:
df = _normalize_columns(book_to_preview[name], FEATURES, TARGET)
t1, t2 = st.tabs(["Tracks", "Summary"])
with t1:
st.pyplot(preview_tracks(df, FEATURES), use_container_width=True)
with t2:
feat_present = [c for c in FEATURES if c in df.columns]
if not feat_present:
st.info("No feature columns found to summarize.")
else:
tbl = (
df[feat_present]
.agg(['min','max','mean','std'])
.T.rename(columns={"min":"Min","max":"Max","mean":"Mean","std":"Std"})
.reset_index(names="Feature")
)
df_centered_rounded(tbl)
st.session_state.show_preview_modal = False
# =========================
# Footer
# =========================
st.markdown("""
""", unsafe_allow_html=True)