surfdisp2k25 / app.py
schattin's picture
feature(first version): First version to run the simulator with parametes.
d7eb045
from __future__ import annotations
from typing import Sequence
import matplotlib.pyplot as plt
import numpy as np
import torch
import math
try:
import gradio as gr
except ModuleNotFoundError: # pragma: no cover - optional dependency for tests
gr = None # type: ignore[assignment]
from surfdisp2k25 import dispsurf2k25_simulator
DISPERSION_SAMPLES = 60
MAX_LAYERS = 100
# Default layered model with a half-space.
DEFAULT_CUSTOM_PROFILE: list[tuple[float, float]] = [
(1.0, 2.0),
(1.0, 2.5),
(1.0, 3.0),
(0.0, 3.5),
]
DEFAULT_THICKNESS = np.asarray([layer[0] for layer in DEFAULT_CUSTOM_PROFILE], dtype=np.float32)
DEFAULT_VS = np.asarray([layer[1] for layer in DEFAULT_CUSTOM_PROFILE], dtype=np.float32)
DEFAULT_TABLE = [[float(h), float(vs)] for h, vs in DEFAULT_CUSTOM_PROFILE]
WAVE_TYPE_OPTIONS = {
"Rayleigh waves (iwave=2)": 2,
"Love waves (iwave=1)": 1,
}
GROUP_VELOCITY_OPTIONS = {
"Phase velocity only (igr=0)": 0,
"Phase & group velocity (igr=1)": 1,
}
class ValidationError(ValueError):
"""Raised when user input is invalid."""
def _fail(message: str) -> None:
if gr is not None:
raise gr.Error(message)
raise ValidationError(message)
def _normalize_table(table_values: Sequence[Sequence[float]]) -> list[list[float]]:
if table_values is None: # type: ignore[comparison-overlap]
return []
if hasattr(table_values, "to_numpy"):
return table_values.to_numpy().tolist() # type: ignore[no-any-return]
if isinstance(table_values, np.ndarray):
return table_values.tolist()
return [list(row) if isinstance(row, (list, tuple, np.ndarray)) else [row] for row in table_values] # type: ignore[misc]
def _is_blank(value: object) -> bool:
if value in (None, ""):
return True
if isinstance(value, (float, np.floating)):
return math.isnan(value)
return False
def _table_is_empty(table_values: Sequence[Sequence[float]]) -> bool:
rows = _normalize_table(table_values)
for row in rows:
if not isinstance(row, (list, tuple)):
continue
if len(row) < 2:
continue
if not (_is_blank(row[0]) and _is_blank(row[1])):
return False
return True
def _extract_layer_values(
table_values: Sequence[Sequence[float]] | None,
) -> tuple[np.ndarray, np.ndarray]:
normalized = _normalize_table(table_values)
if _table_is_empty(normalized):
return DEFAULT_THICKNESS.copy(), DEFAULT_VS.copy()
thickness_values: list[float] = []
vs_values: list[float] = []
for idx, row in enumerate(normalized, start=1):
if not isinstance(row, (list, tuple)):
_fail("Each layer must provide thickness and Vs.")
if len(row) < 2:
_fail(f"Layer {idx} must include both thickness and Vs.")
raw_thickness, raw_vs = row[0], row[1]
if _is_blank(raw_thickness) and _is_blank(raw_vs):
continue # Ignore empty rows for dynamic tables
if _is_blank(raw_thickness) or _is_blank(raw_vs):
_fail(f"Layer {idx} must include both thickness and Vs.")
try:
thickness = float(raw_thickness)
vs = float(raw_vs)
except (TypeError, ValueError):
_fail("Layer thickness and Vs must be numeric values.")
if thickness < 0.0:
_fail(f"Layer {idx} thickness must be non-negative.")
if vs <= 0.0:
_fail(f"Layer {idx} shear-wave velocity must be greater than 0.")
thickness_values.append(thickness)
vs_values.append(vs)
if not thickness_values:
_fail("Custom model must contain at least one layer.")
if len(thickness_values) > MAX_LAYERS:
_fail(f"Models cannot exceed {MAX_LAYERS} layers.")
thickness_array = np.asarray(thickness_values, dtype=np.float32)
vs_array = np.asarray(vs_values, dtype=np.float32)
if thickness_array.size == 0:
_fail("Custom model must contain at least one layer.")
if thickness_array[-1] != 0.0:
thickness_array[-1] = 0.0
if np.any(thickness_array[:-1] <= 0.0):
_fail("Only the final layer may have zero thickness.")
return thickness_array, vs_array
def _build_theta(
thickness_values: np.ndarray, vs_values: np.ndarray, vp_vs_ratio: float
) -> torch.Tensor:
if vp_vs_ratio <= 1.0:
_fail("Vp/Vs ratio must be greater than 1.0.")
if thickness_values.size != vs_values.size:
_fail("Each layer must have a corresponding Vs value.")
n_layers = vs_values.size
if n_layers == 0:
_fail("Model must contain at least one layer.")
if n_layers > MAX_LAYERS:
_fail(f"Models cannot exceed {MAX_LAYERS} layers.")
if np.any(thickness_values < 0.0):
_fail("Layer thickness values must be non-negative.")
if thickness_values[-1] != 0.0:
_fail("The final layer must have zero thickness (half-space).")
if np.any(vs_values <= 0.0):
_fail("Shear-wave velocity must be positive.")
theta = np.concatenate(
[
np.array([float(n_layers), float(vp_vs_ratio)], dtype=np.float32),
thickness_values.astype(np.float32),
vs_values.astype(np.float32),
]
)
return torch.from_numpy(theta).unsqueeze(0)
def _make_model_plot(
thickness_values: np.ndarray, vs_values: np.ndarray
):
min_depth = 0.0
cumulative_depth = np.cumsum(thickness_values)
depth_edges = min_depth + np.concatenate(([0.0], cumulative_depth))
velocity_steps = np.concatenate((vs_values, [vs_values[-1]]))
plot_depth = np.concatenate((depth_edges, [depth_edges[-1] + 1.0]))
plot_velocity = np.concatenate((velocity_steps, [velocity_steps[-1]]))
max_depth = depth_edges[-1] + 1.0
if max_depth <= min_depth:
max_depth = min_depth + 1.0
fig, ax = plt.subplots(figsize=(7, 4))
ax.step(plot_depth, plot_velocity, where="post", linewidth=1.5)
ax.set_xlabel("Depth (km)")
ax.set_ylabel("Shear velocity (km/s)")
ax.set_title("Layered Vs model")
ax.set_xlim(min_depth, max_depth)
ax.grid(True, linestyle="--", linewidth=0.6, alpha=0.5)
fig.tight_layout()
return fig
def _make_plot(periods: np.ndarray, velocities: np.ndarray):
fig, ax = plt.subplots(figsize=(7, 4))
ax.plot(periods, velocities, marker="o", markersize=3, linewidth=1.5)
ax.set_xlabel("Period (s)")
ax.set_ylabel("Phase velocity (km/s)")
ax.set_title("SurfDisp2k25 Dispersion Curve")
ax.grid(True, linestyle="--", linewidth=0.6, alpha=0.5)
fig.tight_layout()
return fig
def _layers_to_table(thickness: np.ndarray, vs: np.ndarray) -> list[list[float]]:
return [[float(h), float(v)] for h, v in zip(thickness, vs)]
def _generate_random_model() -> tuple[np.ndarray, np.ndarray]:
rng = np.random.default_rng()
n_layers = int(rng.integers(2, 21))
finite_thickness = rng.uniform(0.1, 3.0, size=n_layers - 1).astype(np.float32)
vs_values = rng.uniform(2.0, 8.0, size=n_layers - 1).astype(np.float32)
thickness = np.concatenate([finite_thickness, np.array([0.0], dtype=np.float32)])
last_vs = float(rng.uniform(2.0, 8.0))
while abs(last_vs - float(vs_values[-1])) < 1e-3:
last_vs = float(rng.uniform(2.0, 8.0))
vs = np.concatenate([vs_values, np.array([last_vs], dtype=np.float32)])
return thickness, vs
def on_layer_table_change(
table_values: Sequence[Sequence[float]] | None,
):
if gr is None:
raise RuntimeError("Gradio is required for interactive updates.")
thickness, vs = _extract_layer_values(table_values)
sanitized_table = _layers_to_table(thickness, vs)
plot = _make_model_plot(thickness, vs)
return sanitized_table, plot
def on_random_model_click():
if gr is None:
raise RuntimeError("Gradio is required for interactive updates.")
thickness, vs = _generate_random_model()
table = _layers_to_table(thickness, vs)
plot = _make_model_plot(thickness, vs)
return table, plot
def run_simulation(
custom_values: Sequence[Sequence[float]] | None,
vp_vs_ratio: float,
p_min: float,
p_max: float,
wave_type_label: str,
mode: int,
group_label: str,
):
thickness_values, vs_values = _extract_layer_values(custom_values)
if p_min <= 0.0 or p_max <= 0.0:
_fail("Periods must be positive numbers.")
if p_max <= p_min:
_fail("Maximum period must be greater than minimum period.")
if mode < 1:
_fail("Mode number must be at least 1.")
if mode > 3:
_fail("Mode number cannot exceed 3 in this simulator.")
theta = _build_theta(thickness_values, vs_values, vp_vs_ratio)
iflsph = 0 # Always use flat Earth
iwave = WAVE_TYPE_OPTIONS[wave_type_label]
igr = GROUP_VELOCITY_OPTIONS[group_label]
try:
disp = dispsurf2k25_simulator(
theta=theta,
p_min=float(p_min),
p_max=float(p_max),
kmax=DISPERSION_SAMPLES,
iflsph=iflsph,
iwave=iwave,
mode=int(mode),
igr=igr,
dtype=torch.float32,
)
except RuntimeError as exc:
_fail(f"Simulation failed: {exc}")
velocities = disp.squeeze(0).detach().cpu().numpy()
periods = np.linspace(p_min, p_max, velocities.size)
dispersion_plot = _make_plot(periods, velocities)
model_plot = _make_model_plot(thickness_values, vs_values)
table = [[float(p), float(v)] for p, v in zip(periods, velocities)]
finite_thickness = thickness_values[:-1] if thickness_values.size > 1 else thickness_values
total_depth = float(np.sum(finite_thickness))
max_depth = total_depth
n_finite_layers = finite_thickness.size
summary = "\n".join(
[
"**Model**: Custom layered model",
f"**Layers**: {vs_values.size} (including half-space)",
f"**Finite thickness layers**: {n_finite_layers}",
f"**Depth window**: 0.00 – {max_depth:.2f} km",
f"**Vp/Vs ratio**: {vp_vs_ratio:.2f}",
f"**Vs range**: {vs_values.min():.2f}{vs_values.max():.2f} km/s",
f"**Total thickness**: {total_depth:.2f} km",
f"**Periods**: {p_min:.2f}{p_max:.2f} s ({velocities.size} samples)",
f"**Wave type**: {wave_type_label}; mode = {int(mode)}",
f"**Phase velocity range**: {velocities.min():.2f}{velocities.max():.2f} km/s",
]
)
return model_plot, dispersion_plot, table, summary
if gr is not None:
with gr.Blocks(title="SurfDisp2k25 Simulator") as demo:
gr.Markdown(
"""## SurfDisp2k25 - Interactive Surface Wave Dispersion Simulator (Alpha)
This simulator computes surface wave dispersion curves (Love and Rayleigh waves) for layered Earth models.
You can define layer thicknesses and shear-wave velocities manually or generate a random model.
**Parameters**
- Minimum/Maximum period (s) - Range of periods used to compute the dispersion curve.
- Wave type - Choose between Rayleigh (vertical-radial motion) and Love (horizontal shear) waves.
- Mode number - Number of modes (fundamental plus higher harmonics) to compute.
- Velocity output - Select whether to compute only phase velocity or both phase and group velocity.
- Vp/Vs ratio - Defines the P-wave velocity from the S-wave velocity.
The simulator displays both the layered Vs model and the resulting dispersion curve.
This is an alpha version; results may contain numerical artefacts, so use with caution for testing and visualization."""
)
with gr.Row():
with gr.Column():
random_button = gr.Button("Generate random model")
custom_model = gr.Dataframe(
headers=["Thickness (km)", "Vs (km/s)"],
value=DEFAULT_TABLE,
row_count=(len(DEFAULT_TABLE), "dynamic"),
col_count=(2, "fixed"),
datatype=["float", "float"],
label="Custom layered model",
)
vpvs_input = gr.Slider(
minimum=1.5,
maximum=2.2,
value=1.75,
step=0.01,
label="Vp/Vs ratio",
)
with gr.Column():
p_min_input = gr.Number(value=1.0, label="Minimum period (s)")
p_max_input = gr.Number(value=30.0, label="Maximum period (s)")
wave_type_input = gr.Radio(
choices=list(WAVE_TYPE_OPTIONS.keys()),
value="Rayleigh waves (iwave=2)",
label="Wave type",
)
mode_input = gr.Slider(
minimum=1,
maximum=3,
value=1,
step=1,
label="Mode number",
)
group_mode_input = gr.Radio(
choices=list(GROUP_VELOCITY_OPTIONS.keys()),
value="Phase velocity only (igr=0)",
label="Velocity output",
)
run_button = gr.Button("Run simulation", variant="primary")
with gr.Row():
model_plot_output = gr.Plot(label="Shear-wave velocity profile")
plot_output = gr.Plot(label="Dispersion curve")
with gr.Row():
table_output = gr.Dataframe(
headers=["Period (s)", "Phase velocity (km/s)"],
datatype="float",
col_count=(2, "fixed"),
row_count=(DISPERSION_SAMPLES, "dynamic"),
label="Sampled dispersion values",
)
summary_output = gr.Markdown()
demo.load(
fn=lambda: _make_model_plot(DEFAULT_THICKNESS, DEFAULT_VS),
inputs=None,
outputs=model_plot_output,
)
custom_model.change(
fn=on_layer_table_change,
inputs=custom_model,
outputs=[custom_model, model_plot_output],
trigger_mode="always_last",
queue=False,
)
random_button.click(
fn=on_random_model_click,
inputs=None,
outputs=[custom_model, model_plot_output],
)
run_button.click(
fn=run_simulation,
inputs=[
custom_model,
vpvs_input,
p_min_input,
p_max_input,
wave_type_input,
mode_input,
group_mode_input,
],
outputs=[model_plot_output, plot_output, table_output, summary_output],
)
else: # pragma: no cover - allows importing without gradio installed
demo = None
if __name__ == "__main__":
if demo is None:
raise ImportError("gradio must be installed to launch the interface.")
demo.launch()