# app_FP.py — ST_GeoMech_FP (Fracture Pressure) # Mirrors the SHmin app's specs & workflow (password gate, preview panel, train/validate/predict, Excel export). # Self-contained: trains a fixed, optimized RF pipeline in-app. No external model files. import io import os import base64 import math from pathlib import Path from datetime import datetime import streamlit as st import pandas as pd import numpy as np # Matplotlib (static previews & cross-plot) import matplotlib matplotlib.use("Agg") import matplotlib.pyplot as plt from matplotlib.ticker import FuncFormatter import plotly.graph_objects as go from sklearn.metrics import mean_squared_error from sklearn.ensemble import RandomForestRegressor from sklearn.pipeline import Pipeline from sklearn.impute import SimpleImputer # ========================= # App constants / defaults # ========================= APP_NAME = "ST_GeoMech_FP" TAGLINE = "Real-Time Fracture Pressure Prediction" # Canonical features (match SHmin app) FEATURES = ["Q (gpm)", "SPP (psi)", "T (kft.lbf)", "WOB (klbf)", "ROP (ft/h)"] # Canonical prediction/target labels TARGET_CANON = "FracPress_Actual" PRED_COL = "FracPress_Pred" UNITS = "Psi" # Target aliases accepted in input workbooks TARGET_ALIASES = [ "FracPress_Actual", "FracturePressure_Actual", "Fracture Pressure (psi)", "Frac Pressure (psi)", "FracPressure", "Frac_Pressure", "FracturePressure", "FP_Actual", "FP (psi)" ] # Optional transform (kept for parity; RF is used on raw scale) TRANSFORM = "none" # "none" | "log10" | "ln" # Fixed "best" RandomForest params BEST_PARAMS = dict( n_estimators=400, max_depth=None, min_samples_split=2, min_samples_leaf=1, max_features=0.6, bootstrap=True, random_state=42, n_jobs=-1, ) # Color / layout COLORS = {"pred": "#1f77b4", "actual": "#f2b702", "ref": "#5a5a5a"} CROSS_W, CROSS_H = 350, 350 TRACK_H, TRACK_W = 1000, 500 FONT_SZ = 13 BOLD_FONT = "Arial Black, Arial, sans-serif" # ========================= # Page / CSS # ========================= st.set_page_config(page_title=APP_NAME, page_icon="logo.png", layout="wide") st.markdown(""" """, unsafe_allow_html=True) TABLE_CENTER_CSS = [ dict(selector="th", props=[("text-align", "center")]), dict(selector="td", props=[("text-align", "center")]), ] # ========================= # Password gate (optional) # ========================= def inline_logo(path: str = "logo.png") -> str: try: p = Path(path) if not p.exists(): return "" return f"data:image/png;base64,{base64.b64encode(p.read_bytes()).decode('ascii')}" except Exception: return "" def add_password_gate() -> None: try: required = st.secrets.get("APP_PASSWORD", "") except Exception: required = os.environ.get("APP_PASSWORD", "") if not required: return if st.session_state.get("auth_ok", False): return st.sidebar.markdown(f"""
{APP_NAME}
Smart Thinking • Secure Access
""", unsafe_allow_html=True) pwd = st.sidebar.text_input("Access key", type="password", placeholder="••••••••") if st.sidebar.button("Unlock", type="primary"): if pwd == required: st.session_state.auth_ok = True st.rerun() else: st.error("Incorrect key.") st.stop() add_password_gate() # ========================= # Utilities # ========================= def rmse(y_true, y_pred) -> float: return float(np.sqrt(mean_squared_error(y_true, y_pred))) def mape(y_true, y_pred, eps: float = 1e-9) -> float: a = np.asarray(y_true, dtype=float) p = np.asarray(y_pred, dtype=float) den = np.maximum(np.abs(a), eps) return float(np.mean(np.abs((a - p) / den)) * 100.0) def pearson_r(y_true, y_pred) -> float: a = np.asarray(y_true, dtype=float) p = np.asarray(y_pred, dtype=float) if a.size < 2: return float("nan") if np.all(a == a[0]) or np.all(p == p[0]): return float("nan") return float(np.corrcoef(a, p)[0, 1]) @st.cache_data(show_spinner=False) def parse_excel(data_bytes: bytes) -> dict[str, pd.DataFrame]: bio = io.BytesIO(data_bytes) xl = pd.ExcelFile(bio) return {sh: xl.parse(sh) for sh in xl.sheet_names} def read_book_bytes(b: bytes) -> dict[str, pd.DataFrame]: return parse_excel(b) if b else {} def _excel_engine() -> str: try: import xlsxwriter # noqa: F401 return "xlsxwriter" except Exception: return "openpyxl" def _excel_safe_name(name: str) -> str: bad = '[]:*?/\\' safe = ''.join('_' if ch in bad else ch for ch in str(name)) return safe[:31] def _round_numeric(df: pd.DataFrame, ndigits: int = 3) -> pd.DataFrame: out = df.copy() for c in out.columns: if pd.api.types.is_float_dtype(out[c]) or pd.api.types.is_integer_dtype(out[c]): out[c] = pd.to_numeric(out[c], errors="coerce").round(ndigits) return out def df_centered_rounded(df: pd.DataFrame, hide_index: bool = True) -> None: out = df.copy() numcols = out.select_dtypes(include=[np.number]).columns styler = ( out.style .format({c: "{:.3f}" for c in numcols}) .set_properties(**{"text-align": "center"}) .set_table_styles(TABLE_CENTER_CSS) ) st.dataframe(styler, use_container_width=True, hide_index=hide_index) def ensure_cols(df: pd.DataFrame, cols: list[str]) -> bool: miss = [c for c in cols if c not in df.columns] if miss: st.error(f"Missing columns: {miss}\nFound: {list(df.columns)}") return False return True def _nice_tick0(xmin: float, step: float = 0.1) -> float: return step * math.floor(xmin / step) if np.isfinite(xmin) else xmin # ---------- Transform helpers ---------- def _inv_transform(x: np.ndarray, transform: str) -> np.ndarray: t = (transform or "none").lower() if t in ("log10", "log_10", "log10()"): return np.power(10.0, x) if t in ("ln", "log", "loge", "log_e", "natural"): return np.exp(x) return x # ---------- Build X exactly as trained ---------- def _make_X(df: pd.DataFrame, features: list[str]) -> pd.DataFrame: X = df.reindex(columns=features, copy=False) for c in X.columns: X[c] = pd.to_numeric(X[c], errors="coerce") return X # ---------- Target resolver (use aliases) ---------- def _resolve_target_col(df: pd.DataFrame) -> str | None: cols_lower = {c.lower(): c for c in df.columns} for cand in TARGET_ALIASES: if cand.lower() in cols_lower: return cols_lower[cand.lower()] return None # ========================= # Export helpers # ========================= def _summary_table(df: pd.DataFrame, cols: list[str]) -> pd.DataFrame: cols = [c for c in cols if c in df.columns] if not cols: return pd.DataFrame() tbl = ( df[cols] .agg(["min", "max", "mean", "std"]) .T.rename(columns={"min": "Min", "max": "Max", "mean": "Mean", "std": "Std"}) .reset_index(names="Field") ) return _round_numeric(tbl, 3) def _train_ranges_df(ranges: dict[str, tuple[float, float]]) -> pd.DataFrame: if not ranges: return pd.DataFrame() df = pd.DataFrame(ranges).T.reset_index() df.columns = ["Feature", "Min", "Max"] return _round_numeric(df, 3) def _excel_autofit(writer, sheet_name: str, df: pd.DataFrame, min_w: int = 8, max_w: int = 40) -> None: try: import xlsxwriter # noqa: F401 except Exception: return ws = writer.sheets[sheet_name] for i, col in enumerate(df.columns): series = df[col].astype(str) max_len = max([len(str(col))] + series.map(len).tolist()) ws.set_column(i, i, max(min_w, min(max_len + 2, max_w))) ws.freeze_panes(1, 0) def _available_sections() -> list[str]: res = st.session_state.get("results", {}) sections: list[str] = [] if "Train" in res: sections += ["Training", "Training_Metrics", "Training_Summary"] if "Test" in res: sections += ["Testing", "Testing_Metrics", "Testing_Summary"] if "Validate" in res: sections += ["Validation", "Validation_Metrics", "Validation_Summary", "Validation_OOR"] if "PredictOnly" in res: sections += ["Prediction", "Prediction_Summary"] if st.session_state.get("train_ranges"): sections += ["Training_Ranges"] sections += ["Info"] return sections def build_export_workbook(selected: list[str], ndigits: int = 3, do_autofit: bool = True) -> tuple[bytes | None, str | None, list[str]]: res = st.session_state.get("results", {}) if not res: return None, None, [] sheets: dict[str, pd.DataFrame] = {} order: list[str] = [] def _add(name: str, df: pd.DataFrame) -> None: if df is None or (isinstance(df, pd.DataFrame) and df.empty): return sheets[name] = _round_numeric(df, ndigits) order.append(name) # Training / Testing if "Training" in selected and "Train" in res: _add("Training", res["Train"]) if "Training_Metrics" in selected and res.get("m_train"): _add("Training_Metrics", pd.DataFrame([res["m_train"]])) if "Training_Summary" in selected and "Train" in res: tr_cols = FEATURES + [c for c in [st.session_state.get("tcol_train", TARGET_CANON), PRED_COL] if c in res["Train"].columns] _add("Training_Summary", _summary_table(res["Train"], tr_cols)) if "Testing" in selected and "Test" in res: _add("Testing", res["Test"]) if "Testing_Metrics" in selected and res.get("m_test"): _add("Testing_Metrics", pd.DataFrame([res["m_test"]])) if "Testing_Summary" in selected and "Test" in res: te_cols = FEATURES + [c for c in [st.session_state.get("tcol_test", TARGET_CANON), PRED_COL] if c in res["Test"].columns] _add("Testing_Summary", _summary_table(res["Test"], te_cols)) # Validation / Prediction if "Validation" in selected and "Validate" in res: _add("Validation", res["Validate"]) if "Validation_Metrics" in selected and res.get("m_val"): _add("Validation_Metrics", pd.DataFrame([res["m_val"]])) if "Validation_Summary" in selected and res.get("sv_val"): _add("Validation_Summary", pd.DataFrame([res["sv_val"]])) if "Validation_OOR" in selected and isinstance(res.get("oor_tbl"), pd.DataFrame) and not res["oor_tbl"].empty: _add("Validation_OOR", res["oor_tbl"].reset_index(drop=True)) if "Prediction" in selected and "PredictOnly" in res: _add("Prediction", res["PredictOnly"]) if "Prediction_Summary" in selected and res.get("sv_pred"): _add("Prediction_Summary", pd.DataFrame([res["sv_pred"]])) if "Training_Ranges" in selected and st.session_state.get("train_ranges"): _add("Training_Ranges", _train_ranges_df(st.session_state["train_ranges"])) if "Info" in selected: info = pd.DataFrame([ {"Key": "AppName", "Value": APP_NAME}, {"Key": "Tagline", "Value": TAGLINE}, {"Key": "Target", "Value": st.session_state.get("tcol_train", TARGET_CANON)}, {"Key": "PredColumn", "Value": PRED_COL}, {"Key": "Features", "Value": ", ".join(FEATURES)}, {"Key": "ExportedAt", "Value": datetime.now().strftime("%Y-%m-%d %H:%M:%S")}, ]) _add("Info", info) if not order: return None, None, [] bio = io.BytesIO() engine = _excel_engine() with pd.ExcelWriter(bio, engine=engine) as writer: for name in order: df = sheets[name] sheet = _excel_safe_name(name) df.to_excel(writer, sheet_name=sheet, index=False) if do_autofit: _excel_autofit(writer, sheet, df) bio.seek(0) fname = f"FracPressure_Export_{datetime.now().strftime('%Y%m%d_%H%M%S')}.xlsx" return bio.getvalue(), fname, order def render_export_button(phase_key: str) -> None: res = st.session_state.get("results", {}) if not res: return st.divider() st.markdown("### Export to Excel") options = _available_sections() selected_sheets = st.multiselect( "Sheets to include", options=options, default=[], placeholder="Choose option(s)", help="Pick the sheets you want in the Excel export.", key=f"sheets_{phase_key}", ) if not selected_sheets: st.caption("Select one or more sheets above to enable export.") st.download_button( "⬇️ Export Excel", data=b"", file_name="FracPressure_Export.xlsx", mime="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", disabled=True, key=f"download_{phase_key}", ) return data, fname, names = build_export_workbook(selected=selected_sheets, ndigits=3, do_autofit=True) if names: st.caption("Will include: " + ", ".join(names)) st.download_button( "⬇️ Export Excel", data=(data or b""), file_name=(fname or "FracPressure_Export.xlsx"), mime="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", disabled=(data is None), key=f"download_{phase_key}", ) # ========================= # Plots (integer ticks) # ========================= def cross_plot_static(actual, pred, label: str = "Fracture Pressure"): a = pd.Series(actual, dtype=float) p = pd.Series(pred, dtype=float) lo = float(min(a.min(), p.min())) hi = float(max(a.max(), p.max())) pad = 0.03 * (hi - lo if hi > lo else 1.0) lo2, hi2 = lo - pad, hi + pad ticks = np.linspace(lo2, hi2, 5) dpi = 110 fig, ax = plt.subplots(figsize=(CROSS_W / dpi, CROSS_H / dpi), dpi=dpi, constrained_layout=False) ax.scatter(a, p, s=14, c=COLORS["pred"], alpha=0.9, linewidths=0) ax.plot([lo2, hi2], [lo2, hi2], linestyle="--", linewidth=1.2, color=COLORS["ref"]) ax.set_xlim(lo2, hi2); ax.set_ylim(lo2, hi2) ax.set_xticks(ticks); ax.set_yticks(ticks) ax.set_aspect("equal", adjustable="box") fmt = FuncFormatter(lambda x, _: f"{x:.0f}") ax.xaxis.set_major_formatter(fmt); ax.yaxis.set_major_formatter(fmt) ax.set_xlabel(f"Actual {label} ({UNITS})", fontweight="bold", fontsize=10, color="black") ax.set_ylabel(f"Predicted {label} ({UNITS})", fontweight="bold", fontsize=10, color="black") ax.tick_params(labelsize=6, colors="black") ax.grid(True, linestyle=":", alpha=0.3) for spine in ax.spines.values(): spine.set_linewidth(1.1); spine.set_color("#444") fig.subplots_adjust(left=0.16, bottom=0.16, right=0.98, top=0.98) return fig def track_plot(df: pd.DataFrame, actual_col: str | None, include_actual: bool = True): depth_col = next((c for c in df.columns if ("depth" in str(c).lower()) or ("tvd" in str(c).lower())), None) if depth_col is not None: y = pd.to_numeric(df[depth_col], errors="coerce") ylab = depth_col y_range = [float(np.nanmax(y)), float(np.nanmin(y))] # reversed else: y = pd.Series(np.arange(1, len(df) + 1)) ylab = "Point Index" y_range = [float(y.max()), float(y.min())] x_series = pd.Series(df.get(PRED_COL, pd.Series(dtype=float))).astype(float) if include_actual and actual_col and actual_col in df.columns: x_series = pd.concat([x_series, pd.Series(df[actual_col]).astype(float)], ignore_index=True) x_lo, x_hi = float(x_series.min()), float(x_series.max()) x_pad = 0.03 * (x_hi - x_lo if x_hi > x_lo else 1.0) xmin, xmax = x_lo - x_pad, x_hi + x_pad tick0 = _nice_tick0(xmin, step=max((xmax - xmin) / 10.0, 0.1)) fig = go.Figure() if PRED_COL in df.columns: fig.add_trace(go.Scatter( x=df[PRED_COL], y=y, mode="lines", line=dict(color=COLORS["pred"], width=1.8), name=PRED_COL, hovertemplate=f"{PRED_COL}: "+"%{x:.0f}
"+ylab+": %{y}" )) if include_actual and actual_col and actual_col in df.columns: fig.add_trace(go.Scatter( x=df[actual_col], y=y, mode="lines", line=dict(color=COLORS["actual"], width=2.0, dash="dot"), name=f"{actual_col} (actual)", hovertemplate=f"{actual_col}: "+"%{x:.0f}
"+ylab+": %{y}" )) fig.update_layout( height=TRACK_H, width=TRACK_W, autosize=False, paper_bgcolor="#fff", plot_bgcolor="#fff", margin=dict(l=64, r=16, t=36, b=48), hovermode="closest", font=dict(size=FONT_SZ, color="#000"), legend=dict(x=0.98, y=0.05, xanchor="right", yanchor="bottom", bgcolor="rgba(255,255,255,0.75)", bordercolor="#ccc", borderwidth=1), legend_title_text="" ) fig.update_xaxes( title_text=f"Fracture Pressure ({UNITS})", title_font=dict(size=20, family=BOLD_FONT, color="#000"), tickfont=dict(size=15, family=BOLD_FONT, color="#000"), side="top", range=[xmin, xmax], ticks="outside", tickformat=",.0f", tickmode="auto", tick0=tick0, showline=True, linewidth=1.2, linecolor="#444", mirror=True, showgrid=True, gridcolor="rgba(0,0,0,0.12)", automargin=True ) fig.update_yaxes( title_text=ylab, title_font=dict(size=20, family=BOLD_FONT, color="#000"), tickfont=dict(size=15, family=BOLD_FONT, color="#000"), range=y_range, ticks="outside", showline=True, linewidth=1.2, linecolor="#444", mirror=True, showgrid=True, gridcolor="rgba(0,0,0,0.12)", automargin=True ) return fig def preview_tracks(df: pd.DataFrame, cols: list[str]): cols = [c for c in cols if c in df.columns] n = len(cols) if n == 0: fig, ax = plt.subplots(figsize=(4, 2)) ax.text(0.5, 0.5, "No selected columns", ha="center", va="center") ax.axis("off") return fig depth_col = next((c for c in df.columns if ("depth" in str(c).lower()) or ("tvd" in str(c).lower())), None) if depth_col is not None: idx = pd.to_numeric(df[depth_col], errors="coerce") y_label = depth_col y_min, y_max = float(np.nanmin(idx)), float(np.nanmax(idx)) else: idx = pd.Series(np.arange(1, len(df) + 1)) y_label = "Point Index" y_min, y_max = float(idx.min()), float(idx.max()) cmap = plt.get_cmap("tab20") col_colors = {col: cmap(i % cmap.N) for i, col in enumerate(cols)} fig, axes = plt.subplots(1, n, figsize=(2.4 * n, 7.0), sharey=True, dpi=100) if n == 1: axes = [axes] for i, (ax, col) in enumerate(zip(axes, cols)): x = pd.to_numeric(df[col], errors="coerce") ax.plot(x, idx, '-', lw=1.6, color=col_colors[col]) ax.set_xlabel(col); ax.xaxis.set_label_position('top'); ax.xaxis.tick_top() ax.set_ylim(y_max, y_min) # reversed depth down ax.grid(True, linestyle=":", alpha=0.3) if i == 0: ax.set_ylabel(y_label) else: ax.tick_params(labelleft=False); ax.set_ylabel("") fig.tight_layout() return fig # ========================= # Fixed training pipeline # ========================= def build_pipeline() -> Pipeline: model = RandomForestRegressor(**BEST_PARAMS) pipe = Pipeline(steps=[ ("imputer", SimpleImputer(strategy="median")), ("model", model), ]) return pipe # ========================= # Session state (mirrors SHmin) # ========================= st.session_state.setdefault("app_step", "intro") st.session_state.setdefault("results", {}) st.session_state.setdefault("train_ranges", None) st.session_state.setdefault("dev_file_name", "") st.session_state.setdefault("dev_file_bytes", b"") st.session_state.setdefault("dev_file_loaded", False) st.session_state.setdefault("fitted_model", None) # Persistent top-of-page preview panel st.session_state.setdefault("show_preview_panel", False) st.session_state.setdefault("preview_book", {}) # ========================= # Sidebar branding # ========================= st.sidebar.markdown(f"""
{APP_NAME}
{TAGLINE}
""", unsafe_allow_html=True) def sticky_header(title: str, message: str) -> None: st.markdown( f"""

{title}

{message}

""", unsafe_allow_html=True ) def render_preview_panel() -> None: """Top-of-page preview panel (same behavior as SHmin).""" if not st.session_state.get("show_preview_panel"): return st.markdown("## 🔎 Data preview") book = st.session_state.get("preview_book", {}) or {} if not book: st.info("No data loaded yet.") col = st.columns(2)[1] with col: if st.button("Hide preview"): st.session_state.show_preview_panel = False st.session_state.preview_book = {} st.rerun() return names = list(book.keys()) tabs = st.tabs(names + ["✖ Hide preview"]) for i, name in enumerate(names): with tabs[i]: df = book[name] t1, t2 = st.tabs(["Tracks", "Summary"]) with t1: st.pyplot(preview_tracks(df, FEATURES), use_container_width=True) with t2: feat_present = [c for c in FEATURES if c in df.columns] if not feat_present: st.info("No feature columns found to summarize.") else: tbl = ( df[feat_present] .agg(["min", "max", "mean", "std"]) .T.rename(columns={"min": "Min", "max": "Max", "mean": "Mean", "std": "Std"}) .reset_index(names="Feature") ) df_centered_rounded(tbl) with tabs[-1]: if st.button("Hide preview", use_container_width=True): st.session_state.show_preview_panel = False st.session_state.preview_book = {} st.rerun() # ========================= # INTRO # ========================= if st.session_state.app_step == "intro": st.header("Welcome!") st.markdown( f"This software is developed by *Smart Thinking AI-Solutions Team* to estimate **Fracture Pressure** ({UNITS}) from drilling/offset data." ) st.subheader("How It Works") st.markdown( "1) **Upload your data file** and click **Run Model** to fit the baked-in pipeline. \n" "2) **Validate** on held-out wells (with actual). \n" "3) **Predict** on wells without actual." ) if st.button("Start Showcase", type="primary"): st.session_state.app_step = "dev" st.rerun() # ========================= # CASE BUILDING (Train/Test) # ========================= def _find_sheet(book: dict[str, pd.DataFrame], names: list[str]) -> str | None: low2orig = {k.lower(): k for k in book.keys()} for nm in names: if nm.lower() in low2orig: return low2orig[nm.lower()] return None if st.session_state.app_step == "dev": st.sidebar.header("Case Building") up = st.sidebar.file_uploader("Upload Your Data File", type=["xlsx", "xls"]) if up is not None: st.session_state.dev_file_bytes = up.getvalue() st.session_state.dev_file_name = up.name st.session_state.dev_file_loaded = True st.session_state.fitted_model = None # show preview panel immediately st.session_state.preview_book = read_book_bytes(st.session_state.dev_file_bytes) if st.session_state.dev_file_bytes else {} st.session_state.show_preview_panel = True st.rerun() if st.session_state.dev_file_loaded: tmp = read_book_bytes(st.session_state.dev_file_bytes) if tmp: df0 = next(iter(tmp.values())) st.sidebar.caption(f"**Data loaded:** {st.session_state.dev_file_name} • {df0.shape[0]} rows × {df0.shape[1]} cols") run = st.sidebar.button("Run Model", type="primary", use_container_width=True) if st.sidebar.button("Proceed to Validation ▶", use_container_width=True): st.session_state.app_step = "validate"; st.rerun() if st.sidebar.button("Proceed to Prediction ▶", use_container_width=True): st.session_state.app_step = "predict"; st.rerun() if st.session_state.dev_file_loaded and st.session_state.show_preview_panel: sticky_header("Case Building", "Previewed ✓ — now click **Run Model**.") elif st.session_state.dev_file_loaded: sticky_header("Case Building", "📄 **Preview uploaded data** using the sidebar button, then click **Run Model**.") else: sticky_header("Case Building", "**Upload your data to build a case, then run the model to review performance.**") render_preview_panel() if run and st.session_state.dev_file_bytes: book = read_book_bytes(st.session_state.dev_file_bytes) sh_train = _find_sheet(book, ["Train", "Training", "training2", "train", "training"]) sh_test = _find_sheet(book, ["Test", "Testing", "testing2", "test", "testing"]) if sh_train is None or sh_test is None: st.markdown('
Workbook must include Train/Training and Test/Testing sheets.
', unsafe_allow_html=True) st.stop() tr0 = book[sh_train].copy() te0 = book[sh_test].copy() # Resolve target name per-sheet from aliases tcol_tr = _resolve_target_col(tr0) tcol_te = _resolve_target_col(te0) if tcol_tr is None or tcol_te is None: st.error(f"Missing target column. Expected one of: {TARGET_ALIASES}") st.stop() # Ensure feature columns exist if not (ensure_cols(tr0, FEATURES) and ensure_cols(te0, FEATURES)): st.markdown('
Missing required feature columns.
', unsafe_allow_html=True) st.stop() # Prepare X,y X_tr = _make_X(tr0, FEATURES) y_tr = pd.to_numeric(tr0[tcol_tr], errors="coerce") X_te = _make_X(te0, FEATURES) y_te = pd.to_numeric(te0[tcol_te], errors="coerce") # Drop rows with NA in y mask_tr = np.isfinite(y_tr); X_tr, y_tr = X_tr.loc[mask_tr], y_tr.loc[mask_tr] mask_te = np.isfinite(y_te); X_te, y_te = X_te.loc[mask_te], y_te.loc[mask_te] pipe = build_pipeline() pipe.fit(X_tr, y_tr) st.session_state.fitted_model = pipe # Predictions tr = tr0.copy(); te = te0.copy() tr[PRED_COL] = _inv_transform(pipe.predict(_make_X(tr0, FEATURES)), TRANSFORM) te[PRED_COL] = _inv_transform(pipe.predict(_make_X(te0, FEATURES)), TRANSFORM) # Save results & metrics st.session_state.results["Train"] = tr st.session_state.results["Test"] = te st.session_state.results["m_train"] = { "R": pearson_r(tr[tcol_tr], tr[PRED_COL]), "RMSE": rmse(tr[tcol_tr], tr[PRED_COL]), "MAPE%": mape(tr[tcol_tr], tr[PRED_COL]), } st.session_state.results["m_test"] = { "R": pearson_r(te[tcol_te], te[PRED_COL]), "RMSE": rmse(te[tcol_te], te[PRED_COL]), "MAPE%": mape(te[tcol_te], te[PRED_COL]), } # Persist used target names (for export/plots) st.session_state["tcol_train"] = tcol_tr st.session_state["tcol_test"] = tcol_te # Training min–max ranges tr_min = tr[FEATURES].min().to_dict() tr_max = tr[FEATURES].max().to_dict() st.session_state.train_ranges = {f: (float(tr_min[f]), float(tr_max[f])) for f in FEATURES} st.markdown('
Case has been built and results are displayed below.
', unsafe_allow_html=True) def _dev_block(df: pd.DataFrame, actual_col: str, m: dict): c1, c2, c3 = st.columns(3) c1.metric("R", f"{m['R']:.3f}") c2.metric("RMSE", f"{m['RMSE']:.2f}") c3.metric("MAPE%", f"{m['MAPE%']:.2f}") st.markdown("""
R: Pearson Correlation Coefficient
RMSE: Root Mean Square Error
MAPE: Mean Absolute Percentage Error
""", unsafe_allow_html=True) col_track, col_cross = st.columns([2, 3], gap="large") with col_track: st.plotly_chart( track_plot(df, actual_col, include_actual=True), use_container_width=False, config={"displayModeBar": False, "scrollZoom": True} ) with col_cross: st.pyplot( cross_plot_static(df[actual_col], df[PRED_COL], label="Fracture Pressure"), use_container_width=False ) if "Train" in st.session_state.results or "Test" in st.session_state.results: tab1, tab2 = st.tabs(["Training", "Testing"]) if "Train" in st.session_state.results: with tab1: _dev_block(st.session_state.results["Train"], st.session_state.get("tcol_train", TARGET_CANON), st.session_state.results["m_train"]) if "Test" in st.session_state.results: with tab2: _dev_block(st.session_state.results["Test"], st.session_state.get("tcol_test", TARGET_CANON), st.session_state.results["m_test"]) render_export_button(phase_key="dev") # ========================= # VALIDATION (with actual) # ========================= if st.session_state.app_step == "validate": st.sidebar.header("Validate the Model") up = st.sidebar.file_uploader("Upload Validation Excel", type=["xlsx", "xls"]) if up is not None: book = read_book_bytes(up.getvalue()) if book: df0 = next(iter(book.values())) st.sidebar.caption(f"**Data loaded:** {up.name} • {df0.shape[0]} rows × {df0.shape[1]} cols") # Preview button if st.sidebar.button("Preview data", use_container_width=True, disabled=(up is None)): st.session_state.preview_book = read_book_bytes(up.getvalue()) if up is not None else {} st.session_state.show_preview_panel = True st.rerun() go_btn = st.sidebar.button("Predict & Validate", type="primary", use_container_width=True) if st.sidebar.button("⬅ Back to Case Building", use_container_width=True): st.session_state.app_step = "dev"; st.rerun() if st.sidebar.button("Proceed to Prediction ▶", use_container_width=True): st.session_state.app_step = "predict"; st.rerun() sticky_header("Validate the Model", "Upload a dataset with the same **features** and an **actual fracture pressure** column.") render_preview_panel() if go_btn and up is not None: if st.session_state.fitted_model is None: st.error("Please train the model first in Case Building.") st.stop() book = read_book_bytes(up.getvalue()) names = list(book.keys()) name = next((s for s in names if s.lower() in ("validation", "validate", "validation2", "val", "val2")), names[0]) df0 = book[name].copy() tcol = _resolve_target_col(df0) if tcol is None: st.error(f"Missing target column. Expected one of: {TARGET_ALIASES}") st.stop() if not ensure_cols(df0, FEATURES): st.markdown('
Missing required feature columns.
', unsafe_allow_html=True) st.stop() df = df0.copy() df[PRED_COL] = _inv_transform(st.session_state.fitted_model.predict(_make_X(df0, FEATURES)), TRANSFORM) st.session_state.results["Validate"] = df # Range checks ranges = st.session_state.train_ranges oor_pct = 0.0 tbl = None if ranges: any_viol = pd.DataFrame({f: (df[f] < ranges[f][0]) | (df[f] > ranges[f][1]) for f in FEATURES}).any(axis=1) oor_pct = float(any_viol.mean() * 100.0) if any_viol.any(): tbl = df.loc[any_viol, FEATURES].copy() for c in FEATURES: if pd.api.types.is_numeric_dtype(tbl[c]): tbl[c] = tbl[c].round(3) tbl["Violations"] = pd.DataFrame({f: (df[f] < ranges[f][0]) | (df[f] > ranges[f][1]) for f in FEATURES}).loc[any_viol].apply( lambda r: ", ".join([c for c, v in r.items() if v]), axis=1 ) st.session_state.results["m_val"] = { "R": pearson_r(df[tcol], df[PRED_COL]), "RMSE": rmse(df[tcol], df[PRED_COL]), "MAPE%": mape(df[tcol], df[PRED_COL]), } st.session_state.results["sv_val"] = {"n": len(df), "pred_min": float(df[PRED_COL].min()), "pred_max": float(df[PRED_COL].max()), "oor": oor_pct} st.session_state.results["oor_tbl"] = tbl st.session_state["tcol_val"] = tcol if "Validate" in st.session_state.results: m = st.session_state.results["m_val"] tcol = st.session_state.get("tcol_val", TARGET_CANON) c1, c2, c3 = st.columns(3) c1.metric("R", f"{m['R']:.3f}"); c2.metric("RMSE", f"{m['RMSE']:.2f}"); c3.metric("MAPE%", f"{m['MAPE%']:.2f}") st.markdown("""
R: Pearson Correlation Coefficient
RMSE: Root Mean Square Error
MAPE: Mean Absolute Percentage Error
""", unsafe_allow_html=True) col_track, col_cross = st.columns([2, 3], gap="large") with col_track: st.plotly_chart( track_plot(st.session_state.results["Validate"], tcol, include_actual=True), use_container_width=False, config={"displayModeBar": False, "scrollZoom": True} ) with col_cross: st.pyplot( cross_plot_static(st.session_state.results["Validate"][tcol], st.session_state.results["Validate"][PRED_COL], label="Fracture Pressure"), use_container_width=False ) render_export_button(phase_key="validate") sv = st.session_state.results["sv_val"] if sv["oor"] > 0: st.markdown('
Some inputs fall outside **training min–max** ranges.
', unsafe_allow_html=True) if st.session_state.results["oor_tbl"] is not None: st.write("*Out-of-range rows (vs. Training min–max):*") df_centered_rounded(st.session_state.results["oor_tbl"]) # ========================= # PREDICTION (no actual) # ========================= if st.session_state.app_step == "predict": st.sidebar.header("Prediction (No Actual)") up = st.sidebar.file_uploader("Upload Prediction Excel", type=["xlsx", "xls"]) if up is not None: book = read_book_bytes(up.getvalue()) if book: df0 = next(iter(book.values())) st.sidebar.caption(f"**Data loaded:** {up.name} • {df0.shape[0]} rows × {df0.shape[1]} cols") # Preview button if st.sidebar.button("Preview data", use_container_width=True, disabled=(up is None)): st.session_state.preview_book = read_book_bytes(up.getvalue()) if up is not None else {} st.session_state.show_preview_panel = True st.rerun() go_btn = st.sidebar.button("Predict", type="primary", use_container_width=True) if st.sidebar.button("⬅ Back to Case Building", use_container_width=True): st.session_state.app_step = "dev"; st.rerun() sticky_header("Prediction", "Upload a dataset with the 5 feature columns (no actual column).") render_preview_panel() if go_btn and up is not None: if st.session_state.fitted_model is None: st.error("Please train the model first in Case Building.") st.stop() book = read_book_bytes(up.getvalue()) name = list(book.keys())[0] df0 = book[name].copy() if not ensure_cols(df0, FEATURES): st.markdown('
Missing required columns.
', unsafe_allow_html=True) st.stop() df = df0.copy() df[PRED_COL] = _inv_transform(st.session_state.fitted_model.predict(_make_X(df0, FEATURES)), TRANSFORM) st.session_state.results["PredictOnly"] = df ranges = st.session_state.train_ranges oor_pct = 0.0 if ranges: any_viol = pd.DataFrame({f: (df[f] < ranges[f][0]) | (df[f] > ranges[f][1]) for f in FEATURES}).any(axis=1) oor_pct = float(any_viol.mean() * 100.0) st.session_state.results["sv_pred"] = { "n": len(df), "pred_min": float(df[PRED_COL].min()), "pred_max": float(df[PRED_COL].max()), "pred_mean": float(df[PRED_COL].mean()), "pred_std": float(df[PRED_COL].std(ddof=0)), "oor": oor_pct } if "PredictOnly" in st.session_state.results: df = st.session_state.results["PredictOnly"] sv = st.session_state.results["sv_pred"] col_left, col_right = st.columns([2, 3], gap="large") with col_left: table = pd.DataFrame({ "Metric": ["# points", "Pred min", "Pred max", "Pred mean", "Pred std", "OOR %"], "Value": [sv["n"], round(sv["pred_min"], 3), round(sv["pred_max"], 3), round(sv["pred_mean"], 3), round(sv["pred_std"], 3), f'{sv["oor"]:.1f}%'] }) st.markdown('
Predictions ready ✓
', unsafe_allow_html=True) df_centered_rounded(table, hide_index=True) st.caption("**★ OOR** = % of rows with input features outside the training min–max range.") with col_right: st.plotly_chart( track_plot(df, actual_col=None, include_actual=False), use_container_width=False, config={"displayModeBar": False, "scrollZoom": True} ) render_export_button(phase_key="predict") # ========================= # Footer # ========================= st.markdown("""



© 2025 Smart Thinking AI-Solutions Team. All rights reserved.
Website: smartthinking.com.sa
""", unsafe_allow_html=True)