import io, json, os, base64, math from pathlib import Path import streamlit as st import pandas as pd import numpy as np import joblib # matplotlib only for PREVIEW modal import matplotlib matplotlib.use("Agg") import matplotlib.pyplot as plt import plotly.graph_objects as go from sklearn.metrics import r2_score, mean_squared_error, mean_absolute_error # ========================= # Constants (simple & robust) # ========================= FEATURES = ["Q, gpm", "SPP(psi)", "T (kft.lbf)", "WOB (klbf)", "ROP (ft/h)"] TARGET = "UCS" MODELS_DIR = Path("models") DEFAULT_MODEL = MODELS_DIR / "ucs_rf.joblib" MODEL_FALLBACKS = [MODELS_DIR / "model.joblib", MODELS_DIR / "model.pkl"] COLORS = {"pred": "#1f77b4", "actual": "#f2b702", "ref": "#5a5a5a"} # ---- Plot sizing controls (edit here) ---- CROSS_W = 500; CROSS_H = 500 # square cross-plot (Build + Validate) TRACK_W = 400; TRACK_H = 950 # log-strip style (all pages) FONT_SZ = 15 PLOT_COLS = [30, 1, 20] # 3-column band: left • spacer • right (Build + Validate) CROSS_NUDGE = 0.06 # push cross-plot to the RIGHT inside its band: # inner columns [CROSS_NUDGE : 1] → bigger = more right # ========================= # Page / CSS # ========================= st.set_page_config(page_title="ST_GeoMech_UCS", page_icon="logo.png", layout="wide") st.markdown("", unsafe_allow_html=True) st.markdown( """ """, unsafe_allow_html=True ) # ========================= # Password gate (define first, then call) # ========================= def inline_logo(path="logo.png") -> str: try: p = Path(path) if not p.exists(): return "" return f"data:image/png;base64,{base64.b64encode(p.read_bytes()).decode('ascii')}" except Exception: return "" def add_password_gate() -> None: try: required = st.secrets.get("APP_PASSWORD", "") except Exception: required = os.environ.get("APP_PASSWORD", "") if not required: st.markdown( f"""
ST_GeoMech_UCS
Smart Thinking • Secure Access
Protected Area
Set APP_PASSWORD in Settings → Secrets (or environment) and restart.
""", unsafe_allow_html=True, ) st.stop() if st.session_state.get("auth_ok", False): return st.markdown( f"""
ST_GeoMech_UCS
Smart Thinking • Secure Access
Protected
Please enter your access key to continue.
""", unsafe_allow_html=True ) pwd = st.text_input("Access key", type="password", placeholder="••••••••") if st.button("Unlock", type="primary"): if pwd == required: st.session_state.auth_ok = True st.rerun() else: st.error("Incorrect key.") st.stop() add_password_gate() # ========================= # Utilities # ========================= try: dialog = st.dialog except AttributeError: def dialog(title): def deco(fn): def wrapper(*args, **kwargs): with st.expander(title, expanded=True): return fn(*args, **kwargs) return wrapper return deco def rmse(y_true, y_pred): return float(np.sqrt(mean_squared_error(y_true, y_pred))) @st.cache_resource(show_spinner=False) def load_model(model_path: str): return joblib.load(model_path) @st.cache_data(show_spinner=False) def parse_excel(data_bytes: bytes): bio = io.BytesIO(data_bytes) xl = pd.ExcelFile(bio) return {sh: xl.parse(sh) for sh in xl.sheet_names} def read_book_bytes(b: bytes): return parse_excel(b) if b else {} def ensure_cols(df, cols): miss = [c for c in cols if c not in df.columns] if miss: st.error(f"Missing columns: {miss}\nFound: {list(df.columns)}") return False return True def find_sheet(book, names): low2orig = {k.lower(): k for k in book.keys()} for nm in names: if nm.lower() in low2orig: return low2orig[nm.lower()] return None def _nice_tick0(xmin: float, step: int = 100) -> float: """Round xmin down to a sensible multiple so the first tick sits at the left edge.""" if not np.isfinite(xmin): return xmin return step * math.floor(xmin / step) # ---------- Plot builders ---------- def cross_plot(actual, pred): a = pd.Series(actual).astype(float) p = pd.Series(pred).astype(float) fixed_min = 6000 fixed_max = 10000 fig = go.Figure() # Scatter points fig.add_trace(go.Scatter( x=a, y=p, mode="markers", marker=dict(size=6, color=COLORS["pred"]), hovertemplate="Actual: %{x:.0f}
Pred: %{y:.0f}", showlegend=False )) # 1:1 reference line from bottom-left to top-right fig.add_trace(go.Scatter( x=[fixed_min, fixed_max], y=[fixed_min, fixed_max], mode="lines", line=dict(color=COLORS["ref"], width=1.2, dash="dash"), hoverinfo="skip", showlegend=False )) fig.update_layout( width=CROSS_W, height=CROSS_H, paper_bgcolor="#fff", plot_bgcolor="#fff", margin=dict(l=64, r=18, t=10, b=48), hovermode="closest", font=dict(size=FONT_SZ), dragmode=False # disables zooming ) fig.update_xaxes( title_text="Actual UCS (psi)", range=[fixed_min, fixed_max], tickformat=",.0f", ticks="outside", showline=True, linewidth=1.2, linecolor="#444", mirror=True, showgrid=True, gridcolor="rgba(0,0,0,0.12)", automargin=True, fixedrange=True # disables zooming ) fig.update_yaxes( title_text="Predicted UCS (psi)", range=[fixed_min, fixed_max], tickformat=",.0f", ticks="outside", showline=True, linewidth=1.2, linecolor="#444", mirror=True, showgrid=True, gridcolor="rgba(0,0,0,0.12)", scaleanchor="x", scaleratio=1, automargin=True, fixedrange=True # disables zooming ) return fig def track_plot(df, include_actual=True): depth_col = next((c for c in df.columns if 'depth' in str(c).lower()), None) if depth_col is not None: y = pd.Series(df[depth_col]).astype(float) ylab = depth_col y_min, y_max = float(y.min()), float(y.max()) y_range = [y_max, y_min] # reversed for log profile style else: y = pd.Series(np.arange(1, len(df) + 1)) ylab = "Point Index" y_min, y_max = float(y.min()), float(y.max()) y_range = [y_max, y_min] # X (UCS) range & ticks x_series = pd.Series(df.get("UCS_Pred", pd.Series(dtype=float))).astype(float) if include_actual and TARGET in df.columns: x_series = pd.concat([x_series, pd.Series(df[TARGET]).astype(float)], ignore_index=True) x_lo, x_hi = float(x_series.min()), float(x_series.max()) x_pad = 0.03 * (x_hi - x_lo if x_hi > x_lo else 1.0) xmin, xmax = x_lo - x_pad, x_hi + x_pad tick0 = _nice_tick0(xmin, step=100) fig = go.Figure() fig.add_trace(go.Scatter( x=df["UCS_Pred"], y=y, mode="lines", line=dict(color=COLORS["pred"], width=1.8), name="UCS_Pred", hovertemplate="UCS_Pred: %{x:.0f}
" + ylab + ": %{y}" )) if include_actual and TARGET in df.columns: fig.add_trace(go.Scatter( x=df[TARGET], y=y, mode="lines", line=dict(color=COLORS["actual"], width=2.0, dash="dot"), name="UCS (actual)", hovertemplate="UCS (actual): %{x:.0f}
" + ylab + ": %{y}" )) fig.update_layout( width=TRACK_W, height=TRACK_H, paper_bgcolor="#fff", plot_bgcolor="#fff", margin=dict(l=72, r=18, t=36, b=48), hovermode="closest", font=dict(size=FONT_SZ), legend=dict( x=0.98, y=0.05, xanchor="right", yanchor="bottom", bgcolor="rgba(255,255,255,0.75)", bordercolor="#ccc", borderwidth=1 ), legend_title_text="" ) fig.update_xaxes( title_text="UCS (psi)", title_font=dict(size=16), side="top", range=[xmin, xmax], ticks="outside", tickformat=",.0f", tickmode="auto", tick0=tick0, showline=True, linewidth=1.2, linecolor="#444", mirror=True, showgrid=True, gridcolor="rgba(0,0,0,0.12)", automargin=True ) fig.update_yaxes( title_text=f"{ylab}", title_font=dict(size=16), range=y_range, ticks="outside", showline=True, linewidth=1.2, linecolor="#444", mirror=True, showgrid=True, gridcolor="rgba(0,0,0,0.12)", automargin=True ) # Add a border rectangle to enclose the full figure space fig.add_shape( type="rect", xref="paper", yref="paper", x0=-0.12, y0=-0.12, x1=1.12, y1=1.12, line=dict(color="#000", width=1.5), layer="below" ) return fig # ---------- Preview modal (matplotlib) ---------- def preview_tracks(df: pd.DataFrame, cols: list[str]): cols = [c for c in cols if c in df.columns] n = len(cols) if n == 0: fig, ax = plt.subplots(figsize=(4, 2)) ax.text(0.5,0.5,"No selected columns",ha="center",va="center") ax.axis("off") return fig fig, axes = plt.subplots(1, n, figsize=(2.2*n, 7.0), sharey=True, dpi=100) if n == 1: axes = [axes] idx = np.arange(1, len(df) + 1) for ax, col in zip(axes, cols): ax.plot(df[col], idx, '-', lw=1.4, color="#333") ax.set_xlabel(col); ax.xaxis.set_label_position('top'); ax.xaxis.tick_top(); ax.invert_yaxis() ax.grid(True, linestyle=":", alpha=0.3) for s in ax.spines.values(): s.set_visible(True) axes[0].set_ylabel("Point Index") return fig try: dialog = st.dialog except AttributeError: def dialog(title): def deco(fn): def wrapper(*args, **kwargs): with st.expander(title, expanded=True): return fn(*args, **kwargs) return wrapper return deco @dialog("Preview data") def preview_modal(book: dict[str, pd.DataFrame]): if not book: st.info("No data loaded yet."); return names = list(book.keys()) tabs = st.tabs(names) for t, name in zip(tabs, names): with t: df = book[name] t1, t2 = st.tabs(["Tracks", "Summary"]) with t1: st.pyplot(preview_tracks(df, FEATURES), use_container_width=True) with t2: tbl = df[FEATURES].agg(['min','max','mean','std']).T.rename(columns={"min":"Min","max":"Max","mean":"Mean","std":"Std"}) st.dataframe(tbl.reset_index(names="Feature"), use_container_width=True) # ========================= # Load model (simple) # ========================= def ensure_model() -> Path|None: for p in [DEFAULT_MODEL, *MODEL_FALLBACKS]: if p.exists() and p.stat().st_size > 0: return p url = os.environ.get("MODEL_URL", "") if not url: return None try: import requests DEFAULT_MODEL.parent.mkdir(parents=True, exist_ok=True) with requests.get(url, stream=True, timeout=30) as r: r.raise_for_status() with open(DEFAULT_MODEL, "wb") as f: for chunk in r.iter_content(1<<20): if chunk: f.write(chunk) return DEFAULT_MODEL except Exception: return None mpath = ensure_model() if not mpath: st.error("Model not found. Upload models/ucs_rf.joblib (or set MODEL_URL).") st.stop() try: model = load_model(str(mpath)) except Exception as e: st.error(f"Failed to load model: {e}") st.stop() meta_path = MODELS_DIR / "meta.json" if meta_path.exists(): try: meta = json.loads(meta_path.read_text(encoding="utf-8")) FEATURES = meta.get("features", FEATURES); TARGET = meta.get("target", TARGET) except Exception: pass # ========================= # Session state # ========================= st.session_state.setdefault("app_step", "intro") st.session_state.setdefault("results", {}) st.session_state.setdefault("train_ranges", None) st.session_state.setdefault("dev_file_name","") st.session_state.setdefault("dev_file_bytes",b"") st.session_state.setdefault("dev_file_loaded",False) st.session_state.setdefault("dev_preview",False) # ========================= # Hero # ========================= st.markdown( f"""

ST_GeoMech_UCS

Real-Time UCS Tracking While Drilling
""", unsafe_allow_html=True, ) # ========================= # INTRO # ========================= if st.session_state.app_step == "intro": st.header("Welcome!") st.markdown("This software is developed by *Smart Thinking AI-Solutions Team* to estimate UCS from drilling data.") st.subheader("How It Works") st.markdown( "1) **Upload your data to build the case and preview the performance of our model.** \n" "2) Click **Run Model** to compute metrics and plots. \n" "3) **Proceed to Validation** (with actual UCS) or **Proceed to Prediction** (no UCS)." ) if st.button("Start Showcase", type="primary"): st.session_state.app_step = "dev"; st.rerun() # ========================= # CASE BUILDING # ========================= if st.session_state.app_step == "dev": st.sidebar.header("Case Building") up = st.sidebar.file_uploader("Upload Train/Test Excel", type=["xlsx","xls"]) if up is not None: st.session_state.dev_file_bytes = up.getvalue() st.session_state.dev_file_name = up.name st.session_state.dev_file_loaded = True st.session_state.dev_preview = False if st.session_state.dev_file_loaded: tmp = read_book_bytes(st.session_state.dev_file_bytes) if tmp: df0 = next(iter(tmp.values())) st.sidebar.caption(f"**Data loaded:** {st.session_state.dev_file_name} • {df0.shape[0]} rows × {df0.shape[1]} cols") if st.sidebar.button("Preview data", use_container_width=True, disabled=not st.session_state.dev_file_loaded): preview_modal(read_book_bytes(st.session_state.dev_file_bytes)) st.session_state.dev_preview = True run = st.sidebar.button("Run Model", type="primary", use_container_width=True) # always available nav if st.sidebar.button("Proceed to Validation ▶", use_container_width=True): st.session_state.app_step="validate"; st.rerun() if st.sidebar.button("Proceed to Prediction ▶", use_container_width=True): st.session_state.app_step="predict"; st.rerun() # ---- Pinned helper at the very top of the page ---- helper_top = st.container() with helper_top: st.subheader("Case Building") if st.session_state.dev_file_loaded and st.session_state.dev_preview: st.info("Previewed ✓ — now click **Run Model**.") elif st.session_state.dev_file_loaded: st.info("📄 **Preview uploaded data** using the sidebar button, then click **Run Model**.") else: st.write("**Upload your data to build a case, then run the model to review development performance.**") if run and st.session_state.dev_file_bytes: book = read_book_bytes(st.session_state.dev_file_bytes) sh_train = find_sheet(book, ["Train","Training","training2","train","training"]) sh_test = find_sheet(book, ["Test","Testing","testing2","test","testing"]) if sh_train is None or sh_test is None: st.error("Workbook must include Train/Training/training2 and Test/Testing/testing2 sheets."); st.stop() tr = book[sh_train].copy(); te = book[sh_test].copy() if not (ensure_cols(tr, FEATURES+[TARGET]) and ensure_cols(te, FEATURES+[TARGET])): st.error("Missing required columns."); st.stop() tr["UCS_Pred"] = model.predict(tr[FEATURES]) te["UCS_Pred"] = model.predict(te[FEATURES]) st.session_state.results["Train"]=tr; st.session_state.results["Test"]=te st.session_state.results["m_train"]={"R2":r2_score(tr[TARGET],tr["UCS_Pred"]), "RMSE":rmse(tr[TARGET],tr["UCS_Pred"]), "MAE":mean_absolute_error(tr[TARGET],tr["UCS_Pred"])} st.session_state.results["m_test"] ={"R2":r2_score(te[TARGET],te["UCS_Pred"]), "RMSE":rmse(te[TARGET],te["UCS_Pred"]), "MAE":mean_absolute_error(te[TARGET],te["UCS_Pred"])} tr_min = tr[FEATURES].min().to_dict(); tr_max = tr[FEATURES].max().to_dict() st.session_state.train_ranges = {f:(float(tr_min[f]), float(tr_max[f])) for f in FEATURES} st.success("Case has been built and results are displayed below.") def _dev_block(df, m): c1,c2,c3 = st.columns(3) c1.metric("R²", f"{m['R2']:.4f}"); c2.metric("RMSE", f"{m['RMSE']:.4f}"); c3.metric("MAE", f"{m['MAE']:.4f}") left, spacer, right = st.columns(PLOT_COLS) with left: pad, plotcol = left.columns([CROSS_NUDGE, 1]) # shift cross-plot right inside its band with plotcol: st.plotly_chart( cross_plot(df[TARGET], df["UCS_Pred"]), use_container_width=False, config={"displayModeBar": False, "scrollZoom": True} ) with right: st.plotly_chart( track_plot(df, include_actual=True), use_container_width=False, config={"displayModeBar": False, "scrollZoom": True} ) if "Train" in st.session_state.results or "Test" in st.session_state.results: tab1, tab2 = st.tabs(["Training", "Testing"]) if "Train" in st.session_state.results: with tab1: _dev_block(st.session_state.results["Train"], st.session_state.results["m_train"]) if "Test" in st.session_state.results: with tab2: _dev_block(st.session_state.results["Test"], st.session_state.results["m_test"]) # ========================= # VALIDATION (with actual UCS) # ========================= if st.session_state.app_step == "validate": st.sidebar.header("Validate the Model") up = st.sidebar.file_uploader("Upload Validation Excel", type=["xlsx","xls"]) if up is not None: book = read_book_bytes(up.getvalue()) if book: df0 = next(iter(book.values())) st.sidebar.caption(f"**Data loaded:** {up.name} • {df0.shape[0]} rows × {df0.shape[1]} cols") if st.sidebar.button("Preview data", use_container_width=True, disabled=(up is None)): preview_modal(read_book_bytes(up.getvalue())) go_btn = st.sidebar.button("Predict", type="primary", use_container_width=True) if st.sidebar.button("⬅ Back to Case Building", use_container_width=True): st.session_state.app_step="dev"; st.rerun() if st.sidebar.button("Proceed to Prediction ▶", use_container_width=True): st.session_state.app_step="predict"; st.rerun() st.subheader("Validate the Model") st.write("Upload a dataset with the same **features** and **UCS** to evaluate performance.") if go_btn and up is not None: book = read_book_bytes(up.getvalue()) name = find_sheet(book, ["Validation","Validate","validation2","Val","val"]) or list(book.keys())[0] df = book[name].copy() if not ensure_cols(df, FEATURES+[TARGET]): st.error("Missing required columns."); st.stop() df["UCS_Pred"] = model.predict(df[FEATURES]) st.session_state.results["Validate"]=df ranges = st.session_state.train_ranges; oor_pct = 0.0; tbl=None if ranges: any_viol = pd.DataFrame({f:(df[f]ranges[f][1]) for f in FEATURES}).any(axis=1) oor_pct = float(any_viol.mean()*100.0) if any_viol.any(): tbl = df.loc[any_viol, FEATURES].copy() tbl["Violations"] = pd.DataFrame({f:(df[f]ranges[f][1]) for f in FEATURES}).loc[any_viol].apply(lambda r:", ".join([c for c,v in r.items() if v]), axis=1) st.session_state.results["m_val"]={"R2":r2_score(df[TARGET],df["UCS_Pred"]), "RMSE":rmse(df[TARGET],df["UCS_Pred"]), "MAE":mean_absolute_error(df[TARGET],df["UCS_Pred"])} st.session_state.results["sv_val"]={"n":len(df),"pred_min":float(df["UCS_Pred"].min()),"pred_max":float(df["UCS_Pred"].max()),"oor":oor_pct} st.session_state.results["oor_tbl"]=tbl if "Validate" in st.session_state.results: m = st.session_state.results["m_val"] c1,c2,c3 = st.columns(3) c1.metric("R²", f"{m['R2']:.4f}"); c2.metric("RMSE", f"{m['RMSE']:.4f}"); c3.metric("MAE", f"{m['MAE']:.4f}") left, spacer, right = st.columns(PLOT_COLS) with left: pad, plotcol = left.columns([CROSS_NUDGE, 1]) # same nudge with plotcol: st.plotly_chart( cross_plot(st.session_state.results["Validate"][TARGET], st.session_state.results["Validate"]["UCS_Pred"]), use_container_width=False, config={"displayModeBar": False, "scrollZoom": True} ) with right: st.plotly_chart( track_plot(st.session_state.results["Validate"], include_actual=True), use_container_width=False, config={"displayModeBar": False, "scrollZoom": True} ) sv = st.session_state.results["sv_val"] if sv["oor"] > 0: st.warning("Some inputs fall outside **training min–max** ranges.") if st.session_state.results["oor_tbl"] is not None: st.write("*Out-of-range rows (vs. Training min–max):*") st.dataframe(st.session_state.results["oor_tbl"], use_container_width=True) # ========================= # PREDICTION (no actual UCS) # ========================= if st.session_state.app_step == "predict": st.sidebar.header("Prediction (No Actual UCS)") up = st.sidebar.file_uploader("Upload Prediction Excel", type=["xlsx","xls"]) if up is not None: book = read_book_bytes(up.getvalue()) if book: df0 = next(iter(book.values())) st.sidebar.caption(f"**Data loaded:** {up.name} • {df0.shape[0]} rows × {df0.shape[1]} cols") if st.sidebar.button("Preview data", use_container_width=True, disabled=(up is None)): preview_modal(read_book_bytes(up.getvalue())) go_btn = st.sidebar.button("Predict", type="primary", use_container_width=True) if st.sidebar.button("⬅ Back to Case Building", use_container_width=True): st.session_state.app_step="dev"; st.rerun() st.subheader("Prediction") st.write("Upload a dataset with the feature columns (no **UCS**).") if go_btn and up is not None: book = read_book_bytes(up.getvalue()); name = list(book.keys())[0] df = book[name].copy() if not ensure_cols(df, FEATURES): st.error("Missing required columns."); st.stop() df["UCS_Pred"] = model.predict(df[FEATURES]) st.session_state.results["PredictOnly"]=df ranges = st.session_state.train_ranges; oor_pct = 0.0 if ranges: any_viol = pd.DataFrame({f:(df[f]ranges[f][1]) for f in FEATURES}).any(axis=1) oor_pct = float(any_viol.mean()*100.0) st.session_state.results["sv_pred"]={ "n":len(df), "pred_min":float(df["UCS_Pred"].min()), "pred_max":float(df["UCS_Pred"].max()), "pred_mean":float(df["UCS_Pred"].mean()), "pred_std":float(df["UCS_Pred"].std(ddof=0)), "oor":oor_pct } if "PredictOnly" in st.session_state.results: df = st.session_state.results["PredictOnly"]; sv = st.session_state.results["sv_pred"] left, spacer, right = st.columns(PLOT_COLS) with left: table = pd.DataFrame({ "Metric": ["# points","Pred min","Pred max","Pred mean","Pred std","OOR %"], "Value": [sv["n"], sv["pred_min"], sv["pred_max"], sv["pred_mean"], sv["pred_std"], f'{sv["oor"]:.1f}%'] }) st.success("Predictions ready ✓") st.dataframe(table, use_container_width=True, hide_index=True) st.caption("**★ OOR** = % of rows whose input features fall outside the training min–max range.") with right: st.plotly_chart( track_plot(df, include_actual=False), use_container_width=False, config={"displayModeBar": False, "scrollZoom": True} ) # ========================= # Footer # ========================= st.markdown("---") st.markdown( """
ST_GeoMech_UCS • © Smart Thinking
Visit our website: smartthinking.com.sa
""", unsafe_allow_html=True )