import streamlit as st import numpy as np import matplotlib.pyplot as plt from scipy.io import loadmat from scipy.interpolate import PchipInterpolator, interp1d import io import time import tempfile import os ########################################################################### # Configure Streamlit page st.set_page_config( page_title="Bubble Dynamics Analysis", page_icon="๐Ÿซง", layout="wide", initial_sidebar_state="expanded" ) # Try importing TensorFlow try: import tensorflow as tf from tensorflow import keras from tensorflow.keras import layers TENSORFLOW_AVAILABLE = True except ImportError: TENSORFLOW_AVAILABLE = False # ULTRA-AGGRESSIVE CSS FIX - Complete stability with UT Austin background and Logo st.markdown(""" """, unsafe_allow_html=True) # Initialize session state if 'data_loaded' not in st.session_state: st.session_state.data_loaded = False if 'processed_data' not in st.session_state: st.session_state.processed_data = False if 'model_loaded' not in st.session_state: st.session_state.model_loaded = False # ULTRA-OPTIMIZED BubbleSimulation class - ZERO UI UPDATES during simulation class OptimizedBubbleSimulation: """ ULTRA-OPTIMIZED version - ZERO UI updates during simulation to prevent any trembling """ def __init__(self): self.setup_parameters() def setup_parameters(self): # Fixed parameters (same as original MATLAB) self.R0 = 35e-6 self.P_inf = 101325 self.T_inf = 298.15 self.cav_type = 'LIC' # Material parameters (same as original) self.c_long = 1700 self.alpha = 0.0 self.rho = 1000 self.gamma = 0.0725 # Parameters for bubble contents (same as original) self.D0 = 24.2e-6 self.kappa = 1.4 self.Ru = 8.3144598 self.Rv = self.Ru / (18.01528e-3) self.Ra = self.Ru / (28.966e-3) self.A = 5.28e-5 self.B = 1.17e-2 self.P_ref = 1.17e11 self.T_ref = 5200 # OPTIMIZED numerical parameters for speed self.NT = 100 # Reduced from 500 to 100 (5x faster, still accurate) self.RelTol = 1e-5 # Relaxed from 1e-7 to 1e-5 (faster convergence) def run_optimized_simulation(self, G, mu, lambda_max_mean=None): """ULTRA-OPTIMIZED simulation - ZERO UI updates to prevent trembling""" from scipy.integrate import solve_ivp # Set lambdamax from loaded data or use default if lambda_max_mean is not None: self.lambdamax = lambda_max_mean print(f"Using lambda_max_mean = {self.lambdamax} from .mat file") else: self.lambdamax = 5.99 # Fallback default print(f"Warning: Using default lambda_max = {self.lambdamax}") print(f"Running ULTRA-OPTIMIZED simulation with predicted G={G:.2e} Pa, ฮผ={mu:.4f} Paยทs") # Use predicted values self.G = G self.mu = mu # Setup (same as original) if self.cav_type == 'LIC': self.Rmax = self.lambdamax * self.R0 self.PA = 0 self.omega = 0 self.delta = 0 self.n = 0 if self.cav_type == 'LIC': self.Rc = self.Rmax self.Uc = np.sqrt(self.P_inf / self.rho) self.tc = self.Rmax / self.Uc self.tspan = 3 * self.tc # Reduced for speed # Calculate parameters (same as original) self.Pv = self.P_ref * np.exp(-self.T_ref / self.T_inf) self.K_inf = self.A * self.T_inf + self.B # Non-dimensional variables (same as original) self.C_star = self.c_long / self.Uc self.We = self.P_inf * self.Rc / (2 * self.gamma) self.Ca = self.P_inf / self.G self.Re = self.P_inf * self.Rc / (self.mu * self.Uc) self.fom = self.D0 / (self.Uc * self.Rc) self.chi = self.T_inf * self.K_inf / (self.P_inf * self.Rc * self.Uc) self.A_star = self.A * self.T_inf / self.K_inf self.B_star = self.B / self.K_inf self.Pv_star = self.Pv / self.P_inf self.tspan_star = self.tspan / self.tc self.Req = self.R0 / self.Rmax self.PA_star = self.PA / self.P_inf self.omega_star = self.omega * self.tc self.delta_star = self.delta / self.tc # Parameters vector (same as original) self.params = [self.NT, self.C_star, self.We, self.Ca, self.alpha, self.Re, self.Rv, self.Ra, self.kappa, self.fom, self.chi, self.A_star, self.B_star, self.Pv_star, self.Req, self.PA_star, self.omega_star, self.delta_star, self.n] # Initial conditions (same as original) R0_star = 1 U0_star = 0 Theta0 = np.zeros(self.NT) if self.cav_type == 'LIC': P0 = (self.Pv + (self.P_inf + 2 * self.gamma / self.R0 - self.Pv) * ((self.R0 / self.Rmax) ** 3)) P0_star = P0 / self.P_inf S0 = ((3 * self.alpha - 1) * (5 - 4 * self.Req - self.Req ** 4) / (2 * self.Ca) + 2 * self.alpha * (27 / 40 + self.Req ** 8 / 8 + self.Req ** 5 / 5 + self.Req ** 2 - 2 / self.Req) / self.Ca) k0 = ((1 + (self.Rv / self.Ra) * (P0_star / self.Pv_star - 1)) ** (-1)) * np.ones(self.NT) X0 = np.concatenate([[R0_star, U0_star, P0_star, S0], Theta0, k0]) print(f"State vector size: {len(X0)} (4 + {self.NT} + {self.NT})") print(f"Time span: 0 to {self.tspan_star:.4f}") # CRITICAL FIX: NO UI UPDATES AT ALL - store them for later self.simulation_status = "Starting simulation..." # ULTRA-OPTIMIZED ODE solving - NO UI updates during solving try: sol = solve_ivp( self.bubble_optimized, [0, self.tspan_star], X0, method='BDF', rtol=self.RelTol, atol=1e-8, max_step=self.tspan_star / 200, dense_output=False ) self.simulation_status = "Processing results..." except Exception as e: print(f"BDF failed: {str(e)}, trying LSODA...") self.simulation_status = "Trying backup solver..." try: sol = solve_ivp( self.bubble_optimized, [0, self.tspan_star], X0, method='LSODA', rtol=1e-4, atol=1e-7, max_step=self.tspan_star / 100, ) except Exception as e2: print(f"All solvers failed: {str(e2)}") return self.fast_fallback() if not sol.success: print(f"Solver failed: {sol.message}") return self.fast_fallback() # Extract solution t_nondim = sol.t X_nondim = sol.y.T R_nondim = X_nondim[:, 0] # Filter valid solutions valid_mask = (R_nondim > 0.01) & (R_nondim < 20) & np.isfinite(R_nondim) t_nondim = t_nondim[valid_mask] R_nondim = R_nondim[valid_mask] if len(t_nondim) < 10: print("Too few valid points, using fast fallback") return self.fast_fallback() # Back to physical units t = t_nondim * self.tc R = R_nondim * self.Rc # Change units scale = 1e4 t_newunit = t * scale R_newunit = R * scale self.simulation_status = "Simulation complete!" print(f"ULTRA-OPTIMIZED simulation completed in {len(t_newunit)} points!") print(f"Time range: {t_newunit[0]:.3f} to {t_newunit[-1]:.3f} (0.1 ms)") print(f"Radius range: {np.min(R_newunit):.3f} to {np.max(R_newunit):.3f} (0.1 mm)") return t_newunit, R_newunit def bubble_optimized(self, t, x): """ OPTIMIZED bubble physics function - same physics, no UI updates """ # Extract parameters (same as original) NT = int(self.params[0]) C_star = self.params[1] We = self.params[2] Ca = self.params[3] alpha = self.params[4] Re = self.params[5] Rv = self.params[6] Ra = self.params[7] kappa = self.params[8] fom = self.params[9] chi = self.params[10] A_star = self.params[11] B_star = self.params[12] Pv_star = self.params[13] Req = self.params[14] # Extract state variables (same as original) R = x[0] U = x[1] P = x[2] S = x[3] Theta = x[4:4 + NT] k = x[4 + NT:4 + 2 * NT] # Grid setup (same physics, fewer points) deltaY = 1 / (NT - 1) ii = np.arange(1, NT + 1) yk = (ii - 1) * deltaY # Boundary condition (same as original) k = k.copy() k[-1] = (1 + (Rv / Ra) * (P / Pv_star - 1)) ** (-1) # Calculate mixture fields (same physics) T = (A_star - 1 + np.sqrt(1 + 2 * A_star * Theta)) / A_star K_star = A_star * T + B_star Rmix = k * Rv + (1 - k) * Ra # OPTIMIZATION: Vectorized spatial derivatives for speed DTheta = np.zeros(NT) DDTheta = np.zeros(NT) Dk = np.zeros(NT) DDk = np.zeros(NT) # Neumann BC at origin DTheta[0] = 0 Dk[0] = 0 # Vectorized central differences (faster than loops) if NT >= 3: DTheta[1:-1] = (Theta[2:] - Theta[:-2]) / (2 * deltaY) Dk[1:-1] = (k[2:] - k[:-2]) / (2 * deltaY) # Backward difference at wall DTheta[-1] = (3 * Theta[-1] - 4 * Theta[-2] + Theta[-3]) / (2 * deltaY) Dk[-1] = (3 * k[-1] - 4 * k[-2] + k[-3]) / (2 * deltaY) # Laplacians (vectorized where possible) DDTheta[0] = 6 * (Theta[1] - Theta[0]) / deltaY ** 2 DDk[0] = 6 * (k[1] - k[0]) / deltaY ** 2 if NT >= 3: # Vectorized Laplacian calculation for i in range(1, NT - 1): DDTheta[i] = ((Theta[i + 1] - 2 * Theta[i] + Theta[i - 1]) / deltaY ** 2 + (2 / yk[i]) * DTheta[i] if yk[i] > 1e-12 else (Theta[i + 1] - 2 * Theta[i] + Theta[i - 1]) / deltaY ** 2) DDk[i] = ((k[i + 1] - 2 * k[i] + k[i - 1]) / deltaY ** 2 + (2 / yk[i]) * Dk[i] if yk[i] > 1e-12 else (k[i + 1] - 2 * k[i] + k[i - 1]) / deltaY ** 2) if NT >= 4: DDTheta[-1] = ((2 * Theta[-1] - 5 * Theta[-2] + 4 * Theta[-3] - Theta[-4]) / deltaY ** 2 + (2 / yk[-1]) * DTheta[-1] if yk[-1] > 1e-12 else (2 * Theta[-1] - 5 * Theta[-2] + 4 * Theta[-3] - Theta[-4]) / deltaY ** 2) DDk[-1] = ((2 * k[-1] - 5 * k[-2] + 4 * k[-3] - k[-4]) / deltaY ** 2 + (2 / yk[-1]) * Dk[-1] if yk[-1] > 1e-12 else (2 * k[-1] - 5 * k[-2] + 4 * k[-3] - k[-4]) / deltaY ** 2) # Internal pressure evolution (same physics) if Rmix[-1] > 1e-12 and (1 - k[-1]) > 1e-12 and R > 1e-12: pdot = (3 / R * (-kappa * P * U + (kappa - 1) * chi * DTheta[-1] / R + kappa * P * fom * Rv * Dk[-1] / (R * Rmix[-1] * (1 - k[-1])))) else: pdot = -3 * kappa * P * U / R if R > 1e-12 else 0 # OPTIMIZATION: Vectorized mixture velocity calculation Umix = np.zeros(NT) valid_indices = (Rmix > 1e-12) & (kappa * P > 1e-12) if np.any(valid_indices): idx = np.where(valid_indices)[0] Umix[idx] = (((kappa - 1) * chi / R * DTheta[idx] - R * yk[idx] * pdot / 3) / (kappa * P) + fom / R * (Rv - Ra) / Rmix[idx] * Dk[idx]) # Temperature evolution (vectorized where possible) Theta_prime = np.zeros(NT) valid_P = P > 1e-12 if valid_P: for i in range(NT - 1): # Exclude wall point if Rmix[i] > 1e-12: Theta_prime[i] = ( (pdot + DDTheta[i] * chi / R ** 2) * (K_star[i] * T[i] / P * (kappa - 1) / kappa) - DTheta[i] * (Umix[i] - yk[i] * U) / R + fom / R ** 2 * (Rv - Ra) / Rmix[i] * Dk[i] * DTheta[i]) Theta_prime[-1] = 0 # Dirichlet BC # Vapor concentration evolution (vectorized where possible) k_prime = np.zeros(NT) for i in range(NT - 1): # Exclude wall point if Rmix[i] > 1e-12 and T[i] > 1e-12: term1 = fom / R ** 2 * (DDk[i] + Dk[i] * (-((Rv - Ra) / Rmix[i]) * Dk[i] - DTheta[i] / np.sqrt(1 + 2 * A_star * Theta[i]) / T[i])) term2 = -(Umix[i] - U * yk[i]) / R * Dk[i] k_prime[i] = term1 + term2 k_prime[-1] = 0 # Dirichlet BC # Elastic stress evolution (same physics) if self.cav_type == 'LIC': if Req > 1e-12: Rst = R / Req if Rst > 1e-12: Sdot = (2 * U / R * (3 * alpha - 1) * (1 / Rst + 1 / Rst ** 4) / Ca - 2 * alpha * U / R * (1 / Rst ** 8 + 1 / Rst ** 5 + 2 / Rst ** 2 + 2 * Rst) / Ca) else: Sdot = 0 else: Sdot = 0 # Keller-Miksis equations (same physics) rdot = U if R > 1e-12: numerator = ((1 + U / C_star) * (P - 1 / (We * R) + S - 4 * U / (Re * R) - 1) + R / C_star * (pdot + U / (We * R ** 2) + Sdot + 4 * U ** 2 / (Re * R ** 2)) - (3 / 2) * (1 - U / (3 * C_star)) * U ** 2) denominator = (1 - U / C_star) * R + 4 / (C_star * Re) if abs(denominator) > 1e-12: udot = numerator / denominator else: udot = 0 else: udot = 0 # Return derivatives dxdt = np.concatenate([[rdot, udot, pdot, Sdot], Theta_prime, k_prime]) return dxdt def fast_fallback(self): """Faster fallback for validation""" print("Using fast analytical approximation for validation") # Quick analytical approximation based on Rayleigh-Plesset equation t_nondim = np.linspace(0, 3, 200) # Fewer points for speed # Simple damped oscillation model using actual parameters if hasattr(self, 'P_inf') and hasattr(self, 'rho') and hasattr(self, 'lambdamax'): Rmax = self.lambdamax * self.R0 # Use dynamic lambda value omega_natural = np.sqrt(3 * self.P_inf / (self.rho * Rmax ** 2)) damping = self.mu / (self.rho * Rmax ** 2) if hasattr(self, 'mu') else 0.1 else: omega_natural = 1000 damping = 0.1 # Analytical solution approximation omega_d = omega_natural * np.sqrt(1 - damping ** 2) if damping < 1 else omega_natural decay = np.exp(-damping * omega_natural * t_nondim * self.tc) R_nondim = self.Req + (1 - self.Req) * decay * np.cos(omega_d * t_nondim * self.tc) R_nondim = np.maximum(R_nondim, 0.05) # Prevent negative values # Convert to physical units t = t_nondim * self.tc R = R_nondim * self.Rc scale = 1e4 return t * scale, R * scale # Main Streamlit App def main(): # Header - ULTRA-STABLE with fixed positioning and UT Austin branding + Logo st.markdown("""

Bubble Dynamics Analysis

""", unsafe_allow_html=True) # Initialize current page in session state if 'current_page' not in st.session_state: st.session_state.current_page = "๐Ÿ  Home" # Sidebar for navigation with clickable menu st.sidebar.title("๐Ÿ“‹ Navigation") # Create clickable menu buttons menu_items = [ "๐Ÿ  Home", "๐Ÿ“‚ Data Loading", "โš™๏ธ Data Processing", "๐Ÿค– ML Prediction", "โœ… Validation", "๐Ÿ“Š Results & Export" ] # Display menu buttons for item in menu_items: # Check if this is the current page to highlight it if st.session_state.current_page == item: # Use different styling for active page if st.sidebar.button(f"โ–ถ๏ธ {item}", key=f"nav_{item}", use_container_width=True): st.session_state.current_page = item st.rerun() else: if st.sidebar.button(f" {item}", key=f"nav_{item}", use_container_width=True): st.session_state.current_page = item st.rerun() # Add some spacing st.sidebar.markdown("---") # Show current status in sidebar st.sidebar.markdown("### ๐Ÿ“Š Status") if st.session_state.data_loaded: st.sidebar.success("โœ… Data loaded") else: st.sidebar.info("๐Ÿ“‚ No data loaded") if st.session_state.processed_data: st.sidebar.success("โœ… Data processed") else: st.sidebar.info("โš™๏ธ Data not processed") if st.session_state.model_loaded: st.sidebar.success("โœ… Model loaded") else: st.sidebar.info("๐Ÿค– No model loaded") # Display the selected page page = st.session_state.current_page if page == "๐Ÿ  Home": show_home() elif page == "๐Ÿ“‚ Data Loading": show_data_loading() elif page == "โš™๏ธ Data Processing": show_data_processing() elif page == "๐Ÿค– ML Prediction": show_ml_prediction() elif page == "โœ… Validation": show_validation() elif page == "๐Ÿ“Š Results & Export": show_results() def show_home(): """Home page with overview""" col1, col2 = st.columns([2, 1]) with col1: st.markdown(""" ### Welcome to the YANG Research Group Bubble Dynamics Analysis Platform **The University of Texas at Austin - Aerospace Engineering and Engineering Mechanics** **Cockrell School of Engineering** This advanced web application provides comprehensive tools for analyzing bubble dynamics data: **Features:** - ๐Ÿ“‚ **Data Loading**: Upload and analyze .mat files containing bubble dynamics data - โš™๏ธ **Data Processing**: Interpolate and process experimental data - ๐Ÿค– **ML Prediction**: Use machine learning to predict material properties (G & ฮผ) - โœ… **Validation**: Compare experimental vs simulated bubble behavior - ๐Ÿ“Š **Export**: Download processed results and visualizations **Getting Started:** 1. Navigate to "Data Loading" to upload your .mat file 2. Process your data in "Data Processing" 3. Use ML models in "ML Prediction" 4. Validate results in "Validation" 5. Export your findings in "Results & Export" """) with col2: if TENSORFLOW_AVAILABLE: st.success(""" **โœ… System Status: Full Features Available** โœ… Core Features: Ready โœ… Data Processing: Ready โœ… Visualization: Ready โœ… ML Models: Ready โœ… Simulations: Ready """) else: st.warning(""" **โš ๏ธ System Status: Limited Features** โœ… Core Features: Ready โœ… Data Processing: Ready โœ… Visualization: Ready โŒ ML Models: TensorFlow not installed โœ… Simulations: Ready ๐Ÿ’ก Install TensorFlow to enable ML predictions: `pip install tensorflow-cpu` """) # Show current session state if st.session_state.data_loaded: st.success("๐Ÿ“‚ Data loaded successfully") if st.session_state.processed_data: st.success("โš™๏ธ Data processed") if st.session_state.model_loaded: st.success("๐Ÿค– ML model loaded") def show_data_loading(): """Data loading interface""" st.markdown('

๐Ÿ“‚ Data Loading

', unsafe_allow_html=True) uploaded_file = st.file_uploader( "Upload your .mat file", type=['mat'], help="Upload a MATLAB .mat file containing 'R_nondim_All', 't_nondim_All', and 'lambda_max_mean'" ) if uploaded_file is not None: try: # Save uploaded file temporarily with tempfile.NamedTemporaryFile(delete=False, suffix='.mat') as tmp_file: tmp_file.write(uploaded_file.getvalue()) tmp_file_path = tmp_file.name # Load the .mat file data = loadmat(tmp_file_path) # Check required variables required_vars = ['R_nondim_All', 't_nondim_All'] missing_vars = [var for var in required_vars if var not in data] if missing_vars: st.error(f"Missing required variables: {missing_vars}") return # Extract data R_nondim_all = data['R_nondim_All'] t_nondim_all = data['t_nondim_All'] num_datasets = R_nondim_all.shape[1] # Extract lambda_max_mean if 'lambda_max_mean' in data: lambda_max_mean = float(data['lambda_max_mean']) else: st.warning("lambda_max_mean not found in file. Using default value 5.99") lambda_max_mean = 5.99 # Store in session state st.session_state.data = data st.session_state.R_nondim_all = R_nondim_all st.session_state.t_nondim_all = t_nondim_all st.session_state.lambda_max_mean = lambda_max_mean st.session_state.num_datasets = num_datasets st.session_state.data_loaded = True # Calculate physical parameters R0_sim = 35e-6 P_inf_exp = 101325 rho_exp = 1000 Rmax_exp = lambda_max_mean * R0_sim Rc_exp = Rmax_exp Uc_exp = np.sqrt(P_inf_exp / rho_exp) tc_exp = Rc_exp / Uc_exp st.session_state.physical_params = { 'Rmax_exp': Rmax_exp, 'Rc_exp': Rc_exp, 'Uc_exp': Uc_exp, 'tc_exp': tc_exp } # Display success message st.markdown(f"""
โœ… Data loaded successfully!
๐Ÿ“Š Datasets found: {num_datasets}
๐ŸŽฏ Lambda max: {lambda_max_mean:.3f}
๐Ÿ“ Physical parameters calculated
""", unsafe_allow_html=True) # Show data preview col1, col2 = st.columns(2) with col1: st.subheader("๐Ÿ“ˆ Data Overview") st.write(f"**Number of datasets:** {num_datasets}") st.write(f"**Lambda max mean:** {lambda_max_mean:.3f}") st.write(f"**Data shape:** {R_nondim_all.shape}") with col2: st.subheader("๐Ÿ”ง Physical Parameters") st.write(f"**R_max:** {Rmax_exp * 1e6:.1f} ฮผm") st.write(f"**Time scale:** {tc_exp * 1e6:.1f} ฮผs") st.write(f"**Velocity scale:** {Uc_exp:.1f} m/s") # Clean up temporary file os.unlink(tmp_file_path) except Exception as e: st.error(f"Error loading file: {str(e)}") def show_data_processing(): """Data processing interface""" st.markdown('

โš™๏ธ Data Processing

', unsafe_allow_html=True) if not st.session_state.data_loaded: st.warning("Please load data first in the 'Data Loading' section.") return # Dataset selection dataset_idx = st.selectbox( "Select dataset to process:", range(st.session_state.num_datasets), format_func=lambda x: f"Dataset {x + 1}" ) # Processing parameters col1, col2 = st.columns(2) with col1: interp_range = st.number_input( "Interpolation Range", min_value=0.1, max_value=2.0, value=0.8, step=0.1, help="Time range for interpolation" ) with col2: time_step = st.number_input( "Time Step", min_value=0.001, max_value=0.1, value=0.008, step=0.001, format="%.3f", help="Time step for interpolation" ) if st.button("๐Ÿ”„ Process Data", type="primary"): with st.spinner("Processing data..."): try: # Extract data for selected dataset R_nondim_exp = np.array(st.session_state.R_nondim_all[0, dataset_idx]).flatten() t_nondim_exp = np.array(st.session_state.t_nondim_all[0, dataset_idx]).flatten() # Find zero index zero_candidates = np.where(np.abs(t_nondim_exp) < 1e-10)[0] if len(zero_candidates) > 0: zero_idx = zero_candidates[0] else: zero_idx = np.argmin(np.abs(t_nondim_exp)) # Convert to physical units tc_exp = st.session_state.physical_params['tc_exp'] Rc_exp = st.session_state.physical_params['Rc_exp'] t_exp = t_nondim_exp * tc_exp R_exp = R_nondim_exp * Rc_exp # Process from zero point t_fromzero = t_exp[zero_idx:].flatten() R_frommax = R_exp[zero_idx:].flatten() # Scale to new units scale_exp = 1e4 t_newunit_exp = t_fromzero * scale_exp R_newunit_exp = R_frommax * scale_exp # Sort data sort_indices = np.argsort(t_newunit_exp) t_newunit_exp = t_newunit_exp[sort_indices] R_newunit_exp = R_newunit_exp[sort_indices] # Interpolate t_interp_newunit = np.arange(0, interp_range + time_step, time_step) pchip_interpolator = PchipInterpolator(t_newunit_exp, R_newunit_exp) R_interp_newunit = pchip_interpolator(t_interp_newunit) # Store results st.session_state.t_interp_newunit = t_interp_newunit st.session_state.R_interp_newunit = R_interp_newunit st.session_state.t_original = t_newunit_exp st.session_state.R_original = R_newunit_exp st.session_state.processed_data = True st.session_state.selected_dataset = dataset_idx st.success(f"โœ… Data processed successfully! {len(R_interp_newunit)} interpolated points created.") except Exception as e: st.error(f"Processing failed: {str(e)}") # Show results if data is processed if st.session_state.processed_data: st.subheader("๐Ÿ“Š Processing Results") # Create plot fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 5)) # Original vs interpolated ax1.plot(st.session_state.t_original, st.session_state.R_original, 'b-', linewidth=2, label='Original Data') ax1.plot(st.session_state.t_interp_newunit, st.session_state.R_interp_newunit, 'ro', markersize=3, label='Interpolated Points') ax1.set_xlabel('Time (0.1 ms)') ax1.set_ylabel('Radius (0.1 mm)') ax1.set_title('Original vs Interpolated Data') ax1.grid(True, alpha=0.3) ax1.legend() # Interpolated data only ax2.plot(st.session_state.t_interp_newunit, st.session_state.R_interp_newunit, 'ro', markersize=4) ax2.set_xlabel('Time (0.1 ms)') ax2.set_ylabel('Radius (0.1 mm)') ax2.set_title('Interpolated R-t Curve') ax2.grid(True, alpha=0.3) plt.tight_layout() st.pyplot(fig) # Download processed data if st.button("๐Ÿ’พ Download Processed Data"): # Create download data data_str = ' '.join([f'{val:.6f}' for val in st.session_state.R_interp_newunit[:-1]]) st.download_button( label="๐Ÿ“ฅ Download as TXT", data=data_str, file_name=f"interpolated_data_dataset_{dataset_idx + 1}.txt", mime="text/plain" ) def show_ml_prediction(): """ML prediction interface - MODIFIED FOR FILE UPLOAD""" st.markdown('

๐Ÿค– ML Prediction

', unsafe_allow_html=True) if not TENSORFLOW_AVAILABLE: st.error("โŒ **TensorFlow not available.** ML prediction features are disabled.") with st.expander("๐Ÿ”ง How to Enable ML Features", expanded=True): st.markdown(""" **To enable ML predictions:** 1. **Install TensorFlow:** ```bash pip install tensorflow-cpu # Recommended (smaller) # or pip install tensorflow # Full version ``` 2. **Restart the web app:** - Stop the app (Ctrl+C in terminal) - Run: `streamlit run streamlit_bubble_app.py` - Refresh your browser """) return if not st.session_state.processed_data: st.warning("Please process data first in the 'Data Processing' section.") return # Model file upload section (MODIFIED) st.subheader("๐Ÿ“ Upload ML Model") col1, col2 = st.columns([2, 1]) with col1: st.markdown("**Upload your trained model files:**") # Primary model file upload uploaded_model = st.file_uploader( "Upload model file (.h5, .keras, or .zip for SavedModel)", type=['h5', 'keras', 'zip'], help="Upload your trained model in H5, Keras, or ZIP format (for SavedModel)", key="model_file_upload" ) # Optional config file upload uploaded_config = st.file_uploader( "Upload model config (optional)", type=['npy'], help="Upload model_config.npy if available", key="config_file_upload" ) with col2: if st.button("๐Ÿ“– Model Format Help"): st.info(""" **Supported model formats:** **๐ŸŽฏ H5 Format (.h5)** - Recommended - Single file containing model architecture and weights - Most compatible format **โšก Keras Format (.keras)** - Native Keras 3.0 format - Single file format **๐Ÿ“ฆ SavedModel (.zip)** - Zip the entire SavedModel folder - Should contain: saved_model.pb, variables/, assets/ **โš™๏ธ Config File (.npy)** - Optional - Contains model configuration metadata - Helps with model information display """) # Model loading with uploaded files (MODIFIED) if uploaded_model is not None and st.button("๐Ÿ”„ Load Uploaded Model", type="primary"): with st.spinner("Loading uploaded ML model..."): try: # Create temporary directory for uploaded files temp_dir = tempfile.mkdtemp() model_loaded = False loading_method = "Unknown" model = None # Get file extension file_extension = uploaded_model.name.split('.')[-1].lower() # Define custom layers exactly matching your training code class CustomMultiHeadAttention(layers.Layer): def __init__(self, embed_dim, num_heads=8, **kwargs): super(CustomMultiHeadAttention, self).__init__(**kwargs) self.embed_dim = embed_dim self.num_heads = num_heads self.projection_dim = embed_dim // num_heads self.query_dense = layers.Dense(embed_dim) self.key_dense = layers.Dense(embed_dim) self.value_dense = layers.Dense(embed_dim) self.combine_heads = layers.Dense(embed_dim) def attention(self, query, key, value): score = tf.matmul(query, key, transpose_b=True) dim_key = tf.cast(tf.shape(key)[-1], tf.float32) scaled_score = score / tf.math.sqrt(dim_key) weights = tf.nn.softmax(scaled_score, axis=-1) output = tf.matmul(weights, value) return output, weights def separate_heads(self, x, batch_size): x = tf.reshape(x, (batch_size, -1, self.num_heads, self.projection_dim)) return tf.transpose(x, perm=[0, 2, 1, 3]) def call(self, inputs): batch_size = tf.shape(inputs)[0] query = self.query_dense(inputs) key = self.key_dense(inputs) value = self.value_dense(inputs) query = self.separate_heads(query, batch_size) key = self.separate_heads(key, batch_size) value = self.separate_heads(value, batch_size) attention, weights = self.attention(query, key, value) attention = tf.transpose(attention, perm=[0, 2, 1, 3]) concat_attention = tf.reshape(attention, (batch_size, -1, self.embed_dim)) output = self.combine_heads(concat_attention) return output def get_config(self): config = super(CustomMultiHeadAttention, self).get_config() config.update({ 'embed_dim': self.embed_dim, 'num_heads': self.num_heads, }) return config @classmethod def from_config(cls, config): return cls(**config) class CustomTransformerEncoderLayer(layers.Layer): def __init__(self, embed_dim, num_heads, ff_dim, rate=0.1, **kwargs): super(CustomTransformerEncoderLayer, self).__init__(**kwargs) self.embed_dim = embed_dim self.num_heads = num_heads self.ff_dim = ff_dim self.rate = rate self.att = CustomMultiHeadAttention(embed_dim, num_heads) self.ffn = keras.Sequential( [layers.Dense(ff_dim, activation="softplus"), layers.Dense(embed_dim)]) self.layernorm1 = layers.LayerNormalization(epsilon=1e-6) self.layernorm2 = layers.LayerNormalization(epsilon=1e-6) self.dropout1 = layers.Dropout(rate) self.dropout2 = layers.Dropout(rate) def call(self, inputs, training): attn_output = self.att(inputs) attn_output = self.dropout1(attn_output, training=training) out1 = self.layernorm1(inputs + attn_output) ffn_output = self.ffn(out1) ffn_output = self.dropout2(ffn_output, training=training) return self.layernorm2(out1 + ffn_output) def get_config(self): config = super(CustomTransformerEncoderLayer, self).get_config() config.update({ 'embed_dim': self.embed_dim, 'num_heads': self.num_heads, 'ff_dim': self.ff_dim, 'rate': self.rate, }) return config @classmethod def from_config(cls, config): return cls(**config) # Custom objects for model loading custom_objects = { 'CustomMultiHeadAttention': CustomMultiHeadAttention, 'CustomTransformerEncoderLayer': CustomTransformerEncoderLayer } # Handle different file formats if file_extension == 'h5': # Handle H5 format model_path = os.path.join(temp_dir, uploaded_model.name) with open(model_path, 'wb') as f: f.write(uploaded_model.getvalue()) try: model = keras.models.load_model(model_path, custom_objects=custom_objects) loading_method = "H5 format (uploaded)" model_loaded = True st.success("โœ… Loaded H5 model successfully") except Exception as e: st.error(f"H5 loading failed: {str(e)}") elif file_extension == 'keras': # Handle Keras format model_path = os.path.join(temp_dir, uploaded_model.name) with open(model_path, 'wb') as f: f.write(uploaded_model.getvalue()) try: model = keras.models.load_model(model_path, custom_objects=custom_objects) loading_method = "Keras format (uploaded)" model_loaded = True st.success("โœ… Loaded Keras model successfully") except Exception as e: st.error(f"Keras loading failed: {str(e)}") elif file_extension == 'zip': # Handle SavedModel ZIP format import zipfile zip_path = os.path.join(temp_dir, uploaded_model.name) with open(zip_path, 'wb') as f: f.write(uploaded_model.getvalue()) # Extract ZIP file extract_dir = os.path.join(temp_dir, 'extracted_model') with zipfile.ZipFile(zip_path, 'r') as zip_ref: zip_ref.extractall(extract_dir) # Look for SavedModel directory savedmodel_dirs = [] for root, dirs, files in os.walk(extract_dir): if 'saved_model.pb' in files: savedmodel_dirs.append(root) if savedmodel_dirs: try: model = keras.models.load_model(savedmodel_dirs[0], custom_objects=custom_objects) loading_method = "SavedModel format (uploaded ZIP)" model_loaded = True st.success("โœ… Loaded SavedModel successfully") except Exception as e: # Try TFSMLayer as fallback try: model = layers.TFSMLayer(savedmodel_dirs[0], call_endpoint='serving_default') loading_method = "TFSMLayer (uploaded ZIP fallback)" model_loaded = True st.success("โœ… Loaded using TFSMLayer") except Exception as e2: st.error(f"SavedModel loading failed: {str(e)}, TFSMLayer failed: {str(e2)}") else: st.error("โŒ No SavedModel found in ZIP file. Ensure ZIP contains saved_model.pb") if not model_loaded: st.error("โŒ Failed to load uploaded model") # Show debug information st.subheader("๐Ÿ” Debug Information") st.write(f"**File name:** {uploaded_model.name}") st.write(f"**File size:** {len(uploaded_model.getvalue()):,} bytes") st.write(f"**File extension:** {file_extension}") st.info(""" **Troubleshooting:** - Ensure your model is saved in a compatible format - For H5: Use `model.save('model.h5', save_format='h5')` - For Keras: Use `model.save('model.keras')` - For SavedModel: ZIP the entire SavedModel folder - Verify custom layers are properly saved """) return # Store in session state st.session_state.loaded_model = model st.session_state.model_name = uploaded_model.name st.session_state.model_loaded = True st.session_state.loading_method = loading_method st.session_state.temp_model_dir = temp_dir # Load config file if provided model_config = None if uploaded_config is not None: try: config_path = os.path.join(temp_dir, uploaded_config.name) with open(config_path, 'wb') as f: f.write(uploaded_config.getvalue()) model_config = np.load(config_path, allow_pickle=True).item() st.session_state.model_config = model_config st.success("โœ… Config file loaded successfully") except Exception as e: st.warning(f"Config file loading failed: {str(e)}") # Display model info st.success(f"โœ… Model uploaded and loaded successfully!") with st.expander("๐Ÿ“Š Model Information", expanded=False): col1, col2 = st.columns(2) with col1: st.write(f"**Model file:** {uploaded_model.name}") st.write(f"**Loading method:** {loading_method}") st.write(f"**Model type:** {type(model).__name__}") st.write(f"**File size:** {len(uploaded_model.getvalue()):,} bytes") with col2: try: if hasattr(model, 'count_params'): total_params = model.count_params() st.write(f"**Total parameters:** {total_params:,}") if hasattr(model, 'input_shape'): st.write(f"**Input shape:** {model.input_shape}") elif hasattr(model, 'input_spec'): st.write(f"**Input spec:** Available") # Show config info if available if model_config: st.write(f"**Sequence length:** {model_config.get('sequence_length', 'Unknown')}") st.write(f"**Model type:** {model_config.get('model_type', 'Unknown')}") except Exception as config_error: st.write("**Configuration:** Unable to read") except Exception as e: st.error(f"โŒ Failed to load uploaded model: {str(e)}") with st.expander("๐Ÿ” Error Details"): st.write(f"**Error type:** {type(e).__name__}") st.write(f"**Error message:** {str(e)}") st.write(f"**File name:** {uploaded_model.name if uploaded_model else 'None'}") # Show current model status if st.session_state.model_loaded: method = st.session_state.get('loading_method', 'Unknown method') model_name = st.session_state.get('model_name', 'Unknown model') st.success(f"๐Ÿค– **Model Ready:** `{model_name}` ({method})") # Input file selection and prediction (UNCHANGED) if st.session_state.model_loaded: st.subheader("๐Ÿ“ฅ Input Data") col1, col2 = st.columns([2, 1]) # File uploader for input data uploaded_input = st.file_uploader( "Upload input data file", type=['txt'], help="Upload a text file with R-t curve data" ) # Use current data button if st.button("๐Ÿ“Š Use Current Processed Data"): if st.session_state.processed_data and 'R_interp_newunit' in st.session_state: temp_data = ' '.join([f'{val:.6f}' for val in st.session_state.R_interp_newunit[:-1]]) st.session_state.temp_input_data = temp_data st.session_state.using_current_data = True st.success("โœ… Current interpolated data ready for prediction") else: st.error("No processed data available. Please process data first.") # Prediction interface st.subheader("๐ŸŽฏ Make Predictions") if st.button("๐Ÿš€ Predict G & ฮผ", type="primary"): # Determine input data source input_data = None if st.session_state.get('using_current_data', False) and 'temp_input_data' in st.session_state: try: input_values = [float(x) for x in st.session_state.temp_input_data.split()] input_data = np.array(input_values) st.info("Using current processed data for prediction") except Exception as e: st.error(f"Error processing current data: {e}") return elif uploaded_input is not None: try: input_data = np.loadtxt(io.StringIO(uploaded_input.getvalue().decode())) st.info("Using uploaded file for prediction") except Exception as e: st.error(f"Error reading uploaded file: {e}") return else: st.error("Please select input data: use current processed data or upload a file") return # Run prediction (exactly matching your training code format) - UNCHANGED with st.spinner("Running ML prediction..."): try: # Process input data exactly as in training test_input_curves = input_data if test_input_curves.ndim == 1: test_input_curves = test_input_curves.reshape(1, -1) # Ensure input size is 100 (matching your training) if test_input_curves.shape[1] != 100: if test_input_curves.shape[1] > 100: test_input_curves = test_input_curves[:, :100] else: padding = np.zeros((test_input_curves.shape[0], 100 - test_input_curves.shape[1])) test_input_curves = np.concatenate([test_input_curves, padding], axis=1) # Reshape for model input (matching training: sequence_length, 1) test_input_curves = test_input_curves.reshape(-1, 100, 1) # Position inputs for transformer (exactly from training) position_inputs = np.arange(100) test_position_inputs = np.tile(position_inputs, (test_input_curves.shape[0], 1)) # Make prediction start_time = time.time() # Handle different model types if st.session_state.loading_method == "TFSMLayer (uploaded ZIP fallback)": # For TFSMLayer, call directly predictions = st.session_state.loaded_model([test_input_curves, test_position_inputs]) if isinstance(predictions, dict): # Extract from TFSMLayer output dictionary predictions_g = predictions.get('g_output', predictions.get('output_1', list(predictions.values())[0])) predictions_mu = predictions.get('mu_output', predictions.get('output_2', list(predictions.values())[1])) else: predictions_g, predictions_mu = predictions else: # Standard model prediction (matching training) predictions_g, predictions_mu = st.session_state.loaded_model.predict( [test_input_curves, test_position_inputs]) prediction_time = time.time() - start_time # Process predictions (exactly from desktop GUI) num_samples = 1 pred_G = predictions_g[:num_samples] pred_mu = predictions_mu[:num_samples] # Apply scaling (exactly matching training scaling) pred_G_scaled = 10 ** (pred_G * (6 - 3) + 6 - 3) pred_mu_scaled = 10 ** (pred_mu * (0 + 3) - 3) # Store results st.session_state.pred_G = pred_G_scaled st.session_state.pred_mu = pred_mu_scaled # Extract values for display G_value = st.session_state.pred_G[0][0] if st.session_state.pred_G.ndim > 1 else \ st.session_state.pred_G[0] mu_value = st.session_state.pred_mu[0][0] if st.session_state.pred_mu.ndim > 1 else \ st.session_state.pred_mu[0] # Display results col1, col2, col3 = st.columns(3) with col1: st.metric( label="Shear Modulus (G)", value=f"{G_value:.2e} Pa", help="Predicted shear modulus of the material" ) with col2: st.metric( label="Viscosity (ฮผ)", value=f"{mu_value:.4f} Paยทs", help="Predicted viscosity of the material" ) with col3: st.metric( label="Prediction Time", value=f"{prediction_time:.3f} s", help="Time taken for ML inference" ) # Show detailed results result_text = f"G: {G_value:.2e} Pa, ฮผ: {mu_value:.4f}" st.success(f"๐ŸŽ‰ **Prediction Results:** {result_text}") detailed_results = f"""**Prediction completed successfully!** **Shear Modulus (G):** {G_value:.2e} Pa **Viscosity (ฮผ):** {mu_value:.4f} Paยทs **Prediction Time:** {prediction_time:.4f} seconds ({prediction_time * 1000:.2f} ms) Ready for validation simulation!""" st.info(detailed_results) # Enable validation st.session_state.validation_ready = True except Exception as e: st.error(f"โŒ Prediction failed: {str(e)}") with st.expander("๐Ÿ” Debug Information"): st.write(f"**Error:** {str(e)}") st.write(f"**Model type:** {type(st.session_state.loaded_model).__name__}") st.write(f"**Loading method:** {st.session_state.get('loading_method', 'Unknown')}") if 'test_input_curves' in locals(): st.write(f"**Input shape:** {test_input_curves.shape}") if 'test_position_inputs' in locals(): st.write(f"**Position shape:** {test_position_inputs.shape}") # Reset using_current_data flag when file is uploaded if uploaded_input is not None: st.session_state.using_current_data = False # Cleanup temporary files when session ends if hasattr(st.session_state, 'temp_model_dir') and st.session_state.temp_model_dir: # Note: In a real deployment, you might want to implement proper cleanup # For now, the temporary directory will be cleaned up when the container restarts pass def show_validation(): """ULTRA-STABLE Validation interface - ZERO trembling using session state control""" st.markdown('

โœ… Validation

', unsafe_allow_html=True) if not st.session_state.processed_data: st.warning("Please process data first.") return if not st.session_state.get('validation_ready', False) or 'pred_G' not in st.session_state or 'pred_mu' not in st.session_state: st.warning("Please run ML prediction first to get material properties for validation.") st.info(""" **Validation Process:** 1. ๐Ÿ“‚ Load experimental data 2. โš™๏ธ Process and interpolate data 3. ๐Ÿค– Use ML model to predict G & ฮผ 4. โœ… **Run validation simulation** (you are here) 5. ๐Ÿ“Š Compare experimental vs simulated results """) return st.subheader("๐Ÿ”ฌ IMR Simulation using predicted G and ฮผ vs Experimental R-t curve") # REVOLUTIONARY FIX: Cache all static values in session state to prevent re-computation if 'validation_G_cached' not in st.session_state: st.session_state.validation_G_cached = st.session_state.pred_G[0][0] if st.session_state.pred_G.ndim > 1 else st.session_state.pred_G[0] st.session_state.validation_mu_cached = st.session_state.pred_mu[0][0] if st.session_state.pred_mu.ndim > 1 else st.session_state.pred_mu[0] st.session_state.validation_lambda_cached = st.session_state.lambda_max_mean # ULTRA-STABLE LAYOUT: Single column layout to prevent trembling # Removed predicted values display to eliminate trembling # ULTRA-CRITICAL FIX: Use session state to control button behavior if 'validation_button_clicked' not in st.session_state: st.session_state.validation_button_clicked = False if 'validation_running' not in st.session_state: st.session_state.validation_running = False if 'validation_complete' not in st.session_state: st.session_state.validation_complete = False # STABLE BUTTON: Only show if not running if not st.session_state.validation_running: if st.button("๐Ÿš€ Run Validation Simulation", type="primary", key="validation_btn_stable"): st.session_state.validation_button_clicked = True st.session_state.validation_running = True st.session_state.validation_complete = False st.rerun() # HANDLE SIMULATION EXECUTION if st.session_state.validation_running and not st.session_state.validation_complete: # Check required data if st.session_state.lambda_max_mean is None: st.error("No lambda_max_mean loaded. Please load data first.") st.session_state.validation_running = False return # Show status in fixed container status_placeholder = st.empty() status_placeholder.info("๐Ÿ”„ Running simulation... Please wait") try: # Use cached values - absolutely no re-computation G_value = st.session_state.validation_G_cached mu_value = st.session_state.validation_mu_cached # Initialize and run simulation bubble_sim = OptimizedBubbleSimulation() start_time = time.time() t_sim, R_sim = bubble_sim.run_optimized_simulation( G_value, mu_value, st.session_state.validation_lambda_cached ) simulation_time = time.time() - start_time # Store ALL results in session state st.session_state.validation_t_sim = t_sim st.session_state.validation_R_sim = R_sim st.session_state.validation_simulation_time = simulation_time # Calculate and store error metrics rmse = mae = max_error = 0 if len(t_sim) > 0 and len(st.session_state.t_interp_newunit) > 0: t_min = max(st.session_state.t_interp_newunit[0], t_sim[0]) t_max = min(st.session_state.t_interp_newunit[-1], t_sim[-1]) if t_max > t_min: f_sim = interp1d(t_sim, R_sim, kind='linear', bounds_error=False, fill_value='extrapolate') mask = (st.session_state.t_interp_newunit >= t_min) & (st.session_state.t_interp_newunit <= t_max) t_common = st.session_state.t_interp_newunit[mask] R_exp_common = st.session_state.R_interp_newunit[mask] R_sim_common = f_sim(t_common) if len(R_exp_common) > 0: rmse = np.sqrt(np.mean((R_exp_common - R_sim_common) ** 2)) mae = np.mean(np.abs(R_exp_common - R_sim_common)) max_error = np.max(np.abs(R_exp_common - R_sim_common)) # Store metrics in session state st.session_state.validation_rmse = rmse st.session_state.validation_mae = mae st.session_state.validation_max_error = max_error # Clear status and mark complete status_placeholder.empty() st.session_state.validation_running = False st.session_state.validation_complete = True # Rerun to show results st.rerun() except Exception as e: st.session_state.validation_running = False st.session_state.validation_error = str(e) st.rerun() # DISPLAY RESULTS (only if complete) if st.session_state.validation_complete and 'validation_t_sim' in st.session_state: # Create comparison plot fig, ax = plt.subplots(figsize=(10, 6)) ax.plot(st.session_state.t_interp_newunit, st.session_state.R_interp_newunit, 'ro', markersize=4, label='Interpolated (Experimental)', alpha=0.7) ax.plot(st.session_state.validation_t_sim, st.session_state.validation_R_sim, 'b-', linewidth=2, label='Simulated (Predicted G & ฮผ)') ax.set_xlabel('Time (0.1 ms)') ax.set_ylabel('Radius (0.1 mm)') ax.set_title('Validation: Experimental vs Simulated R-t Curves') ax.grid(True, alpha=0.3) ax.legend() # Removed error metrics from plot to prevent trembling plt.tight_layout() st.pyplot(fig) # Removed metrics display to eliminate trembling # Results summary st.success("โœ… Validation simulation completed!") st.info(f"""**Validation Results:** **Predicted Values:** - Shear Modulus (G): {st.session_state.validation_G_cached:.2e} Pa - Viscosity (ฮผ): {st.session_state.validation_mu_cached:.4f} Paยทs - Lambda Max: {st.session_state.validation_lambda_cached:.3f} **Performance:** - Simulation Time: {st.session_state.validation_simulation_time:.2f} seconds - Simulated Points: {len(st.session_state.validation_R_sim)} - Time Range: {st.session_state.validation_t_sim[0]:.3f} to {st.session_state.validation_t_sim[-1]:.3f} (0.1 ms) """) # Reset button if st.button("๐Ÿ”„ Run New Simulation", key="reset_validation"): # Clear all validation session state for key in list(st.session_state.keys()): if key.startswith('validation_'): del st.session_state[key] st.rerun() # Handle simulation error if hasattr(st.session_state, 'validation_error'): st.error(f"Simulation failed: {st.session_state.validation_error}") if st.button("๐Ÿ”„ Try Again", key="retry_validation"): del st.session_state.validation_error st.session_state.validation_running = False st.rerun() # Additional CSS to ensure zero movement st.markdown(""" """, unsafe_allow_html=True) def show_results(): """Results and export interface""" st.markdown('

๐Ÿ“Š Results & Export

', unsafe_allow_html=True) if not st.session_state.processed_data: st.warning("No processed data available.") return # Summary of all results st.subheader("๐Ÿ“‹ Analysis Summary") col1, col2 = st.columns(2) with col1: st.markdown("**Data Information:**") if st.session_state.data_loaded: st.write(f"โœ… Datasets loaded: {st.session_state.num_datasets}") st.write(f"โœ… Lambda max: {st.session_state.lambda_max_mean:.3f}") st.write(f"โœ… Selected dataset: {st.session_state.get('selected_dataset', 'N/A') + 1}") if st.session_state.processed_data: st.write(f"โœ… Interpolated points: {len(st.session_state.R_interp_newunit)}") with col2: st.markdown("**ML Predictions:**") if 'pred_G' in st.session_state: G_value = st.session_state.pred_G[0][0] if st.session_state.pred_G.ndim > 1 else st.session_state.pred_G[0] mu_value = st.session_state.pred_mu[0][0] if st.session_state.pred_mu.ndim > 1 else \ st.session_state.pred_mu[0] st.write(f"๐ŸŽฏ Shear Modulus (G): {G_value:.2e} Pa") st.write(f"๐ŸŽฏ Viscosity (ฮผ): {mu_value:.4f} Paยทs") else: st.write("โŒ No predictions available") # Export options st.subheader("๐Ÿ’พ Export Options") export_col1, export_col2, export_col3 = st.columns(3) with export_col1: if st.session_state.processed_data: # Export interpolated data data_str = ' '.join([f'{val:.6f}' for val in st.session_state.R_interp_newunit[:-1]]) st.download_button( label="๐Ÿ“ฅ Download Interpolated Data", data=data_str, file_name="interpolated_bubble_data.txt", mime="text/plain" ) with export_col2: if 'pred_G' in st.session_state: # Export predictions G_value = st.session_state.pred_G[0][0] if st.session_state.pred_G.ndim > 1 else st.session_state.pred_G[0] mu_value = st.session_state.pred_mu[0][0] if st.session_state.pred_mu.ndim > 1 else \ st.session_state.pred_mu[0] pred_summary = f"""Bubble Dynamics Analysis Results Dataset: {st.session_state.get('selected_dataset', 'N/A') + 1} Lambda Max: {st.session_state.lambda_max_mean:.3f} ML Predictions: Shear Modulus (G): {G_value:.2e} Pa Viscosity (ฮผ): {mu_value:.4f} Paยทs Analysis completed on: {time.strftime('%Y-%m-%d %H:%M:%S')} """ st.download_button( label="๐Ÿ“‹ Download Results Summary", data=pred_summary, file_name="bubble_analysis_results.txt", mime="text/plain" ) with export_col3: if 'validation_t_sim' in st.session_state: # Export simulation data sim_data = np.column_stack([st.session_state.validation_t_sim, st.session_state.validation_R_sim]) sim_str = '\n'.join([f'{t:.6f}\t{r:.6f}' for t, r in sim_data]) st.download_button( label="๐Ÿ”ฌ Download Simulation Data", data=sim_str, file_name="simulation_results.txt", mime="text/plain" ) # Session reset st.subheader("๐Ÿ”„ Reset Session") if st.button("๐Ÿ—‘๏ธ Clear All Data", type="secondary"): for key in list(st.session_state.keys()): del st.session_state[key] st.success("โœ… Session cleared! Refresh the page to start over.") if __name__ == "__main__": main()