import streamlit as st
import numpy as np
import matplotlib.pyplot as plt
from scipy.io import loadmat
from scipy.interpolate import PchipInterpolator, interp1d
import io
import time
import tempfile
import os
###########################################################################
# Configure Streamlit page
st.set_page_config(
page_title="Bubble Dynamics Analysis",
page_icon="๐ซง",
layout="wide",
initial_sidebar_state="expanded"
)
# Try importing TensorFlow
try:
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
TENSORFLOW_AVAILABLE = True
except ImportError:
TENSORFLOW_AVAILABLE = False
# ULTRA-AGGRESSIVE CSS FIX - Complete stability with UT Austin background and Logo
st.markdown("""
""", unsafe_allow_html=True)
# Initialize session state
if 'data_loaded' not in st.session_state:
st.session_state.data_loaded = False
if 'processed_data' not in st.session_state:
st.session_state.processed_data = False
if 'model_loaded' not in st.session_state:
st.session_state.model_loaded = False
# ULTRA-OPTIMIZED BubbleSimulation class - ZERO UI UPDATES during simulation
class OptimizedBubbleSimulation:
"""
ULTRA-OPTIMIZED version - ZERO UI updates during simulation to prevent any trembling
"""
def __init__(self):
self.setup_parameters()
def setup_parameters(self):
# Fixed parameters (same as original MATLAB)
self.R0 = 35e-6
self.P_inf = 101325
self.T_inf = 298.15
self.cav_type = 'LIC'
# Material parameters (same as original)
self.c_long = 1700
self.alpha = 0.0
self.rho = 1000
self.gamma = 0.0725
# Parameters for bubble contents (same as original)
self.D0 = 24.2e-6
self.kappa = 1.4
self.Ru = 8.3144598
self.Rv = self.Ru / (18.01528e-3)
self.Ra = self.Ru / (28.966e-3)
self.A = 5.28e-5
self.B = 1.17e-2
self.P_ref = 1.17e11
self.T_ref = 5200
# OPTIMIZED numerical parameters for speed
self.NT = 100 # Reduced from 500 to 100 (5x faster, still accurate)
self.RelTol = 1e-5 # Relaxed from 1e-7 to 1e-5 (faster convergence)
def run_optimized_simulation(self, G, mu, lambda_max_mean=None):
"""ULTRA-OPTIMIZED simulation - ZERO UI updates to prevent trembling"""
from scipy.integrate import solve_ivp
# Set lambdamax from loaded data or use default
if lambda_max_mean is not None:
self.lambdamax = lambda_max_mean
print(f"Using lambda_max_mean = {self.lambdamax} from .mat file")
else:
self.lambdamax = 5.99 # Fallback default
print(f"Warning: Using default lambda_max = {self.lambdamax}")
print(f"Running ULTRA-OPTIMIZED simulation with predicted G={G:.2e} Pa, ฮผ={mu:.4f} Paยทs")
# Use predicted values
self.G = G
self.mu = mu
# Setup (same as original)
if self.cav_type == 'LIC':
self.Rmax = self.lambdamax * self.R0
self.PA = 0
self.omega = 0
self.delta = 0
self.n = 0
if self.cav_type == 'LIC':
self.Rc = self.Rmax
self.Uc = np.sqrt(self.P_inf / self.rho)
self.tc = self.Rmax / self.Uc
self.tspan = 3 * self.tc # Reduced for speed
# Calculate parameters (same as original)
self.Pv = self.P_ref * np.exp(-self.T_ref / self.T_inf)
self.K_inf = self.A * self.T_inf + self.B
# Non-dimensional variables (same as original)
self.C_star = self.c_long / self.Uc
self.We = self.P_inf * self.Rc / (2 * self.gamma)
self.Ca = self.P_inf / self.G
self.Re = self.P_inf * self.Rc / (self.mu * self.Uc)
self.fom = self.D0 / (self.Uc * self.Rc)
self.chi = self.T_inf * self.K_inf / (self.P_inf * self.Rc * self.Uc)
self.A_star = self.A * self.T_inf / self.K_inf
self.B_star = self.B / self.K_inf
self.Pv_star = self.Pv / self.P_inf
self.tspan_star = self.tspan / self.tc
self.Req = self.R0 / self.Rmax
self.PA_star = self.PA / self.P_inf
self.omega_star = self.omega * self.tc
self.delta_star = self.delta / self.tc
# Parameters vector (same as original)
self.params = [self.NT, self.C_star, self.We, self.Ca, self.alpha, self.Re,
self.Rv, self.Ra, self.kappa, self.fom, self.chi, self.A_star,
self.B_star, self.Pv_star, self.Req, self.PA_star,
self.omega_star, self.delta_star, self.n]
# Initial conditions (same as original)
R0_star = 1
U0_star = 0
Theta0 = np.zeros(self.NT)
if self.cav_type == 'LIC':
P0 = (self.Pv + (self.P_inf + 2 * self.gamma / self.R0 - self.Pv) *
((self.R0 / self.Rmax) ** 3))
P0_star = P0 / self.P_inf
S0 = ((3 * self.alpha - 1) * (5 - 4 * self.Req - self.Req ** 4) / (2 * self.Ca) +
2 * self.alpha * (27 / 40 + self.Req ** 8 / 8 + self.Req ** 5 / 5 +
self.Req ** 2 - 2 / self.Req) / self.Ca)
k0 = ((1 + (self.Rv / self.Ra) * (P0_star / self.Pv_star - 1)) ** (-1)) * np.ones(self.NT)
X0 = np.concatenate([[R0_star, U0_star, P0_star, S0], Theta0, k0])
print(f"State vector size: {len(X0)} (4 + {self.NT} + {self.NT})")
print(f"Time span: 0 to {self.tspan_star:.4f}")
# CRITICAL FIX: NO UI UPDATES AT ALL - store them for later
self.simulation_status = "Starting simulation..."
# ULTRA-OPTIMIZED ODE solving - NO UI updates during solving
try:
sol = solve_ivp(
self.bubble_optimized,
[0, self.tspan_star],
X0,
method='BDF',
rtol=self.RelTol,
atol=1e-8,
max_step=self.tspan_star / 200,
dense_output=False
)
self.simulation_status = "Processing results..."
except Exception as e:
print(f"BDF failed: {str(e)}, trying LSODA...")
self.simulation_status = "Trying backup solver..."
try:
sol = solve_ivp(
self.bubble_optimized,
[0, self.tspan_star],
X0,
method='LSODA',
rtol=1e-4,
atol=1e-7,
max_step=self.tspan_star / 100,
)
except Exception as e2:
print(f"All solvers failed: {str(e2)}")
return self.fast_fallback()
if not sol.success:
print(f"Solver failed: {sol.message}")
return self.fast_fallback()
# Extract solution
t_nondim = sol.t
X_nondim = sol.y.T
R_nondim = X_nondim[:, 0]
# Filter valid solutions
valid_mask = (R_nondim > 0.01) & (R_nondim < 20) & np.isfinite(R_nondim)
t_nondim = t_nondim[valid_mask]
R_nondim = R_nondim[valid_mask]
if len(t_nondim) < 10:
print("Too few valid points, using fast fallback")
return self.fast_fallback()
# Back to physical units
t = t_nondim * self.tc
R = R_nondim * self.Rc
# Change units
scale = 1e4
t_newunit = t * scale
R_newunit = R * scale
self.simulation_status = "Simulation complete!"
print(f"ULTRA-OPTIMIZED simulation completed in {len(t_newunit)} points!")
print(f"Time range: {t_newunit[0]:.3f} to {t_newunit[-1]:.3f} (0.1 ms)")
print(f"Radius range: {np.min(R_newunit):.3f} to {np.max(R_newunit):.3f} (0.1 mm)")
return t_newunit, R_newunit
def bubble_optimized(self, t, x):
"""
OPTIMIZED bubble physics function - same physics, no UI updates
"""
# Extract parameters (same as original)
NT = int(self.params[0])
C_star = self.params[1]
We = self.params[2]
Ca = self.params[3]
alpha = self.params[4]
Re = self.params[5]
Rv = self.params[6]
Ra = self.params[7]
kappa = self.params[8]
fom = self.params[9]
chi = self.params[10]
A_star = self.params[11]
B_star = self.params[12]
Pv_star = self.params[13]
Req = self.params[14]
# Extract state variables (same as original)
R = x[0]
U = x[1]
P = x[2]
S = x[3]
Theta = x[4:4 + NT]
k = x[4 + NT:4 + 2 * NT]
# Grid setup (same physics, fewer points)
deltaY = 1 / (NT - 1)
ii = np.arange(1, NT + 1)
yk = (ii - 1) * deltaY
# Boundary condition (same as original)
k = k.copy()
k[-1] = (1 + (Rv / Ra) * (P / Pv_star - 1)) ** (-1)
# Calculate mixture fields (same physics)
T = (A_star - 1 + np.sqrt(1 + 2 * A_star * Theta)) / A_star
K_star = A_star * T + B_star
Rmix = k * Rv + (1 - k) * Ra
# OPTIMIZATION: Vectorized spatial derivatives for speed
DTheta = np.zeros(NT)
DDTheta = np.zeros(NT)
Dk = np.zeros(NT)
DDk = np.zeros(NT)
# Neumann BC at origin
DTheta[0] = 0
Dk[0] = 0
# Vectorized central differences (faster than loops)
if NT >= 3:
DTheta[1:-1] = (Theta[2:] - Theta[:-2]) / (2 * deltaY)
Dk[1:-1] = (k[2:] - k[:-2]) / (2 * deltaY)
# Backward difference at wall
DTheta[-1] = (3 * Theta[-1] - 4 * Theta[-2] + Theta[-3]) / (2 * deltaY)
Dk[-1] = (3 * k[-1] - 4 * k[-2] + k[-3]) / (2 * deltaY)
# Laplacians (vectorized where possible)
DDTheta[0] = 6 * (Theta[1] - Theta[0]) / deltaY ** 2
DDk[0] = 6 * (k[1] - k[0]) / deltaY ** 2
if NT >= 3:
# Vectorized Laplacian calculation
for i in range(1, NT - 1):
DDTheta[i] = ((Theta[i + 1] - 2 * Theta[i] + Theta[i - 1]) / deltaY ** 2 +
(2 / yk[i]) * DTheta[i] if yk[i] > 1e-12 else
(Theta[i + 1] - 2 * Theta[i] + Theta[i - 1]) / deltaY ** 2)
DDk[i] = ((k[i + 1] - 2 * k[i] + k[i - 1]) / deltaY ** 2 +
(2 / yk[i]) * Dk[i] if yk[i] > 1e-12 else
(k[i + 1] - 2 * k[i] + k[i - 1]) / deltaY ** 2)
if NT >= 4:
DDTheta[-1] = ((2 * Theta[-1] - 5 * Theta[-2] + 4 * Theta[-3] - Theta[-4]) / deltaY ** 2 +
(2 / yk[-1]) * DTheta[-1] if yk[-1] > 1e-12 else
(2 * Theta[-1] - 5 * Theta[-2] + 4 * Theta[-3] - Theta[-4]) / deltaY ** 2)
DDk[-1] = ((2 * k[-1] - 5 * k[-2] + 4 * k[-3] - k[-4]) / deltaY ** 2 +
(2 / yk[-1]) * Dk[-1] if yk[-1] > 1e-12 else
(2 * k[-1] - 5 * k[-2] + 4 * k[-3] - k[-4]) / deltaY ** 2)
# Internal pressure evolution (same physics)
if Rmix[-1] > 1e-12 and (1 - k[-1]) > 1e-12 and R > 1e-12:
pdot = (3 / R * (-kappa * P * U + (kappa - 1) * chi * DTheta[-1] / R +
kappa * P * fom * Rv * Dk[-1] / (R * Rmix[-1] * (1 - k[-1]))))
else:
pdot = -3 * kappa * P * U / R if R > 1e-12 else 0
# OPTIMIZATION: Vectorized mixture velocity calculation
Umix = np.zeros(NT)
valid_indices = (Rmix > 1e-12) & (kappa * P > 1e-12)
if np.any(valid_indices):
idx = np.where(valid_indices)[0]
Umix[idx] = (((kappa - 1) * chi / R * DTheta[idx] - R * yk[idx] * pdot / 3) / (kappa * P) +
fom / R * (Rv - Ra) / Rmix[idx] * Dk[idx])
# Temperature evolution (vectorized where possible)
Theta_prime = np.zeros(NT)
valid_P = P > 1e-12
if valid_P:
for i in range(NT - 1): # Exclude wall point
if Rmix[i] > 1e-12:
Theta_prime[i] = (
(pdot + DDTheta[i] * chi / R ** 2) * (K_star[i] * T[i] / P * (kappa - 1) / kappa) -
DTheta[i] * (Umix[i] - yk[i] * U) / R +
fom / R ** 2 * (Rv - Ra) / Rmix[i] * Dk[i] * DTheta[i])
Theta_prime[-1] = 0 # Dirichlet BC
# Vapor concentration evolution (vectorized where possible)
k_prime = np.zeros(NT)
for i in range(NT - 1): # Exclude wall point
if Rmix[i] > 1e-12 and T[i] > 1e-12:
term1 = fom / R ** 2 * (DDk[i] + Dk[i] * (-((Rv - Ra) / Rmix[i]) * Dk[i] -
DTheta[i] / np.sqrt(1 + 2 * A_star * Theta[i]) / T[i]))
term2 = -(Umix[i] - U * yk[i]) / R * Dk[i]
k_prime[i] = term1 + term2
k_prime[-1] = 0 # Dirichlet BC
# Elastic stress evolution (same physics)
if self.cav_type == 'LIC':
if Req > 1e-12:
Rst = R / Req
if Rst > 1e-12:
Sdot = (2 * U / R * (3 * alpha - 1) * (1 / Rst + 1 / Rst ** 4) / Ca -
2 * alpha * U / R * (1 / Rst ** 8 + 1 / Rst ** 5 + 2 / Rst ** 2 + 2 * Rst) / Ca)
else:
Sdot = 0
else:
Sdot = 0
# Keller-Miksis equations (same physics)
rdot = U
if R > 1e-12:
numerator = ((1 + U / C_star) * (P - 1 / (We * R) + S - 4 * U / (Re * R) - 1) +
R / C_star * (pdot + U / (We * R ** 2) + Sdot + 4 * U ** 2 / (Re * R ** 2)) -
(3 / 2) * (1 - U / (3 * C_star)) * U ** 2)
denominator = (1 - U / C_star) * R + 4 / (C_star * Re)
if abs(denominator) > 1e-12:
udot = numerator / denominator
else:
udot = 0
else:
udot = 0
# Return derivatives
dxdt = np.concatenate([[rdot, udot, pdot, Sdot], Theta_prime, k_prime])
return dxdt
def fast_fallback(self):
"""Faster fallback for validation"""
print("Using fast analytical approximation for validation")
# Quick analytical approximation based on Rayleigh-Plesset equation
t_nondim = np.linspace(0, 3, 200) # Fewer points for speed
# Simple damped oscillation model using actual parameters
if hasattr(self, 'P_inf') and hasattr(self, 'rho') and hasattr(self, 'lambdamax'):
Rmax = self.lambdamax * self.R0 # Use dynamic lambda value
omega_natural = np.sqrt(3 * self.P_inf / (self.rho * Rmax ** 2))
damping = self.mu / (self.rho * Rmax ** 2) if hasattr(self, 'mu') else 0.1
else:
omega_natural = 1000
damping = 0.1
# Analytical solution approximation
omega_d = omega_natural * np.sqrt(1 - damping ** 2) if damping < 1 else omega_natural
decay = np.exp(-damping * omega_natural * t_nondim * self.tc)
R_nondim = self.Req + (1 - self.Req) * decay * np.cos(omega_d * t_nondim * self.tc)
R_nondim = np.maximum(R_nondim, 0.05) # Prevent negative values
# Convert to physical units
t = t_nondim * self.tc
R = R_nondim * self.Rc
scale = 1e4
return t * scale, R * scale
# Main Streamlit App
def main():
# Header - ULTRA-STABLE with fixed positioning and UT Austin branding + Logo
st.markdown("""
""", unsafe_allow_html=True)
# Initialize current page in session state
if 'current_page' not in st.session_state:
st.session_state.current_page = "๐ Home"
# Sidebar for navigation with clickable menu
st.sidebar.title("๐ Navigation")
# Create clickable menu buttons
menu_items = [
"๐ Home",
"๐ Data Loading",
"โ๏ธ Data Processing",
"๐ค ML Prediction",
"โ
Validation",
"๐ Results & Export"
]
# Display menu buttons
for item in menu_items:
# Check if this is the current page to highlight it
if st.session_state.current_page == item:
# Use different styling for active page
if st.sidebar.button(f"โถ๏ธ {item}", key=f"nav_{item}", use_container_width=True):
st.session_state.current_page = item
st.rerun()
else:
if st.sidebar.button(f" {item}", key=f"nav_{item}", use_container_width=True):
st.session_state.current_page = item
st.rerun()
# Add some spacing
st.sidebar.markdown("---")
# Show current status in sidebar
st.sidebar.markdown("### ๐ Status")
if st.session_state.data_loaded:
st.sidebar.success("โ
Data loaded")
else:
st.sidebar.info("๐ No data loaded")
if st.session_state.processed_data:
st.sidebar.success("โ
Data processed")
else:
st.sidebar.info("โ๏ธ Data not processed")
if st.session_state.model_loaded:
st.sidebar.success("โ
Model loaded")
else:
st.sidebar.info("๐ค No model loaded")
# Display the selected page
page = st.session_state.current_page
if page == "๐ Home":
show_home()
elif page == "๐ Data Loading":
show_data_loading()
elif page == "โ๏ธ Data Processing":
show_data_processing()
elif page == "๐ค ML Prediction":
show_ml_prediction()
elif page == "โ
Validation":
show_validation()
elif page == "๐ Results & Export":
show_results()
def show_home():
"""Home page with overview"""
col1, col2 = st.columns([2, 1])
with col1:
st.markdown("""
### Welcome to the YANG Research Group Bubble Dynamics Analysis Platform
**The University of Texas at Austin - Aerospace Engineering and Engineering Mechanics**
**Cockrell School of Engineering**
This advanced web application provides comprehensive tools for analyzing bubble dynamics data:
**Features:**
- ๐ **Data Loading**: Upload and analyze .mat files containing bubble dynamics data
- โ๏ธ **Data Processing**: Interpolate and process experimental data
- ๐ค **ML Prediction**: Use machine learning to predict material properties (G & ฮผ)
- โ
**Validation**: Compare experimental vs simulated bubble behavior
- ๐ **Export**: Download processed results and visualizations
**Getting Started:**
1. Navigate to "Data Loading" to upload your .mat file
2. Process your data in "Data Processing"
3. Use ML models in "ML Prediction"
4. Validate results in "Validation"
5. Export your findings in "Results & Export"
""")
with col2:
if TENSORFLOW_AVAILABLE:
st.success("""
**โ
System Status: Full Features Available**
โ
Core Features: Ready
โ
Data Processing: Ready
โ
Visualization: Ready
โ
ML Models: Ready
โ
Simulations: Ready
""")
else:
st.warning("""
**โ ๏ธ System Status: Limited Features**
โ
Core Features: Ready
โ
Data Processing: Ready
โ
Visualization: Ready
โ ML Models: TensorFlow not installed
โ
Simulations: Ready
๐ก Install TensorFlow to enable ML predictions:
`pip install tensorflow-cpu`
""")
# Show current session state
if st.session_state.data_loaded:
st.success("๐ Data loaded successfully")
if st.session_state.processed_data:
st.success("โ๏ธ Data processed")
if st.session_state.model_loaded:
st.success("๐ค ML model loaded")
def show_data_loading():
"""Data loading interface"""
st.markdown('', unsafe_allow_html=True)
uploaded_file = st.file_uploader(
"Upload your .mat file",
type=['mat'],
help="Upload a MATLAB .mat file containing 'R_nondim_All', 't_nondim_All', and 'lambda_max_mean'"
)
if uploaded_file is not None:
try:
# Save uploaded file temporarily
with tempfile.NamedTemporaryFile(delete=False, suffix='.mat') as tmp_file:
tmp_file.write(uploaded_file.getvalue())
tmp_file_path = tmp_file.name
# Load the .mat file
data = loadmat(tmp_file_path)
# Check required variables
required_vars = ['R_nondim_All', 't_nondim_All']
missing_vars = [var for var in required_vars if var not in data]
if missing_vars:
st.error(f"Missing required variables: {missing_vars}")
return
# Extract data
R_nondim_all = data['R_nondim_All']
t_nondim_all = data['t_nondim_All']
num_datasets = R_nondim_all.shape[1]
# Extract lambda_max_mean
if 'lambda_max_mean' in data:
lambda_max_mean = float(data['lambda_max_mean'])
else:
st.warning("lambda_max_mean not found in file. Using default value 5.99")
lambda_max_mean = 5.99
# Store in session state
st.session_state.data = data
st.session_state.R_nondim_all = R_nondim_all
st.session_state.t_nondim_all = t_nondim_all
st.session_state.lambda_max_mean = lambda_max_mean
st.session_state.num_datasets = num_datasets
st.session_state.data_loaded = True
# Calculate physical parameters
R0_sim = 35e-6
P_inf_exp = 101325
rho_exp = 1000
Rmax_exp = lambda_max_mean * R0_sim
Rc_exp = Rmax_exp
Uc_exp = np.sqrt(P_inf_exp / rho_exp)
tc_exp = Rc_exp / Uc_exp
st.session_state.physical_params = {
'Rmax_exp': Rmax_exp,
'Rc_exp': Rc_exp,
'Uc_exp': Uc_exp,
'tc_exp': tc_exp
}
# Display success message
st.markdown(f"""
โ
Data loaded successfully!
๐ Datasets found: {num_datasets}
๐ฏ Lambda max: {lambda_max_mean:.3f}
๐ Physical parameters calculated
""", unsafe_allow_html=True)
# Show data preview
col1, col2 = st.columns(2)
with col1:
st.subheader("๐ Data Overview")
st.write(f"**Number of datasets:** {num_datasets}")
st.write(f"**Lambda max mean:** {lambda_max_mean:.3f}")
st.write(f"**Data shape:** {R_nondim_all.shape}")
with col2:
st.subheader("๐ง Physical Parameters")
st.write(f"**R_max:** {Rmax_exp * 1e6:.1f} ฮผm")
st.write(f"**Time scale:** {tc_exp * 1e6:.1f} ฮผs")
st.write(f"**Velocity scale:** {Uc_exp:.1f} m/s")
# Clean up temporary file
os.unlink(tmp_file_path)
except Exception as e:
st.error(f"Error loading file: {str(e)}")
def show_data_processing():
"""Data processing interface"""
st.markdown('', unsafe_allow_html=True)
if not st.session_state.data_loaded:
st.warning("Please load data first in the 'Data Loading' section.")
return
# Dataset selection
dataset_idx = st.selectbox(
"Select dataset to process:",
range(st.session_state.num_datasets),
format_func=lambda x: f"Dataset {x + 1}"
)
# Processing parameters
col1, col2 = st.columns(2)
with col1:
interp_range = st.number_input(
"Interpolation Range",
min_value=0.1,
max_value=2.0,
value=0.8,
step=0.1,
help="Time range for interpolation"
)
with col2:
time_step = st.number_input(
"Time Step",
min_value=0.001,
max_value=0.1,
value=0.008,
step=0.001,
format="%.3f",
help="Time step for interpolation"
)
if st.button("๐ Process Data", type="primary"):
with st.spinner("Processing data..."):
try:
# Extract data for selected dataset
R_nondim_exp = np.array(st.session_state.R_nondim_all[0, dataset_idx]).flatten()
t_nondim_exp = np.array(st.session_state.t_nondim_all[0, dataset_idx]).flatten()
# Find zero index
zero_candidates = np.where(np.abs(t_nondim_exp) < 1e-10)[0]
if len(zero_candidates) > 0:
zero_idx = zero_candidates[0]
else:
zero_idx = np.argmin(np.abs(t_nondim_exp))
# Convert to physical units
tc_exp = st.session_state.physical_params['tc_exp']
Rc_exp = st.session_state.physical_params['Rc_exp']
t_exp = t_nondim_exp * tc_exp
R_exp = R_nondim_exp * Rc_exp
# Process from zero point
t_fromzero = t_exp[zero_idx:].flatten()
R_frommax = R_exp[zero_idx:].flatten()
# Scale to new units
scale_exp = 1e4
t_newunit_exp = t_fromzero * scale_exp
R_newunit_exp = R_frommax * scale_exp
# Sort data
sort_indices = np.argsort(t_newunit_exp)
t_newunit_exp = t_newunit_exp[sort_indices]
R_newunit_exp = R_newunit_exp[sort_indices]
# Interpolate
t_interp_newunit = np.arange(0, interp_range + time_step, time_step)
pchip_interpolator = PchipInterpolator(t_newunit_exp, R_newunit_exp)
R_interp_newunit = pchip_interpolator(t_interp_newunit)
# Store results
st.session_state.t_interp_newunit = t_interp_newunit
st.session_state.R_interp_newunit = R_interp_newunit
st.session_state.t_original = t_newunit_exp
st.session_state.R_original = R_newunit_exp
st.session_state.processed_data = True
st.session_state.selected_dataset = dataset_idx
st.success(f"โ
Data processed successfully! {len(R_interp_newunit)} interpolated points created.")
except Exception as e:
st.error(f"Processing failed: {str(e)}")
# Show results if data is processed
if st.session_state.processed_data:
st.subheader("๐ Processing Results")
# Create plot
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 5))
# Original vs interpolated
ax1.plot(st.session_state.t_original, st.session_state.R_original, 'b-', linewidth=2, label='Original Data')
ax1.plot(st.session_state.t_interp_newunit, st.session_state.R_interp_newunit, 'ro', markersize=3,
label='Interpolated Points')
ax1.set_xlabel('Time (0.1 ms)')
ax1.set_ylabel('Radius (0.1 mm)')
ax1.set_title('Original vs Interpolated Data')
ax1.grid(True, alpha=0.3)
ax1.legend()
# Interpolated data only
ax2.plot(st.session_state.t_interp_newunit, st.session_state.R_interp_newunit, 'ro', markersize=4)
ax2.set_xlabel('Time (0.1 ms)')
ax2.set_ylabel('Radius (0.1 mm)')
ax2.set_title('Interpolated R-t Curve')
ax2.grid(True, alpha=0.3)
plt.tight_layout()
st.pyplot(fig)
# Download processed data
if st.button("๐พ Download Processed Data"):
# Create download data
data_str = ' '.join([f'{val:.6f}' for val in st.session_state.R_interp_newunit[:-1]])
st.download_button(
label="๐ฅ Download as TXT",
data=data_str,
file_name=f"interpolated_data_dataset_{dataset_idx + 1}.txt",
mime="text/plain"
)
def show_ml_prediction():
"""ML prediction interface - MODIFIED FOR FILE UPLOAD"""
st.markdown('', unsafe_allow_html=True)
if not TENSORFLOW_AVAILABLE:
st.error("โ **TensorFlow not available.** ML prediction features are disabled.")
with st.expander("๐ง How to Enable ML Features", expanded=True):
st.markdown("""
**To enable ML predictions:**
1. **Install TensorFlow:**
```bash
pip install tensorflow-cpu # Recommended (smaller)
# or
pip install tensorflow # Full version
```
2. **Restart the web app:**
- Stop the app (Ctrl+C in terminal)
- Run: `streamlit run streamlit_bubble_app.py`
- Refresh your browser
""")
return
if not st.session_state.processed_data:
st.warning("Please process data first in the 'Data Processing' section.")
return
# Model file upload section (MODIFIED)
st.subheader("๐ Upload ML Model")
col1, col2 = st.columns([2, 1])
with col1:
st.markdown("**Upload your trained model files:**")
# Primary model file upload
uploaded_model = st.file_uploader(
"Upload model file (.h5, .keras, or .zip for SavedModel)",
type=['h5', 'keras', 'zip'],
help="Upload your trained model in H5, Keras, or ZIP format (for SavedModel)",
key="model_file_upload"
)
# Optional config file upload
uploaded_config = st.file_uploader(
"Upload model config (optional)",
type=['npy'],
help="Upload model_config.npy if available",
key="config_file_upload"
)
with col2:
if st.button("๐ Model Format Help"):
st.info("""
**Supported model formats:**
**๐ฏ H5 Format (.h5)** - Recommended
- Single file containing model architecture and weights
- Most compatible format
**โก Keras Format (.keras)**
- Native Keras 3.0 format
- Single file format
**๐ฆ SavedModel (.zip)**
- Zip the entire SavedModel folder
- Should contain: saved_model.pb, variables/, assets/
**โ๏ธ Config File (.npy)** - Optional
- Contains model configuration metadata
- Helps with model information display
""")
# Model loading with uploaded files (MODIFIED)
if uploaded_model is not None and st.button("๐ Load Uploaded Model", type="primary"):
with st.spinner("Loading uploaded ML model..."):
try:
# Create temporary directory for uploaded files
temp_dir = tempfile.mkdtemp()
model_loaded = False
loading_method = "Unknown"
model = None
# Get file extension
file_extension = uploaded_model.name.split('.')[-1].lower()
# Define custom layers exactly matching your training code
class CustomMultiHeadAttention(layers.Layer):
def __init__(self, embed_dim, num_heads=8, **kwargs):
super(CustomMultiHeadAttention, self).__init__(**kwargs)
self.embed_dim = embed_dim
self.num_heads = num_heads
self.projection_dim = embed_dim // num_heads
self.query_dense = layers.Dense(embed_dim)
self.key_dense = layers.Dense(embed_dim)
self.value_dense = layers.Dense(embed_dim)
self.combine_heads = layers.Dense(embed_dim)
def attention(self, query, key, value):
score = tf.matmul(query, key, transpose_b=True)
dim_key = tf.cast(tf.shape(key)[-1], tf.float32)
scaled_score = score / tf.math.sqrt(dim_key)
weights = tf.nn.softmax(scaled_score, axis=-1)
output = tf.matmul(weights, value)
return output, weights
def separate_heads(self, x, batch_size):
x = tf.reshape(x, (batch_size, -1, self.num_heads, self.projection_dim))
return tf.transpose(x, perm=[0, 2, 1, 3])
def call(self, inputs):
batch_size = tf.shape(inputs)[0]
query = self.query_dense(inputs)
key = self.key_dense(inputs)
value = self.value_dense(inputs)
query = self.separate_heads(query, batch_size)
key = self.separate_heads(key, batch_size)
value = self.separate_heads(value, batch_size)
attention, weights = self.attention(query, key, value)
attention = tf.transpose(attention, perm=[0, 2, 1, 3])
concat_attention = tf.reshape(attention, (batch_size, -1, self.embed_dim))
output = self.combine_heads(concat_attention)
return output
def get_config(self):
config = super(CustomMultiHeadAttention, self).get_config()
config.update({
'embed_dim': self.embed_dim,
'num_heads': self.num_heads,
})
return config
@classmethod
def from_config(cls, config):
return cls(**config)
class CustomTransformerEncoderLayer(layers.Layer):
def __init__(self, embed_dim, num_heads, ff_dim, rate=0.1, **kwargs):
super(CustomTransformerEncoderLayer, self).__init__(**kwargs)
self.embed_dim = embed_dim
self.num_heads = num_heads
self.ff_dim = ff_dim
self.rate = rate
self.att = CustomMultiHeadAttention(embed_dim, num_heads)
self.ffn = keras.Sequential(
[layers.Dense(ff_dim, activation="softplus"), layers.Dense(embed_dim)])
self.layernorm1 = layers.LayerNormalization(epsilon=1e-6)
self.layernorm2 = layers.LayerNormalization(epsilon=1e-6)
self.dropout1 = layers.Dropout(rate)
self.dropout2 = layers.Dropout(rate)
def call(self, inputs, training):
attn_output = self.att(inputs)
attn_output = self.dropout1(attn_output, training=training)
out1 = self.layernorm1(inputs + attn_output)
ffn_output = self.ffn(out1)
ffn_output = self.dropout2(ffn_output, training=training)
return self.layernorm2(out1 + ffn_output)
def get_config(self):
config = super(CustomTransformerEncoderLayer, self).get_config()
config.update({
'embed_dim': self.embed_dim,
'num_heads': self.num_heads,
'ff_dim': self.ff_dim,
'rate': self.rate,
})
return config
@classmethod
def from_config(cls, config):
return cls(**config)
# Custom objects for model loading
custom_objects = {
'CustomMultiHeadAttention': CustomMultiHeadAttention,
'CustomTransformerEncoderLayer': CustomTransformerEncoderLayer
}
# Handle different file formats
if file_extension == 'h5':
# Handle H5 format
model_path = os.path.join(temp_dir, uploaded_model.name)
with open(model_path, 'wb') as f:
f.write(uploaded_model.getvalue())
try:
model = keras.models.load_model(model_path, custom_objects=custom_objects)
loading_method = "H5 format (uploaded)"
model_loaded = True
st.success("โ
Loaded H5 model successfully")
except Exception as e:
st.error(f"H5 loading failed: {str(e)}")
elif file_extension == 'keras':
# Handle Keras format
model_path = os.path.join(temp_dir, uploaded_model.name)
with open(model_path, 'wb') as f:
f.write(uploaded_model.getvalue())
try:
model = keras.models.load_model(model_path, custom_objects=custom_objects)
loading_method = "Keras format (uploaded)"
model_loaded = True
st.success("โ
Loaded Keras model successfully")
except Exception as e:
st.error(f"Keras loading failed: {str(e)}")
elif file_extension == 'zip':
# Handle SavedModel ZIP format
import zipfile
zip_path = os.path.join(temp_dir, uploaded_model.name)
with open(zip_path, 'wb') as f:
f.write(uploaded_model.getvalue())
# Extract ZIP file
extract_dir = os.path.join(temp_dir, 'extracted_model')
with zipfile.ZipFile(zip_path, 'r') as zip_ref:
zip_ref.extractall(extract_dir)
# Look for SavedModel directory
savedmodel_dirs = []
for root, dirs, files in os.walk(extract_dir):
if 'saved_model.pb' in files:
savedmodel_dirs.append(root)
if savedmodel_dirs:
try:
model = keras.models.load_model(savedmodel_dirs[0], custom_objects=custom_objects)
loading_method = "SavedModel format (uploaded ZIP)"
model_loaded = True
st.success("โ
Loaded SavedModel successfully")
except Exception as e:
# Try TFSMLayer as fallback
try:
model = layers.TFSMLayer(savedmodel_dirs[0], call_endpoint='serving_default')
loading_method = "TFSMLayer (uploaded ZIP fallback)"
model_loaded = True
st.success("โ
Loaded using TFSMLayer")
except Exception as e2:
st.error(f"SavedModel loading failed: {str(e)}, TFSMLayer failed: {str(e2)}")
else:
st.error("โ No SavedModel found in ZIP file. Ensure ZIP contains saved_model.pb")
if not model_loaded:
st.error("โ Failed to load uploaded model")
# Show debug information
st.subheader("๐ Debug Information")
st.write(f"**File name:** {uploaded_model.name}")
st.write(f"**File size:** {len(uploaded_model.getvalue()):,} bytes")
st.write(f"**File extension:** {file_extension}")
st.info("""
**Troubleshooting:**
- Ensure your model is saved in a compatible format
- For H5: Use `model.save('model.h5', save_format='h5')`
- For Keras: Use `model.save('model.keras')`
- For SavedModel: ZIP the entire SavedModel folder
- Verify custom layers are properly saved
""")
return
# Store in session state
st.session_state.loaded_model = model
st.session_state.model_name = uploaded_model.name
st.session_state.model_loaded = True
st.session_state.loading_method = loading_method
st.session_state.temp_model_dir = temp_dir
# Load config file if provided
model_config = None
if uploaded_config is not None:
try:
config_path = os.path.join(temp_dir, uploaded_config.name)
with open(config_path, 'wb') as f:
f.write(uploaded_config.getvalue())
model_config = np.load(config_path, allow_pickle=True).item()
st.session_state.model_config = model_config
st.success("โ
Config file loaded successfully")
except Exception as e:
st.warning(f"Config file loading failed: {str(e)}")
# Display model info
st.success(f"โ
Model uploaded and loaded successfully!")
with st.expander("๐ Model Information", expanded=False):
col1, col2 = st.columns(2)
with col1:
st.write(f"**Model file:** {uploaded_model.name}")
st.write(f"**Loading method:** {loading_method}")
st.write(f"**Model type:** {type(model).__name__}")
st.write(f"**File size:** {len(uploaded_model.getvalue()):,} bytes")
with col2:
try:
if hasattr(model, 'count_params'):
total_params = model.count_params()
st.write(f"**Total parameters:** {total_params:,}")
if hasattr(model, 'input_shape'):
st.write(f"**Input shape:** {model.input_shape}")
elif hasattr(model, 'input_spec'):
st.write(f"**Input spec:** Available")
# Show config info if available
if model_config:
st.write(f"**Sequence length:** {model_config.get('sequence_length', 'Unknown')}")
st.write(f"**Model type:** {model_config.get('model_type', 'Unknown')}")
except Exception as config_error:
st.write("**Configuration:** Unable to read")
except Exception as e:
st.error(f"โ Failed to load uploaded model: {str(e)}")
with st.expander("๐ Error Details"):
st.write(f"**Error type:** {type(e).__name__}")
st.write(f"**Error message:** {str(e)}")
st.write(f"**File name:** {uploaded_model.name if uploaded_model else 'None'}")
# Show current model status
if st.session_state.model_loaded:
method = st.session_state.get('loading_method', 'Unknown method')
model_name = st.session_state.get('model_name', 'Unknown model')
st.success(f"๐ค **Model Ready:** `{model_name}` ({method})")
# Input file selection and prediction (UNCHANGED)
if st.session_state.model_loaded:
st.subheader("๐ฅ Input Data")
col1, col2 = st.columns([2, 1])
# File uploader for input data
uploaded_input = st.file_uploader(
"Upload input data file",
type=['txt'],
help="Upload a text file with R-t curve data"
)
# Use current data button
if st.button("๐ Use Current Processed Data"):
if st.session_state.processed_data and 'R_interp_newunit' in st.session_state:
temp_data = ' '.join([f'{val:.6f}' for val in st.session_state.R_interp_newunit[:-1]])
st.session_state.temp_input_data = temp_data
st.session_state.using_current_data = True
st.success("โ
Current interpolated data ready for prediction")
else:
st.error("No processed data available. Please process data first.")
# Prediction interface
st.subheader("๐ฏ Make Predictions")
if st.button("๐ Predict G & ฮผ", type="primary"):
# Determine input data source
input_data = None
if st.session_state.get('using_current_data', False) and 'temp_input_data' in st.session_state:
try:
input_values = [float(x) for x in st.session_state.temp_input_data.split()]
input_data = np.array(input_values)
st.info("Using current processed data for prediction")
except Exception as e:
st.error(f"Error processing current data: {e}")
return
elif uploaded_input is not None:
try:
input_data = np.loadtxt(io.StringIO(uploaded_input.getvalue().decode()))
st.info("Using uploaded file for prediction")
except Exception as e:
st.error(f"Error reading uploaded file: {e}")
return
else:
st.error("Please select input data: use current processed data or upload a file")
return
# Run prediction (exactly matching your training code format) - UNCHANGED
with st.spinner("Running ML prediction..."):
try:
# Process input data exactly as in training
test_input_curves = input_data
if test_input_curves.ndim == 1:
test_input_curves = test_input_curves.reshape(1, -1)
# Ensure input size is 100 (matching your training)
if test_input_curves.shape[1] != 100:
if test_input_curves.shape[1] > 100:
test_input_curves = test_input_curves[:, :100]
else:
padding = np.zeros((test_input_curves.shape[0], 100 - test_input_curves.shape[1]))
test_input_curves = np.concatenate([test_input_curves, padding], axis=1)
# Reshape for model input (matching training: sequence_length, 1)
test_input_curves = test_input_curves.reshape(-1, 100, 1)
# Position inputs for transformer (exactly from training)
position_inputs = np.arange(100)
test_position_inputs = np.tile(position_inputs, (test_input_curves.shape[0], 1))
# Make prediction
start_time = time.time()
# Handle different model types
if st.session_state.loading_method == "TFSMLayer (uploaded ZIP fallback)":
# For TFSMLayer, call directly
predictions = st.session_state.loaded_model([test_input_curves, test_position_inputs])
if isinstance(predictions, dict):
# Extract from TFSMLayer output dictionary
predictions_g = predictions.get('g_output',
predictions.get('output_1', list(predictions.values())[0]))
predictions_mu = predictions.get('mu_output',
predictions.get('output_2', list(predictions.values())[1]))
else:
predictions_g, predictions_mu = predictions
else:
# Standard model prediction (matching training)
predictions_g, predictions_mu = st.session_state.loaded_model.predict(
[test_input_curves, test_position_inputs])
prediction_time = time.time() - start_time
# Process predictions (exactly from desktop GUI)
num_samples = 1
pred_G = predictions_g[:num_samples]
pred_mu = predictions_mu[:num_samples]
# Apply scaling (exactly matching training scaling)
pred_G_scaled = 10 ** (pred_G * (6 - 3) + 6 - 3)
pred_mu_scaled = 10 ** (pred_mu * (0 + 3) - 3)
# Store results
st.session_state.pred_G = pred_G_scaled
st.session_state.pred_mu = pred_mu_scaled
# Extract values for display
G_value = st.session_state.pred_G[0][0] if st.session_state.pred_G.ndim > 1 else \
st.session_state.pred_G[0]
mu_value = st.session_state.pred_mu[0][0] if st.session_state.pred_mu.ndim > 1 else \
st.session_state.pred_mu[0]
# Display results
col1, col2, col3 = st.columns(3)
with col1:
st.metric(
label="Shear Modulus (G)",
value=f"{G_value:.2e} Pa",
help="Predicted shear modulus of the material"
)
with col2:
st.metric(
label="Viscosity (ฮผ)",
value=f"{mu_value:.4f} Paยทs",
help="Predicted viscosity of the material"
)
with col3:
st.metric(
label="Prediction Time",
value=f"{prediction_time:.3f} s",
help="Time taken for ML inference"
)
# Show detailed results
result_text = f"G: {G_value:.2e} Pa, ฮผ: {mu_value:.4f}"
st.success(f"๐ **Prediction Results:** {result_text}")
detailed_results = f"""**Prediction completed successfully!**
**Shear Modulus (G):** {G_value:.2e} Pa
**Viscosity (ฮผ):** {mu_value:.4f} Paยทs
**Prediction Time:** {prediction_time:.4f} seconds ({prediction_time * 1000:.2f} ms)
Ready for validation simulation!"""
st.info(detailed_results)
# Enable validation
st.session_state.validation_ready = True
except Exception as e:
st.error(f"โ Prediction failed: {str(e)}")
with st.expander("๐ Debug Information"):
st.write(f"**Error:** {str(e)}")
st.write(f"**Model type:** {type(st.session_state.loaded_model).__name__}")
st.write(f"**Loading method:** {st.session_state.get('loading_method', 'Unknown')}")
if 'test_input_curves' in locals():
st.write(f"**Input shape:** {test_input_curves.shape}")
if 'test_position_inputs' in locals():
st.write(f"**Position shape:** {test_position_inputs.shape}")
# Reset using_current_data flag when file is uploaded
if uploaded_input is not None:
st.session_state.using_current_data = False
# Cleanup temporary files when session ends
if hasattr(st.session_state, 'temp_model_dir') and st.session_state.temp_model_dir:
# Note: In a real deployment, you might want to implement proper cleanup
# For now, the temporary directory will be cleaned up when the container restarts
pass
def show_validation():
"""ULTRA-STABLE Validation interface - ZERO trembling using session state control"""
st.markdown('', unsafe_allow_html=True)
if not st.session_state.processed_data:
st.warning("Please process data first.")
return
if not st.session_state.get('validation_ready', False) or 'pred_G' not in st.session_state or 'pred_mu' not in st.session_state:
st.warning("Please run ML prediction first to get material properties for validation.")
st.info("""
**Validation Process:**
1. ๐ Load experimental data
2. โ๏ธ Process and interpolate data
3. ๐ค Use ML model to predict G & ฮผ
4. โ
**Run validation simulation** (you are here)
5. ๐ Compare experimental vs simulated results
""")
return
st.subheader("๐ฌ IMR Simulation using predicted G and ฮผ vs Experimental R-t curve")
# REVOLUTIONARY FIX: Cache all static values in session state to prevent re-computation
if 'validation_G_cached' not in st.session_state:
st.session_state.validation_G_cached = st.session_state.pred_G[0][0] if st.session_state.pred_G.ndim > 1 else st.session_state.pred_G[0]
st.session_state.validation_mu_cached = st.session_state.pred_mu[0][0] if st.session_state.pred_mu.ndim > 1 else st.session_state.pred_mu[0]
st.session_state.validation_lambda_cached = st.session_state.lambda_max_mean
# ULTRA-STABLE LAYOUT: Single column layout to prevent trembling
# Removed predicted values display to eliminate trembling
# ULTRA-CRITICAL FIX: Use session state to control button behavior
if 'validation_button_clicked' not in st.session_state:
st.session_state.validation_button_clicked = False
if 'validation_running' not in st.session_state:
st.session_state.validation_running = False
if 'validation_complete' not in st.session_state:
st.session_state.validation_complete = False
# STABLE BUTTON: Only show if not running
if not st.session_state.validation_running:
if st.button("๐ Run Validation Simulation", type="primary", key="validation_btn_stable"):
st.session_state.validation_button_clicked = True
st.session_state.validation_running = True
st.session_state.validation_complete = False
st.rerun()
# HANDLE SIMULATION EXECUTION
if st.session_state.validation_running and not st.session_state.validation_complete:
# Check required data
if st.session_state.lambda_max_mean is None:
st.error("No lambda_max_mean loaded. Please load data first.")
st.session_state.validation_running = False
return
# Show status in fixed container
status_placeholder = st.empty()
status_placeholder.info("๐ Running simulation... Please wait")
try:
# Use cached values - absolutely no re-computation
G_value = st.session_state.validation_G_cached
mu_value = st.session_state.validation_mu_cached
# Initialize and run simulation
bubble_sim = OptimizedBubbleSimulation()
start_time = time.time()
t_sim, R_sim = bubble_sim.run_optimized_simulation(
G_value, mu_value, st.session_state.validation_lambda_cached
)
simulation_time = time.time() - start_time
# Store ALL results in session state
st.session_state.validation_t_sim = t_sim
st.session_state.validation_R_sim = R_sim
st.session_state.validation_simulation_time = simulation_time
# Calculate and store error metrics
rmse = mae = max_error = 0
if len(t_sim) > 0 and len(st.session_state.t_interp_newunit) > 0:
t_min = max(st.session_state.t_interp_newunit[0], t_sim[0])
t_max = min(st.session_state.t_interp_newunit[-1], t_sim[-1])
if t_max > t_min:
f_sim = interp1d(t_sim, R_sim, kind='linear', bounds_error=False, fill_value='extrapolate')
mask = (st.session_state.t_interp_newunit >= t_min) & (st.session_state.t_interp_newunit <= t_max)
t_common = st.session_state.t_interp_newunit[mask]
R_exp_common = st.session_state.R_interp_newunit[mask]
R_sim_common = f_sim(t_common)
if len(R_exp_common) > 0:
rmse = np.sqrt(np.mean((R_exp_common - R_sim_common) ** 2))
mae = np.mean(np.abs(R_exp_common - R_sim_common))
max_error = np.max(np.abs(R_exp_common - R_sim_common))
# Store metrics in session state
st.session_state.validation_rmse = rmse
st.session_state.validation_mae = mae
st.session_state.validation_max_error = max_error
# Clear status and mark complete
status_placeholder.empty()
st.session_state.validation_running = False
st.session_state.validation_complete = True
# Rerun to show results
st.rerun()
except Exception as e:
st.session_state.validation_running = False
st.session_state.validation_error = str(e)
st.rerun()
# DISPLAY RESULTS (only if complete)
if st.session_state.validation_complete and 'validation_t_sim' in st.session_state:
# Create comparison plot
fig, ax = plt.subplots(figsize=(10, 6))
ax.plot(st.session_state.t_interp_newunit, st.session_state.R_interp_newunit,
'ro', markersize=4, label='Interpolated (Experimental)', alpha=0.7)
ax.plot(st.session_state.validation_t_sim, st.session_state.validation_R_sim,
'b-', linewidth=2, label='Simulated (Predicted G & ฮผ)')
ax.set_xlabel('Time (0.1 ms)')
ax.set_ylabel('Radius (0.1 mm)')
ax.set_title('Validation: Experimental vs Simulated R-t Curves')
ax.grid(True, alpha=0.3)
ax.legend()
# Removed error metrics from plot to prevent trembling
plt.tight_layout()
st.pyplot(fig)
# Removed metrics display to eliminate trembling
# Results summary
st.success("โ
Validation simulation completed!")
st.info(f"""**Validation Results:**
**Predicted Values:**
- Shear Modulus (G): {st.session_state.validation_G_cached:.2e} Pa
- Viscosity (ฮผ): {st.session_state.validation_mu_cached:.4f} Paยทs
- Lambda Max: {st.session_state.validation_lambda_cached:.3f}
**Performance:**
- Simulation Time: {st.session_state.validation_simulation_time:.2f} seconds
- Simulated Points: {len(st.session_state.validation_R_sim)}
- Time Range: {st.session_state.validation_t_sim[0]:.3f} to {st.session_state.validation_t_sim[-1]:.3f} (0.1 ms)
""")
# Reset button
if st.button("๐ Run New Simulation", key="reset_validation"):
# Clear all validation session state
for key in list(st.session_state.keys()):
if key.startswith('validation_'):
del st.session_state[key]
st.rerun()
# Handle simulation error
if hasattr(st.session_state, 'validation_error'):
st.error(f"Simulation failed: {st.session_state.validation_error}")
if st.button("๐ Try Again", key="retry_validation"):
del st.session_state.validation_error
st.session_state.validation_running = False
st.rerun()
# Additional CSS to ensure zero movement
st.markdown("""
""", unsafe_allow_html=True)
def show_results():
"""Results and export interface"""
st.markdown('', unsafe_allow_html=True)
if not st.session_state.processed_data:
st.warning("No processed data available.")
return
# Summary of all results
st.subheader("๐ Analysis Summary")
col1, col2 = st.columns(2)
with col1:
st.markdown("**Data Information:**")
if st.session_state.data_loaded:
st.write(f"โ
Datasets loaded: {st.session_state.num_datasets}")
st.write(f"โ
Lambda max: {st.session_state.lambda_max_mean:.3f}")
st.write(f"โ
Selected dataset: {st.session_state.get('selected_dataset', 'N/A') + 1}")
if st.session_state.processed_data:
st.write(f"โ
Interpolated points: {len(st.session_state.R_interp_newunit)}")
with col2:
st.markdown("**ML Predictions:**")
if 'pred_G' in st.session_state:
G_value = st.session_state.pred_G[0][0] if st.session_state.pred_G.ndim > 1 else st.session_state.pred_G[0]
mu_value = st.session_state.pred_mu[0][0] if st.session_state.pred_mu.ndim > 1 else \
st.session_state.pred_mu[0]
st.write(f"๐ฏ Shear Modulus (G): {G_value:.2e} Pa")
st.write(f"๐ฏ Viscosity (ฮผ): {mu_value:.4f} Paยทs")
else:
st.write("โ No predictions available")
# Export options
st.subheader("๐พ Export Options")
export_col1, export_col2, export_col3 = st.columns(3)
with export_col1:
if st.session_state.processed_data:
# Export interpolated data
data_str = ' '.join([f'{val:.6f}' for val in st.session_state.R_interp_newunit[:-1]])
st.download_button(
label="๐ฅ Download Interpolated Data",
data=data_str,
file_name="interpolated_bubble_data.txt",
mime="text/plain"
)
with export_col2:
if 'pred_G' in st.session_state:
# Export predictions
G_value = st.session_state.pred_G[0][0] if st.session_state.pred_G.ndim > 1 else st.session_state.pred_G[0]
mu_value = st.session_state.pred_mu[0][0] if st.session_state.pred_mu.ndim > 1 else \
st.session_state.pred_mu[0]
pred_summary = f"""Bubble Dynamics Analysis Results
Dataset: {st.session_state.get('selected_dataset', 'N/A') + 1}
Lambda Max: {st.session_state.lambda_max_mean:.3f}
ML Predictions:
Shear Modulus (G): {G_value:.2e} Pa
Viscosity (ฮผ): {mu_value:.4f} Paยทs
Analysis completed on: {time.strftime('%Y-%m-%d %H:%M:%S')}
"""
st.download_button(
label="๐ Download Results Summary",
data=pred_summary,
file_name="bubble_analysis_results.txt",
mime="text/plain"
)
with export_col3:
if 'validation_t_sim' in st.session_state:
# Export simulation data
sim_data = np.column_stack([st.session_state.validation_t_sim, st.session_state.validation_R_sim])
sim_str = '\n'.join([f'{t:.6f}\t{r:.6f}' for t, r in sim_data])
st.download_button(
label="๐ฌ Download Simulation Data",
data=sim_str,
file_name="simulation_results.txt",
mime="text/plain"
)
# Session reset
st.subheader("๐ Reset Session")
if st.button("๐๏ธ Clear All Data", type="secondary"):
for key in list(st.session_state.keys()):
del st.session_state[key]
st.success("โ
Session cleared! Refresh the page to start over.")
if __name__ == "__main__":
main()