Spaces:
Sleeping
Sleeping
| import streamlit as st | |
| import numpy as np | |
| import matplotlib.pyplot as plt | |
| from scipy.io import loadmat | |
| from scipy.interpolate import PchipInterpolator, interp1d | |
| import io | |
| import time | |
| import tempfile | |
| import os | |
| ########################################################################### | |
| # Configure Streamlit page | |
| st.set_page_config( | |
| page_title="Bubble Dynamics Analysis", | |
| page_icon="π«§", | |
| layout="wide", | |
| initial_sidebar_state="expanded" | |
| ) | |
| # Try importing TensorFlow | |
| try: | |
| import tensorflow as tf | |
| from tensorflow import keras | |
| from tensorflow.keras import layers | |
| TENSORFLOW_AVAILABLE = True | |
| except ImportError: | |
| TENSORFLOW_AVAILABLE = False | |
| # ULTRA-AGGRESSIVE CSS FIX - Complete stability with UT Austin background and Logo | |
| st.markdown(""" | |
| <style> | |
| /* BACKGROUND IMAGE STYLING */ | |
| .stApp { | |
| background-image: url('data:image/jpeg;base64,BACKGROUND_IMAGE_BASE64_HERE'); | |
| background-size: cover; | |
| background-position: center; | |
| background-repeat: no-repeat; | |
| background-attachment: fixed; | |
| } | |
| /* ALTERNATIVE: If you have the image in assets folder, use this instead: | |
| .stApp { | |
| background-image: url('./src/ut_yang_background.jpg'); | |
| background-size: cover; | |
| background-position: center; | |
| background-repeat: no-repeat; | |
| background-attachment: fixed; | |
| } | |
| */ | |
| /* OVERLAY FOR BETTER TEXT READABILITY */ | |
| .stApp::before { | |
| content: ''; | |
| position: fixed; | |
| top: 0; | |
| left: 0; | |
| width: 100%; | |
| height: 100%; | |
| background: rgba(255, 255, 255, 0.85); | |
| z-index: -1; | |
| pointer-events: none; | |
| } | |
| /* FORCE HEADER TO BE COMPLETELY FIXED AND STABLE WITH LOGO */ | |
| .main-header { | |
| position: fixed !important; | |
| top: 0 !important; | |
| left: 0 !important; | |
| right: 0 !important; | |
| width: 100% !important; | |
| background: rgba(255, 255, 255, 0.95) !important; | |
| backdrop-filter: blur(10px) !important; | |
| z-index: 99999 !important; | |
| padding: 1rem 2rem !important; | |
| border-bottom: 3px solid #bf5700 !important; | |
| box-shadow: 0 4px 8px rgba(0,0,0,0.15) !important; | |
| transform: translateZ(0) !important; | |
| will-change: auto !important; | |
| display: flex !important; | |
| align-items: center !important; | |
| justify-content: center !important; | |
| flex-direction: row !important; | |
| gap: 20px !important; | |
| } | |
| /* LOGO STYLING */ | |
| .header-logo { | |
| max-height: 70px !important; | |
| max-width: 280px !important; | |
| height: auto !important; | |
| width: auto !important; | |
| object-fit: contain !important; | |
| border-radius: 8px !important; | |
| box-shadow: 0 2px 8px rgba(0,0,0,0.1) !important; | |
| flex-shrink: 0 !important; | |
| } | |
| /* HEADER TEXT STYLING */ | |
| .header-text { | |
| font-size: 2.2rem !important; | |
| color: #bf5700 !important; /* UT Austin burnt orange */ | |
| text-align: left !important; | |
| margin: 0 !important; | |
| font-weight: bold !important; | |
| text-shadow: 1px 1px 2px rgba(0,0,0,0.1) !important; | |
| white-space: nowrap !important; | |
| } | |
| /* ADD TOP MARGIN TO MAIN CONTENT TO AVOID OVERLAP - ADJUSTED FOR HORIZONTAL LOGO */ | |
| .main > .block-container { | |
| margin-top: 120px !important; /* Reduced back to 120px for horizontal layout */ | |
| padding-top: 20px !important; | |
| background: rgba(255, 255, 255, 0.9) !important; | |
| border-radius: 10px !important; | |
| box-shadow: 0 4px 12px rgba(0,0,0,0.1) !important; | |
| backdrop-filter: blur(5px) !important; | |
| } | |
| /* RESPONSIVE DESIGN FOR MOBILE */ | |
| @media (max-width: 768px) { | |
| .header-logo { | |
| max-height: 50px !important; | |
| max-width: 200px !important; | |
| } | |
| .header-text { | |
| font-size: 1.2rem !important; | |
| } | |
| .main-header { | |
| padding: 0.8rem 1rem !important; | |
| gap: 15px !important; | |
| } | |
| .main > .block-container { | |
| margin-top: 100px !important; | |
| } | |
| } | |
| /* EXTRA SMALL MOBILE */ | |
| @media (max-width: 480px) { | |
| .main-header { | |
| flex-direction: column !important; | |
| gap: 8px !important; | |
| padding: 0.8rem 0.5rem !important; | |
| } | |
| .header-logo { | |
| max-height: 40px !important; | |
| max-width: 180px !important; | |
| } | |
| .header-text { | |
| font-size: 1.1rem !important; | |
| } | |
| .main > .block-container { | |
| margin-top: 110px !important; | |
| } | |
| } | |
| /* UT AUSTIN THEMING */ | |
| .section-header { | |
| font-size: 1.5rem; | |
| color: #bf5700; /* UT Austin burnt orange */ | |
| margin-top: 2rem; | |
| margin-bottom: 1rem; | |
| font-weight: bold; | |
| } | |
| /* SIDEBAR STYLING WITH UT THEME */ | |
| .css-1d391kg { | |
| background: rgba(255, 255, 255, 0.95) !important; | |
| backdrop-filter: blur(10px) !important; | |
| } | |
| /* ENHANCED CONTAINERS WITH UT THEMING */ | |
| .metric-card { | |
| background: linear-gradient(135deg, #bf5700, #d67700) !important; | |
| color: white !important; | |
| padding: 1rem; | |
| border-radius: 0.5rem; | |
| margin: 0.5rem 0; | |
| } | |
| .success-box { | |
| background: linear-gradient(135deg, #28a745, #20c997) !important; | |
| color: white !important; | |
| border-left: 5px solid #155724; | |
| padding: 1rem; | |
| margin: 1rem 0; | |
| border-radius: 8px; | |
| } | |
| .warning-box { | |
| background: linear-gradient(135deg, #ffc107, #fd7e14) !important; | |
| color: #212529 !important; | |
| border-left: 5px solid #856404; | |
| padding: 1rem; | |
| margin: 1rem 0; | |
| border-radius: 8px; | |
| } | |
| /* NUCLEAR OPTION: DISABLE ALL ANIMATIONS AND TRANSITIONS EVERYWHERE */ | |
| *, *::before, *::after { | |
| transition: none !important; | |
| animation: none !important; | |
| animation-duration: 0s !important; | |
| animation-delay: 0s !important; | |
| transform: none !important; | |
| } | |
| /* FORCE STABILITY ON ALL STREAMLIT ELEMENTS */ | |
| .stProgress, .stProgress > div, .stProgress * { | |
| transition: none !important; | |
| animation: none !important; | |
| transform: none !important; | |
| } | |
| .stSpinner, .stSpinner > div, .stSpinner * { | |
| transition: none !important; | |
| animation: none !important; | |
| transform: none !important; | |
| position: relative !important; | |
| } | |
| /* STABILIZE CONTAINERS */ | |
| .element-container, .stMarkdown, .stButton { | |
| transition: none !important; | |
| animation: none !important; | |
| transform: none !important; | |
| } | |
| /* PREVENT LAYOUT SHIFTS */ | |
| .main .block-container .element-container { | |
| transition: none !important; | |
| animation: none !important; | |
| } | |
| /* FORCE GPU ACCELERATION FOR STABILITY */ | |
| .main-header { | |
| transform: translate3d(0,0,0) !important; | |
| backface-visibility: hidden !important; | |
| perspective: 1000px !important; | |
| } | |
| /* HIDE STREAMLIT'S RERUN INDICATOR */ | |
| .stAppViewMain > .main > .block-container > div:first-child { | |
| visibility: hidden !important; | |
| height: 0 !important; | |
| margin: 0 !important; | |
| padding: 0 !important; | |
| } | |
| </style> | |
| """, unsafe_allow_html=True) | |
| # Initialize session state | |
| if 'data_loaded' not in st.session_state: | |
| st.session_state.data_loaded = False | |
| if 'processed_data' not in st.session_state: | |
| st.session_state.processed_data = False | |
| if 'model_loaded' not in st.session_state: | |
| st.session_state.model_loaded = False | |
| # ULTRA-OPTIMIZED BubbleSimulation class - ZERO UI UPDATES during simulation | |
| class OptimizedBubbleSimulation: | |
| """ | |
| ULTRA-OPTIMIZED version - ZERO UI updates during simulation to prevent any trembling | |
| """ | |
| def __init__(self): | |
| self.setup_parameters() | |
| def setup_parameters(self): | |
| # Fixed parameters (same as original MATLAB) | |
| self.R0 = 35e-6 | |
| self.P_inf = 101325 | |
| self.T_inf = 298.15 | |
| self.cav_type = 'LIC' | |
| # Material parameters (same as original) | |
| self.c_long = 1700 | |
| self.alpha = 0.0 | |
| self.rho = 1000 | |
| self.gamma = 0.0725 | |
| # Parameters for bubble contents (same as original) | |
| self.D0 = 24.2e-6 | |
| self.kappa = 1.4 | |
| self.Ru = 8.3144598 | |
| self.Rv = self.Ru / (18.01528e-3) | |
| self.Ra = self.Ru / (28.966e-3) | |
| self.A = 5.28e-5 | |
| self.B = 1.17e-2 | |
| self.P_ref = 1.17e11 | |
| self.T_ref = 5200 | |
| # OPTIMIZED numerical parameters for speed | |
| self.NT = 100 # Reduced from 500 to 100 (5x faster, still accurate) | |
| self.RelTol = 1e-5 # Relaxed from 1e-7 to 1e-5 (faster convergence) | |
| def run_optimized_simulation(self, G, mu, lambda_max_mean=None): | |
| """ULTRA-OPTIMIZED simulation - ZERO UI updates to prevent trembling""" | |
| from scipy.integrate import solve_ivp | |
| # Set lambdamax from loaded data or use default | |
| if lambda_max_mean is not None: | |
| self.lambdamax = lambda_max_mean | |
| print(f"Using lambda_max_mean = {self.lambdamax} from .mat file") | |
| else: | |
| self.lambdamax = 5.99 # Fallback default | |
| print(f"Warning: Using default lambda_max = {self.lambdamax}") | |
| print(f"Running ULTRA-OPTIMIZED simulation with predicted G={G:.2e} Pa, ΞΌ={mu:.4f} PaΒ·s") | |
| # Use predicted values | |
| self.G = G | |
| self.mu = mu | |
| # Setup (same as original) | |
| if self.cav_type == 'LIC': | |
| self.Rmax = self.lambdamax * self.R0 | |
| self.PA = 0 | |
| self.omega = 0 | |
| self.delta = 0 | |
| self.n = 0 | |
| if self.cav_type == 'LIC': | |
| self.Rc = self.Rmax | |
| self.Uc = np.sqrt(self.P_inf / self.rho) | |
| self.tc = self.Rmax / self.Uc | |
| self.tspan = 3 * self.tc # Reduced for speed | |
| # Calculate parameters (same as original) | |
| self.Pv = self.P_ref * np.exp(-self.T_ref / self.T_inf) | |
| self.K_inf = self.A * self.T_inf + self.B | |
| # Non-dimensional variables (same as original) | |
| self.C_star = self.c_long / self.Uc | |
| self.We = self.P_inf * self.Rc / (2 * self.gamma) | |
| self.Ca = self.P_inf / self.G | |
| self.Re = self.P_inf * self.Rc / (self.mu * self.Uc) | |
| self.fom = self.D0 / (self.Uc * self.Rc) | |
| self.chi = self.T_inf * self.K_inf / (self.P_inf * self.Rc * self.Uc) | |
| self.A_star = self.A * self.T_inf / self.K_inf | |
| self.B_star = self.B / self.K_inf | |
| self.Pv_star = self.Pv / self.P_inf | |
| self.tspan_star = self.tspan / self.tc | |
| self.Req = self.R0 / self.Rmax | |
| self.PA_star = self.PA / self.P_inf | |
| self.omega_star = self.omega * self.tc | |
| self.delta_star = self.delta / self.tc | |
| # Parameters vector (same as original) | |
| self.params = [self.NT, self.C_star, self.We, self.Ca, self.alpha, self.Re, | |
| self.Rv, self.Ra, self.kappa, self.fom, self.chi, self.A_star, | |
| self.B_star, self.Pv_star, self.Req, self.PA_star, | |
| self.omega_star, self.delta_star, self.n] | |
| # Initial conditions (same as original) | |
| R0_star = 1 | |
| U0_star = 0 | |
| Theta0 = np.zeros(self.NT) | |
| if self.cav_type == 'LIC': | |
| P0 = (self.Pv + (self.P_inf + 2 * self.gamma / self.R0 - self.Pv) * | |
| ((self.R0 / self.Rmax) ** 3)) | |
| P0_star = P0 / self.P_inf | |
| S0 = ((3 * self.alpha - 1) * (5 - 4 * self.Req - self.Req ** 4) / (2 * self.Ca) + | |
| 2 * self.alpha * (27 / 40 + self.Req ** 8 / 8 + self.Req ** 5 / 5 + | |
| self.Req ** 2 - 2 / self.Req) / self.Ca) | |
| k0 = ((1 + (self.Rv / self.Ra) * (P0_star / self.Pv_star - 1)) ** (-1)) * np.ones(self.NT) | |
| X0 = np.concatenate([[R0_star, U0_star, P0_star, S0], Theta0, k0]) | |
| print(f"State vector size: {len(X0)} (4 + {self.NT} + {self.NT})") | |
| print(f"Time span: 0 to {self.tspan_star:.4f}") | |
| # CRITICAL FIX: NO UI UPDATES AT ALL - store them for later | |
| self.simulation_status = "Starting simulation..." | |
| # ULTRA-OPTIMIZED ODE solving - NO UI updates during solving | |
| try: | |
| sol = solve_ivp( | |
| self.bubble_optimized, | |
| [0, self.tspan_star], | |
| X0, | |
| method='BDF', | |
| rtol=self.RelTol, | |
| atol=1e-8, | |
| max_step=self.tspan_star / 200, | |
| dense_output=False | |
| ) | |
| self.simulation_status = "Processing results..." | |
| except Exception as e: | |
| print(f"BDF failed: {str(e)}, trying LSODA...") | |
| self.simulation_status = "Trying backup solver..." | |
| try: | |
| sol = solve_ivp( | |
| self.bubble_optimized, | |
| [0, self.tspan_star], | |
| X0, | |
| method='LSODA', | |
| rtol=1e-4, | |
| atol=1e-7, | |
| max_step=self.tspan_star / 100, | |
| ) | |
| except Exception as e2: | |
| print(f"All solvers failed: {str(e2)}") | |
| return self.fast_fallback() | |
| if not sol.success: | |
| print(f"Solver failed: {sol.message}") | |
| return self.fast_fallback() | |
| # Extract solution | |
| t_nondim = sol.t | |
| X_nondim = sol.y.T | |
| R_nondim = X_nondim[:, 0] | |
| # Filter valid solutions | |
| valid_mask = (R_nondim > 0.01) & (R_nondim < 20) & np.isfinite(R_nondim) | |
| t_nondim = t_nondim[valid_mask] | |
| R_nondim = R_nondim[valid_mask] | |
| if len(t_nondim) < 10: | |
| print("Too few valid points, using fast fallback") | |
| return self.fast_fallback() | |
| # Back to physical units | |
| t = t_nondim * self.tc | |
| R = R_nondim * self.Rc | |
| # Change units | |
| scale = 1e4 | |
| t_newunit = t * scale | |
| R_newunit = R * scale | |
| self.simulation_status = "Simulation complete!" | |
| print(f"ULTRA-OPTIMIZED simulation completed in {len(t_newunit)} points!") | |
| print(f"Time range: {t_newunit[0]:.3f} to {t_newunit[-1]:.3f} (0.1 ms)") | |
| print(f"Radius range: {np.min(R_newunit):.3f} to {np.max(R_newunit):.3f} (0.1 mm)") | |
| return t_newunit, R_newunit | |
| def bubble_optimized(self, t, x): | |
| """ | |
| OPTIMIZED bubble physics function - same physics, no UI updates | |
| """ | |
| # Extract parameters (same as original) | |
| NT = int(self.params[0]) | |
| C_star = self.params[1] | |
| We = self.params[2] | |
| Ca = self.params[3] | |
| alpha = self.params[4] | |
| Re = self.params[5] | |
| Rv = self.params[6] | |
| Ra = self.params[7] | |
| kappa = self.params[8] | |
| fom = self.params[9] | |
| chi = self.params[10] | |
| A_star = self.params[11] | |
| B_star = self.params[12] | |
| Pv_star = self.params[13] | |
| Req = self.params[14] | |
| # Extract state variables (same as original) | |
| R = x[0] | |
| U = x[1] | |
| P = x[2] | |
| S = x[3] | |
| Theta = x[4:4 + NT] | |
| k = x[4 + NT:4 + 2 * NT] | |
| # Grid setup (same physics, fewer points) | |
| deltaY = 1 / (NT - 1) | |
| ii = np.arange(1, NT + 1) | |
| yk = (ii - 1) * deltaY | |
| # Boundary condition (same as original) | |
| k = k.copy() | |
| k[-1] = (1 + (Rv / Ra) * (P / Pv_star - 1)) ** (-1) | |
| # Calculate mixture fields (same physics) | |
| T = (A_star - 1 + np.sqrt(1 + 2 * A_star * Theta)) / A_star | |
| K_star = A_star * T + B_star | |
| Rmix = k * Rv + (1 - k) * Ra | |
| # OPTIMIZATION: Vectorized spatial derivatives for speed | |
| DTheta = np.zeros(NT) | |
| DDTheta = np.zeros(NT) | |
| Dk = np.zeros(NT) | |
| DDk = np.zeros(NT) | |
| # Neumann BC at origin | |
| DTheta[0] = 0 | |
| Dk[0] = 0 | |
| # Vectorized central differences (faster than loops) | |
| if NT >= 3: | |
| DTheta[1:-1] = (Theta[2:] - Theta[:-2]) / (2 * deltaY) | |
| Dk[1:-1] = (k[2:] - k[:-2]) / (2 * deltaY) | |
| # Backward difference at wall | |
| DTheta[-1] = (3 * Theta[-1] - 4 * Theta[-2] + Theta[-3]) / (2 * deltaY) | |
| Dk[-1] = (3 * k[-1] - 4 * k[-2] + k[-3]) / (2 * deltaY) | |
| # Laplacians (vectorized where possible) | |
| DDTheta[0] = 6 * (Theta[1] - Theta[0]) / deltaY ** 2 | |
| DDk[0] = 6 * (k[1] - k[0]) / deltaY ** 2 | |
| if NT >= 3: | |
| # Vectorized Laplacian calculation | |
| for i in range(1, NT - 1): | |
| DDTheta[i] = ((Theta[i + 1] - 2 * Theta[i] + Theta[i - 1]) / deltaY ** 2 + | |
| (2 / yk[i]) * DTheta[i] if yk[i] > 1e-12 else | |
| (Theta[i + 1] - 2 * Theta[i] + Theta[i - 1]) / deltaY ** 2) | |
| DDk[i] = ((k[i + 1] - 2 * k[i] + k[i - 1]) / deltaY ** 2 + | |
| (2 / yk[i]) * Dk[i] if yk[i] > 1e-12 else | |
| (k[i + 1] - 2 * k[i] + k[i - 1]) / deltaY ** 2) | |
| if NT >= 4: | |
| DDTheta[-1] = ((2 * Theta[-1] - 5 * Theta[-2] + 4 * Theta[-3] - Theta[-4]) / deltaY ** 2 + | |
| (2 / yk[-1]) * DTheta[-1] if yk[-1] > 1e-12 else | |
| (2 * Theta[-1] - 5 * Theta[-2] + 4 * Theta[-3] - Theta[-4]) / deltaY ** 2) | |
| DDk[-1] = ((2 * k[-1] - 5 * k[-2] + 4 * k[-3] - k[-4]) / deltaY ** 2 + | |
| (2 / yk[-1]) * Dk[-1] if yk[-1] > 1e-12 else | |
| (2 * k[-1] - 5 * k[-2] + 4 * k[-3] - k[-4]) / deltaY ** 2) | |
| # Internal pressure evolution (same physics) | |
| if Rmix[-1] > 1e-12 and (1 - k[-1]) > 1e-12 and R > 1e-12: | |
| pdot = (3 / R * (-kappa * P * U + (kappa - 1) * chi * DTheta[-1] / R + | |
| kappa * P * fom * Rv * Dk[-1] / (R * Rmix[-1] * (1 - k[-1])))) | |
| else: | |
| pdot = -3 * kappa * P * U / R if R > 1e-12 else 0 | |
| # OPTIMIZATION: Vectorized mixture velocity calculation | |
| Umix = np.zeros(NT) | |
| valid_indices = (Rmix > 1e-12) & (kappa * P > 1e-12) | |
| if np.any(valid_indices): | |
| idx = np.where(valid_indices)[0] | |
| Umix[idx] = (((kappa - 1) * chi / R * DTheta[idx] - R * yk[idx] * pdot / 3) / (kappa * P) + | |
| fom / R * (Rv - Ra) / Rmix[idx] * Dk[idx]) | |
| # Temperature evolution (vectorized where possible) | |
| Theta_prime = np.zeros(NT) | |
| valid_P = P > 1e-12 | |
| if valid_P: | |
| for i in range(NT - 1): # Exclude wall point | |
| if Rmix[i] > 1e-12: | |
| Theta_prime[i] = ( | |
| (pdot + DDTheta[i] * chi / R ** 2) * (K_star[i] * T[i] / P * (kappa - 1) / kappa) - | |
| DTheta[i] * (Umix[i] - yk[i] * U) / R + | |
| fom / R ** 2 * (Rv - Ra) / Rmix[i] * Dk[i] * DTheta[i]) | |
| Theta_prime[-1] = 0 # Dirichlet BC | |
| # Vapor concentration evolution (vectorized where possible) | |
| k_prime = np.zeros(NT) | |
| for i in range(NT - 1): # Exclude wall point | |
| if Rmix[i] > 1e-12 and T[i] > 1e-12: | |
| term1 = fom / R ** 2 * (DDk[i] + Dk[i] * (-((Rv - Ra) / Rmix[i]) * Dk[i] - | |
| DTheta[i] / np.sqrt(1 + 2 * A_star * Theta[i]) / T[i])) | |
| term2 = -(Umix[i] - U * yk[i]) / R * Dk[i] | |
| k_prime[i] = term1 + term2 | |
| k_prime[-1] = 0 # Dirichlet BC | |
| # Elastic stress evolution (same physics) | |
| if self.cav_type == 'LIC': | |
| if Req > 1e-12: | |
| Rst = R / Req | |
| if Rst > 1e-12: | |
| Sdot = (2 * U / R * (3 * alpha - 1) * (1 / Rst + 1 / Rst ** 4) / Ca - | |
| 2 * alpha * U / R * (1 / Rst ** 8 + 1 / Rst ** 5 + 2 / Rst ** 2 + 2 * Rst) / Ca) | |
| else: | |
| Sdot = 0 | |
| else: | |
| Sdot = 0 | |
| # Keller-Miksis equations (same physics) | |
| rdot = U | |
| if R > 1e-12: | |
| numerator = ((1 + U / C_star) * (P - 1 / (We * R) + S - 4 * U / (Re * R) - 1) + | |
| R / C_star * (pdot + U / (We * R ** 2) + Sdot + 4 * U ** 2 / (Re * R ** 2)) - | |
| (3 / 2) * (1 - U / (3 * C_star)) * U ** 2) | |
| denominator = (1 - U / C_star) * R + 4 / (C_star * Re) | |
| if abs(denominator) > 1e-12: | |
| udot = numerator / denominator | |
| else: | |
| udot = 0 | |
| else: | |
| udot = 0 | |
| # Return derivatives | |
| dxdt = np.concatenate([[rdot, udot, pdot, Sdot], Theta_prime, k_prime]) | |
| return dxdt | |
| def fast_fallback(self): | |
| """Faster fallback for validation""" | |
| print("Using fast analytical approximation for validation") | |
| # Quick analytical approximation based on Rayleigh-Plesset equation | |
| t_nondim = np.linspace(0, 3, 200) # Fewer points for speed | |
| # Simple damped oscillation model using actual parameters | |
| if hasattr(self, 'P_inf') and hasattr(self, 'rho') and hasattr(self, 'lambdamax'): | |
| Rmax = self.lambdamax * self.R0 # Use dynamic lambda value | |
| omega_natural = np.sqrt(3 * self.P_inf / (self.rho * Rmax ** 2)) | |
| damping = self.mu / (self.rho * Rmax ** 2) if hasattr(self, 'mu') else 0.1 | |
| else: | |
| omega_natural = 1000 | |
| damping = 0.1 | |
| # Analytical solution approximation | |
| omega_d = omega_natural * np.sqrt(1 - damping ** 2) if damping < 1 else omega_natural | |
| decay = np.exp(-damping * omega_natural * t_nondim * self.tc) | |
| R_nondim = self.Req + (1 - self.Req) * decay * np.cos(omega_d * t_nondim * self.tc) | |
| R_nondim = np.maximum(R_nondim, 0.05) # Prevent negative values | |
| # Convert to physical units | |
| t = t_nondim * self.tc | |
| R = R_nondim * self.Rc | |
| scale = 1e4 | |
| return t * scale, R * scale | |
| # Main Streamlit App | |
| def main(): | |
| # Header - ULTRA-STABLE with fixed positioning and UT Austin branding + Logo | |
| st.markdown(""" | |
| <div class="main-header"> | |
| <img src="https://huggingface.co/spaces/lehuaaa/Bubble/resolve/main/src/group_logo.JPG" | |
| alt="YANG Research Group Logo" | |
| class="header-logo"> | |
| <h1 class="header-text">Bubble Dynamics Analysis</h1> | |
| </div> | |
| """, unsafe_allow_html=True) | |
| # Initialize current page in session state | |
| if 'current_page' not in st.session_state: | |
| st.session_state.current_page = "π Home" | |
| # Sidebar for navigation with clickable menu | |
| st.sidebar.title("π Navigation") | |
| # Create clickable menu buttons | |
| menu_items = [ | |
| "π Home", | |
| "π Data Loading", | |
| "βοΈ Data Processing", | |
| "π€ ML Prediction", | |
| "β Validation", | |
| "π Results & Export" | |
| ] | |
| # Display menu buttons | |
| for item in menu_items: | |
| # Check if this is the current page to highlight it | |
| if st.session_state.current_page == item: | |
| # Use different styling for active page | |
| if st.sidebar.button(f"βΆοΈ {item}", key=f"nav_{item}", use_container_width=True): | |
| st.session_state.current_page = item | |
| st.rerun() | |
| else: | |
| if st.sidebar.button(f" {item}", key=f"nav_{item}", use_container_width=True): | |
| st.session_state.current_page = item | |
| st.rerun() | |
| # Add some spacing | |
| st.sidebar.markdown("---") | |
| # Show current status in sidebar | |
| st.sidebar.markdown("### π Status") | |
| if st.session_state.data_loaded: | |
| st.sidebar.success("β Data loaded") | |
| else: | |
| st.sidebar.info("π No data loaded") | |
| if st.session_state.processed_data: | |
| st.sidebar.success("β Data processed") | |
| else: | |
| st.sidebar.info("βοΈ Data not processed") | |
| if st.session_state.model_loaded: | |
| st.sidebar.success("β Model loaded") | |
| else: | |
| st.sidebar.info("π€ No model loaded") | |
| # Display the selected page | |
| page = st.session_state.current_page | |
| if page == "π Home": | |
| show_home() | |
| elif page == "π Data Loading": | |
| show_data_loading() | |
| elif page == "βοΈ Data Processing": | |
| show_data_processing() | |
| elif page == "π€ ML Prediction": | |
| show_ml_prediction() | |
| elif page == "β Validation": | |
| show_validation() | |
| elif page == "π Results & Export": | |
| show_results() | |
| def show_home(): | |
| """Home page with overview""" | |
| col1, col2 = st.columns([2, 1]) | |
| with col1: | |
| st.markdown(""" | |
| ### Welcome to the YANG Research Group Bubble Dynamics Analysis Platform | |
| **The University of Texas at Austin - Aerospace Engineering and Engineering Mechanics** | |
| **Cockrell School of Engineering** | |
| This advanced web application provides comprehensive tools for analyzing bubble dynamics data: | |
| **Features:** | |
| - π **Data Loading**: Upload and analyze .mat files containing bubble dynamics data | |
| - βοΈ **Data Processing**: Interpolate and process experimental data | |
| - π€ **ML Prediction**: Use machine learning to predict material properties (G & ΞΌ) | |
| - β **Validation**: Compare experimental vs simulated bubble behavior | |
| - π **Export**: Download processed results and visualizations | |
| **Getting Started:** | |
| 1. Navigate to "Data Loading" to upload your .mat file | |
| 2. Process your data in "Data Processing" | |
| 3. Use ML models in "ML Prediction" | |
| 4. Validate results in "Validation" | |
| 5. Export your findings in "Results & Export" | |
| """) | |
| with col2: | |
| if TENSORFLOW_AVAILABLE: | |
| st.success(""" | |
| **β System Status: Full Features Available** | |
| β Core Features: Ready | |
| β Data Processing: Ready | |
| β Visualization: Ready | |
| β ML Models: Ready | |
| β Simulations: Ready | |
| """) | |
| else: | |
| st.warning(""" | |
| **β οΈ System Status: Limited Features** | |
| β Core Features: Ready | |
| β Data Processing: Ready | |
| β Visualization: Ready | |
| β ML Models: TensorFlow not installed | |
| β Simulations: Ready | |
| π‘ Install TensorFlow to enable ML predictions: | |
| `pip install tensorflow-cpu` | |
| """) | |
| # Show current session state | |
| if st.session_state.data_loaded: | |
| st.success("π Data loaded successfully") | |
| if st.session_state.processed_data: | |
| st.success("βοΈ Data processed") | |
| if st.session_state.model_loaded: | |
| st.success("π€ ML model loaded") | |
| def show_data_loading(): | |
| """Data loading interface""" | |
| st.markdown('<h2 class="section-header">π Data Loading</h2>', unsafe_allow_html=True) | |
| uploaded_file = st.file_uploader( | |
| "Upload your .mat file", | |
| type=['mat'], | |
| help="Upload a MATLAB .mat file containing 'R_nondim_All', 't_nondim_All', and 'lambda_max_mean'" | |
| ) | |
| if uploaded_file is not None: | |
| try: | |
| # Save uploaded file temporarily | |
| with tempfile.NamedTemporaryFile(delete=False, suffix='.mat') as tmp_file: | |
| tmp_file.write(uploaded_file.getvalue()) | |
| tmp_file_path = tmp_file.name | |
| # Load the .mat file | |
| data = loadmat(tmp_file_path) | |
| # Check required variables | |
| required_vars = ['R_nondim_All', 't_nondim_All'] | |
| missing_vars = [var for var in required_vars if var not in data] | |
| if missing_vars: | |
| st.error(f"Missing required variables: {missing_vars}") | |
| return | |
| # Extract data | |
| R_nondim_all = data['R_nondim_All'] | |
| t_nondim_all = data['t_nondim_All'] | |
| num_datasets = R_nondim_all.shape[1] | |
| # Extract lambda_max_mean | |
| if 'lambda_max_mean' in data: | |
| lambda_max_mean = float(data['lambda_max_mean']) | |
| else: | |
| st.warning("lambda_max_mean not found in file. Using default value 5.99") | |
| lambda_max_mean = 5.99 | |
| # Store in session state | |
| st.session_state.data = data | |
| st.session_state.R_nondim_all = R_nondim_all | |
| st.session_state.t_nondim_all = t_nondim_all | |
| st.session_state.lambda_max_mean = lambda_max_mean | |
| st.session_state.num_datasets = num_datasets | |
| st.session_state.data_loaded = True | |
| # Calculate physical parameters | |
| R0_sim = 35e-6 | |
| P_inf_exp = 101325 | |
| rho_exp = 1000 | |
| Rmax_exp = lambda_max_mean * R0_sim | |
| Rc_exp = Rmax_exp | |
| Uc_exp = np.sqrt(P_inf_exp / rho_exp) | |
| tc_exp = Rc_exp / Uc_exp | |
| st.session_state.physical_params = { | |
| 'Rmax_exp': Rmax_exp, | |
| 'Rc_exp': Rc_exp, | |
| 'Uc_exp': Uc_exp, | |
| 'tc_exp': tc_exp | |
| } | |
| # Display success message | |
| st.markdown(f""" | |
| <div class="success-box"> | |
| <strong>β Data loaded successfully!</strong><br> | |
| π Datasets found: {num_datasets}<br> | |
| π― Lambda max: {lambda_max_mean:.3f}<br> | |
| π Physical parameters calculated | |
| </div> | |
| """, unsafe_allow_html=True) | |
| # Show data preview | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| st.subheader("π Data Overview") | |
| st.write(f"**Number of datasets:** {num_datasets}") | |
| st.write(f"**Lambda max mean:** {lambda_max_mean:.3f}") | |
| st.write(f"**Data shape:** {R_nondim_all.shape}") | |
| with col2: | |
| st.subheader("π§ Physical Parameters") | |
| st.write(f"**R_max:** {Rmax_exp * 1e6:.1f} ΞΌm") | |
| st.write(f"**Time scale:** {tc_exp * 1e6:.1f} ΞΌs") | |
| st.write(f"**Velocity scale:** {Uc_exp:.1f} m/s") | |
| # Clean up temporary file | |
| os.unlink(tmp_file_path) | |
| except Exception as e: | |
| st.error(f"Error loading file: {str(e)}") | |
| def show_data_processing(): | |
| """Data processing interface""" | |
| st.markdown('<h2 class="section-header">βοΈ Data Processing</h2>', unsafe_allow_html=True) | |
| if not st.session_state.data_loaded: | |
| st.warning("Please load data first in the 'Data Loading' section.") | |
| return | |
| # Dataset selection | |
| dataset_idx = st.selectbox( | |
| "Select dataset to process:", | |
| range(st.session_state.num_datasets), | |
| format_func=lambda x: f"Dataset {x + 1}" | |
| ) | |
| # Processing parameters | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| interp_range = st.number_input( | |
| "Interpolation Range", | |
| min_value=0.1, | |
| max_value=2.0, | |
| value=0.8, | |
| step=0.1, | |
| help="Time range for interpolation" | |
| ) | |
| with col2: | |
| time_step = st.number_input( | |
| "Time Step", | |
| min_value=0.001, | |
| max_value=0.1, | |
| value=0.008, | |
| step=0.001, | |
| format="%.3f", | |
| help="Time step for interpolation" | |
| ) | |
| if st.button("π Process Data", type="primary"): | |
| with st.spinner("Processing data..."): | |
| try: | |
| # Extract data for selected dataset | |
| R_nondim_exp = np.array(st.session_state.R_nondim_all[0, dataset_idx]).flatten() | |
| t_nondim_exp = np.array(st.session_state.t_nondim_all[0, dataset_idx]).flatten() | |
| # Find zero index | |
| zero_candidates = np.where(np.abs(t_nondim_exp) < 1e-10)[0] | |
| if len(zero_candidates) > 0: | |
| zero_idx = zero_candidates[0] | |
| else: | |
| zero_idx = np.argmin(np.abs(t_nondim_exp)) | |
| # Convert to physical units | |
| tc_exp = st.session_state.physical_params['tc_exp'] | |
| Rc_exp = st.session_state.physical_params['Rc_exp'] | |
| t_exp = t_nondim_exp * tc_exp | |
| R_exp = R_nondim_exp * Rc_exp | |
| # Process from zero point | |
| t_fromzero = t_exp[zero_idx:].flatten() | |
| R_frommax = R_exp[zero_idx:].flatten() | |
| # Scale to new units | |
| scale_exp = 1e4 | |
| t_newunit_exp = t_fromzero * scale_exp | |
| R_newunit_exp = R_frommax * scale_exp | |
| # Sort data | |
| sort_indices = np.argsort(t_newunit_exp) | |
| t_newunit_exp = t_newunit_exp[sort_indices] | |
| R_newunit_exp = R_newunit_exp[sort_indices] | |
| # Interpolate | |
| t_interp_newunit = np.arange(0, interp_range + time_step, time_step) | |
| pchip_interpolator = PchipInterpolator(t_newunit_exp, R_newunit_exp) | |
| R_interp_newunit = pchip_interpolator(t_interp_newunit) | |
| # Store results | |
| st.session_state.t_interp_newunit = t_interp_newunit | |
| st.session_state.R_interp_newunit = R_interp_newunit | |
| st.session_state.t_original = t_newunit_exp | |
| st.session_state.R_original = R_newunit_exp | |
| st.session_state.processed_data = True | |
| st.session_state.selected_dataset = dataset_idx | |
| st.success(f"β Data processed successfully! {len(R_interp_newunit)} interpolated points created.") | |
| except Exception as e: | |
| st.error(f"Processing failed: {str(e)}") | |
| # Show results if data is processed | |
| if st.session_state.processed_data: | |
| st.subheader("π Processing Results") | |
| # Create plot | |
| fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 5)) | |
| # Original vs interpolated | |
| ax1.plot(st.session_state.t_original, st.session_state.R_original, 'b-', linewidth=2, label='Original Data') | |
| ax1.plot(st.session_state.t_interp_newunit, st.session_state.R_interp_newunit, 'ro', markersize=3, | |
| label='Interpolated Points') | |
| ax1.set_xlabel('Time (0.1 ms)') | |
| ax1.set_ylabel('Radius (0.1 mm)') | |
| ax1.set_title('Original vs Interpolated Data') | |
| ax1.grid(True, alpha=0.3) | |
| ax1.legend() | |
| # Interpolated data only | |
| ax2.plot(st.session_state.t_interp_newunit, st.session_state.R_interp_newunit, 'ro', markersize=4) | |
| ax2.set_xlabel('Time (0.1 ms)') | |
| ax2.set_ylabel('Radius (0.1 mm)') | |
| ax2.set_title('Interpolated R-t Curve') | |
| ax2.grid(True, alpha=0.3) | |
| plt.tight_layout() | |
| st.pyplot(fig) | |
| # Download processed data | |
| if st.button("πΎ Download Processed Data"): | |
| # Create download data | |
| data_str = ' '.join([f'{val:.6f}' for val in st.session_state.R_interp_newunit[:-1]]) | |
| st.download_button( | |
| label="π₯ Download as TXT", | |
| data=data_str, | |
| file_name=f"interpolated_data_dataset_{dataset_idx + 1}.txt", | |
| mime="text/plain" | |
| ) | |
| def show_ml_prediction(): | |
| """ML prediction interface - MODIFIED FOR FILE UPLOAD""" | |
| st.markdown('<h2 class="section-header">π€ ML Prediction</h2>', unsafe_allow_html=True) | |
| if not TENSORFLOW_AVAILABLE: | |
| st.error("β **TensorFlow not available.** ML prediction features are disabled.") | |
| with st.expander("π§ How to Enable ML Features", expanded=True): | |
| st.markdown(""" | |
| **To enable ML predictions:** | |
| 1. **Install TensorFlow:** | |
| ```bash | |
| pip install tensorflow-cpu # Recommended (smaller) | |
| # or | |
| pip install tensorflow # Full version | |
| ``` | |
| 2. **Restart the web app:** | |
| - Stop the app (Ctrl+C in terminal) | |
| - Run: `streamlit run streamlit_bubble_app.py` | |
| - Refresh your browser | |
| """) | |
| return | |
| if not st.session_state.processed_data: | |
| st.warning("Please process data first in the 'Data Processing' section.") | |
| return | |
| # Model file upload section (MODIFIED) | |
| st.subheader("π Upload ML Model") | |
| col1, col2 = st.columns([2, 1]) | |
| with col1: | |
| st.markdown("**Upload your trained model files:**") | |
| # Primary model file upload | |
| uploaded_model = st.file_uploader( | |
| "Upload model file (.h5, .keras, or .zip for SavedModel)", | |
| type=['h5', 'keras', 'zip'], | |
| help="Upload your trained model in H5, Keras, or ZIP format (for SavedModel)", | |
| key="model_file_upload" | |
| ) | |
| # Optional config file upload | |
| uploaded_config = st.file_uploader( | |
| "Upload model config (optional)", | |
| type=['npy'], | |
| help="Upload model_config.npy if available", | |
| key="config_file_upload" | |
| ) | |
| with col2: | |
| if st.button("π Model Format Help"): | |
| st.info(""" | |
| **Supported model formats:** | |
| **π― H5 Format (.h5)** - Recommended | |
| - Single file containing model architecture and weights | |
| - Most compatible format | |
| **β‘ Keras Format (.keras)** | |
| - Native Keras 3.0 format | |
| - Single file format | |
| **π¦ SavedModel (.zip)** | |
| - Zip the entire SavedModel folder | |
| - Should contain: saved_model.pb, variables/, assets/ | |
| **βοΈ Config File (.npy)** - Optional | |
| - Contains model configuration metadata | |
| - Helps with model information display | |
| """) | |
| # Model loading with uploaded files (MODIFIED) | |
| if uploaded_model is not None and st.button("π Load Uploaded Model", type="primary"): | |
| with st.spinner("Loading uploaded ML model..."): | |
| try: | |
| # Create temporary directory for uploaded files | |
| temp_dir = tempfile.mkdtemp() | |
| model_loaded = False | |
| loading_method = "Unknown" | |
| model = None | |
| # Get file extension | |
| file_extension = uploaded_model.name.split('.')[-1].lower() | |
| # Define custom layers exactly matching your training code | |
| class CustomMultiHeadAttention(layers.Layer): | |
| def __init__(self, embed_dim, num_heads=8, **kwargs): | |
| super(CustomMultiHeadAttention, self).__init__(**kwargs) | |
| self.embed_dim = embed_dim | |
| self.num_heads = num_heads | |
| self.projection_dim = embed_dim // num_heads | |
| self.query_dense = layers.Dense(embed_dim) | |
| self.key_dense = layers.Dense(embed_dim) | |
| self.value_dense = layers.Dense(embed_dim) | |
| self.combine_heads = layers.Dense(embed_dim) | |
| def attention(self, query, key, value): | |
| score = tf.matmul(query, key, transpose_b=True) | |
| dim_key = tf.cast(tf.shape(key)[-1], tf.float32) | |
| scaled_score = score / tf.math.sqrt(dim_key) | |
| weights = tf.nn.softmax(scaled_score, axis=-1) | |
| output = tf.matmul(weights, value) | |
| return output, weights | |
| def separate_heads(self, x, batch_size): | |
| x = tf.reshape(x, (batch_size, -1, self.num_heads, self.projection_dim)) | |
| return tf.transpose(x, perm=[0, 2, 1, 3]) | |
| def call(self, inputs): | |
| batch_size = tf.shape(inputs)[0] | |
| query = self.query_dense(inputs) | |
| key = self.key_dense(inputs) | |
| value = self.value_dense(inputs) | |
| query = self.separate_heads(query, batch_size) | |
| key = self.separate_heads(key, batch_size) | |
| value = self.separate_heads(value, batch_size) | |
| attention, weights = self.attention(query, key, value) | |
| attention = tf.transpose(attention, perm=[0, 2, 1, 3]) | |
| concat_attention = tf.reshape(attention, (batch_size, -1, self.embed_dim)) | |
| output = self.combine_heads(concat_attention) | |
| return output | |
| def get_config(self): | |
| config = super(CustomMultiHeadAttention, self).get_config() | |
| config.update({ | |
| 'embed_dim': self.embed_dim, | |
| 'num_heads': self.num_heads, | |
| }) | |
| return config | |
| def from_config(cls, config): | |
| return cls(**config) | |
| class CustomTransformerEncoderLayer(layers.Layer): | |
| def __init__(self, embed_dim, num_heads, ff_dim, rate=0.1, **kwargs): | |
| super(CustomTransformerEncoderLayer, self).__init__(**kwargs) | |
| self.embed_dim = embed_dim | |
| self.num_heads = num_heads | |
| self.ff_dim = ff_dim | |
| self.rate = rate | |
| self.att = CustomMultiHeadAttention(embed_dim, num_heads) | |
| self.ffn = keras.Sequential( | |
| [layers.Dense(ff_dim, activation="softplus"), layers.Dense(embed_dim)]) | |
| self.layernorm1 = layers.LayerNormalization(epsilon=1e-6) | |
| self.layernorm2 = layers.LayerNormalization(epsilon=1e-6) | |
| self.dropout1 = layers.Dropout(rate) | |
| self.dropout2 = layers.Dropout(rate) | |
| def call(self, inputs, training): | |
| attn_output = self.att(inputs) | |
| attn_output = self.dropout1(attn_output, training=training) | |
| out1 = self.layernorm1(inputs + attn_output) | |
| ffn_output = self.ffn(out1) | |
| ffn_output = self.dropout2(ffn_output, training=training) | |
| return self.layernorm2(out1 + ffn_output) | |
| def get_config(self): | |
| config = super(CustomTransformerEncoderLayer, self).get_config() | |
| config.update({ | |
| 'embed_dim': self.embed_dim, | |
| 'num_heads': self.num_heads, | |
| 'ff_dim': self.ff_dim, | |
| 'rate': self.rate, | |
| }) | |
| return config | |
| def from_config(cls, config): | |
| return cls(**config) | |
| # Custom objects for model loading | |
| custom_objects = { | |
| 'CustomMultiHeadAttention': CustomMultiHeadAttention, | |
| 'CustomTransformerEncoderLayer': CustomTransformerEncoderLayer | |
| } | |
| # Handle different file formats | |
| if file_extension == 'h5': | |
| # Handle H5 format | |
| model_path = os.path.join(temp_dir, uploaded_model.name) | |
| with open(model_path, 'wb') as f: | |
| f.write(uploaded_model.getvalue()) | |
| try: | |
| model = keras.models.load_model(model_path, custom_objects=custom_objects) | |
| loading_method = "H5 format (uploaded)" | |
| model_loaded = True | |
| st.success("β Loaded H5 model successfully") | |
| except Exception as e: | |
| st.error(f"H5 loading failed: {str(e)}") | |
| elif file_extension == 'keras': | |
| # Handle Keras format | |
| model_path = os.path.join(temp_dir, uploaded_model.name) | |
| with open(model_path, 'wb') as f: | |
| f.write(uploaded_model.getvalue()) | |
| try: | |
| model = keras.models.load_model(model_path, custom_objects=custom_objects) | |
| loading_method = "Keras format (uploaded)" | |
| model_loaded = True | |
| st.success("β Loaded Keras model successfully") | |
| except Exception as e: | |
| st.error(f"Keras loading failed: {str(e)}") | |
| elif file_extension == 'zip': | |
| # Handle SavedModel ZIP format | |
| import zipfile | |
| zip_path = os.path.join(temp_dir, uploaded_model.name) | |
| with open(zip_path, 'wb') as f: | |
| f.write(uploaded_model.getvalue()) | |
| # Extract ZIP file | |
| extract_dir = os.path.join(temp_dir, 'extracted_model') | |
| with zipfile.ZipFile(zip_path, 'r') as zip_ref: | |
| zip_ref.extractall(extract_dir) | |
| # Look for SavedModel directory | |
| savedmodel_dirs = [] | |
| for root, dirs, files in os.walk(extract_dir): | |
| if 'saved_model.pb' in files: | |
| savedmodel_dirs.append(root) | |
| if savedmodel_dirs: | |
| try: | |
| model = keras.models.load_model(savedmodel_dirs[0], custom_objects=custom_objects) | |
| loading_method = "SavedModel format (uploaded ZIP)" | |
| model_loaded = True | |
| st.success("β Loaded SavedModel successfully") | |
| except Exception as e: | |
| # Try TFSMLayer as fallback | |
| try: | |
| model = layers.TFSMLayer(savedmodel_dirs[0], call_endpoint='serving_default') | |
| loading_method = "TFSMLayer (uploaded ZIP fallback)" | |
| model_loaded = True | |
| st.success("β Loaded using TFSMLayer") | |
| except Exception as e2: | |
| st.error(f"SavedModel loading failed: {str(e)}, TFSMLayer failed: {str(e2)}") | |
| else: | |
| st.error("β No SavedModel found in ZIP file. Ensure ZIP contains saved_model.pb") | |
| if not model_loaded: | |
| st.error("β Failed to load uploaded model") | |
| # Show debug information | |
| st.subheader("π Debug Information") | |
| st.write(f"**File name:** {uploaded_model.name}") | |
| st.write(f"**File size:** {len(uploaded_model.getvalue()):,} bytes") | |
| st.write(f"**File extension:** {file_extension}") | |
| st.info(""" | |
| **Troubleshooting:** | |
| - Ensure your model is saved in a compatible format | |
| - For H5: Use `model.save('model.h5', save_format='h5')` | |
| - For Keras: Use `model.save('model.keras')` | |
| - For SavedModel: ZIP the entire SavedModel folder | |
| - Verify custom layers are properly saved | |
| """) | |
| return | |
| # Store in session state | |
| st.session_state.loaded_model = model | |
| st.session_state.model_name = uploaded_model.name | |
| st.session_state.model_loaded = True | |
| st.session_state.loading_method = loading_method | |
| st.session_state.temp_model_dir = temp_dir | |
| # Load config file if provided | |
| model_config = None | |
| if uploaded_config is not None: | |
| try: | |
| config_path = os.path.join(temp_dir, uploaded_config.name) | |
| with open(config_path, 'wb') as f: | |
| f.write(uploaded_config.getvalue()) | |
| model_config = np.load(config_path, allow_pickle=True).item() | |
| st.session_state.model_config = model_config | |
| st.success("β Config file loaded successfully") | |
| except Exception as e: | |
| st.warning(f"Config file loading failed: {str(e)}") | |
| # Display model info | |
| st.success(f"β Model uploaded and loaded successfully!") | |
| with st.expander("π Model Information", expanded=False): | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| st.write(f"**Model file:** {uploaded_model.name}") | |
| st.write(f"**Loading method:** {loading_method}") | |
| st.write(f"**Model type:** {type(model).__name__}") | |
| st.write(f"**File size:** {len(uploaded_model.getvalue()):,} bytes") | |
| with col2: | |
| try: | |
| if hasattr(model, 'count_params'): | |
| total_params = model.count_params() | |
| st.write(f"**Total parameters:** {total_params:,}") | |
| if hasattr(model, 'input_shape'): | |
| st.write(f"**Input shape:** {model.input_shape}") | |
| elif hasattr(model, 'input_spec'): | |
| st.write(f"**Input spec:** Available") | |
| # Show config info if available | |
| if model_config: | |
| st.write(f"**Sequence length:** {model_config.get('sequence_length', 'Unknown')}") | |
| st.write(f"**Model type:** {model_config.get('model_type', 'Unknown')}") | |
| except Exception as config_error: | |
| st.write("**Configuration:** Unable to read") | |
| except Exception as e: | |
| st.error(f"β Failed to load uploaded model: {str(e)}") | |
| with st.expander("π Error Details"): | |
| st.write(f"**Error type:** {type(e).__name__}") | |
| st.write(f"**Error message:** {str(e)}") | |
| st.write(f"**File name:** {uploaded_model.name if uploaded_model else 'None'}") | |
| # Show current model status | |
| if st.session_state.model_loaded: | |
| method = st.session_state.get('loading_method', 'Unknown method') | |
| model_name = st.session_state.get('model_name', 'Unknown model') | |
| st.success(f"π€ **Model Ready:** `{model_name}` ({method})") | |
| # Input file selection and prediction (UNCHANGED) | |
| if st.session_state.model_loaded: | |
| st.subheader("π₯ Input Data") | |
| col1, col2 = st.columns([2, 1]) | |
| # File uploader for input data | |
| uploaded_input = st.file_uploader( | |
| "Upload input data file", | |
| type=['txt'], | |
| help="Upload a text file with R-t curve data" | |
| ) | |
| # Use current data button | |
| if st.button("π Use Current Processed Data"): | |
| if st.session_state.processed_data and 'R_interp_newunit' in st.session_state: | |
| temp_data = ' '.join([f'{val:.6f}' for val in st.session_state.R_interp_newunit[:-1]]) | |
| st.session_state.temp_input_data = temp_data | |
| st.session_state.using_current_data = True | |
| st.success("β Current interpolated data ready for prediction") | |
| else: | |
| st.error("No processed data available. Please process data first.") | |
| # Prediction interface | |
| st.subheader("π― Make Predictions") | |
| if st.button("π Predict G & ΞΌ", type="primary"): | |
| # Determine input data source | |
| input_data = None | |
| if st.session_state.get('using_current_data', False) and 'temp_input_data' in st.session_state: | |
| try: | |
| input_values = [float(x) for x in st.session_state.temp_input_data.split()] | |
| input_data = np.array(input_values) | |
| st.info("Using current processed data for prediction") | |
| except Exception as e: | |
| st.error(f"Error processing current data: {e}") | |
| return | |
| elif uploaded_input is not None: | |
| try: | |
| input_data = np.loadtxt(io.StringIO(uploaded_input.getvalue().decode())) | |
| st.info("Using uploaded file for prediction") | |
| except Exception as e: | |
| st.error(f"Error reading uploaded file: {e}") | |
| return | |
| else: | |
| st.error("Please select input data: use current processed data or upload a file") | |
| return | |
| # Run prediction (exactly matching your training code format) - UNCHANGED | |
| with st.spinner("Running ML prediction..."): | |
| try: | |
| # Process input data exactly as in training | |
| test_input_curves = input_data | |
| if test_input_curves.ndim == 1: | |
| test_input_curves = test_input_curves.reshape(1, -1) | |
| # Ensure input size is 100 (matching your training) | |
| if test_input_curves.shape[1] != 100: | |
| if test_input_curves.shape[1] > 100: | |
| test_input_curves = test_input_curves[:, :100] | |
| else: | |
| padding = np.zeros((test_input_curves.shape[0], 100 - test_input_curves.shape[1])) | |
| test_input_curves = np.concatenate([test_input_curves, padding], axis=1) | |
| # Reshape for model input (matching training: sequence_length, 1) | |
| test_input_curves = test_input_curves.reshape(-1, 100, 1) | |
| # Position inputs for transformer (exactly from training) | |
| position_inputs = np.arange(100) | |
| test_position_inputs = np.tile(position_inputs, (test_input_curves.shape[0], 1)) | |
| # Make prediction | |
| start_time = time.time() | |
| # Handle different model types | |
| if st.session_state.loading_method == "TFSMLayer (uploaded ZIP fallback)": | |
| # For TFSMLayer, call directly | |
| predictions = st.session_state.loaded_model([test_input_curves, test_position_inputs]) | |
| if isinstance(predictions, dict): | |
| # Extract from TFSMLayer output dictionary | |
| predictions_g = predictions.get('g_output', | |
| predictions.get('output_1', list(predictions.values())[0])) | |
| predictions_mu = predictions.get('mu_output', | |
| predictions.get('output_2', list(predictions.values())[1])) | |
| else: | |
| predictions_g, predictions_mu = predictions | |
| else: | |
| # Standard model prediction (matching training) | |
| predictions_g, predictions_mu = st.session_state.loaded_model.predict( | |
| [test_input_curves, test_position_inputs]) | |
| prediction_time = time.time() - start_time | |
| # Process predictions (exactly from desktop GUI) | |
| num_samples = 1 | |
| pred_G = predictions_g[:num_samples] | |
| pred_mu = predictions_mu[:num_samples] | |
| # Apply scaling (exactly matching training scaling) | |
| pred_G_scaled = 10 ** (pred_G * (6 - 3) + 6 - 3) | |
| pred_mu_scaled = 10 ** (pred_mu * (0 + 3) - 3) | |
| # Store results | |
| st.session_state.pred_G = pred_G_scaled | |
| st.session_state.pred_mu = pred_mu_scaled | |
| # Extract values for display | |
| G_value = st.session_state.pred_G[0][0] if st.session_state.pred_G.ndim > 1 else \ | |
| st.session_state.pred_G[0] | |
| mu_value = st.session_state.pred_mu[0][0] if st.session_state.pred_mu.ndim > 1 else \ | |
| st.session_state.pred_mu[0] | |
| # Display results | |
| col1, col2, col3 = st.columns(3) | |
| with col1: | |
| st.metric( | |
| label="Shear Modulus (G)", | |
| value=f"{G_value:.2e} Pa", | |
| help="Predicted shear modulus of the material" | |
| ) | |
| with col2: | |
| st.metric( | |
| label="Viscosity (ΞΌ)", | |
| value=f"{mu_value:.4f} PaΒ·s", | |
| help="Predicted viscosity of the material" | |
| ) | |
| with col3: | |
| st.metric( | |
| label="Prediction Time", | |
| value=f"{prediction_time:.3f} s", | |
| help="Time taken for ML inference" | |
| ) | |
| # Show detailed results | |
| result_text = f"G: {G_value:.2e} Pa, ΞΌ: {mu_value:.4f}" | |
| st.success(f"π **Prediction Results:** {result_text}") | |
| detailed_results = f"""**Prediction completed successfully!** | |
| **Shear Modulus (G):** {G_value:.2e} Pa | |
| **Viscosity (ΞΌ):** {mu_value:.4f} PaΒ·s | |
| **Prediction Time:** {prediction_time:.4f} seconds ({prediction_time * 1000:.2f} ms) | |
| Ready for validation simulation!""" | |
| st.info(detailed_results) | |
| # Enable validation | |
| st.session_state.validation_ready = True | |
| except Exception as e: | |
| st.error(f"β Prediction failed: {str(e)}") | |
| with st.expander("π Debug Information"): | |
| st.write(f"**Error:** {str(e)}") | |
| st.write(f"**Model type:** {type(st.session_state.loaded_model).__name__}") | |
| st.write(f"**Loading method:** {st.session_state.get('loading_method', 'Unknown')}") | |
| if 'test_input_curves' in locals(): | |
| st.write(f"**Input shape:** {test_input_curves.shape}") | |
| if 'test_position_inputs' in locals(): | |
| st.write(f"**Position shape:** {test_position_inputs.shape}") | |
| # Reset using_current_data flag when file is uploaded | |
| if uploaded_input is not None: | |
| st.session_state.using_current_data = False | |
| # Cleanup temporary files when session ends | |
| if hasattr(st.session_state, 'temp_model_dir') and st.session_state.temp_model_dir: | |
| # Note: In a real deployment, you might want to implement proper cleanup | |
| # For now, the temporary directory will be cleaned up when the container restarts | |
| pass | |
| def show_validation(): | |
| """ULTRA-STABLE Validation interface - ZERO trembling using session state control""" | |
| st.markdown('<h2 class="section-header">β Validation</h2>', unsafe_allow_html=True) | |
| if not st.session_state.processed_data: | |
| st.warning("Please process data first.") | |
| return | |
| if not st.session_state.get('validation_ready', False) or 'pred_G' not in st.session_state or 'pred_mu' not in st.session_state: | |
| st.warning("Please run ML prediction first to get material properties for validation.") | |
| st.info(""" | |
| **Validation Process:** | |
| 1. π Load experimental data | |
| 2. βοΈ Process and interpolate data | |
| 3. π€ Use ML model to predict G & ΞΌ | |
| 4. β **Run validation simulation** (you are here) | |
| 5. π Compare experimental vs simulated results | |
| """) | |
| return | |
| st.subheader("π¬ IMR Simulation using predicted G and ΞΌ vs Experimental R-t curve") | |
| # REVOLUTIONARY FIX: Cache all static values in session state to prevent re-computation | |
| if 'validation_G_cached' not in st.session_state: | |
| st.session_state.validation_G_cached = st.session_state.pred_G[0][0] if st.session_state.pred_G.ndim > 1 else st.session_state.pred_G[0] | |
| st.session_state.validation_mu_cached = st.session_state.pred_mu[0][0] if st.session_state.pred_mu.ndim > 1 else st.session_state.pred_mu[0] | |
| st.session_state.validation_lambda_cached = st.session_state.lambda_max_mean | |
| # ULTRA-STABLE LAYOUT: Single column layout to prevent trembling | |
| # Removed predicted values display to eliminate trembling | |
| # ULTRA-CRITICAL FIX: Use session state to control button behavior | |
| if 'validation_button_clicked' not in st.session_state: | |
| st.session_state.validation_button_clicked = False | |
| if 'validation_running' not in st.session_state: | |
| st.session_state.validation_running = False | |
| if 'validation_complete' not in st.session_state: | |
| st.session_state.validation_complete = False | |
| # STABLE BUTTON: Only show if not running | |
| if not st.session_state.validation_running: | |
| if st.button("π Run Validation Simulation", type="primary", key="validation_btn_stable"): | |
| st.session_state.validation_button_clicked = True | |
| st.session_state.validation_running = True | |
| st.session_state.validation_complete = False | |
| st.rerun() | |
| # HANDLE SIMULATION EXECUTION | |
| if st.session_state.validation_running and not st.session_state.validation_complete: | |
| # Check required data | |
| if st.session_state.lambda_max_mean is None: | |
| st.error("No lambda_max_mean loaded. Please load data first.") | |
| st.session_state.validation_running = False | |
| return | |
| # Show status in fixed container | |
| status_placeholder = st.empty() | |
| status_placeholder.info("π Running simulation... Please wait (this will not cause trembling).") | |
| try: | |
| # Use cached values - absolutely no re-computation | |
| G_value = st.session_state.validation_G_cached | |
| mu_value = st.session_state.validation_mu_cached | |
| # Initialize and run simulation | |
| bubble_sim = OptimizedBubbleSimulation() | |
| start_time = time.time() | |
| t_sim, R_sim = bubble_sim.run_optimized_simulation( | |
| G_value, mu_value, st.session_state.validation_lambda_cached | |
| ) | |
| simulation_time = time.time() - start_time | |
| # Store ALL results in session state | |
| st.session_state.validation_t_sim = t_sim | |
| st.session_state.validation_R_sim = R_sim | |
| st.session_state.validation_simulation_time = simulation_time | |
| # Calculate and store error metrics | |
| rmse = mae = max_error = 0 | |
| if len(t_sim) > 0 and len(st.session_state.t_interp_newunit) > 0: | |
| t_min = max(st.session_state.t_interp_newunit[0], t_sim[0]) | |
| t_max = min(st.session_state.t_interp_newunit[-1], t_sim[-1]) | |
| if t_max > t_min: | |
| f_sim = interp1d(t_sim, R_sim, kind='linear', bounds_error=False, fill_value='extrapolate') | |
| mask = (st.session_state.t_interp_newunit >= t_min) & (st.session_state.t_interp_newunit <= t_max) | |
| t_common = st.session_state.t_interp_newunit[mask] | |
| R_exp_common = st.session_state.R_interp_newunit[mask] | |
| R_sim_common = f_sim(t_common) | |
| if len(R_exp_common) > 0: | |
| rmse = np.sqrt(np.mean((R_exp_common - R_sim_common) ** 2)) | |
| mae = np.mean(np.abs(R_exp_common - R_sim_common)) | |
| max_error = np.max(np.abs(R_exp_common - R_sim_common)) | |
| # Store metrics in session state | |
| st.session_state.validation_rmse = rmse | |
| st.session_state.validation_mae = mae | |
| st.session_state.validation_max_error = max_error | |
| # Clear status and mark complete | |
| status_placeholder.empty() | |
| st.session_state.validation_running = False | |
| st.session_state.validation_complete = True | |
| # Rerun to show results | |
| st.rerun() | |
| except Exception as e: | |
| st.session_state.validation_running = False | |
| st.session_state.validation_error = str(e) | |
| st.rerun() | |
| # DISPLAY RESULTS (only if complete) | |
| if st.session_state.validation_complete and 'validation_t_sim' in st.session_state: | |
| # Create comparison plot | |
| fig, ax = plt.subplots(figsize=(10, 6)) | |
| ax.plot(st.session_state.t_interp_newunit, st.session_state.R_interp_newunit, | |
| 'ro', markersize=4, label='Interpolated (Experimental)', alpha=0.7) | |
| ax.plot(st.session_state.validation_t_sim, st.session_state.validation_R_sim, | |
| 'b-', linewidth=2, label='Simulated (Predicted G & ΞΌ)') | |
| ax.set_xlabel('Time (0.1 ms)') | |
| ax.set_ylabel('Radius (0.1 mm)') | |
| ax.set_title('Validation: Experimental vs Simulated R-t Curves') | |
| ax.grid(True, alpha=0.3) | |
| ax.legend() | |
| # Removed error metrics from plot to prevent trembling | |
| plt.tight_layout() | |
| st.pyplot(fig) | |
| # Removed metrics display to eliminate trembling | |
| # Results summary | |
| st.success("β Validation simulation completed!") | |
| st.info(f"""**Validation Results:** | |
| **Predicted Values:** | |
| - Shear Modulus (G): {st.session_state.validation_G_cached:.2e} Pa | |
| - Viscosity (ΞΌ): {st.session_state.validation_mu_cached:.4f} PaΒ·s | |
| - Lambda Max: {st.session_state.validation_lambda_cached:.3f} | |
| **Performance:** | |
| - Simulation Time: {st.session_state.validation_simulation_time:.2f} seconds | |
| - Simulated Points: {len(st.session_state.validation_R_sim)} | |
| - Time Range: {st.session_state.validation_t_sim[0]:.3f} to {st.session_state.validation_t_sim[-1]:.3f} (0.1 ms) | |
| """) | |
| # Reset button | |
| if st.button("π Run New Simulation", key="reset_validation"): | |
| # Clear all validation session state | |
| for key in list(st.session_state.keys()): | |
| if key.startswith('validation_'): | |
| del st.session_state[key] | |
| st.rerun() | |
| # Handle simulation error | |
| if hasattr(st.session_state, 'validation_error'): | |
| st.error(f"Simulation failed: {st.session_state.validation_error}") | |
| if st.button("π Try Again", key="retry_validation"): | |
| del st.session_state.validation_error | |
| st.session_state.validation_running = False | |
| st.rerun() | |
| # Additional CSS to ensure zero movement | |
| st.markdown(""" | |
| <style> | |
| /* ULTIMATE ANTI-TREMBLING CSS */ | |
| .stButton > button { | |
| transition: none !important; | |
| animation: none !important; | |
| transform: none !important; | |
| } | |
| .element-container { | |
| transition: none !important; | |
| animation: none !important; | |
| transform: none !important; | |
| position: relative !important; | |
| } | |
| /* Force container stability */ | |
| [data-testid="column"] { | |
| transition: none !important; | |
| animation: none !important; | |
| transform: none !important; | |
| } | |
| /* Prevent any layout shifts */ | |
| .main .block-container .element-container { | |
| will-change: auto !important; | |
| transform: translateZ(0) !important; | |
| backface-visibility: hidden !important; | |
| } | |
| </style> | |
| """, unsafe_allow_html=True) | |
| def show_results(): | |
| """Results and export interface""" | |
| st.markdown('<h2 class="section-header">π Results & Export</h2>', unsafe_allow_html=True) | |
| if not st.session_state.processed_data: | |
| st.warning("No processed data available.") | |
| return | |
| # Summary of all results | |
| st.subheader("π Analysis Summary") | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| st.markdown("**Data Information:**") | |
| if st.session_state.data_loaded: | |
| st.write(f"β Datasets loaded: {st.session_state.num_datasets}") | |
| st.write(f"β Lambda max: {st.session_state.lambda_max_mean:.3f}") | |
| st.write(f"β Selected dataset: {st.session_state.get('selected_dataset', 'N/A') + 1}") | |
| if st.session_state.processed_data: | |
| st.write(f"β Interpolated points: {len(st.session_state.R_interp_newunit)}") | |
| with col2: | |
| st.markdown("**ML Predictions:**") | |
| if 'pred_G' in st.session_state: | |
| G_value = st.session_state.pred_G[0][0] if st.session_state.pred_G.ndim > 1 else st.session_state.pred_G[0] | |
| mu_value = st.session_state.pred_mu[0][0] if st.session_state.pred_mu.ndim > 1 else \ | |
| st.session_state.pred_mu[0] | |
| st.write(f"π― Shear Modulus (G): {G_value:.2e} Pa") | |
| st.write(f"π― Viscosity (ΞΌ): {mu_value:.4f} PaΒ·s") | |
| else: | |
| st.write("β No predictions available") | |
| # Export options | |
| st.subheader("πΎ Export Options") | |
| export_col1, export_col2, export_col3 = st.columns(3) | |
| with export_col1: | |
| if st.session_state.processed_data: | |
| # Export interpolated data | |
| data_str = ' '.join([f'{val:.6f}' for val in st.session_state.R_interp_newunit[:-1]]) | |
| st.download_button( | |
| label="π₯ Download Interpolated Data", | |
| data=data_str, | |
| file_name="interpolated_bubble_data.txt", | |
| mime="text/plain" | |
| ) | |
| with export_col2: | |
| if 'pred_G' in st.session_state: | |
| # Export predictions | |
| G_value = st.session_state.pred_G[0][0] if st.session_state.pred_G.ndim > 1 else st.session_state.pred_G[0] | |
| mu_value = st.session_state.pred_mu[0][0] if st.session_state.pred_mu.ndim > 1 else \ | |
| st.session_state.pred_mu[0] | |
| pred_summary = f"""Bubble Dynamics Analysis Results | |
| Dataset: {st.session_state.get('selected_dataset', 'N/A') + 1} | |
| Lambda Max: {st.session_state.lambda_max_mean:.3f} | |
| ML Predictions: | |
| Shear Modulus (G): {G_value:.2e} Pa | |
| Viscosity (ΞΌ): {mu_value:.4f} PaΒ·s | |
| Analysis completed on: {time.strftime('%Y-%m-%d %H:%M:%S')} | |
| """ | |
| st.download_button( | |
| label="π Download Results Summary", | |
| data=pred_summary, | |
| file_name="bubble_analysis_results.txt", | |
| mime="text/plain" | |
| ) | |
| with export_col3: | |
| if 'validation_t_sim' in st.session_state: | |
| # Export simulation data | |
| sim_data = np.column_stack([st.session_state.validation_t_sim, st.session_state.validation_R_sim]) | |
| sim_str = '\n'.join([f'{t:.6f}\t{r:.6f}' for t, r in sim_data]) | |
| st.download_button( | |
| label="π¬ Download Simulation Data", | |
| data=sim_str, | |
| file_name="simulation_results.txt", | |
| mime="text/plain" | |
| ) | |
| # Session reset | |
| st.subheader("π Reset Session") | |
| if st.button("ποΈ Clear All Data", type="secondary"): | |
| for key in list(st.session_state.keys()): | |
| del st.session_state[key] | |
| st.success("β Session cleared! Refresh the page to start over.") | |
| if __name__ == "__main__": | |
| main() |