# app.py — ST_GeoMech_YM (mirrors your UCS GUI, adapted for Young's Modulus) import io, json, os, base64, math from pathlib import Path import streamlit as st import pandas as pd import numpy as np import joblib from datetime import datetime # Matplotlib for PREVIEW modal and the CROSS-PLOT (static) import matplotlib matplotlib.use("Agg") import matplotlib.pyplot as plt from matplotlib.ticker import FuncFormatter import plotly.graph_objects as go from sklearn.metrics import mean_squared_error # ========================= # Constants (Ym variant) # ========================= APP_NAME = "ST_GeoMech_YM" TAGLINE = "Real-Time Young's Modulus Tracking" FEATURES = ["WOB(klbf)", "TORQUE(kft.lbf)", "SPP(psi)", "RPM(1/min)", "ROP(ft/h)", "Flow Rate, gpm"] TARGET = "Actual Ym" PRED_COL = "Ym_Pred" MODELS_DIR = Path("models") DEFAULT_MODEL = MODELS_DIR / "ym_rf.joblib" MODEL_FALLBACKS = [MODELS_DIR / "model.joblib", MODELS_DIR / "model.pkl"] COLORS = {"pred": "#1f77b4", "actual": "#f2b702", "ref": "#5a5a5a"} # ---- Plot sizing controls ---- CROSS_W = 350 CROSS_H = 350 TRACK_H = 1000 TRACK_W = 500 FONT_SZ = 13 BOLD_FONT = "Arial Black, Arial, sans-serif" # ========================= # Page / CSS # ========================= st.set_page_config(page_title=APP_NAME, page_icon="logo.png", layout="wide") st.markdown(""" """, unsafe_allow_html=True) # Sticky helpers st.markdown(""" """, unsafe_allow_html=True) # Hide uploader helper text st.markdown(""" """, unsafe_allow_html=True) # Make the Preview expander title & tabs sticky (pinned to the top) st.markdown(""" """, unsafe_allow_html=True) # Center text in all pandas Styler tables TABLE_CENTER_CSS = [ dict(selector="th", props=[("text-align", "center")]), dict(selector="td", props=[("text-align", "center")]), ] # Message box CSS st.markdown(""" """, unsafe_allow_html=True) # ========================= # Password gate # ========================= def inline_logo(path="logo.png") -> str: try: p = Path(path) if not p.exists(): return "" return f"data:image/png;base64,{base64.b64encode(p.read_bytes()).decode('ascii')}" except Exception: return "" def add_password_gate() -> None: try: required = st.secrets.get("APP_PASSWORD", "") except Exception: required = os.environ.get("APP_PASSWORD", "") if not required: st.warning("Set APP_PASSWORD in Secrets (or environment) and restart.") st.stop() if st.session_state.get("auth_ok", False): return st.sidebar.markdown(f"""
{APP_NAME}
Smart Thinking • Secure Access
""", unsafe_allow_html=True ) pwd = st.sidebar.text_input("Access key", type="password", placeholder="••••••••") if st.sidebar.button("Unlock", type="primary"): if pwd == required: st.session_state.auth_ok = True st.rerun() else: st.error("Incorrect key.") st.stop() add_password_gate() # ========================= # Utilities # ========================= def rmse(y_true, y_pred) -> float: return float(np.sqrt(mean_squared_error(y_true, y_pred))) def pearson_r(y_true, y_pred) -> float: a = np.asarray(y_true, dtype=float) p = np.asarray(y_pred, dtype=float) if a.size < 2: return float("nan") # Guard constant series if np.all(a == a[0]) or np.all(p == p[0]): return float("nan") return float(np.corrcoef(a, p)[0, 1]) @st.cache_resource(show_spinner=False) def load_model(model_path: str): return joblib.load(model_path) @st.cache_data(show_spinner=False) def parse_excel(data_bytes: bytes): bio = io.BytesIO(data_bytes) xl = pd.ExcelFile(bio) return {sh: xl.parse(sh) for sh in xl.sheet_names} def read_book_bytes(b: bytes): return parse_excel(b) if b else {} def _normalize_columns(df: pd.DataFrame) -> pd.DataFrame: out = df.copy() out.columns = [c.strip() for c in out.columns] # Fix flow-rate typo variants out = out.rename(columns={ "Fow Rate, gpm": "Flow Rate, gpm", "Fow Rate, gpm ": "Flow Rate, gpm" }) return out def ensure_cols(df: pd.DataFrame, cols: list[str]) -> bool: miss = [c for c in cols if c not in df.columns] if miss: st.error(f"Missing columns: {miss}\nFound: {list(df.columns)}") return False return True def find_sheet(book, names): low2orig = {k.lower(): k for k in book.keys()} for nm in names: if nm.lower() in low2orig: return low2orig[nm.lower()] return None def _nice_tick0(xmin: float, step: float = 0.1) -> float: # Rounded start tick for continuous Ym scales (unit-agnostic) return step * math.floor(xmin / step) if np.isfinite(xmin) else xmin def df_centered_rounded(df: pd.DataFrame, hide_index=True): out = df.copy() numcols = out.select_dtypes(include=[np.number]).columns styler = ( out.style .format({c: "{:.2f}" for c in numcols}) .set_properties(**{"text-align": "center"}) .set_table_styles(TABLE_CENTER_CSS) ) st.dataframe(styler, use_container_width=True, hide_index=hide_index) # === Excel export helpers ================================================= def _excel_engine() -> str: try: import xlsxwriter # noqa: F401 return "xlsxwriter" except Exception: return "openpyxl" def _excel_safe_name(name: str) -> str: bad = '[]:*?/\\' safe = ''.join('_' if ch in bad else ch for ch in str(name)) return safe[:31] def mape(y_true, y_pred, eps: float = 1e-8) -> float: a = np.asarray(y_true, dtype=float) p = np.asarray(y_pred, dtype=float) denom = np.where(np.abs(a) < eps, np.nan, np.abs(a)) # ignore near-zero actuals return float(np.nanmean(np.abs(a - p) / denom) * 100.0) # percent def _round_numeric(df: pd.DataFrame, ndigits: int = 3) -> pd.DataFrame: out = df.copy() for c in out.columns: if pd.api.types.is_float_dtype(out[c]) or pd.api.types.is_integer_dtype(out[c]): out[c] = pd.to_numeric(out[c], errors="coerce").round(ndigits) return out def _summary_table(df: pd.DataFrame, cols: list[str]) -> pd.DataFrame: cols = [c for c in cols if c in df.columns] if not cols: return pd.DataFrame() tbl = (df[cols] .agg(['min','max','mean','std']) .T.rename(columns={"min":"Min","max":"Max","mean":"Mean","std":"Std"}) .reset_index(names="Field")) return _round_numeric(tbl, 3) def _train_ranges_df(ranges: dict[str, tuple[float, float]]) -> pd.DataFrame: if not ranges: return pd.DataFrame() df = pd.DataFrame(ranges).T.reset_index() df.columns = ["Feature", "Min", "Max"] return _round_numeric(df, 3) def _excel_autofit(writer, sheet_name: str, df: pd.DataFrame, min_w: int = 8, max_w: int = 40): """Auto-fit columns when using xlsxwriter.""" try: import xlsxwriter # noqa: F401 except Exception: return ws = writer.sheets[sheet_name] for i, col in enumerate(df.columns): series = df[col].astype(str) max_len = max([len(str(col))] + series.map(len).tolist()) ws.set_column(i, i, max(min_w, min(max_len + 2, max_w))) ws.freeze_panes(1, 0) def _add_sheet(sheets: dict, order: list, name: str, df: pd.DataFrame, ndigits: int): if df is None or df.empty: return sheets[name] = _round_numeric(df, ndigits) order.append(name) def _available_sections() -> list[str]: """Compute which sections exist (offered in the export dropdown).""" res = st.session_state.get("results", {}) sections = [] if "Train" in res: sections += ["Training","Training_Metrics","Training_Summary"] if "Test" in res: sections += ["Testing","Testing_Metrics","Testing_Summary"] if "Validate" in res: sections += ["Validation","Validation_Metrics","Validation_Summary","Validation_OOR"] if "PredictOnly" in res: sections += ["Prediction","Prediction_Summary"] if st.session_state.get("train_ranges"): sections += ["Training_Ranges"] sections += ["Info"] return sections def build_export_workbook(selected: list[str], ndigits: int = 3, do_autofit: bool = True) -> tuple[bytes|None, str|None, list[str]]: """Builds an in-memory Excel workbook for selected sheets; fixed rounding to 3 decimals.""" res = st.session_state.get("results", {}) if not res: return None, None, [] sheets: dict[str, pd.DataFrame] = {} order: list[str] = [] # Training if "Training" in selected and "Train" in res: _add_sheet(sheets, order, "Training", res["Train"], ndigits) if "Training_Metrics" in selected and res.get("m_train"): _add_sheet(sheets, order, "Training_Metrics", pd.DataFrame([res["m_train"]]), ndigits) if "Training_Summary" in selected and "Train" in res: tr_cols = FEATURES + [c for c in [TARGET, PRED_COL] if c in res["Train"].columns] _add_sheet(sheets, order, "Training_Summary", _summary_table(res["Train"], tr_cols), ndigits) # Testing if "Testing" in selected and "Test" in res: _add_sheet(sheets, order, "Testing", res["Test"], ndigits) if "Testing_Metrics" in selected and res.get("m_test"): _add_sheet(sheets, order, "Testing_Metrics", pd.DataFrame([res["m_test"]]), ndigits) if "Testing_Summary" in selected and "Test" in res: te_cols = FEATURES + [c for c in [TARGET, PRED_COL] if c in res["Test"].columns] _add_sheet(sheets, order, "Testing_Summary", _summary_table(res["Test"], te_cols), ndigits) # Validation if "Validation" in selected and "Validate" in res: _add_sheet(sheets, order, "Validation", res["Validate"], ndigits) if "Validation_Metrics" in selected and res.get("m_val"): _add_sheet(sheets, order, "Validation_Metrics", pd.DataFrame([res["m_val"]]), ndigits) if "Validation_Summary" in selected and res.get("sv_val"): _add_sheet(sheets, order, "Validation_Summary", pd.DataFrame([res["sv_val"]]), ndigits) if "Validation_OOR" in selected and isinstance(res.get("oor_tbl"), pd.DataFrame) and not res["oor_tbl"].empty: _add_sheet(sheets, order, "Validation_OOR", res["oor_tbl"].reset_index(drop=True), ndigits) # Prediction if "Prediction" in selected and "PredictOnly" in res: _add_sheet(sheets, order, "Prediction", res["PredictOnly"], ndigits) if "Prediction_Summary" in selected and res.get("sv_pred"): _add_sheet(sheets, order, "Prediction_Summary", pd.DataFrame([res["sv_pred"]]), ndigits) # Training ranges if "Training_Ranges" in selected and st.session_state.get("train_ranges"): rr = _train_ranges_df(st.session_state["train_ranges"]) _add_sheet(sheets, order, "Training_Ranges", rr, ndigits) # Info if "Info" in selected: info = pd.DataFrame([ {"Key": "AppName", "Value": APP_NAME}, {"Key": "Tagline", "Value": TAGLINE}, {"Key": "Target", "Value": TARGET}, {"Key": "PredColumn", "Value": PRED_COL}, {"Key": "Features", "Value": ", ".join(FEATURES)}, {"Key": "ExportedAt", "Value": datetime.now().strftime("%Y-%m-%d %H:%M:%S")}, ]) _add_sheet(sheets, order, "Info", info, ndigits) if not order: return None, None, [] bio = io.BytesIO() engine = _excel_engine() with pd.ExcelWriter(bio, engine=engine) as writer: for name in order: df = sheets[name] sheet = _excel_safe_name(name) df.to_excel(writer, sheet_name=sheet, index=False) if do_autofit: _excel_autofit(writer, sheet, df) bio.seek(0) fname = f"YM_Export_{datetime.now().strftime('%Y%m%d_%H%M%S')}.xlsx" return bio.getvalue(), fname, order # --------- SIMPLE export UI (dropdown checklist, starts empty) ---------- def render_export_button(phase_key: str) -> None: """ Export UI — one multiselect dropdown that starts EMPTY. The download button is disabled until at least one sheet is selected. """ res = st.session_state.get("results", {}) if not res: return st.divider() st.markdown("### Export to Excel") options = _available_sections() # only what exists right now selected_sheets = st.multiselect( "Sheets to include", options=options, default=[], placeholder="Choose option(s)", help="Pick the sheets you want to include in the Excel export.", key=f"sheets_{phase_key}", ) if not selected_sheets: st.caption("Select one or more sheets above to enable the export.") st.download_button( label="⬇️ Export Excel", data=b"", file_name="YM_Export.xlsx", mime="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", disabled=True, key=f"download_{phase_key}", ) return data, fname, names = build_export_workbook(selected=selected_sheets, ndigits=3, do_autofit=True) if names: st.caption("Will include: " + ", ".join(names)) st.download_button( "⬇️ Export Excel", data=(data or b""), file_name=(fname or "YM_Export.xlsx"), mime="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", disabled=(data is None), key=f"download_{phase_key}", ) # ========================= # Cross plot (Matplotlib) — auto-scaled for Ym # ========================= def cross_plot_static(actual, pred, xlabel="Actual Ym", ylabel="Predicted Ym"): a = pd.Series(actual, dtype=float) p = pd.Series(pred, dtype=float) lo = float(min(a.min(), p.min())) hi = float(max(a.max(), p.max())) pad = 0.03 * (hi - lo if hi > lo else 1.0) lo2, hi2 = lo - pad, hi + pad ticks = np.linspace(lo2, hi2, 5) dpi = 110 fig, ax = plt.subplots(figsize=(CROSS_W / dpi, CROSS_H / dpi), dpi=dpi, constrained_layout=False) ax.scatter(a, p, s=14, c=COLORS["pred"], alpha=0.9, linewidths=0) ax.plot([lo2, hi2], [lo2, hi2], linestyle="--", linewidth=1.2, color=COLORS["ref"]) ax.set_xlim(lo2, hi2) ax.set_ylim(lo2, hi2) ax.set_xticks(ticks) ax.set_yticks(ticks) ax.set_aspect("equal", adjustable="box") # Generic numeric formatting (2 decimals) fmt = FuncFormatter(lambda x, _: f"{x:.2f}") ax.xaxis.set_major_formatter(fmt) ax.yaxis.set_major_formatter(fmt) ax.set_xlabel(xlabel, fontweight="bold", fontsize=10, color="black") ax.set_ylabel(ylabel, fontweight="bold", fontsize=10, color="black") ax.tick_params(labelsize=6, colors="black") ax.grid(True, linestyle=":", alpha=0.3) for spine in ax.spines.values(): spine.set_linewidth(1.1) spine.set_color("#444") fig.subplots_adjust(left=0.16, bottom=0.16, right=0.98, top=0.98) return fig # ========================= # Track plot (Plotly) # ========================= def track_plot(df, include_actual=True): # Depth (or index) on Y depth_col = next((c for c in df.columns if 'depth' in str(c).lower()), None) if depth_col is not None: y = pd.Series(df[depth_col]).astype(float) ylab = depth_col y_range = [float(y.max()), float(y.min())] # reverse else: y = pd.Series(np.arange(1, len(df) + 1)) ylab = "Point Index" y_range = [float(y.max()), float(y.min())] # X range from prediction/actual x_series = pd.Series(df.get(PRED_COL, pd.Series(dtype=float))).astype(float) if include_actual and TARGET in df.columns: x_series = pd.concat([x_series, pd.Series(df[TARGET]).astype(float)], ignore_index=True) x_lo, x_hi = float(x_series.min()), float(x_series.max()) x_pad = 0.03 * (x_hi - x_lo if x_hi > x_lo else 1.0) xmin, xmax = x_lo - x_pad, x_hi + x_pad tick0 = _nice_tick0(xmin, step=max((xmax - xmin) / 10.0, 0.1)) fig = go.Figure() if PRED_COL in df.columns: fig.add_trace(go.Scatter( x=df[PRED_COL], y=y, mode="lines", line=dict(color=COLORS["pred"], width=1.8), name=PRED_COL, hovertemplate=f"{PRED_COL}: "+"%{x:.0f}
"+ylab+": %{y}" )) if include_actual and TARGET in df.columns: fig.add_trace(go.Scatter( x=df[TARGET], y=y, mode="lines", line=dict(color=COLORS["actual"], width=2.0, dash="dot"), name=f"{TARGET} (actual)", hovertemplate=f"{TARGET}: "+"%{x:.0f}
"+ylab+": %{y}" )) fig.update_layout( height=TRACK_H, width=TRACK_W, autosize=False, paper_bgcolor="#fff", plot_bgcolor="#fff", margin=dict(l=64, r=16, t=36, b=48), hovermode="closest", font=dict(size=FONT_SZ, color="#000"), legend=dict(x=0.98, y=0.05, xanchor="right", yanchor="bottom", bgcolor="rgba(255,255,255,0.75)", bordercolor="#ccc", borderwidth=1), legend_title_text="" ) # X axis with NO decimals fig.update_xaxes( title_text="Ym", title_font=dict(size=20, family=BOLD_FONT, color="#000"), tickfont=dict(size=15, family=BOLD_FONT, color="#000"), side="top", range=[xmin, xmax], ticks="outside", tickformat=",.0f", # integers, thousands separated tickmode="auto", tick0=tick0, showline=True, linewidth=1.2, linecolor="#444", mirror=True, showgrid=True, gridcolor="rgba(0,0,0,0.12)", automargin=True ) fig.update_yaxes( title_text=ylab, title_font=dict(size=20, family=BOLD_FONT, color="#000"), tickfont=dict(size=15, family=BOLD_FONT, color="#000"), range=y_range, ticks="outside", showline=True, linewidth=1.2, linecolor="#444", mirror=True, showgrid=True, gridcolor="rgba(0,0,0,0.12)", automargin=True ) return fig def preview_tracks(df: pd.DataFrame, cols: list[str]): """ Render quick-look tracks for the selected columns with DISTINCT colors per input, and reserve the Y-axis across all plots so depth/index is aligned. """ cols = [c for c in cols if c in df.columns] n = len(cols) if n == 0: fig, ax = plt.subplots(figsize=(4, 2)) ax.text(0.5, 0.5, "No selected columns", ha="center", va="center") ax.axis("off") return fig # Depth or fallback to point index depth_col = next((c for c in df.columns if 'depth' in str(c).lower()), None) if depth_col is not None: idx = pd.to_numeric(df[depth_col], errors="coerce") y_label = depth_col else: idx = pd.Series(np.arange(1, len(df) + 1)) y_label = "Point Index" # Y range is reserved across all subplots y_min, y_max = float(idx.min()), float(idx.max()) fig, axes = plt.subplots(1, n, figsize=(2.2 * n, 7.0), sharey=True, dpi=100) if n == 1: axes = [axes] # Stable color palette cmap = plt.get_cmap("tab20") col_colors = {col: cmap(i % cmap.N) for i, col in enumerate(cols)} for i, (ax, col) in enumerate(zip(axes, cols)): x = pd.to_numeric(df[col], errors="coerce") ax.plot(x, idx, '-', lw=1.6, color=col_colors[col]) ax.set_xlabel(col) ax.xaxis.set_label_position('top') ax.xaxis.tick_top() ax.set_ylim(y_max, y_min) # reserve and invert depth axis ax.grid(True, linestyle=":", alpha=0.3) # Only show y-axis label + ticks on the first subplot if i == 0: ax.set_ylabel(y_label) else: ax.tick_params(labelleft=False) ax.set_ylabel("") fig.tight_layout() return fig # Modal wrapper try: dialog = st.dialog except AttributeError: def dialog(title): def deco(fn): def wrapper(*args, **kwargs): with st.expander(title, expanded=True): return fn(*args, **kwargs) return wrapper return deco def preview_modal(book: dict[str, pd.DataFrame]): if not book: st.info("No data loaded yet."); return names = list(book.keys()) tabs = st.tabs(names) for t, name in zip(tabs, names): with t: df = _normalize_columns(book[name]) t1, t2 = st.tabs(["Tracks", "Summary"]) with t1: st.pyplot(preview_tracks(df, FEATURES), use_container_width=True) with t2: present = [c for c in FEATURES if c in df.columns] if present: tbl = (df[present] .agg(['min','max','mean','std']) .T.rename(columns={"min":"Min","max":"Max","mean":"Mean","std":"Std"})) df_centered_rounded(tbl.reset_index(names="Feature")) else: st.info("No expected feature columns found to summarize.") # ========================= # Load model # ========================= def ensure_model() -> Path|None: for p in [DEFAULT_MODEL, *MODEL_FALLBACKS]: if p.exists() and p.stat().st_size > 0: return p url = os.environ.get("MODEL_URL", "") if not url: return None try: import requests DEFAULT_MODEL.parent.mkdir(parents=True, exist_ok=True) with requests.get(url, stream=True, timeout=30) as r: r.raise_for_status() with open(DEFAULT_MODEL, "wb") as f: for chunk in r.iter_content(1<<20): if chunk: f.write(chunk) return DEFAULT_MODEL except Exception: return None mpath = ensure_model() if not mpath: st.error("Model not found. Upload models/ym_rf.joblib (or set MODEL_URL).") st.stop() try: model = load_model(str(mpath)) except Exception as e: st.error(f"Failed to load model: {e}") st.stop() # ---------- Load meta (optional) ---------- meta = {} meta_candidates = [MODELS_DIR / "meta.json", MODELS_DIR / "ym_meta.json"] meta_path = next((p for p in meta_candidates if p.exists()), None) if meta_path: try: meta = json.loads(meta_path.read_text(encoding="utf-8")) FEATURES = meta.get("features", FEATURES) TARGET = meta.get("target", TARGET) except Exception as e: st.warning(f"Could not parse meta file ({meta_path.name}): {e}") # Optional: version mismatch warning import numpy as _np, sklearn as _skl mv = meta.get("versions", {}) if mv: msg = [] if mv.get("numpy") and mv["numpy"] != _np.__version__: msg.append(f"NumPy {mv['numpy']} expected, running {_np.__version__}") if mv.get("scikit_learn") and mv["scikit_learn"] != _skl.__version__: msg.append(f"scikit-learn {mv['scikit_learn']} expected, running {_skl.__version__}") if msg: st.warning("Environment mismatch: " + " | ".join(msg)) # ========================= # Session state # ========================= st.session_state.setdefault("app_step", "intro") st.session_state.setdefault("results", {}) st.session_state.setdefault("train_ranges", None) st.session_state.setdefault("dev_file_name","") st.session_state.setdefault("dev_file_bytes",b"") st.session_state.setdefault("dev_file_loaded",False) st.session_state.setdefault("dev_preview",False) st.session_state.setdefault("show_preview_modal", False) # ========================= # Branding in Sidebar # ========================= st.sidebar.markdown(f"""
{APP_NAME}
{TAGLINE}
""", unsafe_allow_html=True ) # ========================= # Reusable Sticky Header Function # ========================= def sticky_header(title, message): st.markdown( f"""

{title}

{message}

""", unsafe_allow_html=True ) # ========================= # INTRO # ========================= if st.session_state.app_step == "intro": st.header("Welcome!") st.markdown("This software is developed by *Smart Thinking AI-Solutions Team* to estimate Young's Modulus (Ym) from drilling data.") st.subheader("How It Works") st.markdown( "1) **Upload your data to build the case and preview the model performance.** \n" "2) Click **Run Model** to compute metrics and plots. \n" "3) **Proceed to Validation** (with actual Ym) or **Proceed to Prediction** (no Ym)." ) if st.button("Start Showcase", type="primary"): st.session_state.app_step = "dev"; st.rerun() # ========================= # CASE BUILDING # ========================= if st.session_state.app_step == "dev": st.sidebar.header("Case Building") up = st.sidebar.file_uploader("Upload Your Data File", type=["xlsx","xls"]) if up is not None: st.session_state.dev_file_bytes = up.getvalue() st.session_state.dev_file_name = up.name st.session_state.dev_file_loaded = True st.session_state.dev_preview = False if st.session_state.dev_file_loaded: tmp = read_book_bytes(st.session_state.dev_file_bytes) if tmp: df0 = next(iter(tmp.values())) st.sidebar.caption(f"**Data loaded:** {st.session_state.dev_file_name} • {df0.shape[0]} rows × {df0.shape[1]} cols") if st.sidebar.button("Preview data", use_container_width=True, disabled=not st.session_state.dev_file_loaded): st.session_state.show_preview_modal = True st.session_state.dev_preview = True run = st.sidebar.button("Run Model", type="primary", use_container_width=True) if st.sidebar.button("Proceed to Validation ▶", use_container_width=True): st.session_state.app_step="validate"; st.rerun() if st.sidebar.button("Proceed to Prediction ▶", use_container_width=True): st.session_state.app_step="predict"; st.rerun() if st.session_state.dev_file_loaded and st.session_state.dev_preview: sticky_header("Case Building", "Previewed ✓ — now click **Run Model**.") elif st.session_state.dev_file_loaded: sticky_header("Case Building", "📄 **Preview uploaded data** using the sidebar button, then click **Run Model**.") else: sticky_header("Case Building", "**Upload your data to build a case, then run the model to review development performance.**") if run and st.session_state.dev_file_bytes: book = read_book_bytes(st.session_state.dev_file_bytes) sh_train = find_sheet(book, ["Train","Training","training2","train","training"]) sh_test = find_sheet(book, ["Test","Testing","testing2","test","testing"]) if sh_train is None or sh_test is None: st.markdown('
Workbook must include Train/Training/training2 and Test/Testing/testing2 sheets.
', unsafe_allow_html=True) st.stop() tr = _normalize_columns(book[sh_train].copy()) te = _normalize_columns(book[sh_test].copy()) if not (ensure_cols(tr, FEATURES+[TARGET]) and ensure_cols(te, FEATURES+[TARGET])): st.markdown('
Missing required columns.
', unsafe_allow_html=True) st.stop() tr[PRED_COL] = model.predict(tr[FEATURES]) te[PRED_COL] = model.predict(te[FEATURES]) st.session_state.results["Train"]=tr; st.session_state.results["Test"]=te st.session_state.results["m_train"]={ "R": pearson_r(tr[TARGET], tr[PRED_COL]), "RMSE": rmse(tr[TARGET], tr[PRED_COL]), "MAPE": mape(tr[TARGET], tr[PRED_COL]) } st.session_state.results["m_test"]={ "R": pearson_r(te[TARGET], te[PRED_COL]), "RMSE": rmse(te[TARGET], te[PRED_COL]), "MAPE": mape(te[TARGET], te[PRED_COL]) } tr_min = tr[FEATURES].min().to_dict(); tr_max = tr[FEATURES].max().to_dict() st.session_state.train_ranges = {f:(float(tr_min[f]), float(tr_max[f])) for f in FEATURES} st.markdown('
Case has been built and results are displayed below.
', unsafe_allow_html=True) def _dev_block(df, m): c1,c2,c3 = st.columns(3) c1.metric("R", f"{m['R']:.3f}") c2.metric("RMSE", f"{m['RMSE']:.2f}") c3.metric("MAPE (%)", f"{m['MAPE']:.2f}") st.markdown("""
R: Pearson Correlation Coefficient
RMSE: Root Mean Square Error
MAPE: Mean Absolute Percentage Error
""", unsafe_allow_html=True) col_track, col_cross = st.columns([2, 3], gap="large") with col_track: st.plotly_chart( track_plot(df, include_actual=True), use_container_width=False, config={"displayModeBar": False, "scrollZoom": True} ) with col_cross: st.pyplot(cross_plot_static(df[TARGET], df[PRED_COL]), use_container_width=False) if "Train" in st.session_state.results or "Test" in st.session_state.results: tab1, tab2 = st.tabs(["Training", "Testing"]) if "Train" in st.session_state.results: with tab1: _dev_block(st.session_state.results["Train"], st.session_state.results["m_train"]) if "Test" in st.session_state.results: with tab2: _dev_block(st.session_state.results["Test"], st.session_state.results["m_test"]) # Export UI for this phase (dropdown checklist starts empty) render_export_button(phase_key="dev") # ========================= # VALIDATION (with actual Ym) # ========================= if st.session_state.app_step == "validate": st.sidebar.header("Validate the Model") up = st.sidebar.file_uploader("Upload Validation Excel", type=["xlsx","xls"]) if up is not None: book = read_book_bytes(up.getvalue()) if book: df0 = next(iter(book.values())) st.sidebar.caption(f"**Data loaded:** {up.name} • {df0.shape[0]} rows × {df0.shape[1]} cols") if st.sidebar.button("Preview data", use_container_width=True, disabled=(up is None)): st.session_state.show_preview_modal = True go_btn = st.sidebar.button("Predict & Validate", type="primary", use_container_width=True) if st.sidebar.button("⬅ Back to Case Building", use_container_width=True): st.session_state.app_step="dev"; st.rerun() if st.sidebar.button("Proceed to Prediction ▶", use_container_width=True): st.session_state.app_step="predict"; st.rerun() sticky_header("Validate the Model", "Upload a dataset with the same **features** and **Actual Ym** to evaluate performance.") if go_btn and up is not None: book = read_book_bytes(up.getvalue()) name = find_sheet(book, ["Validation","Validate","validation2","Val","val"]) or list(book.keys())[0] df = _normalize_columns(book[name].copy()) if not ensure_cols(df, FEATURES+[TARGET]): st.markdown('
Missing required columns.
', unsafe_allow_html=True); st.stop() df[PRED_COL] = model.predict(df[FEATURES]) st.session_state.results["Validate"]=df ranges = st.session_state.train_ranges; oor_pct = 0.0; tbl=None if ranges: any_viol = pd.DataFrame({f:(df[f]ranges[f][1]) for f in FEATURES}).any(axis=1) oor_pct = float(any_viol.mean()*100.0) if any_viol.any(): tbl = df.loc[any_viol, FEATURES].copy() for c in FEATURES: if pd.api.types.is_numeric_dtype(tbl[c]): tbl[c] = tbl[c].round(2) tbl["Violations"] = pd.DataFrame({f:(df[f]ranges[f][1]) for f in FEATURES}).loc[any_viol].apply(lambda r:", ".join([c for c,v in r.items() if v]), axis=1) st.session_state.results["m_val"]={ "R": pearson_r(df[TARGET], df[PRED_COL]), "RMSE": rmse(df[TARGET], df[PRED_COL]), "MAPE": mape(df[TARGET], df[PRED_COL]) } st.session_state.results["sv_val"]={"n":len(df), "pred_min":float(df[PRED_COL].min()), "pred_max":float(df[PRED_COL].max()), "oor":oor_pct} st.session_state.results["oor_tbl"]=tbl if "Validate" in st.session_state.results: m = st.session_state.results["m_val"] c1,c2,c3 = st.columns(3) c1.metric("R", f"{m['R']:.3f}") c2.metric("RMSE", f"{m['RMSE']:.2f}") c3.metric("MAPE (%)", f"{m['MAPE']:.2f}") st.markdown("""
R: Pearson Correlation Coefficient
RMSE: Root Mean Square Error
MAPE: Mean Absolute Percentage Error
""", unsafe_allow_html=True) col_track, col_cross = st.columns([2, 3], gap="large") with col_track: st.plotly_chart( track_plot(st.session_state.results["Validate"], include_actual=True), use_container_width=False, config={"displayModeBar": False, "scrollZoom": True} ) with col_cross: st.pyplot( cross_plot_static(st.session_state.results["Validate"][TARGET], st.session_state.results["Validate"][PRED_COL]), use_container_width=False ) # Export UI for this phase (dropdown checklist starts empty) render_export_button(phase_key="validate") sv = st.session_state.results["sv_val"] if sv["oor"] > 0: st.markdown('
Some inputs fall outside **training min–max** ranges.
', unsafe_allow_html=True) if st.session_state.results["oor_tbl"] is not None: st.write("*Out-of-range rows (vs. Training min–max):*") df_centered_rounded(st.session_state.results["oor_tbl"]) # ========================= # PREDICTION (no actual Ym) # ========================= if st.session_state.app_step == "predict": st.sidebar.header("Prediction (No Actual Ym)") up = st.sidebar.file_uploader("Upload Prediction Excel", type=["xlsx","xls"]) if up is not None: book = read_book_bytes(up.getvalue()) if book: df0 = next(iter(book.values())) st.sidebar.caption(f"**Data loaded:** {up.name} • {df0.shape[0]} rows × {df0.shape[1]} cols") if st.sidebar.button("Preview data", use_container_width=True, disabled=(up is None)): st.session_state.show_preview_modal = True go_btn = st.sidebar.button("Predict", type="primary", use_container_width=True) if st.sidebar.button("⬅ Back to Case Building", use_container_width=True): st.session_state.app_step="dev"; st.rerun() sticky_header("Prediction", "Upload a dataset with the feature columns (no **Actual Ym**).") if go_btn and up is not None: book = read_book_bytes(up.getvalue()); name = list(book.keys())[0] df = _normalize_columns(book[name].copy()) if not ensure_cols(df, FEATURES): st.markdown('
Missing required columns.
', unsafe_allow_html=True); st.stop() df[PRED_COL] = model.predict(df[FEATURES]) st.session_state.results["PredictOnly"]=df ranges = st.session_state.train_ranges; oor_pct = 0.0 if ranges: any_viol = pd.DataFrame({f:(df[f]ranges[f][1]) for f in FEATURES}).any(axis=1) oor_pct = float(any_viol.mean()*100.0) st.session_state.results["sv_pred"]={ "n":len(df), "pred_min":float(df[PRED_COL].min()), "pred_max":float(df[PRED_COL].max()), "pred_mean":float(df[PRED_COL].mean()), "pred_std":float(df[PRED_COL].std(ddof=0)), "oor":oor_pct } if "PredictOnly" in st.session_state.results: df = st.session_state.results["PredictOnly"]; sv = st.session_state.results["sv_pred"] col_left, col_right = st.columns([2,3], gap="large") with col_left: table = pd.DataFrame({ "Metric": ["# points","Pred min","Pred max","Pred mean","Pred std","OOR %"], "Value": [sv["n"], round(sv["pred_min"],3), round(sv["pred_max"],3), round(sv["pred_mean"],3), round(sv["pred_std"],3), f'{sv["oor"]:.1f}%'] }) st.markdown('
Predictions ready ✓
', unsafe_allow_html=True) df_centered_rounded(table, hide_index=True) st.caption("**★ OOR** = % of rows whose input features fall outside the training min–max range.") with col_right: st.plotly_chart( track_plot(df, include_actual=False), use_container_width=False, config={"displayModeBar": False, "scrollZoom": True} ) # Export UI for this phase (dropdown checklist starts empty) render_export_button(phase_key="predict") # ========================= # Run preview modal after all other elements # ========================= if st.session_state.show_preview_modal: # Select the correct workbook bytes for this step book_to_preview = {} if st.session_state.app_step == "dev": book_to_preview = read_book_bytes(st.session_state.dev_file_bytes) elif st.session_state.app_step in ["validate", "predict"] and 'up' in locals() and up is not None: book_to_preview = read_book_bytes(up.getvalue()) with st.expander("Preview data", expanded=True): if not book_to_preview: st.markdown('
No data loaded yet.
', unsafe_allow_html=True) else: names = list(book_to_preview.keys()) tabs = st.tabs(names) for t, name in zip(tabs, names): with t: df = _normalize_columns(book_to_preview[name]) t1, t2 = st.tabs(["Tracks", "Summary"]) with t1: st.pyplot(preview_tracks(df, FEATURES), use_container_width=True) with t2: feat_present = [c for c in FEATURES if c in df.columns] if not feat_present: st.info("No feature columns found to summarize.") else: tbl = ( df[feat_present] .agg(['min','max','mean','std']) .T.rename(columns={"min":"Min","max":"Max","mean":"Mean","std":"Std"}) .reset_index(names="Feature") ) df_centered_rounded(tbl) st.session_state.show_preview_modal = False # ========================= # Footer # ========================= st.markdown("""



© 2025 Smart Thinking AI-Solutions Team. All rights reserved.
Website: smartthinking.com.sa
""", unsafe_allow_html=True)