Max_Hz_Stresses / app.py
UCS2014's picture
Update app.py
862a511 verified
# app.py — ST_Max_Horizontal_Stress (σHmax)
# Self-contained Streamlit app that TRAINS a fixed, optimized ML pipeline in-app.
# No external model files, no model-source UI. Upload Excel and go.
import io, json, os, base64, math
from pathlib import Path
from datetime import datetime
import streamlit as st
import pandas as pd
import numpy as np
# Matplotlib for static previews & cross-plot
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
from matplotlib.ticker import FuncFormatter
import plotly.graph_objects as go
from sklearn.metrics import mean_squared_error
from sklearn.ensemble import RandomForestRegressor
from sklearn.pipeline import Pipeline
from sklearn.impute import SimpleImputer
# =========================
# App constants / defaults
# =========================
APP_NAME = "ST_GeoMech_SHmax"
TAGLINE = "Real-Time Maximum Horizontal Stress Prediction"
# -------- Canonical names (match your files) --------
FEATURES = ["Q (gpm)", "SPP (psi)", "T (kft.lbf)", "WOB (klbf)", "ROP (ft/h)"]
TARGET = "MaxStress_Actual" # <-- matches your sheet (was 'MaxStress', causing the error)
PRED_COL = "SHmax_Pred"
ACTUAL_COL = TARGET
TRANSFORM = "none" # "none" | "log10" | "ln"
UNITS = "Psi"
# ---- Fixed ("best") model params baked into the code ----
BEST_PARAMS = dict(
n_estimators=100,
max_depth=22,
max_features="log2",
random_state=10,
n_jobs=-1,
)
# Color / layout
COLORS = {"pred": "#1f77b4", "actual": "#f2b702", "ref": "#5a5a5a"}
CROSS_W, CROSS_H = 350, 350
TRACK_H, TRACK_W = 1000, 500
FONT_SZ = 13
BOLD_FONT = "Arial Black, Arial, sans-serif"
# =========================
# Page / CSS
# =========================
st.set_page_config(page_title=APP_NAME, page_icon="logo.png", layout="wide")
st.markdown("""
<style>
.brand-logo { width: 200px; height: auto; object-fit: contain; }
.centered-container { display:flex; flex-direction:column; align-items:center; text-align:center; }
.st-message-box { background:#f0f2f6; color:#333; padding:10px; border-radius:10px; border:1px solid #e6e9ef; }
.st-message-box.st-success { background:#d4edda; color:#155724; border-color:#c3e6cb; }
.st-message-box.st-warning { background:#fff3cd; color:#856404; border-color:#ffeeba; }
.st-message-box.st-error { background:#f8d7da; color:#721c24; border-color:#f5c6cb; }
.main .block-container { overflow: unset !important; }
div[data-testid="stVerticalBlock"] { overflow: unset !important; }
div[data-testid="stExpander"] > details > summary {
position: sticky; top: 0; z-index: 10; background: #fff; border-bottom: 1px solid #eee;
}
div[data-testid="stExpander"] div[data-baseweb="tab-list"] {
position: sticky; top: 42px; z-index: 9; background: #fff; padding-top: 6px;
}
</style>
""", unsafe_allow_html=True)
TABLE_CENTER_CSS = [
dict(selector="th", props=[("text-align", "center")]),
dict(selector="td", props=[("text-align", "center")]),
]
# =========================
# Password gate (same as shmin)
# =========================
def inline_logo(path="logo.png") -> str:
try:
p = Path(path)
if not p.exists(): return ""
return f"data:image/png;base64,{base64.b64encode(p.read_bytes()).decode('ascii')}"
except Exception:
return ""
def add_password_gate() -> None:
try:
required = st.secrets.get("APP_PASSWORD", "")
except Exception:
required = os.environ.get("APP_PASSWORD", "")
if not required:
return # no password configured
if st.session_state.get("auth_ok", False):
return
st.sidebar.markdown(f"""
<div class="centered-container">
<img src="{inline_logo('logo.png')}" class="brand-logo">
<div style='font-weight:800;font-size:1.2rem; margin-top: 10px;'>{APP_NAME}</div>
<div style='color:#667085;'>Smart Thinking • Secure Access</div>
</div>
""", unsafe_allow_html=True)
pwd = st.sidebar.text_input("Access key", type="password", placeholder="••••••••")
if st.sidebar.button("Unlock", type="primary"):
if pwd == required:
st.session_state.auth_ok = True
st.rerun()
else:
st.error("Incorrect key.")
st.stop()
add_password_gate()
# =========================
# Utilities
# =========================
def rmse(y_true, y_pred) -> float:
return float(np.sqrt(mean_squared_error(y_true, y_pred)))
def mape(y_true, y_pred, eps: float = 1e-9) -> float:
a = np.asarray(y_true, dtype=float)
p = np.asarray(y_pred, dtype=float)
den = np.maximum(np.abs(a), eps)
return float(np.mean(np.abs((a - p) / den)) * 100.0)
def pearson_r(y_true, y_pred) -> float:
a = np.asarray(y_true, dtype=float)
p = np.asarray(y_pred, dtype=float)
if a.size < 2: return float("nan")
if np.all(a == a[0]) or np.all(p == p[0]): return float("nan")
return float(np.corrcoef(a, p)[0, 1])
@st.cache_data(show_spinner=False)
def parse_excel(data_bytes: bytes):
bio = io.BytesIO(data_bytes)
xl = pd.ExcelFile(bio)
return {sh: xl.parse(sh) for sh in xl.sheet_names}
def read_book_bytes(b: bytes):
return parse_excel(b) if b else {}
def _excel_engine() -> str:
try:
import xlsxwriter # noqa: F401
return "xlsxwriter"
except Exception:
return "openpyxl"
def _excel_safe_name(name: str) -> str:
bad = '[]:*?/\\'
safe = ''.join('_' if ch in bad else ch for ch in str(name))
return safe[:31]
def _round_numeric(df: pd.DataFrame, ndigits: int = 3) -> pd.DataFrame:
out = df.copy()
for c in out.columns:
if pd.api.types.is_float_dtype(out[c]) or pd.api.types.is_integer_dtype(out[c]):
out[c] = pd.to_numeric(out[c], errors="coerce").round(ndigits)
return out
def df_centered_rounded(df: pd.DataFrame, hide_index=True):
out = df.copy()
numcols = out.select_dtypes(include=[np.number]).columns
styler = (
out.style
.format({c: "{:.3f}" for c in numcols})
.set_properties(**{"text-align": "center"})
.set_table_styles(TABLE_CENTER_CSS)
)
st.dataframe(styler, use_container_width=True, hide_index=hide_index)
def ensure_cols(df: pd.DataFrame, cols: list[str]) -> bool:
miss = [c for c in cols if c not in df.columns]
if miss:
st.error(f"Missing columns: {miss}\nFound: {list(df.columns)}")
return False
return True
def _nice_tick0(xmin: float, step: float = 0.1) -> float:
return step * math.floor(xmin / step) if np.isfinite(xmin) else xmin
# ---------- Transform helpers ----------
def _inv_transform(x: np.ndarray, transform: str) -> np.ndarray:
t = (transform or "none").lower()
if t in ("log10", "log_10", "log10()"): return np.power(10.0, x)
if t in ("ln", "log", "loge", "log_e", "natural"): return np.exp(x)
return x
# ---------- Build X exactly as trained ----------
def _make_X(df: pd.DataFrame, features: list[str]) -> pd.DataFrame:
X = df.reindex(columns=features, copy=False)
for c in X.columns:
X[c] = pd.to_numeric(X[c], errors="coerce")
return X
# =========================
# Export helpers
# =========================
def _summary_table(df: pd.DataFrame, cols: list[str]) -> pd.DataFrame:
cols = [c for c in cols if c in df.columns]
if not cols: return pd.DataFrame()
tbl = (df[cols]
.agg(['min','max','mean','std'])
.T.rename(columns={"min":"Min","max":"Max","mean":"Mean","std":"Std"})
.reset_index(names="Field"))
return _round_numeric(tbl, 3)
def _train_ranges_df(ranges: dict[str, tuple[float, float]]) -> pd.DataFrame:
if not ranges: return pd.DataFrame()
df = pd.DataFrame(ranges).T.reset_index()
df.columns = ["Feature", "Min", "Max"]
return _round_numeric(df, 3)
def _excel_autofit(writer, sheet_name: str, df: pd.DataFrame, min_w: int = 8, max_w: int = 40):
try:
import xlsxwriter # noqa: F401
except Exception:
return
ws = writer.sheets[sheet_name]
for i, col in enumerate(df.columns):
series = df[col].astype(str)
max_len = max([len(str(col))] + series.map(len).tolist())
ws.set_column(i, i, max(min_w, min(max_len + 2, max_w)))
ws.freeze_panes(1, 0)
def _available_sections() -> list[str]:
res = st.session_state.get("results", {})
sections = []
if "Train" in res: sections += ["Training","Training_Metrics","Training_Summary"]
if "Test" in res: sections += ["Testing","Testing_Metrics","Testing_Summary"]
if "Validate" in res: sections += ["Validation","Validation_Metrics","Validation_Summary","Validation_OOR"]
if "PredictOnly" in res: sections += ["Prediction","Prediction_Summary"]
if st.session_state.get("train_ranges"): sections += ["Training_Ranges"]
sections += ["Info"]
return sections
def build_export_workbook(selected: list[str], ndigits: int = 3, do_autofit: bool = True) -> tuple[bytes|None, str|None, list[str]]:
res = st.session_state.get("results", {})
if not res: return None, None, []
sheets: dict[str, pd.DataFrame] = {}
order: list[str] = []
def _add(name: str, df: pd.DataFrame):
if df is None or (isinstance(df, pd.DataFrame) and df.empty): return
sheets[name] = _round_numeric(df, ndigits); order.append(name)
if "Training" in selected and "Train" in res: _add("Training", res["Train"])
if "Training_Metrics" in selected and res.get("m_train"): _add("Training_Metrics", pd.DataFrame([res["m_train"]]))
if "Training_Summary" in selected and "Train" in res:
tr_cols = FEATURES + [c for c in [TARGET, PRED_COL] if c in res["Train"].columns]
_add("Training_Summary", _summary_table(res["Train"], tr_cols))
if "Testing" in selected and "Test" in res: _add("Testing", res["Test"])
if "Testing_Metrics" in selected and res.get("m_test"): _add("Testing_Metrics", pd.DataFrame([res["m_test"]]))
if "Testing_Summary" in selected and "Test" in res:
te_cols = FEATURES + [c for c in [TARGET, PRED_COL] if c in res["Test"].columns]
_add("Testing_Summary", _summary_table(res["Test"], te_cols))
if "Validation" in selected and "Validate" in res: _add("Validation", res["Validate"])
if "Validation_Metrics" in selected and res.get("m_val"): _add("Validation_Metrics", pd.DataFrame([res["m_val"]]))
if "Validation_Summary" in selected and res.get("sv_val"): _add("Validation_Summary", pd.DataFrame([res["sv_val"]]))
if "Validation_OOR" in selected and isinstance(res.get("oor_tbl"), pd.DataFrame) and not res["oor_tbl"].empty:
_add("Validation_OOR", res["oor_tbl"].reset_index(drop=True))
if "Prediction" in selected and "PredictOnly" in res: _add("Prediction", res["PredictOnly"])
if "Prediction_Summary" in selected and res.get("sv_pred"): _add("Prediction_Summary", pd.DataFrame([res["sv_pred"]]))
if "Training_Ranges" in selected and st.session_state.get("train_ranges"):
_add("Training_Ranges", _train_ranges_df(st.session_state["train_ranges"]))
if "Info" in selected:
info = pd.DataFrame([
{"Key": "AppName", "Value": APP_NAME},
{"Key": "Tagline", "Value": TAGLINE},
{"Key": "Target", "Value": TARGET},
{"Key": "PredColumn", "Value": PRED_COL},
{"Key": "Features", "Value": ", ".join(FEATURES)},
{"Key": "ExportedAt", "Value": datetime.now().strftime("%Y-%m-%d %H:%M:%S")},
])
_add("Info", info)
if not order: return None, None, []
bio = io.BytesIO()
engine = _excel_engine()
with pd.ExcelWriter(bio, engine=engine) as writer:
for name in order:
df = sheets[name]; sheet = _excel_safe_name(name)
df.to_excel(writer, sheet_name=sheet, index=False)
if do_autofit: _excel_autofit(writer, sheet, df)
bio.seek(0)
fname = f"MaxStress_Export_{datetime.now().strftime('%Y%m%d_%H%M%S')}.xlsx"
return bio.getvalue(), fname, order
def render_export_button(phase_key: str) -> None:
res = st.session_state.get("results", {})
if not res: return
st.divider()
st.markdown("### Export to Excel")
options = _available_sections()
selected_sheets = st.multiselect(
"Sheets to include",
options=options,
default=[],
placeholder="Choose option(s)",
help="Pick the sheets you want in the Excel export.",
key=f"sheets_{phase_key}",
)
if not selected_sheets:
st.caption("Select one or more sheets above to enable export.")
st.download_button("⬇️ Export Excel", data=b"", file_name="MaxStress_Export.xlsx",
mime="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
disabled=True, key=f"download_{phase_key}")
return
data, fname, names = build_export_workbook(selected=selected_sheets, ndigits=3, do_autofit=True)
if names: st.caption("Will include: " + ", ".join(names))
st.download_button("⬇️ Export Excel", data=(data or b""), file_name=(fname or "MaxStress_Export.xlsx"),
mime="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
disabled=(data is None), key=f"download_{phase_key}")
# =========================
# Plots (no decimals on X)
# =========================
def cross_plot_static(actual, pred):
a = pd.Series(actual, dtype=float)
p = pd.Series(pred, dtype=float)
lo = float(min(a.min(), p.min())); hi = float(max(a.max(), p.max()))
pad = 0.03 * (hi - lo if hi > lo else 1.0)
lo2, hi2 = lo - pad, hi + pad
ticks = np.linspace(lo2, hi2, 5)
dpi = 110
fig, ax = plt.subplots(figsize=(CROSS_W / dpi, CROSS_H / dpi), dpi=dpi, constrained_layout=False)
ax.scatter(a, p, s=14, c=COLORS["pred"], alpha=0.9, linewidths=0)
ax.plot([lo2, hi2], [lo2, hi2], linestyle="--", linewidth=1.2, color=COLORS["ref"])
ax.set_xlim(lo2, hi2); ax.set_ylim(lo2, hi2)
ax.set_xticks(ticks); ax.set_yticks(ticks)
ax.set_aspect("equal", adjustable="box")
fmt = FuncFormatter(lambda x, _: f"{x:.0f}") # no decimals
ax.xaxis.set_major_formatter(fmt); ax.yaxis.set_major_formatter(fmt)
ax.set_xlabel(f"Actual Max Stress ({UNITS})", fontweight="bold", fontsize=10, color="black")
ax.set_ylabel(f"Predicted Max Stress ({UNITS})", fontweight="bold", fontsize=10, color="black")
ax.tick_params(labelsize=6, colors="black")
ax.grid(True, linestyle=":", alpha=0.3)
for spine in ax.spines.values():
spine.set_linewidth(1.1); spine.set_color("#444")
fig.subplots_adjust(left=0.16, bottom=0.16, right=0.98, top=0.98)
return fig
def track_plot(df, include_actual=True):
depth_col = next((c for c in df.columns if 'depth' in str(c).lower()), None)
if depth_col is not None:
y = pd.to_numeric(df[depth_col], errors="coerce"); ylab = depth_col
y_range = [float(np.nanmax(y)), float(np.nanmin(y))] # reversed
else:
y = pd.Series(np.arange(1, len(df) + 1)); ylab = "Point Index"
y_range = [float(y.max()), float(y.min())]
x_series = pd.Series(df.get(PRED_COL, pd.Series(dtype=float))).astype(float)
act_col = ACTUAL_COL if (ACTUAL_COL and ACTUAL_COL in df.columns) else TARGET
if include_actual and act_col in df.columns:
x_series = pd.concat([x_series, pd.Series(df[act_col]).astype(float)], ignore_index=True)
x_lo, x_hi = float(x_series.min()), float(x_series.max())
x_pad = 0.03 * (x_hi - x_lo if x_hi > x_lo else 1.0)
xmin, xmax = x_lo - x_pad, x_hi + x_pad
tick0 = _nice_tick0(xmin, step=max((xmax - xmin) / 10.0, 0.1))
fig = go.Figure()
if PRED_COL in df.columns:
fig.add_trace(go.Scatter(
x=df[PRED_COL], y=y, mode="lines",
line=dict(color=COLORS["pred"], width=1.8),
name=PRED_COL,
hovertemplate=f"{PRED_COL}: "+"%{x:.2f}<br>"+ylab+": %{y}<extra></extra>"
))
if include_actual and act_col in df.columns:
fig.add_trace(go.Scatter(
x=df[act_col], y=y, mode="lines",
line=dict(color=COLORS["actual"], width=2.0, dash="dot"),
name=f"{act_col} (actual)",
hovertemplate=f"{act_col}: "+"%{x:.2f}<br>"+ylab+": %{y}<extra></extra>"
))
fig.update_layout(
height=TRACK_H, width=TRACK_W, autosize=False,
paper_bgcolor="#fff", plot_bgcolor="#fff",
margin=dict(l=64, r=16, t=36, b=48), hovermode="closest",
font=dict(size=FONT_SZ, color="#000"),
legend=dict(x=0.98, y=0.05, xanchor="right", yanchor="bottom",
bgcolor="rgba(255,255,255,0.75)", bordercolor="#ccc", borderwidth=1),
legend_title_text=""
)
fig.update_xaxes(
title_text=f"Max Stress ({UNITS})",
title_font=dict(size=20, family=BOLD_FONT, color="#000"),
tickfont=dict(size=15, family=BOLD_FONT, color="#000"),
side="top", range=[xmin, xmax],
ticks="outside",
tickformat=",.0f", # <— no decimals on ticks
tickmode="auto", tick0=tick0,
showline=True, linewidth=1.2, linecolor="#444", mirror=True,
showgrid=True, gridcolor="rgba(0,0,0,0.12)", automargin=True
)
fig.update_yaxes(
title_text=ylab,
title_font=dict(size=20, family=BOLD_FONT, color="#000"),
tickfont=dict(size=15, family=BOLD_FONT, color="#000"),
range=y_range, ticks="outside",
showline=True, linewidth=1.2, linecolor="#444", mirror=True,
showgrid=True, gridcolor="rgba(0,0,0,0.12)", automargin=True
)
return fig
def preview_tracks(df: pd.DataFrame, cols: list[str]):
cols = [c for c in cols if c in df.columns]
n = len(cols)
if n == 0:
fig, ax = plt.subplots(figsize=(4, 2))
ax.text(0.5, 0.5, "No selected columns", ha="center", va="center")
ax.axis("off")
return fig
depth_col = next((c for c in df.columns if 'depth' in str(c).lower()), None)
if depth_col is not None:
idx = pd.to_numeric(df[depth_col], errors="coerce")
y_label = depth_col
y_min, y_max = float(np.nanmin(idx)), float(np.nanmax(idx))
else:
idx = pd.Series(np.arange(1, len(df) + 1))
y_label = "Point Index"
y_min, y_max = float(idx.min()), float(idx.max())
cmap = plt.get_cmap("tab20")
col_colors = {col: cmap(i % cmap.N) for i, col in enumerate(cols)}
fig, axes = plt.subplots(1, n, figsize=(2.4 * n, 7.0), sharey=True, dpi=100)
if n == 1:
axes = [axes]
for i, (ax, col) in enumerate(zip(axes, cols)):
x = pd.to_numeric(df[col], errors="coerce")
ax.plot(x, idx, '-', lw=1.6, color=col_colors[col])
ax.set_xlabel(col); ax.xaxis.set_label_position('top'); ax.xaxis.tick_top()
ax.set_ylim(y_max, y_min) # reversed depth down
ax.grid(True, linestyle=":", alpha=0.3)
if i == 0:
ax.set_ylabel(y_label)
else:
ax.tick_params(labelleft=False); ax.set_ylabel("")
fig.tight_layout()
return fig
# =========================
# Fixed training pipeline
# =========================
def build_pipeline() -> Pipeline:
"""
Fixed, optimized pipeline:
- Numeric imputation (median)
- RandomForestRegressor with tuned params (BEST_PARAMS)
Trees don't need scaling; robust to feature distributions.
"""
model = RandomForestRegressor(**BEST_PARAMS)
pipe = Pipeline(steps=[
("imputer", SimpleImputer(strategy="median")),
("model", model),
])
return pipe
# =========================
# Session state
# =========================
st.session_state.setdefault("app_step", "intro")
st.session_state.setdefault("results", {})
st.session_state.setdefault("train_ranges", None)
st.session_state.setdefault("dev_file_name","")
st.session_state.setdefault("dev_file_bytes",b"")
st.session_state.setdefault("dev_file_loaded",False)
st.session_state.setdefault("dev_preview",False)
st.session_state.setdefault("fitted_model", None) # cache trained pipeline
# NEW: persistent top-of-page preview panel state (same as shmin)
st.session_state.setdefault("show_preview_panel", False)
st.session_state.setdefault("preview_book", {}) # parsed Excel sheets to preview
# =========================
# Sidebar branding
# =========================
st.sidebar.markdown(f"""
<div class="centered-container">
<img src="{inline_logo('logo.png')}" class="brand-logo">
<div style='font-weight:800;font-size:1.2rem;'>{APP_NAME}</div>
<div style='color:#667085;'>{TAGLINE}</div>
</div>
""", unsafe_allow_html=True)
def sticky_header(title, message):
st.markdown(
f"""
<style>
.sticky-container {{
position: sticky; top: 0; background-color: white; z-index: 100;
padding-top: 10px; padding-bottom: 10px; border-bottom: 1px solid #eee;
}}
</style>
<div class="sticky-container">
<h3>{title}</h3>
<p>{message}</p>
</div>
""",
unsafe_allow_html=True
)
# ---------- Top-of-page Preview Panel ----------
def render_preview_panel():
"""If enabled, draws a preview panel at the very top of the page."""
if not st.session_state.get("show_preview_panel"):
return
st.markdown("## 🔎 Data preview")
book = st.session_state.get("preview_book", {}) or {}
if not book:
st.info("No data loaded yet.")
col = st.columns(2)[1]
with col:
if st.button("Hide preview"):
st.session_state.show_preview_panel = False
st.session_state.preview_book = {}
st.rerun()
return
names = list(book.keys())
tabs = st.tabs(names + ["✖ Hide preview"])
for i, name in enumerate(names):
with tabs[i]:
df = book[name]
t1, t2 = st.tabs(["Tracks", "Summary"])
with t1:
st.pyplot(preview_tracks(df, FEATURES), use_container_width=True)
with t2:
feat_present = [c for c in FEATURES if c in df.columns]
if not feat_present:
st.info("No feature columns found to summarize.")
else:
tbl = (
df[feat_present]
.agg(['min','max','mean','std'])
.T.rename(columns={"Min":"Min","Max":"Max","mean":"Mean","std":"Std"})
.reset_index(names="Feature")
)
df_centered_rounded(tbl)
with tabs[-1]:
if st.button("Hide preview", use_container_width=True):
st.session_state.show_preview_panel = False
st.session_state.preview_book = {}
st.rerun()
# =========================
# INTRO
# =========================
if st.session_state.app_step == "intro":
st.header("Welcome!")
st.markdown(f"This software is developed by *Smart Thinking AI-Solutions Team* to estimate **Maximum Horizontal Stress** ({UNITS}) from drilling/offset data.")
st.subheader("How It Works")
st.markdown(
"1) **Upload your data file** and click **Run Model** to fit the baked-in pipeline. \n"
"2) **Validate** on held-out wells (with actual). \n"
"3) **Predict** on wells without actual."
)
if st.button("Start Showcase", type="primary"):
st.session_state.app_step = "dev"; st.rerun()
# =========================
# CASE BUILDING (Train/Test)
# =========================
def _find_sheet(book, names):
low2orig = {k.lower(): k for k in book.keys()}
for nm in names:
if nm.lower() in low2orig: return low2orig[nm.lower()]
return None
if st.session_state.app_step == "dev":
st.sidebar.header("Case Building")
up = st.sidebar.file_uploader("Upload Your Data File", type=["xlsx","xls"])
if up is not None:
st.session_state.dev_file_bytes = up.getvalue()
st.session_state.dev_file_name = up.name
st.session_state.dev_file_loaded = True
st.session_state.dev_preview = False
st.session_state.fitted_model = None # reset
if st.session_state.dev_file_loaded:
tmp = read_book_bytes(st.session_state.dev_file_bytes)
if tmp:
df0 = next(iter(tmp.values()))
st.sidebar.caption(f"**Data loaded:** {st.session_state.dev_file_name}{df0.shape[0]} rows × {df0.shape[1]} cols")
# PREVIEW button -> show preview panel at top
if st.sidebar.button("Preview data", use_container_width=True, disabled=not st.session_state.dev_file_loaded):
st.session_state.preview_book = read_book_bytes(st.session_state.dev_file_bytes) if st.session_state.dev_file_bytes else {}
st.session_state.show_preview_panel = True
st.rerun()
run = st.sidebar.button("Run Model", type="primary", use_container_width=True)
if st.sidebar.button("Proceed to Validation ▶", use_container_width=True): st.session_state.app_step="validate"; st.rerun()
if st.sidebar.button("Proceed to Prediction ▶", use_container_width=True): st.session_state.app_step="predict"; st.rerun()
if st.session_state.dev_file_loaded and st.session_state.show_preview_panel:
sticky_header("Case Building", "Previewed ✓ — now click **Run Model**.")
elif st.session_state.dev_file_loaded:
sticky_header("Case Building", "📄 **Preview uploaded data** using the sidebar button, then click **Run Model**.")
else:
sticky_header("Case Building", "**Upload your data to build a case, then run the model to review performance.**")
# Render the preview panel at the very top (above results)
render_preview_panel()
if run and st.session_state.dev_file_bytes:
book = read_book_bytes(st.session_state.dev_file_bytes)
sh_train = _find_sheet(book, ["Train","Training","training2","train","training"])
sh_test = _find_sheet(book, ["Test","Testing","testing2","test","testing"])
if sh_train is None or sh_test is None:
st.markdown('<div class="st-message-box st-error">Workbook must include Train/Training and Test/Testing sheets.</div>', unsafe_allow_html=True)
st.stop()
tr0 = book[sh_train].copy()
te0 = book[sh_test].copy()
# Ensure columns exist
if not (ensure_cols(tr0, FEATURES+[TARGET]) and ensure_cols(te0, FEATURES+[TARGET])):
st.markdown('<div class="st-message-box st-error">Missing required columns.</div>', unsafe_allow_html=True)
st.stop()
# Prepare X,y
X_tr = _make_X(tr0, FEATURES)
y_tr = pd.to_numeric(tr0[TARGET], errors="coerce")
X_te = _make_X(te0, FEATURES)
y_te = pd.to_numeric(te0[TARGET], errors="coerce")
# Drop rows with NA in y
mask_tr = np.isfinite(y_tr)
X_tr, y_tr = X_tr.loc[mask_tr], y_tr.loc[mask_tr]
mask_te = np.isfinite(y_te)
X_te, y_te = X_te.loc[mask_te], y_te.loc[mask_te]
pipe = build_pipeline()
pipe.fit(X_tr, y_tr)
st.session_state.fitted_model = pipe # cache
# Predictions
tr = tr0.copy(); te = te0.copy()
tr[PRED_COL] = _inv_transform(pipe.predict(_make_X(tr0, FEATURES)), TRANSFORM)
te[PRED_COL] = _inv_transform(pipe.predict(_make_X(te0, FEATURES)), TRANSFORM)
st.session_state.results["Train"] = tr
st.session_state.results["Test"] = te
st.session_state.results["m_train"] = {
"R": pearson_r(tr[TARGET], tr[PRED_COL]),
"RMSE": rmse(tr[TARGET], tr[PRED_COL]),
"MAPE%": mape(tr[TARGET], tr[PRED_COL]),
}
st.session_state.results["m_test"] = {
"R": pearson_r(te[TARGET], te[PRED_COL]),
"RMSE": rmse(te[TARGET], te[PRED_COL]),
"MAPE%": mape(te[TARGET], te[PRED_COL]),
}
tr_min = tr[FEATURES].min().to_dict(); tr_max = tr[FEATURES].max().to_dict()
st.session_state.train_ranges = {f:(float(tr_min[f]), float(tr_max[f])) for f in FEATURES}
st.markdown('<div class="st-message-box st-success">Case has been built and results are displayed below.</div>', unsafe_allow_html=True)
def _dev_block(df, m):
c1,c2,c3 = st.columns(3)
c1.metric("R", f"{m['R']:.3f}")
c2.metric("RMSE", f"{m['RMSE']:.2f}")
c3.metric("MAPE%", f"{m['MAPE%']:.2f}")
st.markdown("""
<div style='text-align: left; font-size: 0.8em; color: #6b7280; margin-top: -16px; margin-bottom: 8px;'>
<strong>R:</strong> Pearson Correlation Coefficient<br>
<strong>RMSE:</strong> Root Mean Square Error<br>
<strong>MAPE:</strong> Mean Absolute Percentage Error
</div>
""", unsafe_allow_html=True)
col_track, col_cross = st.columns([2, 3], gap="large")
with col_track:
st.plotly_chart(track_plot(df, include_actual=True),
use_container_width=False, config={"displayModeBar": False, "scrollZoom": True})
with col_cross:
st.pyplot(cross_plot_static(df[TARGET], df[PRED_COL]), use_container_width=False)
if "Train" in st.session_state.results or "Test" in st.session_state.results:
tab1, tab2 = st.tabs(["Training", "Testing"])
if "Train" in st.session_state.results:
with tab1: _dev_block(st.session_state.results["Train"], st.session_state.results["m_train"])
if "Test" in st.session_state.results:
with tab2: _dev_block(st.session_state.results["Test"], st.session_state.results["m_test"])
render_export_button(phase_key="dev")
# =========================
# VALIDATION (with actual)
# =========================
if st.session_state.app_step == "validate":
st.sidebar.header("Validate the Model")
up = st.sidebar.file_uploader("Upload Validation Excel", type=["xlsx","xls"])
if up is not None:
book = read_book_bytes(up.getvalue())
if book:
df0 = next(iter(book.values()))
st.sidebar.caption(f"**Data loaded:** {up.name}{df0.shape[0]} rows × {df0.shape[1]} cols")
# PREVIEW button -> show preview panel at top
if st.sidebar.button("Preview data", use_container_width=True, disabled=(up is None)):
st.session_state.preview_book = read_book_bytes(up.getvalue()) if up is not None else {}
st.session_state.show_preview_panel = True
st.rerun()
go_btn = st.sidebar.button("Predict & Validate", type="primary", use_container_width=True)
if st.sidebar.button("⬅ Back to Case Building", use_container_width=True): st.session_state.app_step="dev"; st.rerun()
if st.sidebar.button("Proceed to Prediction ▶", use_container_width=True): st.session_state.app_step="predict"; st.rerun()
sticky_header("Validate the Model", "Upload a dataset with the same **features** and **MaxStress_Actual** to evaluate performance.")
render_preview_panel() # top-of-page preview
if go_btn and up is not None:
if st.session_state.fitted_model is None:
st.error("Please train the model first in Case Building.")
st.stop()
book = read_book_bytes(up.getvalue())
names = list(book.keys())
name = next((s for s in names if s.lower() in ("validation","validate","validation2","val","val2")), names[0])
df0 = book[name].copy()
if not ensure_cols(df0, FEATURES+[TARGET]):
st.markdown('<div class="st-message-box st-error">Missing required columns.</div>', unsafe_allow_html=True); st.stop()
df = df0.copy()
df[PRED_COL] = _inv_transform(st.session_state.fitted_model.predict(_make_X(df0, FEATURES)), TRANSFORM)
st.session_state.results["Validate"] = df
ranges = st.session_state.train_ranges; oor_pct = 0.0; tbl=None
if ranges:
any_viol = pd.DataFrame({f:(df[f] < ranges[f][0]) | (df[f] > ranges[f][1]) for f in FEATURES}).any(axis=1)
oor_pct = float(any_viol.mean() * 100.0)
if any_viol.any():
tbl = df.loc[any_viol, FEATURES].copy()
for c in FEATURES:
if pd.api.types.is_numeric_dtype(tbl[c]): tbl[c] = tbl[c].round(3)
tbl["Violations"] = pd.DataFrame({f:(df[f] < ranges[f][0]) | (df[f] > ranges[f][1]) for f in FEATURES}).loc[any_viol].apply(
lambda r:", ".join([c for c,v in r.items() if v]), axis=1
)
st.session_state.results["m_val"] = {
"R": pearson_r(df[TARGET], df[PRED_COL]),
"RMSE": rmse(df[TARGET], df[PRED_COL]),
"MAPE%": mape(df[TARGET], df[PRED_COL]),
}
st.session_state.results["sv_val"] = {"n":len(df), "pred_min":float(df[PRED_COL].min()), "pred_max":float(df[PRED_COL].max()), "oor":oor_pct}
st.session_state.results["oor_tbl"] = tbl
if "Validate" in st.session_state.results:
m = st.session_state.results["m_val"]
c1,c2,c3 = st.columns(3)
c1.metric("R", f"{m['R']:.3f}"); c2.metric("RMSE", f"{m['RMSE']:.2f}"); c3.metric("MAPE%", f"{m['MAPE%']:.2f}")
st.markdown("""
<div style='text-align: left; font-size: 0.8em; color: #6b7280; margin-top: -16px; margin-bottom: 8px;'>
<strong>R:</strong> Pearson Correlation Coefficient<br>
<strong>RMSE:</strong> Root Mean Square Error<br>
<strong>MAPE:</strong> Mean Absolute Percentage Error
</div>
""", unsafe_allow_html=True)
col_track, col_cross = st.columns([2, 3], gap="large")
with col_track:
st.plotly_chart(track_plot(st.session_state.results["Validate"], include_actual=True),
use_container_width=False, config={"displayModeBar": False, "scrollZoom": True})
with col_cross:
st.pyplot(cross_plot_static(st.session_state.results["Validate"][TARGET],
st.session_state.results["Validate"][PRED_COL]),
use_container_width=False)
render_export_button(phase_key="validate")
sv = st.session_state.results["sv_val"]
if sv["oor"] > 0: st.markdown('<div class="st-message-box st-warning">Some inputs fall outside **training min–max** ranges.</div>', unsafe_allow_html=True)
if st.session_state.results["oor_tbl"] is not None:
st.write("*Out-of-range rows (vs. Training min–max):*")
df_centered_rounded(st.session_state.results["oor_tbl"])
# =========================
# PREDICTION (no actual)
# =========================
if st.session_state.app_step == "predict":
st.sidebar.header("Prediction (No Actual)")
up = st.sidebar.file_uploader("Upload Prediction Excel", type=["xlsx","xls"])
if up is not None:
book = read_book_bytes(up.getvalue())
if book:
df0 = next(iter(book.values()))
st.sidebar.caption(f"**Data loaded:** {up.name}{df0.shape[0]} rows × {df0.shape[1]} cols")
# PREVIEW button -> show preview panel at top
if st.sidebar.button("Preview data", use_container_width=True, disabled=(up is None)):
st.session_state.preview_book = read_book_bytes(up.getvalue()) if up is not None else {}
st.session_state.show_preview_panel = True
st.rerun()
go_btn = st.sidebar.button("Predict", type="primary", use_container_width=True)
if st.sidebar.button("⬅ Back to Case Building", use_container_width=True): st.session_state.app_step="dev"; st.rerun()
sticky_header("Prediction", "Upload a dataset with the 5 feature columns (no actual column).")
render_preview_panel() # top-of-page preview
if go_btn and up is not None:
if st.session_state.fitted_model is None:
st.error("Please train the model first in Case Building.")
st.stop()
book = read_book_bytes(up.getvalue()); name = list(book.keys())[0]
df0 = book[name].copy()
if not ensure_cols(df0, FEATURES):
st.markdown('<div class="st-message-box st-error">Missing required columns.</div>', unsafe_allow_html=True); st.stop()
df = df0.copy()
df[PRED_COL] = _inv_transform(st.session_state.fitted_model.predict(_make_X(df0, FEATURES)), TRANSFORM)
st.session_state.results["PredictOnly"] = df
ranges = st.session_state.train_ranges; oor_pct = 0.0
if ranges:
any_viol = pd.DataFrame({f:(df[f] < ranges[f][0]) | (df[f] > ranges[f][1]) for f in FEATURES}).any(axis=1)
oor_pct = float(any_viol.mean() * 100.0)
st.session_state.results["sv_pred"] = {
"n":len(df),
"pred_min":float(df[PRED_COL].min()),
"pred_max":float(df[PRED_COL].max()),
"pred_mean":float(df[PRED_COL].mean()),
"pred_std":float(df[PRED_COL].std(ddof=0)),
"oor":oor_pct
}
if "PredictOnly" in st.session_state.results:
df = st.session_state.results["PredictOnly"]; sv = st.session_state.results["sv_pred"]
col_left, col_right = st.columns([2,3], gap="large")
with col_left:
table = pd.DataFrame({
"Metric": ["# points","Pred min","Pred max","Pred mean","Pred std","OOR %"],
"Value": [sv["n"], round(sv["pred_min"],3), round(sv["pred_max"],3),
round(sv["pred_mean"],3), round(sv["pred_std"],3), f'{sv["oor"]:.1f}%']
})
st.markdown('<div class="st-message-box st-success">Predictions ready ✓</div>', unsafe_allow_html=True)
df_centered_rounded(table, hide_index=True)
st.caption("**★ OOR** = % of rows with input features outside the training min–max range.")
with col_right:
st.plotly_chart(track_plot(df, include_actual=False),
use_container_width=False, config={"displayModeBar": False, "scrollZoom": True})
render_export_button(phase_key="predict")
# =========================
# Footer
# =========================
st.markdown("""
<br><br><br>
<hr>
<div style='text-align:center;color:#6b7280;font-size:1.0em;'>
© 2025 Smart Thinking AI-Solutions Team. All rights reserved.<br>
Website: <a href="https://smartthinking.com.sa" target="_blank" rel="noopener noreferrer">smartthinking.com.sa</a>
</div>
""", unsafe_allow_html=True)