import io, json, os, base64, math
from pathlib import Path
import streamlit as st
import pandas as pd
import numpy as np
import joblib
# matplotlib only for PREVIEW modal
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
import plotly.graph_objects as go
from sklearn.metrics import r2_score, mean_squared_error, mean_absolute_error
# =========================
# Constants (simple & robust)
# =========================
FEATURES = ["Q, gpm", "SPP(psi)", "T (kft.lbf)", "WOB (klbf)", "ROP (ft/h)"]
TARGET = "UCS"
MODELS_DIR = Path("models")
DEFAULT_MODEL = MODELS_DIR / "ucs_rf.joblib"
MODEL_FALLBACKS = [MODELS_DIR / "model.joblib", MODELS_DIR / "model.pkl"]
COLORS = {"pred": "#1f77b4", "actual": "#f2b702", "ref": "#5a5a5a"}
# ---- Plot sizing controls (edit here) ----
CROSS_W = 500; CROSS_H = 500 # square cross-plot (Build + Validate)
TRACK_W = 400; TRACK_H = 950 # log-strip style (all pages)
FONT_SZ = 15
PLOT_COLS = [30, 1, 20] # 3-column band: left • spacer • right (Build + Validate)
CROSS_NUDGE = 0.06 # push cross-plot to the RIGHT inside its band:
# inner columns [CROSS_NUDGE : 1] → bigger = more right
# =========================
# Page / CSS
# =========================
st.set_page_config(page_title="ST_GeoMech_UCS", page_icon="logo.png", layout="wide")
st.markdown("", unsafe_allow_html=True)
st.markdown(
"""
""",
unsafe_allow_html=True
)
# =========================
# Password gate (define first, then call)
# =========================
def inline_logo(path="logo.png") -> str:
try:
p = Path(path)
if not p.exists(): return ""
return f"data:image/png;base64,{base64.b64encode(p.read_bytes()).decode('ascii')}"
except Exception:
return ""
def add_password_gate() -> None:
try:
required = st.secrets.get("APP_PASSWORD", "")
except Exception:
required = os.environ.get("APP_PASSWORD", "")
if not required:
st.markdown(
f"""
ST_GeoMech_UCS
Smart Thinking • Secure Access
Protected Area
Set APP_PASSWORD in Settings → Secrets (or environment) and restart.
""",
unsafe_allow_html=True,
)
st.stop()
if st.session_state.get("auth_ok", False):
return
st.markdown(
f"""
ST_GeoMech_UCS
Smart Thinking • Secure Access
Protected
Please enter your access key to continue.
""",
unsafe_allow_html=True
)
pwd = st.text_input("Access key", type="password", placeholder="••••••••")
if st.button("Unlock", type="primary"):
if pwd == required:
st.session_state.auth_ok = True
st.rerun()
else:
st.error("Incorrect key.")
st.stop()
add_password_gate()
# =========================
# Utilities
# =========================
try:
dialog = st.dialog
except AttributeError:
def dialog(title):
def deco(fn):
def wrapper(*args, **kwargs):
with st.expander(title, expanded=True):
return fn(*args, **kwargs)
return wrapper
return deco
def rmse(y_true, y_pred): return float(np.sqrt(mean_squared_error(y_true, y_pred)))
@st.cache_resource(show_spinner=False)
def load_model(model_path: str):
return joblib.load(model_path)
@st.cache_data(show_spinner=False)
def parse_excel(data_bytes: bytes):
bio = io.BytesIO(data_bytes)
xl = pd.ExcelFile(bio)
return {sh: xl.parse(sh) for sh in xl.sheet_names}
def read_book_bytes(b: bytes): return parse_excel(b) if b else {}
def ensure_cols(df, cols):
miss = [c for c in cols if c not in df.columns]
if miss:
st.error(f"Missing columns: {miss}\nFound: {list(df.columns)}")
return False
return True
def find_sheet(book, names):
low2orig = {k.lower(): k for k in book.keys()}
for nm in names:
if nm.lower() in low2orig: return low2orig[nm.lower()]
return None
def _nice_tick0(xmin: float, step: int = 100) -> float:
"""Round xmin down to a sensible multiple so the first tick sits at the left edge."""
if not np.isfinite(xmin):
return xmin
return step * math.floor(xmin / step)
# ---------- Plot builders ----------
def cross_plot(actual, pred):
a = pd.Series(actual).astype(float)
p = pd.Series(pred).astype(float)
fixed_min = 6000
fixed_max = 10000
fig = go.Figure()
# Scatter points
fig.add_trace(go.Scatter(
x=a, y=p, mode="markers",
marker=dict(size=6, color=COLORS["pred"]),
hovertemplate="Actual: %{x:.0f}
Pred: %{y:.0f}",
showlegend=False
))
# 1:1 reference line from bottom-left to top-right
fig.add_trace(go.Scatter(
x=[fixed_min, fixed_max], y=[fixed_min, fixed_max], mode="lines",
line=dict(color=COLORS["ref"], width=1.2, dash="dash"),
hoverinfo="skip", showlegend=False
))
fig.update_layout(
width=CROSS_W, height=CROSS_H,
paper_bgcolor="#fff", plot_bgcolor="#fff",
margin=dict(l=64, r=18, t=10, b=48),
hovermode="closest",
font=dict(size=FONT_SZ),
dragmode=False # disables zooming
)
fig.update_xaxes(
title_text="Actual UCS (psi)",
range=[fixed_min, fixed_max],
tickformat=",.0f", ticks="outside",
showline=True, linewidth=1.2, linecolor="#444", mirror=True,
showgrid=True, gridcolor="rgba(0,0,0,0.12)",
automargin=True, fixedrange=True # disables zooming
)
fig.update_yaxes(
title_text="Predicted UCS (psi)",
range=[fixed_min, fixed_max],
tickformat=",.0f", ticks="outside",
showline=True, linewidth=1.2, linecolor="#444", mirror=True,
showgrid=True, gridcolor="rgba(0,0,0,0.12)",
scaleanchor="x", scaleratio=1,
automargin=True, fixedrange=True # disables zooming
)
return fig
def track_plot(df, include_actual=True):
depth_col = next((c for c in df.columns if 'depth' in str(c).lower()), None)
if depth_col is not None:
y = pd.Series(df[depth_col]).astype(float)
ylab = depth_col
y_min, y_max = float(y.min()), float(y.max())
y_range = [y_max, y_min] # reversed for log profile style
else:
y = pd.Series(np.arange(1, len(df) + 1))
ylab = "Point Index"
y_min, y_max = float(y.min()), float(y.max())
y_range = [y_max, y_min]
# X (UCS) range & ticks
x_series = pd.Series(df.get("UCS_Pred", pd.Series(dtype=float))).astype(float)
if include_actual and TARGET in df.columns:
x_series = pd.concat([x_series, pd.Series(df[TARGET]).astype(float)], ignore_index=True)
x_lo, x_hi = float(x_series.min()), float(x_series.max())
x_pad = 0.03 * (x_hi - x_lo if x_hi > x_lo else 1.0)
xmin, xmax = x_lo - x_pad, x_hi + x_pad
tick0 = _nice_tick0(xmin, step=100)
fig = go.Figure()
fig.add_trace(go.Scatter(
x=df["UCS_Pred"], y=y, mode="lines",
line=dict(color=COLORS["pred"], width=1.8),
name="UCS_Pred",
hovertemplate="UCS_Pred: %{x:.0f}
" + ylab + ": %{y}"
))
if include_actual and TARGET in df.columns:
fig.add_trace(go.Scatter(
x=df[TARGET], y=y, mode="lines",
line=dict(color=COLORS["actual"], width=2.0, dash="dot"),
name="UCS (actual)",
hovertemplate="UCS (actual): %{x:.0f}
" + ylab + ": %{y}"
))
fig.update_layout(
width=TRACK_W, height=TRACK_H,
paper_bgcolor="#fff", plot_bgcolor="#fff",
margin=dict(l=72, r=18, t=36, b=48),
hovermode="closest",
font=dict(size=FONT_SZ),
legend=dict(
x=0.98, y=0.05, xanchor="right", yanchor="bottom",
bgcolor="rgba(255,255,255,0.75)", bordercolor="#ccc", borderwidth=1
),
legend_title_text=""
)
fig.update_xaxes(
title_text="UCS (psi)",
title_font=dict(size=16),
side="top", range=[xmin, xmax],
ticks="outside", tickformat=",.0f",
tickmode="auto", tick0=tick0,
showline=True, linewidth=1.2, linecolor="#444", mirror=True,
showgrid=True, gridcolor="rgba(0,0,0,0.12)", automargin=True
)
fig.update_yaxes(
title_text=f"{ylab}",
title_font=dict(size=16),
range=y_range,
ticks="outside",
showline=True, linewidth=1.2, linecolor="#444", mirror=True,
showgrid=True, gridcolor="rgba(0,0,0,0.12)", automargin=True
)
# Add a border rectangle to enclose the full figure space
fig.add_shape(
type="rect",
xref="paper", yref="paper",
x0=-0.12, y0=-0.12, x1=1.12, y1=1.12,
line=dict(color="#000", width=1.5),
layer="below"
)
return fig
# ---------- Preview modal (matplotlib) ----------
def preview_tracks(df: pd.DataFrame, cols: list[str]):
cols = [c for c in cols if c in df.columns]
n = len(cols)
if n == 0:
fig, ax = plt.subplots(figsize=(4, 2))
ax.text(0.5,0.5,"No selected columns",ha="center",va="center")
ax.axis("off")
return fig
fig, axes = plt.subplots(1, n, figsize=(2.2*n, 7.0), sharey=True, dpi=100)
if n == 1: axes = [axes]
idx = np.arange(1, len(df) + 1)
for ax, col in zip(axes, cols):
ax.plot(df[col], idx, '-', lw=1.4, color="#333")
ax.set_xlabel(col); ax.xaxis.set_label_position('top'); ax.xaxis.tick_top(); ax.invert_yaxis()
ax.grid(True, linestyle=":", alpha=0.3)
for s in ax.spines.values(): s.set_visible(True)
axes[0].set_ylabel("Point Index")
return fig
try:
dialog = st.dialog
except AttributeError:
def dialog(title):
def deco(fn):
def wrapper(*args, **kwargs):
with st.expander(title, expanded=True):
return fn(*args, **kwargs)
return wrapper
return deco
@dialog("Preview data")
def preview_modal(book: dict[str, pd.DataFrame]):
if not book:
st.info("No data loaded yet."); return
names = list(book.keys())
tabs = st.tabs(names)
for t, name in zip(tabs, names):
with t:
df = book[name]
t1, t2 = st.tabs(["Tracks", "Summary"])
with t1: st.pyplot(preview_tracks(df, FEATURES), use_container_width=True)
with t2:
tbl = df[FEATURES].agg(['min','max','mean','std']).T.rename(columns={"min":"Min","max":"Max","mean":"Mean","std":"Std"})
st.dataframe(tbl.reset_index(names="Feature"), use_container_width=True)
# =========================
# Load model (simple)
# =========================
def ensure_model() -> Path|None:
for p in [DEFAULT_MODEL, *MODEL_FALLBACKS]:
if p.exists() and p.stat().st_size > 0: return p
url = os.environ.get("MODEL_URL", "")
if not url: return None
try:
import requests
DEFAULT_MODEL.parent.mkdir(parents=True, exist_ok=True)
with requests.get(url, stream=True, timeout=30) as r:
r.raise_for_status()
with open(DEFAULT_MODEL, "wb") as f:
for chunk in r.iter_content(1<<20):
if chunk: f.write(chunk)
return DEFAULT_MODEL
except Exception:
return None
mpath = ensure_model()
if not mpath:
st.error("Model not found. Upload models/ucs_rf.joblib (or set MODEL_URL).")
st.stop()
try:
model = load_model(str(mpath))
except Exception as e:
st.error(f"Failed to load model: {e}")
st.stop()
meta_path = MODELS_DIR / "meta.json"
if meta_path.exists():
try:
meta = json.loads(meta_path.read_text(encoding="utf-8"))
FEATURES = meta.get("features", FEATURES); TARGET = meta.get("target", TARGET)
except Exception:
pass
# =========================
# Session state
# =========================
st.session_state.setdefault("app_step", "intro")
st.session_state.setdefault("results", {})
st.session_state.setdefault("train_ranges", None)
st.session_state.setdefault("dev_file_name","")
st.session_state.setdefault("dev_file_bytes",b"")
st.session_state.setdefault("dev_file_loaded",False)
st.session_state.setdefault("dev_preview",False)
# =========================
# Hero
# =========================
st.markdown(
f"""
ST_GeoMech_UCS
Real-Time UCS Tracking While Drilling
""",
unsafe_allow_html=True,
)
# =========================
# INTRO
# =========================
if st.session_state.app_step == "intro":
st.header("Welcome!")
st.markdown("This software is developed by *Smart Thinking AI-Solutions Team* to estimate UCS from drilling data.")
st.subheader("How It Works")
st.markdown(
"1) **Upload your data to build the case and preview the performance of our model.** \n"
"2) Click **Run Model** to compute metrics and plots. \n"
"3) **Proceed to Validation** (with actual UCS) or **Proceed to Prediction** (no UCS)."
)
if st.button("Start Showcase", type="primary"):
st.session_state.app_step = "dev"; st.rerun()
# =========================
# CASE BUILDING
# =========================
if st.session_state.app_step == "dev":
st.sidebar.header("Case Building")
up = st.sidebar.file_uploader("Upload Train/Test Excel", type=["xlsx","xls"])
if up is not None:
st.session_state.dev_file_bytes = up.getvalue()
st.session_state.dev_file_name = up.name
st.session_state.dev_file_loaded = True
st.session_state.dev_preview = False
if st.session_state.dev_file_loaded:
tmp = read_book_bytes(st.session_state.dev_file_bytes)
if tmp:
df0 = next(iter(tmp.values()))
st.sidebar.caption(f"**Data loaded:** {st.session_state.dev_file_name} • {df0.shape[0]} rows × {df0.shape[1]} cols")
if st.sidebar.button("Preview data", use_container_width=True, disabled=not st.session_state.dev_file_loaded):
preview_modal(read_book_bytes(st.session_state.dev_file_bytes))
st.session_state.dev_preview = True
run = st.sidebar.button("Run Model", type="primary", use_container_width=True)
# always available nav
if st.sidebar.button("Proceed to Validation ▶", use_container_width=True): st.session_state.app_step="validate"; st.rerun()
if st.sidebar.button("Proceed to Prediction ▶", use_container_width=True): st.session_state.app_step="predict"; st.rerun()
# ---- Pinned helper at the very top of the page ----
helper_top = st.container()
with helper_top:
st.subheader("Case Building")
if st.session_state.dev_file_loaded and st.session_state.dev_preview:
st.info("Previewed ✓ — now click **Run Model**.")
elif st.session_state.dev_file_loaded:
st.info("📄 **Preview uploaded data** using the sidebar button, then click **Run Model**.")
else:
st.write("**Upload your data to build a case, then run the model to review development performance.**")
if run and st.session_state.dev_file_bytes:
book = read_book_bytes(st.session_state.dev_file_bytes)
sh_train = find_sheet(book, ["Train","Training","training2","train","training"])
sh_test = find_sheet(book, ["Test","Testing","testing2","test","testing"])
if sh_train is None or sh_test is None:
st.error("Workbook must include Train/Training/training2 and Test/Testing/testing2 sheets."); st.stop()
tr = book[sh_train].copy(); te = book[sh_test].copy()
if not (ensure_cols(tr, FEATURES+[TARGET]) and ensure_cols(te, FEATURES+[TARGET])):
st.error("Missing required columns."); st.stop()
tr["UCS_Pred"] = model.predict(tr[FEATURES])
te["UCS_Pred"] = model.predict(te[FEATURES])
st.session_state.results["Train"]=tr; st.session_state.results["Test"]=te
st.session_state.results["m_train"]={"R2":r2_score(tr[TARGET],tr["UCS_Pred"]), "RMSE":rmse(tr[TARGET],tr["UCS_Pred"]), "MAE":mean_absolute_error(tr[TARGET],tr["UCS_Pred"])}
st.session_state.results["m_test"] ={"R2":r2_score(te[TARGET],te["UCS_Pred"]), "RMSE":rmse(te[TARGET],te["UCS_Pred"]), "MAE":mean_absolute_error(te[TARGET],te["UCS_Pred"])}
tr_min = tr[FEATURES].min().to_dict(); tr_max = tr[FEATURES].max().to_dict()
st.session_state.train_ranges = {f:(float(tr_min[f]), float(tr_max[f])) for f in FEATURES}
st.success("Case has been built and results are displayed below.")
def _dev_block(df, m):
c1,c2,c3 = st.columns(3)
c1.metric("R²", f"{m['R2']:.4f}"); c2.metric("RMSE", f"{m['RMSE']:.4f}"); c3.metric("MAE", f"{m['MAE']:.4f}")
left, spacer, right = st.columns(PLOT_COLS)
with left:
pad, plotcol = left.columns([CROSS_NUDGE, 1]) # shift cross-plot right inside its band
with plotcol:
st.plotly_chart(
cross_plot(df[TARGET], df["UCS_Pred"]),
use_container_width=False,
config={"displayModeBar": False, "scrollZoom": True}
)
with right:
st.plotly_chart(
track_plot(df, include_actual=True),
use_container_width=False,
config={"displayModeBar": False, "scrollZoom": True}
)
if "Train" in st.session_state.results or "Test" in st.session_state.results:
tab1, tab2 = st.tabs(["Training", "Testing"])
if "Train" in st.session_state.results:
with tab1: _dev_block(st.session_state.results["Train"], st.session_state.results["m_train"])
if "Test" in st.session_state.results:
with tab2: _dev_block(st.session_state.results["Test"], st.session_state.results["m_test"])
# =========================
# VALIDATION (with actual UCS)
# =========================
if st.session_state.app_step == "validate":
st.sidebar.header("Validate the Model")
up = st.sidebar.file_uploader("Upload Validation Excel", type=["xlsx","xls"])
if up is not None:
book = read_book_bytes(up.getvalue())
if book:
df0 = next(iter(book.values()))
st.sidebar.caption(f"**Data loaded:** {up.name} • {df0.shape[0]} rows × {df0.shape[1]} cols")
if st.sidebar.button("Preview data", use_container_width=True, disabled=(up is None)):
preview_modal(read_book_bytes(up.getvalue()))
go_btn = st.sidebar.button("Predict", type="primary", use_container_width=True)
if st.sidebar.button("⬅ Back to Case Building", use_container_width=True): st.session_state.app_step="dev"; st.rerun()
if st.sidebar.button("Proceed to Prediction ▶", use_container_width=True): st.session_state.app_step="predict"; st.rerun()
st.subheader("Validate the Model")
st.write("Upload a dataset with the same **features** and **UCS** to evaluate performance.")
if go_btn and up is not None:
book = read_book_bytes(up.getvalue())
name = find_sheet(book, ["Validation","Validate","validation2","Val","val"]) or list(book.keys())[0]
df = book[name].copy()
if not ensure_cols(df, FEATURES+[TARGET]): st.error("Missing required columns."); st.stop()
df["UCS_Pred"] = model.predict(df[FEATURES])
st.session_state.results["Validate"]=df
ranges = st.session_state.train_ranges; oor_pct = 0.0; tbl=None
if ranges:
any_viol = pd.DataFrame({f:(df[f]ranges[f][1]) for f in FEATURES}).any(axis=1)
oor_pct = float(any_viol.mean()*100.0)
if any_viol.any():
tbl = df.loc[any_viol, FEATURES].copy()
tbl["Violations"] = pd.DataFrame({f:(df[f]ranges[f][1]) for f in FEATURES}).loc[any_viol].apply(lambda r:", ".join([c for c,v in r.items() if v]), axis=1)
st.session_state.results["m_val"]={"R2":r2_score(df[TARGET],df["UCS_Pred"]), "RMSE":rmse(df[TARGET],df["UCS_Pred"]), "MAE":mean_absolute_error(df[TARGET],df["UCS_Pred"])}
st.session_state.results["sv_val"]={"n":len(df),"pred_min":float(df["UCS_Pred"].min()),"pred_max":float(df["UCS_Pred"].max()),"oor":oor_pct}
st.session_state.results["oor_tbl"]=tbl
if "Validate" in st.session_state.results:
m = st.session_state.results["m_val"]
c1,c2,c3 = st.columns(3)
c1.metric("R²", f"{m['R2']:.4f}"); c2.metric("RMSE", f"{m['RMSE']:.4f}"); c3.metric("MAE", f"{m['MAE']:.4f}")
left, spacer, right = st.columns(PLOT_COLS)
with left:
pad, plotcol = left.columns([CROSS_NUDGE, 1]) # same nudge
with plotcol:
st.plotly_chart(
cross_plot(st.session_state.results["Validate"][TARGET],
st.session_state.results["Validate"]["UCS_Pred"]),
use_container_width=False, config={"displayModeBar": False, "scrollZoom": True}
)
with right:
st.plotly_chart(
track_plot(st.session_state.results["Validate"], include_actual=True),
use_container_width=False, config={"displayModeBar": False, "scrollZoom": True}
)
sv = st.session_state.results["sv_val"]
if sv["oor"] > 0: st.warning("Some inputs fall outside **training min–max** ranges.")
if st.session_state.results["oor_tbl"] is not None:
st.write("*Out-of-range rows (vs. Training min–max):*")
st.dataframe(st.session_state.results["oor_tbl"], use_container_width=True)
# =========================
# PREDICTION (no actual UCS)
# =========================
if st.session_state.app_step == "predict":
st.sidebar.header("Prediction (No Actual UCS)")
up = st.sidebar.file_uploader("Upload Prediction Excel", type=["xlsx","xls"])
if up is not None:
book = read_book_bytes(up.getvalue())
if book:
df0 = next(iter(book.values()))
st.sidebar.caption(f"**Data loaded:** {up.name} • {df0.shape[0]} rows × {df0.shape[1]} cols")
if st.sidebar.button("Preview data", use_container_width=True, disabled=(up is None)):
preview_modal(read_book_bytes(up.getvalue()))
go_btn = st.sidebar.button("Predict", type="primary", use_container_width=True)
if st.sidebar.button("⬅ Back to Case Building", use_container_width=True): st.session_state.app_step="dev"; st.rerun()
st.subheader("Prediction")
st.write("Upload a dataset with the feature columns (no **UCS**).")
if go_btn and up is not None:
book = read_book_bytes(up.getvalue()); name = list(book.keys())[0]
df = book[name].copy()
if not ensure_cols(df, FEATURES): st.error("Missing required columns."); st.stop()
df["UCS_Pred"] = model.predict(df[FEATURES])
st.session_state.results["PredictOnly"]=df
ranges = st.session_state.train_ranges; oor_pct = 0.0
if ranges:
any_viol = pd.DataFrame({f:(df[f]ranges[f][1]) for f in FEATURES}).any(axis=1)
oor_pct = float(any_viol.mean()*100.0)
st.session_state.results["sv_pred"]={
"n":len(df),
"pred_min":float(df["UCS_Pred"].min()),
"pred_max":float(df["UCS_Pred"].max()),
"pred_mean":float(df["UCS_Pred"].mean()),
"pred_std":float(df["UCS_Pred"].std(ddof=0)),
"oor":oor_pct
}
if "PredictOnly" in st.session_state.results:
df = st.session_state.results["PredictOnly"]; sv = st.session_state.results["sv_pred"]
left, spacer, right = st.columns(PLOT_COLS)
with left:
table = pd.DataFrame({
"Metric": ["# points","Pred min","Pred max","Pred mean","Pred std","OOR %"],
"Value": [sv["n"], sv["pred_min"], sv["pred_max"], sv["pred_mean"], sv["pred_std"], f'{sv["oor"]:.1f}%']
})
st.success("Predictions ready ✓")
st.dataframe(table, use_container_width=True, hide_index=True)
st.caption("**★ OOR** = % of rows whose input features fall outside the training min–max range.")
with right:
st.plotly_chart(
track_plot(df, include_actual=False),
use_container_width=False, config={"displayModeBar": False, "scrollZoom": True}
)
# =========================
# Footer
# =========================
st.markdown("---")
st.markdown(
"""
""",
unsafe_allow_html=True
)