# app_FP.py — ST_GeoMech_FP (Fracture Pressure)
# Mirrors the SHmin app's specs & workflow (password gate, preview panel, train/validate/predict, Excel export).
# Self-contained: trains a fixed, optimized RF pipeline in-app. No external model files.
import io
import os
import base64
import math
from pathlib import Path
from datetime import datetime
import streamlit as st
import pandas as pd
import numpy as np
# Matplotlib (static previews & cross-plot)
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
from matplotlib.ticker import FuncFormatter
import plotly.graph_objects as go
from sklearn.metrics import mean_squared_error
from sklearn.ensemble import RandomForestRegressor
from sklearn.pipeline import Pipeline
from sklearn.impute import SimpleImputer
# =========================
# App constants / defaults
# =========================
APP_NAME = "ST_GeoMech_FP"
TAGLINE = "Real-Time Fracture Pressure Prediction"
# Canonical features (match SHmin app)
FEATURES = ["Q (gpm)", "SPP (psi)", "T (kft.lbf)", "WOB (klbf)", "ROP (ft/h)"]
# Canonical prediction/target labels
TARGET_CANON = "FracPress_Actual"
PRED_COL = "FracPress_Pred"
UNITS = "Psi"
# Target aliases accepted in input workbooks
TARGET_ALIASES = [
"FracPress_Actual", "FracturePressure_Actual", "Fracture Pressure (psi)",
"Frac Pressure (psi)", "FracPressure", "Frac_Pressure", "FracturePressure",
"FP_Actual", "FP (psi)"
]
# Optional transform (kept for parity; RF is used on raw scale)
TRANSFORM = "none" # "none" | "log10" | "ln"
# Fixed "best" RandomForest params
BEST_PARAMS = dict(
n_estimators=400,
max_depth=None,
min_samples_split=2,
min_samples_leaf=1,
max_features=0.6,
bootstrap=True,
random_state=42,
n_jobs=-1,
)
# Color / layout
COLORS = {"pred": "#1f77b4", "actual": "#f2b702", "ref": "#5a5a5a"}
CROSS_W, CROSS_H = 350, 350
TRACK_H, TRACK_W = 1000, 500
FONT_SZ = 13
BOLD_FONT = "Arial Black, Arial, sans-serif"
# =========================
# Page / CSS
# =========================
st.set_page_config(page_title=APP_NAME, page_icon="logo.png", layout="wide")
st.markdown("""
""", unsafe_allow_html=True)
TABLE_CENTER_CSS = [
dict(selector="th", props=[("text-align", "center")]),
dict(selector="td", props=[("text-align", "center")]),
]
# =========================
# Password gate (optional)
# =========================
def inline_logo(path: str = "logo.png") -> str:
try:
p = Path(path)
if not p.exists():
return ""
return f"data:image/png;base64,{base64.b64encode(p.read_bytes()).decode('ascii')}"
except Exception:
return ""
def add_password_gate() -> None:
try:
required = st.secrets.get("APP_PASSWORD", "")
except Exception:
required = os.environ.get("APP_PASSWORD", "")
if not required:
return
if st.session_state.get("auth_ok", False):
return
st.sidebar.markdown(f"""
{APP_NAME}
Smart Thinking • Secure Access
""", unsafe_allow_html=True)
pwd = st.sidebar.text_input("Access key", type="password", placeholder="••••••••")
if st.sidebar.button("Unlock", type="primary"):
if pwd == required:
st.session_state.auth_ok = True
st.rerun()
else:
st.error("Incorrect key.")
st.stop()
add_password_gate()
# =========================
# Utilities
# =========================
def rmse(y_true, y_pred) -> float:
return float(np.sqrt(mean_squared_error(y_true, y_pred)))
def mape(y_true, y_pred, eps: float = 1e-9) -> float:
a = np.asarray(y_true, dtype=float)
p = np.asarray(y_pred, dtype=float)
den = np.maximum(np.abs(a), eps)
return float(np.mean(np.abs((a - p) / den)) * 100.0)
def pearson_r(y_true, y_pred) -> float:
a = np.asarray(y_true, dtype=float)
p = np.asarray(y_pred, dtype=float)
if a.size < 2:
return float("nan")
if np.all(a == a[0]) or np.all(p == p[0]):
return float("nan")
return float(np.corrcoef(a, p)[0, 1])
@st.cache_data(show_spinner=False)
def parse_excel(data_bytes: bytes) -> dict[str, pd.DataFrame]:
bio = io.BytesIO(data_bytes)
xl = pd.ExcelFile(bio)
return {sh: xl.parse(sh) for sh in xl.sheet_names}
def read_book_bytes(b: bytes) -> dict[str, pd.DataFrame]:
return parse_excel(b) if b else {}
def _excel_engine() -> str:
try:
import xlsxwriter # noqa: F401
return "xlsxwriter"
except Exception:
return "openpyxl"
def _excel_safe_name(name: str) -> str:
bad = '[]:*?/\\'
safe = ''.join('_' if ch in bad else ch for ch in str(name))
return safe[:31]
def _round_numeric(df: pd.DataFrame, ndigits: int = 3) -> pd.DataFrame:
out = df.copy()
for c in out.columns:
if pd.api.types.is_float_dtype(out[c]) or pd.api.types.is_integer_dtype(out[c]):
out[c] = pd.to_numeric(out[c], errors="coerce").round(ndigits)
return out
def df_centered_rounded(df: pd.DataFrame, hide_index: bool = True) -> None:
out = df.copy()
numcols = out.select_dtypes(include=[np.number]).columns
styler = (
out.style
.format({c: "{:.3f}" for c in numcols})
.set_properties(**{"text-align": "center"})
.set_table_styles(TABLE_CENTER_CSS)
)
st.dataframe(styler, use_container_width=True, hide_index=hide_index)
def ensure_cols(df: pd.DataFrame, cols: list[str]) -> bool:
miss = [c for c in cols if c not in df.columns]
if miss:
st.error(f"Missing columns: {miss}\nFound: {list(df.columns)}")
return False
return True
def _nice_tick0(xmin: float, step: float = 0.1) -> float:
return step * math.floor(xmin / step) if np.isfinite(xmin) else xmin
# ---------- Transform helpers ----------
def _inv_transform(x: np.ndarray, transform: str) -> np.ndarray:
t = (transform or "none").lower()
if t in ("log10", "log_10", "log10()"):
return np.power(10.0, x)
if t in ("ln", "log", "loge", "log_e", "natural"):
return np.exp(x)
return x
# ---------- Build X exactly as trained ----------
def _make_X(df: pd.DataFrame, features: list[str]) -> pd.DataFrame:
X = df.reindex(columns=features, copy=False)
for c in X.columns:
X[c] = pd.to_numeric(X[c], errors="coerce")
return X
# ---------- Target resolver (use aliases) ----------
def _resolve_target_col(df: pd.DataFrame) -> str | None:
cols_lower = {c.lower(): c for c in df.columns}
for cand in TARGET_ALIASES:
if cand.lower() in cols_lower:
return cols_lower[cand.lower()]
return None
# =========================
# Export helpers
# =========================
def _summary_table(df: pd.DataFrame, cols: list[str]) -> pd.DataFrame:
cols = [c for c in cols if c in df.columns]
if not cols:
return pd.DataFrame()
tbl = (
df[cols]
.agg(["min", "max", "mean", "std"])
.T.rename(columns={"min": "Min", "max": "Max", "mean": "Mean", "std": "Std"})
.reset_index(names="Field")
)
return _round_numeric(tbl, 3)
def _train_ranges_df(ranges: dict[str, tuple[float, float]]) -> pd.DataFrame:
if not ranges:
return pd.DataFrame()
df = pd.DataFrame(ranges).T.reset_index()
df.columns = ["Feature", "Min", "Max"]
return _round_numeric(df, 3)
def _excel_autofit(writer, sheet_name: str, df: pd.DataFrame, min_w: int = 8, max_w: int = 40) -> None:
try:
import xlsxwriter # noqa: F401
except Exception:
return
ws = writer.sheets[sheet_name]
for i, col in enumerate(df.columns):
series = df[col].astype(str)
max_len = max([len(str(col))] + series.map(len).tolist())
ws.set_column(i, i, max(min_w, min(max_len + 2, max_w)))
ws.freeze_panes(1, 0)
def _available_sections() -> list[str]:
res = st.session_state.get("results", {})
sections: list[str] = []
if "Train" in res:
sections += ["Training", "Training_Metrics", "Training_Summary"]
if "Test" in res:
sections += ["Testing", "Testing_Metrics", "Testing_Summary"]
if "Validate" in res:
sections += ["Validation", "Validation_Metrics", "Validation_Summary", "Validation_OOR"]
if "PredictOnly" in res:
sections += ["Prediction", "Prediction_Summary"]
if st.session_state.get("train_ranges"):
sections += ["Training_Ranges"]
sections += ["Info"]
return sections
def build_export_workbook(selected: list[str], ndigits: int = 3, do_autofit: bool = True) -> tuple[bytes | None, str | None, list[str]]:
res = st.session_state.get("results", {})
if not res:
return None, None, []
sheets: dict[str, pd.DataFrame] = {}
order: list[str] = []
def _add(name: str, df: pd.DataFrame) -> None:
if df is None or (isinstance(df, pd.DataFrame) and df.empty):
return
sheets[name] = _round_numeric(df, ndigits)
order.append(name)
# Training / Testing
if "Training" in selected and "Train" in res:
_add("Training", res["Train"])
if "Training_Metrics" in selected and res.get("m_train"):
_add("Training_Metrics", pd.DataFrame([res["m_train"]]))
if "Training_Summary" in selected and "Train" in res:
tr_cols = FEATURES + [c for c in [st.session_state.get("tcol_train", TARGET_CANON), PRED_COL] if c in res["Train"].columns]
_add("Training_Summary", _summary_table(res["Train"], tr_cols))
if "Testing" in selected and "Test" in res:
_add("Testing", res["Test"])
if "Testing_Metrics" in selected and res.get("m_test"):
_add("Testing_Metrics", pd.DataFrame([res["m_test"]]))
if "Testing_Summary" in selected and "Test" in res:
te_cols = FEATURES + [c for c in [st.session_state.get("tcol_test", TARGET_CANON), PRED_COL] if c in res["Test"].columns]
_add("Testing_Summary", _summary_table(res["Test"], te_cols))
# Validation / Prediction
if "Validation" in selected and "Validate" in res:
_add("Validation", res["Validate"])
if "Validation_Metrics" in selected and res.get("m_val"):
_add("Validation_Metrics", pd.DataFrame([res["m_val"]]))
if "Validation_Summary" in selected and res.get("sv_val"):
_add("Validation_Summary", pd.DataFrame([res["sv_val"]]))
if "Validation_OOR" in selected and isinstance(res.get("oor_tbl"), pd.DataFrame) and not res["oor_tbl"].empty:
_add("Validation_OOR", res["oor_tbl"].reset_index(drop=True))
if "Prediction" in selected and "PredictOnly" in res:
_add("Prediction", res["PredictOnly"])
if "Prediction_Summary" in selected and res.get("sv_pred"):
_add("Prediction_Summary", pd.DataFrame([res["sv_pred"]]))
if "Training_Ranges" in selected and st.session_state.get("train_ranges"):
_add("Training_Ranges", _train_ranges_df(st.session_state["train_ranges"]))
if "Info" in selected:
info = pd.DataFrame([
{"Key": "AppName", "Value": APP_NAME},
{"Key": "Tagline", "Value": TAGLINE},
{"Key": "Target", "Value": st.session_state.get("tcol_train", TARGET_CANON)},
{"Key": "PredColumn", "Value": PRED_COL},
{"Key": "Features", "Value": ", ".join(FEATURES)},
{"Key": "ExportedAt", "Value": datetime.now().strftime("%Y-%m-%d %H:%M:%S")},
])
_add("Info", info)
if not order:
return None, None, []
bio = io.BytesIO()
engine = _excel_engine()
with pd.ExcelWriter(bio, engine=engine) as writer:
for name in order:
df = sheets[name]
sheet = _excel_safe_name(name)
df.to_excel(writer, sheet_name=sheet, index=False)
if do_autofit:
_excel_autofit(writer, sheet, df)
bio.seek(0)
fname = f"FracPressure_Export_{datetime.now().strftime('%Y%m%d_%H%M%S')}.xlsx"
return bio.getvalue(), fname, order
def render_export_button(phase_key: str) -> None:
res = st.session_state.get("results", {})
if not res:
return
st.divider()
st.markdown("### Export to Excel")
options = _available_sections()
selected_sheets = st.multiselect(
"Sheets to include",
options=options,
default=[],
placeholder="Choose option(s)",
help="Pick the sheets you want in the Excel export.",
key=f"sheets_{phase_key}",
)
if not selected_sheets:
st.caption("Select one or more sheets above to enable export.")
st.download_button(
"⬇️ Export Excel", data=b"", file_name="FracPressure_Export.xlsx",
mime="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
disabled=True, key=f"download_{phase_key}",
)
return
data, fname, names = build_export_workbook(selected=selected_sheets, ndigits=3, do_autofit=True)
if names:
st.caption("Will include: " + ", ".join(names))
st.download_button(
"⬇️ Export Excel", data=(data or b""), file_name=(fname or "FracPressure_Export.xlsx"),
mime="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
disabled=(data is None), key=f"download_{phase_key}",
)
# =========================
# Plots (integer ticks)
# =========================
def cross_plot_static(actual, pred, label: str = "Fracture Pressure"):
a = pd.Series(actual, dtype=float)
p = pd.Series(pred, dtype=float)
lo = float(min(a.min(), p.min()))
hi = float(max(a.max(), p.max()))
pad = 0.03 * (hi - lo if hi > lo else 1.0)
lo2, hi2 = lo - pad, hi + pad
ticks = np.linspace(lo2, hi2, 5)
dpi = 110
fig, ax = plt.subplots(figsize=(CROSS_W / dpi, CROSS_H / dpi), dpi=dpi, constrained_layout=False)
ax.scatter(a, p, s=14, c=COLORS["pred"], alpha=0.9, linewidths=0)
ax.plot([lo2, hi2], [lo2, hi2], linestyle="--", linewidth=1.2, color=COLORS["ref"])
ax.set_xlim(lo2, hi2); ax.set_ylim(lo2, hi2)
ax.set_xticks(ticks); ax.set_yticks(ticks)
ax.set_aspect("equal", adjustable="box")
fmt = FuncFormatter(lambda x, _: f"{x:.0f}")
ax.xaxis.set_major_formatter(fmt); ax.yaxis.set_major_formatter(fmt)
ax.set_xlabel(f"Actual {label} ({UNITS})", fontweight="bold", fontsize=10, color="black")
ax.set_ylabel(f"Predicted {label} ({UNITS})", fontweight="bold", fontsize=10, color="black")
ax.tick_params(labelsize=6, colors="black")
ax.grid(True, linestyle=":", alpha=0.3)
for spine in ax.spines.values():
spine.set_linewidth(1.1); spine.set_color("#444")
fig.subplots_adjust(left=0.16, bottom=0.16, right=0.98, top=0.98)
return fig
def track_plot(df: pd.DataFrame, actual_col: str | None, include_actual: bool = True):
depth_col = next((c for c in df.columns if ("depth" in str(c).lower()) or ("tvd" in str(c).lower())), None)
if depth_col is not None:
y = pd.to_numeric(df[depth_col], errors="coerce")
ylab = depth_col
y_range = [float(np.nanmax(y)), float(np.nanmin(y))] # reversed
else:
y = pd.Series(np.arange(1, len(df) + 1))
ylab = "Point Index"
y_range = [float(y.max()), float(y.min())]
x_series = pd.Series(df.get(PRED_COL, pd.Series(dtype=float))).astype(float)
if include_actual and actual_col and actual_col in df.columns:
x_series = pd.concat([x_series, pd.Series(df[actual_col]).astype(float)], ignore_index=True)
x_lo, x_hi = float(x_series.min()), float(x_series.max())
x_pad = 0.03 * (x_hi - x_lo if x_hi > x_lo else 1.0)
xmin, xmax = x_lo - x_pad, x_hi + x_pad
tick0 = _nice_tick0(xmin, step=max((xmax - xmin) / 10.0, 0.1))
fig = go.Figure()
if PRED_COL in df.columns:
fig.add_trace(go.Scatter(
x=df[PRED_COL], y=y, mode="lines",
line=dict(color=COLORS["pred"], width=1.8),
name=PRED_COL,
hovertemplate=f"{PRED_COL}: "+"%{x:.0f}
"+ylab+": %{y}"
))
if include_actual and actual_col and actual_col in df.columns:
fig.add_trace(go.Scatter(
x=df[actual_col], y=y, mode="lines",
line=dict(color=COLORS["actual"], width=2.0, dash="dot"),
name=f"{actual_col} (actual)",
hovertemplate=f"{actual_col}: "+"%{x:.0f}
"+ylab+": %{y}"
))
fig.update_layout(
height=TRACK_H, width=TRACK_W, autosize=False,
paper_bgcolor="#fff", plot_bgcolor="#fff",
margin=dict(l=64, r=16, t=36, b=48), hovermode="closest",
font=dict(size=FONT_SZ, color="#000"),
legend=dict(x=0.98, y=0.05, xanchor="right", yanchor="bottom",
bgcolor="rgba(255,255,255,0.75)", bordercolor="#ccc", borderwidth=1),
legend_title_text=""
)
fig.update_xaxes(
title_text=f"Fracture Pressure ({UNITS})",
title_font=dict(size=20, family=BOLD_FONT, color="#000"),
tickfont=dict(size=15, family=BOLD_FONT, color="#000"),
side="top", range=[xmin, xmax],
ticks="outside", tickformat=",.0f", tickmode="auto", tick0=tick0,
showline=True, linewidth=1.2, linecolor="#444", mirror=True,
showgrid=True, gridcolor="rgba(0,0,0,0.12)", automargin=True
)
fig.update_yaxes(
title_text=ylab,
title_font=dict(size=20, family=BOLD_FONT, color="#000"),
tickfont=dict(size=15, family=BOLD_FONT, color="#000"),
range=y_range, ticks="outside",
showline=True, linewidth=1.2, linecolor="#444", mirror=True,
showgrid=True, gridcolor="rgba(0,0,0,0.12)", automargin=True
)
return fig
def preview_tracks(df: pd.DataFrame, cols: list[str]):
cols = [c for c in cols if c in df.columns]
n = len(cols)
if n == 0:
fig, ax = plt.subplots(figsize=(4, 2))
ax.text(0.5, 0.5, "No selected columns", ha="center", va="center")
ax.axis("off")
return fig
depth_col = next((c for c in df.columns if ("depth" in str(c).lower()) or ("tvd" in str(c).lower())), None)
if depth_col is not None:
idx = pd.to_numeric(df[depth_col], errors="coerce")
y_label = depth_col
y_min, y_max = float(np.nanmin(idx)), float(np.nanmax(idx))
else:
idx = pd.Series(np.arange(1, len(df) + 1))
y_label = "Point Index"
y_min, y_max = float(idx.min()), float(idx.max())
cmap = plt.get_cmap("tab20")
col_colors = {col: cmap(i % cmap.N) for i, col in enumerate(cols)}
fig, axes = plt.subplots(1, n, figsize=(2.4 * n, 7.0), sharey=True, dpi=100)
if n == 1:
axes = [axes]
for i, (ax, col) in enumerate(zip(axes, cols)):
x = pd.to_numeric(df[col], errors="coerce")
ax.plot(x, idx, '-', lw=1.6, color=col_colors[col])
ax.set_xlabel(col); ax.xaxis.set_label_position('top'); ax.xaxis.tick_top()
ax.set_ylim(y_max, y_min) # reversed depth down
ax.grid(True, linestyle=":", alpha=0.3)
if i == 0:
ax.set_ylabel(y_label)
else:
ax.tick_params(labelleft=False); ax.set_ylabel("")
fig.tight_layout()
return fig
# =========================
# Fixed training pipeline
# =========================
def build_pipeline() -> Pipeline:
model = RandomForestRegressor(**BEST_PARAMS)
pipe = Pipeline(steps=[
("imputer", SimpleImputer(strategy="median")),
("model", model),
])
return pipe
# =========================
# Session state (mirrors SHmin)
# =========================
st.session_state.setdefault("app_step", "intro")
st.session_state.setdefault("results", {})
st.session_state.setdefault("train_ranges", None)
st.session_state.setdefault("dev_file_name", "")
st.session_state.setdefault("dev_file_bytes", b"")
st.session_state.setdefault("dev_file_loaded", False)
st.session_state.setdefault("fitted_model", None)
# Persistent top-of-page preview panel
st.session_state.setdefault("show_preview_panel", False)
st.session_state.setdefault("preview_book", {})
# =========================
# Sidebar branding
# =========================
st.sidebar.markdown(f"""
{APP_NAME}
{TAGLINE}
""", unsafe_allow_html=True)
def sticky_header(title: str, message: str) -> None:
st.markdown(
f"""
""",
unsafe_allow_html=True
)
def render_preview_panel() -> None:
"""Top-of-page preview panel (same behavior as SHmin)."""
if not st.session_state.get("show_preview_panel"):
return
st.markdown("## 🔎 Data preview")
book = st.session_state.get("preview_book", {}) or {}
if not book:
st.info("No data loaded yet.")
col = st.columns(2)[1]
with col:
if st.button("Hide preview"):
st.session_state.show_preview_panel = False
st.session_state.preview_book = {}
st.rerun()
return
names = list(book.keys())
tabs = st.tabs(names + ["✖ Hide preview"])
for i, name in enumerate(names):
with tabs[i]:
df = book[name]
t1, t2 = st.tabs(["Tracks", "Summary"])
with t1:
st.pyplot(preview_tracks(df, FEATURES), use_container_width=True)
with t2:
feat_present = [c for c in FEATURES if c in df.columns]
if not feat_present:
st.info("No feature columns found to summarize.")
else:
tbl = (
df[feat_present]
.agg(["min", "max", "mean", "std"])
.T.rename(columns={"min": "Min", "max": "Max", "mean": "Mean", "std": "Std"})
.reset_index(names="Feature")
)
df_centered_rounded(tbl)
with tabs[-1]:
if st.button("Hide preview", use_container_width=True):
st.session_state.show_preview_panel = False
st.session_state.preview_book = {}
st.rerun()
# =========================
# INTRO
# =========================
if st.session_state.app_step == "intro":
st.header("Welcome!")
st.markdown(
f"This software is developed by *Smart Thinking AI-Solutions Team* to estimate **Fracture Pressure** ({UNITS}) from drilling/offset data."
)
st.subheader("How It Works")
st.markdown(
"1) **Upload your data file** and click **Run Model** to fit the baked-in pipeline. \n"
"2) **Validate** on held-out wells (with actual). \n"
"3) **Predict** on wells without actual."
)
if st.button("Start Showcase", type="primary"):
st.session_state.app_step = "dev"
st.rerun()
# =========================
# CASE BUILDING (Train/Test)
# =========================
def _find_sheet(book: dict[str, pd.DataFrame], names: list[str]) -> str | None:
low2orig = {k.lower(): k for k in book.keys()}
for nm in names:
if nm.lower() in low2orig:
return low2orig[nm.lower()]
return None
if st.session_state.app_step == "dev":
st.sidebar.header("Case Building")
up = st.sidebar.file_uploader("Upload Your Data File", type=["xlsx", "xls"])
if up is not None:
st.session_state.dev_file_bytes = up.getvalue()
st.session_state.dev_file_name = up.name
st.session_state.dev_file_loaded = True
st.session_state.fitted_model = None
# show preview panel immediately
st.session_state.preview_book = read_book_bytes(st.session_state.dev_file_bytes) if st.session_state.dev_file_bytes else {}
st.session_state.show_preview_panel = True
st.rerun()
if st.session_state.dev_file_loaded:
tmp = read_book_bytes(st.session_state.dev_file_bytes)
if tmp:
df0 = next(iter(tmp.values()))
st.sidebar.caption(f"**Data loaded:** {st.session_state.dev_file_name} • {df0.shape[0]} rows × {df0.shape[1]} cols")
run = st.sidebar.button("Run Model", type="primary", use_container_width=True)
if st.sidebar.button("Proceed to Validation ▶", use_container_width=True):
st.session_state.app_step = "validate"; st.rerun()
if st.sidebar.button("Proceed to Prediction ▶", use_container_width=True):
st.session_state.app_step = "predict"; st.rerun()
if st.session_state.dev_file_loaded and st.session_state.show_preview_panel:
sticky_header("Case Building", "Previewed ✓ — now click **Run Model**.")
elif st.session_state.dev_file_loaded:
sticky_header("Case Building", "📄 **Preview uploaded data** using the sidebar button, then click **Run Model**.")
else:
sticky_header("Case Building", "**Upload your data to build a case, then run the model to review performance.**")
render_preview_panel()
if run and st.session_state.dev_file_bytes:
book = read_book_bytes(st.session_state.dev_file_bytes)
sh_train = _find_sheet(book, ["Train", "Training", "training2", "train", "training"])
sh_test = _find_sheet(book, ["Test", "Testing", "testing2", "test", "testing"])
if sh_train is None or sh_test is None:
st.markdown('Workbook must include Train/Training and Test/Testing sheets.
', unsafe_allow_html=True)
st.stop()
tr0 = book[sh_train].copy()
te0 = book[sh_test].copy()
# Resolve target name per-sheet from aliases
tcol_tr = _resolve_target_col(tr0)
tcol_te = _resolve_target_col(te0)
if tcol_tr is None or tcol_te is None:
st.error(f"Missing target column. Expected one of: {TARGET_ALIASES}")
st.stop()
# Ensure feature columns exist
if not (ensure_cols(tr0, FEATURES) and ensure_cols(te0, FEATURES)):
st.markdown('Missing required feature columns.
', unsafe_allow_html=True)
st.stop()
# Prepare X,y
X_tr = _make_X(tr0, FEATURES)
y_tr = pd.to_numeric(tr0[tcol_tr], errors="coerce")
X_te = _make_X(te0, FEATURES)
y_te = pd.to_numeric(te0[tcol_te], errors="coerce")
# Drop rows with NA in y
mask_tr = np.isfinite(y_tr); X_tr, y_tr = X_tr.loc[mask_tr], y_tr.loc[mask_tr]
mask_te = np.isfinite(y_te); X_te, y_te = X_te.loc[mask_te], y_te.loc[mask_te]
pipe = build_pipeline()
pipe.fit(X_tr, y_tr)
st.session_state.fitted_model = pipe
# Predictions
tr = tr0.copy(); te = te0.copy()
tr[PRED_COL] = _inv_transform(pipe.predict(_make_X(tr0, FEATURES)), TRANSFORM)
te[PRED_COL] = _inv_transform(pipe.predict(_make_X(te0, FEATURES)), TRANSFORM)
# Save results & metrics
st.session_state.results["Train"] = tr
st.session_state.results["Test"] = te
st.session_state.results["m_train"] = {
"R": pearson_r(tr[tcol_tr], tr[PRED_COL]),
"RMSE": rmse(tr[tcol_tr], tr[PRED_COL]),
"MAPE%": mape(tr[tcol_tr], tr[PRED_COL]),
}
st.session_state.results["m_test"] = {
"R": pearson_r(te[tcol_te], te[PRED_COL]),
"RMSE": rmse(te[tcol_te], te[PRED_COL]),
"MAPE%": mape(te[tcol_te], te[PRED_COL]),
}
# Persist used target names (for export/plots)
st.session_state["tcol_train"] = tcol_tr
st.session_state["tcol_test"] = tcol_te
# Training min–max ranges
tr_min = tr[FEATURES].min().to_dict()
tr_max = tr[FEATURES].max().to_dict()
st.session_state.train_ranges = {f: (float(tr_min[f]), float(tr_max[f])) for f in FEATURES}
st.markdown('Case has been built and results are displayed below.
', unsafe_allow_html=True)
def _dev_block(df: pd.DataFrame, actual_col: str, m: dict):
c1, c2, c3 = st.columns(3)
c1.metric("R", f"{m['R']:.3f}")
c2.metric("RMSE", f"{m['RMSE']:.2f}")
c3.metric("MAPE%", f"{m['MAPE%']:.2f}")
st.markdown("""
R: Pearson Correlation Coefficient
RMSE: Root Mean Square Error
MAPE: Mean Absolute Percentage Error
""", unsafe_allow_html=True)
col_track, col_cross = st.columns([2, 3], gap="large")
with col_track:
st.plotly_chart(
track_plot(df, actual_col, include_actual=True),
use_container_width=False,
config={"displayModeBar": False, "scrollZoom": True}
)
with col_cross:
st.pyplot(
cross_plot_static(df[actual_col], df[PRED_COL], label="Fracture Pressure"),
use_container_width=False
)
if "Train" in st.session_state.results or "Test" in st.session_state.results:
tab1, tab2 = st.tabs(["Training", "Testing"])
if "Train" in st.session_state.results:
with tab1:
_dev_block(st.session_state.results["Train"], st.session_state.get("tcol_train", TARGET_CANON), st.session_state.results["m_train"])
if "Test" in st.session_state.results:
with tab2:
_dev_block(st.session_state.results["Test"], st.session_state.get("tcol_test", TARGET_CANON), st.session_state.results["m_test"])
render_export_button(phase_key="dev")
# =========================
# VALIDATION (with actual)
# =========================
if st.session_state.app_step == "validate":
st.sidebar.header("Validate the Model")
up = st.sidebar.file_uploader("Upload Validation Excel", type=["xlsx", "xls"])
if up is not None:
book = read_book_bytes(up.getvalue())
if book:
df0 = next(iter(book.values()))
st.sidebar.caption(f"**Data loaded:** {up.name} • {df0.shape[0]} rows × {df0.shape[1]} cols")
# Preview button
if st.sidebar.button("Preview data", use_container_width=True, disabled=(up is None)):
st.session_state.preview_book = read_book_bytes(up.getvalue()) if up is not None else {}
st.session_state.show_preview_panel = True
st.rerun()
go_btn = st.sidebar.button("Predict & Validate", type="primary", use_container_width=True)
if st.sidebar.button("⬅ Back to Case Building", use_container_width=True):
st.session_state.app_step = "dev"; st.rerun()
if st.sidebar.button("Proceed to Prediction ▶", use_container_width=True):
st.session_state.app_step = "predict"; st.rerun()
sticky_header("Validate the Model", "Upload a dataset with the same **features** and an **actual fracture pressure** column.")
render_preview_panel()
if go_btn and up is not None:
if st.session_state.fitted_model is None:
st.error("Please train the model first in Case Building.")
st.stop()
book = read_book_bytes(up.getvalue())
names = list(book.keys())
name = next((s for s in names if s.lower() in ("validation", "validate", "validation2", "val", "val2")), names[0])
df0 = book[name].copy()
tcol = _resolve_target_col(df0)
if tcol is None:
st.error(f"Missing target column. Expected one of: {TARGET_ALIASES}")
st.stop()
if not ensure_cols(df0, FEATURES):
st.markdown('Missing required feature columns.
', unsafe_allow_html=True)
st.stop()
df = df0.copy()
df[PRED_COL] = _inv_transform(st.session_state.fitted_model.predict(_make_X(df0, FEATURES)), TRANSFORM)
st.session_state.results["Validate"] = df
# Range checks
ranges = st.session_state.train_ranges
oor_pct = 0.0
tbl = None
if ranges:
any_viol = pd.DataFrame({f: (df[f] < ranges[f][0]) | (df[f] > ranges[f][1]) for f in FEATURES}).any(axis=1)
oor_pct = float(any_viol.mean() * 100.0)
if any_viol.any():
tbl = df.loc[any_viol, FEATURES].copy()
for c in FEATURES:
if pd.api.types.is_numeric_dtype(tbl[c]):
tbl[c] = tbl[c].round(3)
tbl["Violations"] = pd.DataFrame({f: (df[f] < ranges[f][0]) | (df[f] > ranges[f][1]) for f in FEATURES}).loc[any_viol].apply(
lambda r: ", ".join([c for c, v in r.items() if v]), axis=1
)
st.session_state.results["m_val"] = {
"R": pearson_r(df[tcol], df[PRED_COL]),
"RMSE": rmse(df[tcol], df[PRED_COL]),
"MAPE%": mape(df[tcol], df[PRED_COL]),
}
st.session_state.results["sv_val"] = {"n": len(df), "pred_min": float(df[PRED_COL].min()), "pred_max": float(df[PRED_COL].max()), "oor": oor_pct}
st.session_state.results["oor_tbl"] = tbl
st.session_state["tcol_val"] = tcol
if "Validate" in st.session_state.results:
m = st.session_state.results["m_val"]
tcol = st.session_state.get("tcol_val", TARGET_CANON)
c1, c2, c3 = st.columns(3)
c1.metric("R", f"{m['R']:.3f}"); c2.metric("RMSE", f"{m['RMSE']:.2f}"); c3.metric("MAPE%", f"{m['MAPE%']:.2f}")
st.markdown("""
R: Pearson Correlation Coefficient
RMSE: Root Mean Square Error
MAPE: Mean Absolute Percentage Error
""", unsafe_allow_html=True)
col_track, col_cross = st.columns([2, 3], gap="large")
with col_track:
st.plotly_chart(
track_plot(st.session_state.results["Validate"], tcol, include_actual=True),
use_container_width=False, config={"displayModeBar": False, "scrollZoom": True}
)
with col_cross:
st.pyplot(
cross_plot_static(st.session_state.results["Validate"][tcol], st.session_state.results["Validate"][PRED_COL], label="Fracture Pressure"),
use_container_width=False
)
render_export_button(phase_key="validate")
sv = st.session_state.results["sv_val"]
if sv["oor"] > 0:
st.markdown('Some inputs fall outside **training min–max** ranges.
', unsafe_allow_html=True)
if st.session_state.results["oor_tbl"] is not None:
st.write("*Out-of-range rows (vs. Training min–max):*")
df_centered_rounded(st.session_state.results["oor_tbl"])
# =========================
# PREDICTION (no actual)
# =========================
if st.session_state.app_step == "predict":
st.sidebar.header("Prediction (No Actual)")
up = st.sidebar.file_uploader("Upload Prediction Excel", type=["xlsx", "xls"])
if up is not None:
book = read_book_bytes(up.getvalue())
if book:
df0 = next(iter(book.values()))
st.sidebar.caption(f"**Data loaded:** {up.name} • {df0.shape[0]} rows × {df0.shape[1]} cols")
# Preview button
if st.sidebar.button("Preview data", use_container_width=True, disabled=(up is None)):
st.session_state.preview_book = read_book_bytes(up.getvalue()) if up is not None else {}
st.session_state.show_preview_panel = True
st.rerun()
go_btn = st.sidebar.button("Predict", type="primary", use_container_width=True)
if st.sidebar.button("⬅ Back to Case Building", use_container_width=True):
st.session_state.app_step = "dev"; st.rerun()
sticky_header("Prediction", "Upload a dataset with the 5 feature columns (no actual column).")
render_preview_panel()
if go_btn and up is not None:
if st.session_state.fitted_model is None:
st.error("Please train the model first in Case Building.")
st.stop()
book = read_book_bytes(up.getvalue())
name = list(book.keys())[0]
df0 = book[name].copy()
if not ensure_cols(df0, FEATURES):
st.markdown('Missing required columns.
', unsafe_allow_html=True)
st.stop()
df = df0.copy()
df[PRED_COL] = _inv_transform(st.session_state.fitted_model.predict(_make_X(df0, FEATURES)), TRANSFORM)
st.session_state.results["PredictOnly"] = df
ranges = st.session_state.train_ranges
oor_pct = 0.0
if ranges:
any_viol = pd.DataFrame({f: (df[f] < ranges[f][0]) | (df[f] > ranges[f][1]) for f in FEATURES}).any(axis=1)
oor_pct = float(any_viol.mean() * 100.0)
st.session_state.results["sv_pred"] = {
"n": len(df),
"pred_min": float(df[PRED_COL].min()),
"pred_max": float(df[PRED_COL].max()),
"pred_mean": float(df[PRED_COL].mean()),
"pred_std": float(df[PRED_COL].std(ddof=0)),
"oor": oor_pct
}
if "PredictOnly" in st.session_state.results:
df = st.session_state.results["PredictOnly"]
sv = st.session_state.results["sv_pred"]
col_left, col_right = st.columns([2, 3], gap="large")
with col_left:
table = pd.DataFrame({
"Metric": ["# points", "Pred min", "Pred max", "Pred mean", "Pred std", "OOR %"],
"Value": [sv["n"], round(sv["pred_min"], 3), round(sv["pred_max"], 3),
round(sv["pred_mean"], 3), round(sv["pred_std"], 3), f'{sv["oor"]:.1f}%']
})
st.markdown('Predictions ready ✓
', unsafe_allow_html=True)
df_centered_rounded(table, hide_index=True)
st.caption("**★ OOR** = % of rows with input features outside the training min–max range.")
with col_right:
st.plotly_chart(
track_plot(df, actual_col=None, include_actual=False),
use_container_width=False, config={"displayModeBar": False, "scrollZoom": True}
)
render_export_button(phase_key="predict")
# =========================
# Footer
# =========================
st.markdown("""
""", unsafe_allow_html=True)