# app_rhob.py — ST_Log_RHOB (Formation Bulk Density, g/cc) — MAPE version import io, json, os, base64, math from pathlib import Path import streamlit as st import pandas as pd import numpy as np import joblib from datetime import datetime # Matplotlib (static plots) import matplotlib matplotlib.use("Agg") import matplotlib.pyplot as plt from matplotlib.ticker import FuncFormatter import plotly.graph_objects as go from sklearn.metrics import mean_squared_error # ========================= # Constants (RHOB variant) # ========================= APP_NAME = "ST_Log_RHOB" TAGLINE = "Real-Time Formation Bulk Density (RHOB) Prediction" # Defaults (overridden by rhob_meta.json if present) FEATURES = [ "WOB (klbf)", "Torque (kft.lbf)", "SPP (psi)", "RPM (1/min)", "ROP (ft/h)", "Flow Rate (gpm)", ] TARGET = "RHOB" # canonical target name PRED_COL = "RHOB_Pred" MODELS_DIR = Path("models") DEFAULT_MODEL = MODELS_DIR / "rhob_model.joblib" MODEL_FALLBACKS = [MODELS_DIR / "model.joblib", MODELS_DIR / "model.pkl"] COLORS = {"pred": "#1f77b4", "actual": "#f2b702", "ref": "#5a5a5a"} # Optional env banner from meta STRICT_VERSION_CHECK = False # ---- Plot sizing ---- CROSS_W = 350 CROSS_H = 350 TRACK_H = 1000 TRACK_W = 500 FONT_SZ = 13 BOLD_FONT = "Arial Black, Arial, sans-serif" # ========================= # Page / CSS # ========================= st.set_page_config(page_title=APP_NAME, page_icon="logo.png", layout="wide") st.markdown(""" """, unsafe_allow_html=True) TABLE_CENTER_CSS = [ dict(selector="th", props=[("text-align", "center")]), dict(selector="td", props=[("text-align", "center")]), ] # ========================= # Password gate # ========================= def inline_logo(path="logo.png") -> str: try: p = Path(path) if not p.exists(): return "" return f"data:image/png;base64,{base64.b64encode(p.read_bytes()).decode('ascii')}" except Exception: return "" def add_password_gate() -> None: try: required = st.secrets.get("APP_PASSWORD", "") except Exception: required = os.environ.get("APP_PASSWORD", "") if not required: st.warning("Set APP_PASSWORD in Secrets (or environment) and restart.") st.stop() if st.session_state.get("auth_ok", False): return st.sidebar.markdown(f"""
{APP_NAME}
Smart Thinking • Secure Access
""", unsafe_allow_html=True ) pwd = st.sidebar.text_input("Access key", type="password", placeholder="••••••••") if st.sidebar.button("Unlock", type="primary"): if pwd == required: st.session_state.auth_ok = True st.rerun() else: st.error("Incorrect key.") st.stop() add_password_gate() # ========================= # Utilities # ========================= def rmse(y_true, y_pred) -> float: return float(np.sqrt(mean_squared_error(y_true, y_pred))) def pearson_r(y_true, y_pred) -> float: a = np.asarray(y_true, dtype=float) p = np.asarray(y_pred, dtype=float) if a.size < 2: return float("nan") if np.all(a == a[0]) or np.all(p == p[0]): return float("nan") return float(np.corrcoef(a, p)[0, 1]) def mape(y_true, y_pred) -> float: """ Mean Absolute Percentage Error in PERCENT. Ignores rows where true==0 or non-finite. """ a = np.asarray(y_true, dtype=float) p = np.asarray(y_pred, dtype=float) mask = np.isfinite(a) & np.isfinite(p) & (a != 0) if not np.any(mask): return float("nan") return float(np.mean(np.abs((p[mask] - a[mask]) / a[mask])) * 100.0) @st.cache_resource(show_spinner=False) def load_model(model_path: str): return joblib.load(model_path) @st.cache_data(show_spinner=False) def parse_excel(data_bytes: bytes): bio = io.BytesIO(data_bytes) xl = pd.ExcelFile(bio) return {sh: xl.parse(sh) for sh in xl.sheet_names} def read_book_bytes(b: bytes): return parse_excel(b) if b else {} # ---- Canonical feature aliasing ------------------------------------------ def _build_alias_map(canonical_features: list[str], target_name: str) -> dict: """ Map common header variants -> the *canonical* names in canonical_features. Whatever appears in canonical_features (from rhob_meta.json) wins. """ def pick(expected_list, variants): for v in variants: if v in expected_list: return v return variants[0] can_WOB = pick(canonical_features, ["WOB (klbf)", "WOB, klbf", "WOB(klbf)", "WOB( klbf)"]) can_TORQUE = pick(canonical_features, ["Torque (kft.lbf)", "Torque(kft.lbf)", "TORQUE(kft.lbf)"]) can_SPP = pick(canonical_features, ["SPP (psi)", "SPP(psi)"]) can_RPM = pick(canonical_features, ["RPM (1/min)", "RPM(1/min)"]) can_ROP = pick(canonical_features, ["ROP (ft/h)", "ROP(ft/h)"]) can_FR = pick(canonical_features, [ "Flow Rate (gpm)","Flow Rate, gpm","Flow Rate,gpm","Flow Rate , gpm","Fow Rate, gpm","Fow Rate, gpm " ]) can_DEPTH = "Depth (ft)" alias = { # Features "WOB (klbf)": can_WOB, "WOB, klbf": can_WOB, "WOB(klbf)": can_WOB, "WOB( klbf)": can_WOB, "Torque (kft.lbf)": can_TORQUE, "Torque(kft.lbf)": can_TORQUE, "TORQUE(kft.lbf)": can_TORQUE, "SPP (psi)": can_SPP, "SPP(psi)": can_SPP, "RPM (1/min)": can_RPM, "RPM(1/min)": can_RPM, "ROP (ft/h)": can_ROP, "ROP(ft/h)": can_ROP, "Flow Rate (gpm)": can_FR, "Flow Rate, gpm": can_FR, "Flow Rate,gpm": can_FR, "Flow Rate , gpm": can_FR, "Fow Rate, gpm": can_FR, "Fow Rate, gpm ": can_FR, # Depth (plot only) "Depth (ft)": can_DEPTH, "Depth, ft": can_DEPTH, "Depth(ft)": can_DEPTH, "DEPTH, ft": can_DEPTH, } # ---- Target family (RHOB) ---- target_variants = [ "RHOB", "RHOB (g/cc)", "RHOB (g/cm3)", "RHOB (g/cm³)", "RHOB_Actual", "RHOB_Actual (g/cc)", "RHOB_Actual (g/cm3)", "RHOB_Actual(g/cc)", "RHOB_Actual(g/cm3)", "RhoB", "RhoB (g/cc)", "RhoB (g/cm3)", "RhoB_Actual", "RhoB_Actual (g/cc)", "RhoB_Actual (g/cm3)" ] for t in target_variants: alias[t] = target_name return alias def _normalize_columns(df: pd.DataFrame, canonical_features: list[str], target_name: str) -> pd.DataFrame: out = df.copy() out.columns = [str(c).strip().replace(" ,", ",").replace(", ", ", ").replace(" ", " ") for c in out.columns] alias = _build_alias_map(canonical_features, target_name) actual = {k: v for k, v in alias.items() if k in out.columns and k != v} return out.rename(columns=actual) def ensure_cols(df: pd.DataFrame, cols: list[str]) -> bool: miss = [c for c in cols if c not in df.columns] if miss: st.error(f"Missing columns: {miss}\nFound: {list(df.columns)}") return False return True def find_sheet(book, names): low2orig = {k.lower(): k for k in book.keys()} for nm in names: if nm.lower() in low2orig: return low2orig[nm.lower()] return None def _nice_tick0(xmin: float, step: float = 0.1) -> float: return step * math.floor(xmin / step) if np.isfinite(xmin) else xmin def df_centered_rounded(df: pd.DataFrame, hide_index=True): out = df.copy() numcols = out.select_dtypes(include=[np.number]).columns styler = ( out.style .format({c: "{:.2f}" for c in numcols}) .set_properties(**{"text-align": "center"}) .set_table_styles(TABLE_CENTER_CSS) ) st.dataframe(styler, use_container_width=True, hide_index=hide_index) # ---------- Build X exactly as trained ---------- def _make_X(df: pd.DataFrame, features: list[str]) -> pd.DataFrame: """ Reindex columns to the exact training feature order and coerce to numeric. Prevents scikit-learn 'feature names should match' errors. """ X = df.reindex(columns=features, copy=False) for c in X.columns: X[c] = pd.to_numeric(X[c], errors="coerce") return X # === Excel export helpers ================================================= def _excel_engine() -> str: try: import xlsxwriter # noqa: F401 return "xlsxwriter" except Exception: return "openpyxl" def _excel_safe_name(name: str) -> str: bad = '[]:*?/\\' safe = ''.join('_' if ch in bad else ch for ch in str(name)) return safe[:31] def _round_numeric(df: pd.DataFrame, ndigits: int = 3) -> pd.DataFrame: out = df.copy() for c in out.columns: if pd.api.types.is_float_dtype(out[c]) or pd.api.types.is_integer_dtype(out[c]): out[c] = pd.to_numeric(out[c], errors="coerce").round(ndigits) return out def _summary_table(df: pd.DataFrame, cols: list[str]) -> pd.DataFrame: cols = [c for c in cols if c in df.columns] if not cols: return pd.DataFrame() tbl = (df[cols] .agg(['min','max','mean','std']) .T.rename(columns={"min":"Min","max":"Max","mean":"Mean","std":"Std"}) .reset_index(names="Field")) return _round_numeric(tbl, 3) def _train_ranges_df(ranges: dict[str, tuple[float, float]]) -> pd.DataFrame: if not ranges: return pd.DataFrame() df = pd.DataFrame(ranges).T.reset_index() df.columns = ["Feature", "Min", "Max"] return _round_numeric(df, 3) def _excel_autofit(writer, sheet_name: str, df: pd.DataFrame, min_w: int = 8, max_w: int = 40): try: import xlsxwriter # noqa: F401 except Exception: return ws = writer.sheets[sheet_name] for i, col in enumerate(df.columns): series = df[col].astype(str) max_len = max([len(str(col))] + series.map(len).tolist()) ws.set_column(i, i, max(min_w, min(max_len + 2, max_w))) ws.freeze_panes(1, 0) def _add_sheet(sheets: dict, order: list, name: str, df: pd.DataFrame, ndigits: int): if df is None or df.empty: return sheets[name] = _round_numeric(df, ndigits) order.append(name) def _available_sections() -> list[str]: res = st.session_state.get("results", {}) sections = [] if "Train" in res: sections += ["Training","Training_Metrics","Training_Summary"] if "Test" in res: sections += ["Testing","Testing_Metrics","Testing_Summary"] if "Validate" in res: sections += ["Validation","Validation_Metrics","Validation_Summary","Validation_OOR"] if "PredictOnly" in res: sections += ["Prediction","Prediction_Summary"] if st.session_state.get("train_ranges"): sections += ["Training_Ranges"] sections += ["Info"] return sections def build_export_workbook(selected: list[str], ndigits: int = 3, do_autofit: bool = True) -> tuple[bytes|None, str|None, list[str]]: res = st.session_state.get("results", {}) if not res: return None, None, [] sheets: dict[str, pd.DataFrame] = {} order: list[str] = [] if "Training" in selected and "Train" in res: _add_sheet(sheets, order, "Training", res["Train"], ndigits) if "Training_Metrics" in selected and res.get("m_train"): _add_sheet(sheets, order, "Training_Metrics", pd.DataFrame([res["m_train"]]), ndigits) if "Training_Summary" in selected and "Train" in res: tr_cols = FEATURES + [c for c in [TARGET, PRED_COL] if c in res["Train"].columns] _add_sheet(sheets, order, "Training_Summary", _summary_table(res["Train"], tr_cols), ndigits) if "Testing" in selected and "Test" in res: _add_sheet(sheets, order, "Testing", res["Test"], ndigits) if "Testing_Metrics" in selected and res.get("m_test"): _add_sheet(sheets, order, "Testing_Metrics", pd.DataFrame([res["m_test"]]), ndigits) if "Testing_Summary" in selected and "Test" in res: te_cols = FEATURES + [c for c in [TARGET, PRED_COL] if c in res["Test"].columns] _add_sheet(sheets, order, "Testing_Summary", _summary_table(res["Test"], te_cols), ndigits) if "Validation" in selected and "Validate" in res: _add_sheet(sheets, order, "Validation", res["Validate"], ndigits) if "Validation_Metrics" in selected and res.get("m_val"): _add_sheet(sheets, order, "Validation_Metrics", pd.DataFrame([res["m_val"]]), ndigits) if "Validation_Summary" in selected and res.get("sv_val"): _add_sheet(sheets, order, "Validation_Summary", pd.DataFrame([res["sv_val"]]), ndigits) if "Validation_OOR" in selected and isinstance(res.get("oor_tbl"), pd.DataFrame) and not res["oor_tbl"].empty: _add_sheet(sheets, order, "Validation_OOR", res["oor_tbl"].reset_index(drop=True), ndigits) if "Prediction" in selected and "PredictOnly" in res: _add_sheet(sheets, order, "Prediction", res["PredictOnly"], ndigits) if "Prediction_Summary" in selected and res.get("sv_pred"): _add_sheet(sheets, order, "Prediction_Summary", pd.DataFrame([res["sv_pred"]]), ndigits) if "Training_Ranges" in selected and st.session_state.get("train_ranges"): rr = _train_ranges_df(st.session_state["train_ranges"]) _add_sheet(sheets, order, "Training_Ranges", rr, ndigits) if "Info" in selected: info = pd.DataFrame([ {"Key": "AppName", "Value": APP_NAME}, {"Key": "Tagline", "Value": TAGLINE}, {"Key": "Target", "Value": TARGET}, {"Key": "PredColumn", "Value": PRED_COL}, {"Key": "Features", "Value": ", ".join(FEATURES)}, {"Key": "ExportedAt", "Value": datetime.now().strftime("%Y-%m-%d %H:%M:%S")}, ]) _add_sheet(sheets, order, "Info", info, ndigits) if not order: return None, None, [] bio = io.BytesIO() engine = _excel_engine() with pd.ExcelWriter(bio, engine=engine) as writer: for name in order: df = sheets[name] sheet = _excel_safe_name(name) df.to_excel(writer, sheet_name=sheet, index=False) if do_autofit: _excel_autofit(writer, sheet, df) bio.seek(0) fname = f"RHOB_Export_{datetime.now().strftime('%Y%m%d_%H%M%S')}.xlsx" return bio.getvalue(), fname, order # --------- SIMPLE export UI ---------- def render_export_button(phase_key: str) -> None: res = st.session_state.get("results", {}) if not res: return st.divider() st.markdown("### Export to Excel") options = _available_sections() selected_sheets = st.multiselect( "Sheets to include", options=options, default=[], placeholder="Choose option(s)", help="Pick the sheets you want to include in the Excel export.", key=f"sheets_{phase_key}", ) if not selected_sheets: st.caption("Select one or more sheets above to enable the export.") st.download_button( label="⬇️ Export Excel", data=b"", file_name="RHOB_Export.xlsx", mime="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", disabled=True, key=f"download_{phase_key}", ) return data, fname, names = build_export_workbook(selected=selected_sheets, ndigits=3, do_autofit=True) if names: st.caption("Will include: " + ", ".join(names)) st.download_button( "⬇️ Export Excel", data=(data or b""), file_name=(fname or "RHOB_Export.xlsx"), mime="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", disabled=(data is None), key=f"download_{phase_key}", ) # ========================= # Cross plot (Matplotlib) # ========================= def cross_plot_static(actual, pred, xlabel="Actual RHOB (g/cc)", ylabel="Predicted RHOB (g/cc)"): a = pd.Series(actual, dtype=float) p = pd.Series(pred, dtype=float) lo = float(min(a.min(), p.min())) hi = float(max(a.max(), p.max())) pad = 0.03 * (hi - lo if hi > lo else 1.0) lo2, hi2 = lo - pad, hi + pad ticks = np.linspace(lo2, hi2, 5) dpi = 110 fig, ax = plt.subplots(figsize=(CROSS_W / dpi, CROSS_H / dpi), dpi=dpi, constrained_layout=False) ax.scatter(a, p, s=14, c=COLORS["pred"], alpha=0.9, linewidths=0) ax.plot([lo2, hi2], [lo2, hi2], linestyle="--", linewidth=1.2, color=COLORS["ref"]) ax.set_xlim(lo2, hi2) ax.set_ylim(lo2, hi2) ax.set_xticks(ticks); ax.set_yticks(ticks) ax.set_aspect("equal", adjustable="box") fmt = FuncFormatter(lambda x, _: f"{x:.2f}") ax.xaxis.set_major_formatter(fmt); ax.yaxis.set_major_formatter(fmt) ax.set_xlabel(xlabel, fontweight="bold", fontsize=10, color="black") ax.set_ylabel(ylabel, fontweight="bold", fontsize=10, color="black") ax.tick_params(labelsize=6, colors="black") ax.grid(True, linestyle=":", alpha=0.3) for spine in ax.spines.values(): spine.set_linewidth(1.1); spine.set_color("#444") fig.subplots_adjust(left=0.16, bottom=0.16, right=0.98, top=0.98) return fig # ========================= # Track plot (Plotly) # ========================= def track_plot(df, include_actual=True): depth_col = next((c for c in df.columns if 'depth' in str(c).lower()), None) if depth_col is not None: y = pd.Series(df[depth_col]).astype(float); ylab = depth_col y_range = [float(y.max()), float(y.min())] else: y = pd.Series(np.arange(1, len(df) + 1)); ylab = "Point Index" y_range = [float(y.max()), float(y.min())] x_series = pd.Series(df.get(PRED_COL, pd.Series(dtype=float))).astype(float) if include_actual and TARGET in df.columns: x_series = pd.concat([x_series, pd.Series(df[TARGET]).astype(float)], ignore_index=True) x_lo, x_hi = float(x_series.min()), float(x_series.max()) x_pad = 0.03 * (x_hi - x_lo if x_hi > x_lo else 1.0) xmin, xmax = x_lo - x_pad, x_hi + x_pad tick0 = _nice_tick0(xmin, step=max((xmax - xmin) / 10.0, 0.1)) fig = go.Figure() if PRED_COL in df.columns: fig.add_trace(go.Scatter( x=df[PRED_COL], y=y, mode="lines", line=dict(color=COLORS["pred"], width=1.8), name=PRED_COL, hovertemplate=f"{PRED_COL}: "+"%{x:.3f}
"+ylab+": %{y}" )) if include_actual and TARGET in df.columns: fig.add_trace(go.Scatter( x=df[TARGET], y=y, mode="lines", line=dict(color=COLORS["actual"], width=2.0, dash="dot"), name=f"{TARGET} (actual)", hovertemplate=f"{TARGET}: "+"%{x:.3f}
"+ylab+": %{y}" )) fig.update_layout( height=TRACK_H, width=TRACK_W, autosize=False, paper_bgcolor="#fff", plot_bgcolor="#fff", margin=dict(l=64, r=16, t=36, b=48), hovermode="closest", font=dict(size=FONT_SZ, color="#000"), legend=dict(x=0.98, y=0.05, xanchor="right", yanchor="bottom", bgcolor="rgba(255,255,255,0.75)", bordercolor="#ccc", borderwidth=1), legend_title_text="" ) fig.update_xaxes( title_text="RHOB (g/cc)", title_font=dict(size=20, family=BOLD_FONT, color="#000"), tickfont=dict(size=15, family=BOLD_FONT, color="#000"), side="top", range=[xmin, xmax], ticks="outside", tickformat=".2f", tickmode="auto", tick0=tick0, showline=True, linewidth=1.2, linecolor="#444", mirror=True, showgrid=True, gridcolor="rgba(0,0,0,0.12)", automargin=True, ) fig.update_yaxes( title_text=ylab, title_font=dict(size=20, family=BOLD_FONT, color="#000"), tickfont=dict(size=15, family=BOLD_FONT, color="#000"), range=y_range, ticks="outside", showline=True, linewidth=1.2, linecolor="#444", mirror=True, showgrid=True, gridcolor="rgba(0,0,0,0.12)", automargin=True ) return fig # ---------- Preview (matplotlib) ---------- def preview_tracks(df: pd.DataFrame, cols: list[str]): """ Quick-look multi-track preview: - one subplot per selected column - distinct stable colors per column - shared & reversed Y-axis (Depth downwards) """ cols = [c for c in cols if c in df.columns] n = len(cols) if n == 0: fig, ax = plt.subplots(figsize=(4, 2)) ax.text(0.5, 0.5, "No selected columns", ha="center", va="center") ax.axis("off") return fig # Depth or fallback to index depth_col = next((c for c in df.columns if 'depth' in str(c).lower()), None) if depth_col is not None: idx = pd.to_numeric(df[depth_col], errors="coerce") y_label = depth_col else: idx = pd.Series(np.arange(1, len(df) + 1)) y_label = "Point Index" y_min, y_max = float(idx.min()), float(idx.max()) # Stable qualitative palette cmap = plt.get_cmap("tab20") col_colors = {col: cmap(i % cmap.N) for i, col in enumerate(cols)} fig, axes = plt.subplots(1, n, figsize=(2.3 * n, 7.0), sharey=True, dpi=100) if n == 1: axes = [axes] for i, (ax, col) in enumerate(zip(axes, cols)): x = pd.to_numeric(df[col], errors="coerce") ax.plot(x, idx, '-', lw=1.8, color=col_colors[col]) ax.set_xlabel(col) ax.xaxis.set_label_position('top') ax.xaxis.tick_top() ax.set_ylim(y_max, y_min) # reversed Y (Depth down) ax.grid(True, linestyle=":", alpha=0.3) if i == 0: ax.set_ylabel(y_label) else: ax.tick_params(labelleft=False) ax.set_ylabel("") fig.tight_layout() return fig # ========================= # Load model + meta # ========================= def ensure_model() -> Path|None: for p in [DEFAULT_MODEL, *MODEL_FALLBACKS]: if p.exists() and p.stat().st_size > 0: return p url = os.environ.get("MODEL_URL", "") if not url: return None try: import requests DEFAULT_MODEL.parent.mkdir(parents=True, exist_ok=True) with requests.get(url, stream=True, timeout=30) as r: r.raise_for_status() with open(DEFAULT_MODEL, "wb") as f: for chunk in r.iter_content(1<<20): if chunk: f.write(chunk) return DEFAULT_MODEL except Exception: return None mpath = ensure_model() if not mpath: st.error("Model not found. Upload models/rhob_model.joblib (or set MODEL_URL).") st.stop() try: model = load_model(str(mpath)) except Exception as e: st.error(f"Failed to load model: {e}") st.stop() # Load meta (prefer RHOB-specific) meta = {} meta_candidates = [MODELS_DIR / "rhob_meta.json", MODELS_DIR / "meta.json"] meta_path = next((p for p in meta_candidates if p.exists()), None) if meta_path: try: meta = json.loads(meta_path.read_text(encoding="utf-8")) FEATURES = meta.get("features", FEATURES) TARGET = meta.get("target", TARGET) PRED_COL = meta.get("pred_col", PRED_COL) # if training ranges were saved in meta, seed them so OOR works before any dev step if isinstance(meta.get("train_ranges"), dict) and "train_ranges" not in st.session_state: st.session_state["train_ranges"] = meta["train_ranges"] except Exception as e: st.warning(f"Could not parse meta file ({meta_path.name}): {e}") # Optional: version banner if STRICT_VERSION_CHECK and meta.get("versions"): import numpy as _np, sklearn as _skl mv = meta["versions"]; msg=[] if mv.get("numpy") and mv["numpy"] != _np.__version__: msg.append(f"NumPy {mv['numpy']} expected, running {_np.__version__}") if mv.get("scikit_learn") and mv["scikit_learn"] != _skl.__version__: msg.append(f"scikit-learn {mv['scikit_learn']} expected, running {_skl.__version__}") if msg: st.warning("Environment mismatch: " + " | ".join(msg)) # ========================= # Session state # ========================= st.session_state.setdefault("app_step", "intro") st.session_state.setdefault("results", {}) st.session_state.setdefault("train_ranges", st.session_state.get("train_ranges", None)) st.session_state.setdefault("dev_file_name","") st.session_state.setdefault("dev_file_bytes",b"") st.session_state.setdefault("dev_file_loaded",False) st.session_state.setdefault("dev_preview",False) st.session_state.setdefault("show_preview_modal", False) # ========================= # Sidebar branding # ========================= st.sidebar.markdown(f"""
{APP_NAME}
{TAGLINE}
""", unsafe_allow_html=True ) def sticky_header(title, message): st.markdown( f"""

{title}

{message}

""", unsafe_allow_html=True ) # ========================= # INTRO # ========================= if st.session_state.app_step == "intro": st.header("Welcome!") st.markdown("This software is developed by *Smart Thinking AI-Solutions Team* to estimate **RHOB (Formation Bulk Density)** from drilling data.") st.subheader("How It Works") st.markdown( "1) **Upload your data to build the case and preview the model performance.** \n" "2) Click **Run Model** to compute metrics and plots. \n" "3) **Proceed to Validation** (with actual RHOB) or **Proceed to Prediction** (no RHOB)." ) if st.button("Start Showcase", type="primary"): st.session_state.app_step = "dev"; st.rerun() # ========================= # CASE BUILDING # ========================= if st.session_state.app_step == "dev": st.sidebar.header("Case Building") up = st.sidebar.file_uploader("Upload Your Data File", type=["xlsx","xls"]) if up is not None: st.session_state.dev_file_bytes = up.getvalue() st.session_state.dev_file_name = up.name st.session_state.dev_file_loaded = True st.session_state.dev_preview = False if st.session_state.dev_file_loaded: tmp = read_book_bytes(st.session_state.dev_file_bytes) if tmp: df0 = next(iter(tmp.values())) st.sidebar.caption(f"**Data loaded:** {st.session_state.dev_file_name} • {df0.shape[0]} rows × {df0.shape[1]} cols") if st.sidebar.button("Preview data", use_container_width=True, disabled=not st.session_state.dev_file_loaded): st.session_state.show_preview_modal = True st.session_state.dev_preview = True run = st.sidebar.button("Run Model", type="primary", use_container_width=True) if st.sidebar.button("Proceed to Validation ▶", use_container_width=True): st.session_state.app_step="validate"; st.rerun() if st.sidebar.button("Proceed to Prediction ▶", use_container_width=True): st.session_state.app_step="predict"; st.rerun() if st.session_state.dev_file_loaded and st.session_state.dev_preview: sticky_header("Case Building", "Previewed ✓ — now click **Run Model**.") elif st.session_state.dev_file_loaded: sticky_header("Case Building", "📄 **Preview uploaded data** using the sidebar button, then click **Run Model**.") else: sticky_header("Case Building", "**Upload your data to build a case, then run the model to review development performance.**") if run and st.session_state.dev_file_bytes: book = read_book_bytes(st.session_state.dev_file_bytes) sh_train = find_sheet(book, ["Train","Training","training2","train","training"]) sh_test = find_sheet(book, ["Test","Testing","testing2","test","testing"]) if sh_train is None or sh_test is None: st.markdown('
Workbook must include Train/Training/training2 and Test/Testing/testing2 sheets.
', unsafe_allow_html=True) st.stop() tr = _normalize_columns(book[sh_train].copy(), FEATURES, TARGET) te = _normalize_columns(book[sh_test].copy(), FEATURES, TARGET) if not (ensure_cols(tr, FEATURES+[TARGET]) and ensure_cols(te, FEATURES+[TARGET])): st.markdown('
Missing required columns.
', unsafe_allow_html=True) st.stop() # Predict with exactly the training feature order tr[PRED_COL] = model.predict(_make_X(tr, FEATURES)) te[PRED_COL] = model.predict(_make_X(te, FEATURES)) st.session_state.results["Train"]=tr; st.session_state.results["Test"]=te st.session_state.results["m_train"]={ "R": pearson_r(tr[TARGET], tr[PRED_COL]), "RMSE": rmse(tr[TARGET], tr[PRED_COL]), "MAPE": mape(tr[TARGET], tr[PRED_COL]) } st.session_state.results["m_test"]={ "R": pearson_r(te[TARGET], te[PRED_COL]), "RMSE": rmse(te[TARGET], te[PRED_COL]), "MAPE": mape(te[TARGET], te[PRED_COL]) } tr_min = tr[FEATURES].min().to_dict(); tr_max = tr[FEATURES].max().to_dict() st.session_state.train_ranges = {f:(float(tr_min[f]), float(tr_max[f])) for f in FEATURES} st.markdown('
Case has been built and results are displayed below.
', unsafe_allow_html=True) def _dev_block(df, m): c1,c2,c3 = st.columns(3) c1.metric("R", f"{m['R']:.3f}") c2.metric("RMSE", f"{m['RMSE']:.3f}") c3.metric("MAPE (%)", f"{m['MAPE']:.2f}") st.markdown("""
R: Pearson Correlation Coefficient
RMSE: Root Mean Square Error
MAPE: Mean Absolute Percentage Error
""", unsafe_allow_html=True) col_track, col_cross = st.columns([2, 3], gap="large") with col_track: st.plotly_chart(track_plot(df, include_actual=True), use_container_width=False, config={"displayModeBar": False, "scrollZoom": True}) with col_cross: st.pyplot(cross_plot_static(df[TARGET], df[PRED_COL]), use_container_width=False) if "Train" in st.session_state.results or "Test" in st.session_state.results: tab1, tab2 = st.tabs(["Training", "Testing"]) if "Train" in st.session_state.results: with tab1: _dev_block(st.session_state.results["Train"], st.session_state.results["m_train"]) if "Test" in st.session_state.results: with tab2: _dev_block(st.session_state.results["Test"], st.session_state.results["m_test"]) render_export_button(phase_key="dev") # ========================= # VALIDATION (with actual RHOB) # ========================= if st.session_state.app_step == "validate": st.sidebar.header("Validate the Model") up = st.sidebar.file_uploader("Upload Validation Excel", type=["xlsx","xls"]) if up is not None: book = read_book_bytes(up.getvalue()) if book: df0 = next(iter(book.values())) st.sidebar.caption(f"**Data loaded:** {up.name} • {df0.shape[0]} rows × {df0.shape[1]} cols") if st.sidebar.button("Preview data", use_container_width=True, disabled=(up is None)): st.session_state.show_preview_modal = True go_btn = st.sidebar.button("Predict & Validate", type="primary", use_container_width=True) if st.sidebar.button("⬅ Back to Case Building", use_container_width=True): st.session_state.app_step="dev"; st.rerun() if st.sidebar.button("Proceed to Prediction ▶", use_container_width=True): st.session_state.app_step="predict"; st.rerun() sticky_header("Validate the Model", "Upload a dataset with the same **features** and **RHOB** to evaluate performance.") if go_btn and up is not None: book = read_book_bytes(up.getvalue()) name = find_sheet(book, ["Validation","Validate","validation2","Val","val"]) or list(book.keys())[0] df = _normalize_columns(book[name].copy(), FEATURES, TARGET) if not ensure_cols(df, FEATURES+[TARGET]): st.markdown('
Missing required columns.
', unsafe_allow_html=True); st.stop() df[PRED_COL] = model.predict(_make_X(df, FEATURES)) st.session_state.results["Validate"]=df ranges = st.session_state.train_ranges; oor_pct = 0.0; tbl=None if ranges: any_viol = pd.DataFrame({f:(df[f]ranges[f][1]) for f in FEATURES}).any(axis=1) oor_pct = float(any_viol.mean()*100.0) if any_viol.any(): tbl = df.loc[any_viol, FEATURES].copy() for c in FEATURES: if pd.api.types.is_numeric_dtype(tbl[c]): tbl[c] = tbl[c].round(3) tbl["Violations"] = pd.DataFrame({f:(df[f]ranges[f][1]) for f in FEATURES}).loc[any_viol].apply( lambda r:", ".join([c for c,v in r.items() if v]), axis=1 ) st.session_state.results["m_val"]={ "R": pearson_r(df[TARGET], df[PRED_COL]), "RMSE": rmse(df[TARGET], df[PRED_COL]), "MAPE": mape(df[TARGET], df[PRED_COL]) } st.session_state.results["sv_val"]={"n":len(df), "pred_min":float(df[PRED_COL].min()), "pred_max":float(df[PRED_COL].max()), "oor":oor_pct} st.session_state.results["oor_tbl"]=tbl if "Validate" in st.session_state.results: m = st.session_state.results["m_val"] c1,c2,c3 = st.columns(3) c1.metric("R", f"{m['R']:.3f}") c2.metric("RMSE", f"{m['RMSE']:.3f}") c3.metric("MAPE (%)", f"{m['MAPE']:.2f}") st.markdown("""
R: Pearson Correlation Coefficient
RMSE: Root Mean Square Error
MAPE: Mean Absolute Percentage Error
""", unsafe_allow_html=True) col_track, col_cross = st.columns([2, 3], gap="large") with col_track: st.plotly_chart(track_plot(st.session_state.results["Validate"], include_actual=True), use_container_width=False, config={"displayModeBar": False, "scrollZoom": True}) with col_cross: st.pyplot(cross_plot_static(st.session_state.results["Validate"][TARGET], st.session_state.results["Validate"][PRED_COL]), use_container_width=False) render_export_button(phase_key="validate") sv = st.session_state.results["sv_val"] if sv["oor"] > 0: st.markdown('
Some inputs fall outside **training min–max** ranges.
', unsafe_allow_html=True) if st.session_state.results["oor_tbl"] is not None: st.write("*Out-of-range rows (vs. Training min–max):*") df_centered_rounded(st.session_state.results["oor_tbl"]) # ========================= # PREDICTION (no actual RHOB) # ========================= if st.session_state.app_step == "predict": st.sidebar.header("Prediction (No Actual RHOB)") up = st.sidebar.file_uploader("Upload Prediction Excel", type=["xlsx","xls"]) if up is not None: book = read_book_bytes(up.getvalue()) if book: df0 = next(iter(book.values())) st.sidebar.caption(f"**Data loaded:** {up.name} • {df0.shape[0]} rows × {df0.shape[1]} cols") if st.sidebar.button("Preview data", use_container_width=True, disabled=(up is None)): st.session_state.show_preview_modal = True go_btn = st.sidebar.button("Predict", type="primary", use_container_width=True) if st.sidebar.button("⬅ Back to Case Building", use_container_width=True): st.session_state.app_step="dev"; st.rerun() sticky_header("Prediction", "Upload a dataset with the feature columns (no **RHOB**).") if go_btn and up is not None: book = read_book_bytes(up.getvalue()); name = list(book.keys())[0] df = _normalize_columns(book[name].copy(), FEATURES, TARGET) if not ensure_cols(df, FEATURES): st.markdown('
Missing required columns.
', unsafe_allow_html=True); st.stop() df[PRED_COL] = model.predict(_make_X(df, FEATURES)) st.session_state.results["PredictOnly"]=df ranges = st.session_state.train_ranges; oor_pct = 0.0 if ranges: any_viol = pd.DataFrame({f:(df[f]ranges[f][1]) for f in FEATURES}).any(axis=1) oor_pct = float(any_viol.mean()*100.0) st.session_state.results["sv_pred"]={ "n":len(df), "pred_min":float(df[PRED_COL].min()), "pred_max":float(df[PRED_COL].max()), "pred_mean":float(df[PRED_COL].mean()), "pred_std":float(df[PRED_COL].std(ddof=0)), "oor":oor_pct } if "PredictOnly" in st.session_state.results: df = st.session_state.results["PredictOnly"]; sv = st.session_state.results["sv_pred"] col_left, col_right = st.columns([2,3], gap="large") with col_left: table = pd.DataFrame({ "Metric": ["# points","Pred min","Pred max","Pred mean","Pred std","OOR %"], "Value": [sv["n"], round(sv["pred_min"],3), round(sv["pred_max"],3), round(sv["pred_mean"],3), round(sv["pred_std"],3), f'{sv["oor"]:.1f}%'] }) st.markdown('
Predictions ready ✓
', unsafe_allow_html=True) df_centered_rounded(table, hide_index=True) st.caption("**★ OOR** = % of rows whose input features fall outside the training min–max range.") with col_right: st.plotly_chart(track_plot(df, include_actual=False), use_container_width=False, config={"displayModeBar": False, "scrollZoom": True}) render_export_button(phase_key="predict") # ========================= # Preview modal # ========================= if st.session_state.show_preview_modal: book_to_preview = {} if st.session_state.app_step == "dev": book_to_preview = read_book_bytes(st.session_state.dev_file_bytes) elif st.session_state.app_step in ["validate", "predict"] and 'up' in locals() and up is not None: book_to_preview = read_book_bytes(up.getvalue()) with st.expander("Preview data", expanded=True): if not book_to_preview: st.markdown('
No data loaded yet.
', unsafe_allow_html=True) else: names = list(book_to_preview.keys()) tabs = st.tabs(names) for t, name in zip(tabs, names): with t: df = _normalize_columns(book_to_preview[name], FEATURES, TARGET) t1, t2 = st.tabs(["Tracks", "Summary"]) with t1: st.pyplot(preview_tracks(df, FEATURES), use_container_width=True) with t2: feat_present = [c for c in FEATURES if c in df.columns] if not feat_present: st.info("No feature columns found to summarize.") else: tbl = ( df[feat_present] .agg(['min','max','mean','std']) .T.rename(columns={"min":"Min","max":"Max","mean":"Mean","std":"Std"}) .reset_index(names="Feature") ) df_centered_rounded(tbl) st.session_state.show_preview_modal = False # ========================= # Footer # ========================= st.markdown("""



© 2025 Smart Thinking AI-Solutions Team. All rights reserved.
Website: smartthinking.com.sa
""", unsafe_allow_html=True)