from __future__ import annotations from typing import Sequence import matplotlib.pyplot as plt import numpy as np import torch import math try: import gradio as gr except ModuleNotFoundError: # pragma: no cover - optional dependency for tests gr = None # type: ignore[assignment] from surfdisp2k25 import dispsurf2k25_simulator DISPERSION_SAMPLES = 60 MAX_LAYERS = 100 # Default layered model with a half-space. DEFAULT_CUSTOM_PROFILE: list[tuple[float, float]] = [ (1.0, 2.0), (1.0, 2.5), (1.0, 3.0), (0.0, 3.5), ] DEFAULT_THICKNESS = np.asarray([layer[0] for layer in DEFAULT_CUSTOM_PROFILE], dtype=np.float32) DEFAULT_VS = np.asarray([layer[1] for layer in DEFAULT_CUSTOM_PROFILE], dtype=np.float32) DEFAULT_TABLE = [[float(h), float(vs)] for h, vs in DEFAULT_CUSTOM_PROFILE] WAVE_TYPE_OPTIONS = { "Rayleigh waves (iwave=2)": 2, "Love waves (iwave=1)": 1, } GROUP_VELOCITY_OPTIONS = { "Phase velocity only (igr=0)": 0, "Phase & group velocity (igr=1)": 1, } class ValidationError(ValueError): """Raised when user input is invalid.""" def _fail(message: str) -> None: if gr is not None: raise gr.Error(message) raise ValidationError(message) def _normalize_table(table_values: Sequence[Sequence[float]]) -> list[list[float]]: if table_values is None: # type: ignore[comparison-overlap] return [] if hasattr(table_values, "to_numpy"): return table_values.to_numpy().tolist() # type: ignore[no-any-return] if isinstance(table_values, np.ndarray): return table_values.tolist() return [list(row) if isinstance(row, (list, tuple, np.ndarray)) else [row] for row in table_values] # type: ignore[misc] def _is_blank(value: object) -> bool: if value in (None, ""): return True if isinstance(value, (float, np.floating)): return math.isnan(value) return False def _table_is_empty(table_values: Sequence[Sequence[float]]) -> bool: rows = _normalize_table(table_values) for row in rows: if not isinstance(row, (list, tuple)): continue if len(row) < 2: continue if not (_is_blank(row[0]) and _is_blank(row[1])): return False return True def _extract_layer_values( table_values: Sequence[Sequence[float]] | None, ) -> tuple[np.ndarray, np.ndarray]: normalized = _normalize_table(table_values) if _table_is_empty(normalized): return DEFAULT_THICKNESS.copy(), DEFAULT_VS.copy() thickness_values: list[float] = [] vs_values: list[float] = [] for idx, row in enumerate(normalized, start=1): if not isinstance(row, (list, tuple)): _fail("Each layer must provide thickness and Vs.") if len(row) < 2: _fail(f"Layer {idx} must include both thickness and Vs.") raw_thickness, raw_vs = row[0], row[1] if _is_blank(raw_thickness) and _is_blank(raw_vs): continue # Ignore empty rows for dynamic tables if _is_blank(raw_thickness) or _is_blank(raw_vs): _fail(f"Layer {idx} must include both thickness and Vs.") try: thickness = float(raw_thickness) vs = float(raw_vs) except (TypeError, ValueError): _fail("Layer thickness and Vs must be numeric values.") if thickness < 0.0: _fail(f"Layer {idx} thickness must be non-negative.") if vs <= 0.0: _fail(f"Layer {idx} shear-wave velocity must be greater than 0.") thickness_values.append(thickness) vs_values.append(vs) if not thickness_values: _fail("Custom model must contain at least one layer.") if len(thickness_values) > MAX_LAYERS: _fail(f"Models cannot exceed {MAX_LAYERS} layers.") thickness_array = np.asarray(thickness_values, dtype=np.float32) vs_array = np.asarray(vs_values, dtype=np.float32) if thickness_array.size == 0: _fail("Custom model must contain at least one layer.") if thickness_array[-1] != 0.0: thickness_array[-1] = 0.0 if np.any(thickness_array[:-1] <= 0.0): _fail("Only the final layer may have zero thickness.") return thickness_array, vs_array def _build_theta( thickness_values: np.ndarray, vs_values: np.ndarray, vp_vs_ratio: float ) -> torch.Tensor: if vp_vs_ratio <= 1.0: _fail("Vp/Vs ratio must be greater than 1.0.") if thickness_values.size != vs_values.size: _fail("Each layer must have a corresponding Vs value.") n_layers = vs_values.size if n_layers == 0: _fail("Model must contain at least one layer.") if n_layers > MAX_LAYERS: _fail(f"Models cannot exceed {MAX_LAYERS} layers.") if np.any(thickness_values < 0.0): _fail("Layer thickness values must be non-negative.") if thickness_values[-1] != 0.0: _fail("The final layer must have zero thickness (half-space).") if np.any(vs_values <= 0.0): _fail("Shear-wave velocity must be positive.") theta = np.concatenate( [ np.array([float(n_layers), float(vp_vs_ratio)], dtype=np.float32), thickness_values.astype(np.float32), vs_values.astype(np.float32), ] ) return torch.from_numpy(theta).unsqueeze(0) def _make_model_plot( thickness_values: np.ndarray, vs_values: np.ndarray ): min_depth = 0.0 cumulative_depth = np.cumsum(thickness_values) depth_edges = min_depth + np.concatenate(([0.0], cumulative_depth)) velocity_steps = np.concatenate((vs_values, [vs_values[-1]])) plot_depth = np.concatenate((depth_edges, [depth_edges[-1] + 1.0])) plot_velocity = np.concatenate((velocity_steps, [velocity_steps[-1]])) max_depth = depth_edges[-1] + 1.0 if max_depth <= min_depth: max_depth = min_depth + 1.0 fig, ax = plt.subplots(figsize=(7, 4)) ax.step(plot_depth, plot_velocity, where="post", linewidth=1.5) ax.set_xlabel("Depth (km)") ax.set_ylabel("Shear velocity (km/s)") ax.set_title("Layered Vs model") ax.set_xlim(min_depth, max_depth) ax.grid(True, linestyle="--", linewidth=0.6, alpha=0.5) fig.tight_layout() return fig def _make_plot(periods: np.ndarray, velocities: np.ndarray): fig, ax = plt.subplots(figsize=(7, 4)) ax.plot(periods, velocities, marker="o", markersize=3, linewidth=1.5) ax.set_xlabel("Period (s)") ax.set_ylabel("Phase velocity (km/s)") ax.set_title("SurfDisp2k25 Dispersion Curve") ax.grid(True, linestyle="--", linewidth=0.6, alpha=0.5) fig.tight_layout() return fig def _layers_to_table(thickness: np.ndarray, vs: np.ndarray) -> list[list[float]]: return [[float(h), float(v)] for h, v in zip(thickness, vs)] def _generate_random_model() -> tuple[np.ndarray, np.ndarray]: rng = np.random.default_rng() n_layers = int(rng.integers(2, 21)) finite_thickness = rng.uniform(0.1, 3.0, size=n_layers - 1).astype(np.float32) vs_values = rng.uniform(2.0, 8.0, size=n_layers - 1).astype(np.float32) thickness = np.concatenate([finite_thickness, np.array([0.0], dtype=np.float32)]) last_vs = float(rng.uniform(2.0, 8.0)) while abs(last_vs - float(vs_values[-1])) < 1e-3: last_vs = float(rng.uniform(2.0, 8.0)) vs = np.concatenate([vs_values, np.array([last_vs], dtype=np.float32)]) return thickness, vs def on_layer_table_change( table_values: Sequence[Sequence[float]] | None, ): if gr is None: raise RuntimeError("Gradio is required for interactive updates.") thickness, vs = _extract_layer_values(table_values) sanitized_table = _layers_to_table(thickness, vs) plot = _make_model_plot(thickness, vs) return sanitized_table, plot def on_random_model_click(): if gr is None: raise RuntimeError("Gradio is required for interactive updates.") thickness, vs = _generate_random_model() table = _layers_to_table(thickness, vs) plot = _make_model_plot(thickness, vs) return table, plot def run_simulation( custom_values: Sequence[Sequence[float]] | None, vp_vs_ratio: float, p_min: float, p_max: float, wave_type_label: str, mode: int, group_label: str, ): thickness_values, vs_values = _extract_layer_values(custom_values) if p_min <= 0.0 or p_max <= 0.0: _fail("Periods must be positive numbers.") if p_max <= p_min: _fail("Maximum period must be greater than minimum period.") if mode < 1: _fail("Mode number must be at least 1.") if mode > 3: _fail("Mode number cannot exceed 3 in this simulator.") theta = _build_theta(thickness_values, vs_values, vp_vs_ratio) iflsph = 0 # Always use flat Earth iwave = WAVE_TYPE_OPTIONS[wave_type_label] igr = GROUP_VELOCITY_OPTIONS[group_label] try: disp = dispsurf2k25_simulator( theta=theta, p_min=float(p_min), p_max=float(p_max), kmax=DISPERSION_SAMPLES, iflsph=iflsph, iwave=iwave, mode=int(mode), igr=igr, dtype=torch.float32, ) except RuntimeError as exc: _fail(f"Simulation failed: {exc}") velocities = disp.squeeze(0).detach().cpu().numpy() periods = np.linspace(p_min, p_max, velocities.size) dispersion_plot = _make_plot(periods, velocities) model_plot = _make_model_plot(thickness_values, vs_values) table = [[float(p), float(v)] for p, v in zip(periods, velocities)] finite_thickness = thickness_values[:-1] if thickness_values.size > 1 else thickness_values total_depth = float(np.sum(finite_thickness)) max_depth = total_depth n_finite_layers = finite_thickness.size summary = "\n".join( [ "**Model**: Custom layered model", f"**Layers**: {vs_values.size} (including half-space)", f"**Finite thickness layers**: {n_finite_layers}", f"**Depth window**: 0.00 – {max_depth:.2f} km", f"**Vp/Vs ratio**: {vp_vs_ratio:.2f}", f"**Vs range**: {vs_values.min():.2f} – {vs_values.max():.2f} km/s", f"**Total thickness**: {total_depth:.2f} km", f"**Periods**: {p_min:.2f} – {p_max:.2f} s ({velocities.size} samples)", f"**Wave type**: {wave_type_label}; mode = {int(mode)}", f"**Phase velocity range**: {velocities.min():.2f} – {velocities.max():.2f} km/s", ] ) return model_plot, dispersion_plot, table, summary if gr is not None: with gr.Blocks(title="SurfDisp2k25 Simulator") as demo: gr.Markdown( """## SurfDisp2k25 - Interactive Surface Wave Dispersion Simulator (Alpha) This simulator computes surface wave dispersion curves (Love and Rayleigh waves) for layered Earth models. You can define layer thicknesses and shear-wave velocities manually or generate a random model. **Parameters** - Minimum/Maximum period (s) - Range of periods used to compute the dispersion curve. - Wave type - Choose between Rayleigh (vertical-radial motion) and Love (horizontal shear) waves. - Mode number - Number of modes (fundamental plus higher harmonics) to compute. - Velocity output - Select whether to compute only phase velocity or both phase and group velocity. - Vp/Vs ratio - Defines the P-wave velocity from the S-wave velocity. The simulator displays both the layered Vs model and the resulting dispersion curve. This is an alpha version; results may contain numerical artefacts, so use with caution for testing and visualization.""" ) with gr.Row(): with gr.Column(): random_button = gr.Button("Generate random model") custom_model = gr.Dataframe( headers=["Thickness (km)", "Vs (km/s)"], value=DEFAULT_TABLE, row_count=(len(DEFAULT_TABLE), "dynamic"), col_count=(2, "fixed"), datatype=["float", "float"], label="Custom layered model", ) vpvs_input = gr.Slider( minimum=1.5, maximum=2.2, value=1.75, step=0.01, label="Vp/Vs ratio", ) with gr.Column(): p_min_input = gr.Number(value=1.0, label="Minimum period (s)") p_max_input = gr.Number(value=30.0, label="Maximum period (s)") wave_type_input = gr.Radio( choices=list(WAVE_TYPE_OPTIONS.keys()), value="Rayleigh waves (iwave=2)", label="Wave type", ) mode_input = gr.Slider( minimum=1, maximum=3, value=1, step=1, label="Mode number", ) group_mode_input = gr.Radio( choices=list(GROUP_VELOCITY_OPTIONS.keys()), value="Phase velocity only (igr=0)", label="Velocity output", ) run_button = gr.Button("Run simulation", variant="primary") with gr.Row(): model_plot_output = gr.Plot(label="Shear-wave velocity profile") plot_output = gr.Plot(label="Dispersion curve") with gr.Row(): table_output = gr.Dataframe( headers=["Period (s)", "Phase velocity (km/s)"], datatype="float", col_count=(2, "fixed"), row_count=(DISPERSION_SAMPLES, "dynamic"), label="Sampled dispersion values", ) summary_output = gr.Markdown() demo.load( fn=lambda: _make_model_plot(DEFAULT_THICKNESS, DEFAULT_VS), inputs=None, outputs=model_plot_output, ) custom_model.change( fn=on_layer_table_change, inputs=custom_model, outputs=[custom_model, model_plot_output], trigger_mode="always_last", queue=False, ) random_button.click( fn=on_random_model_click, inputs=None, outputs=[custom_model, model_plot_output], ) run_button.click( fn=run_simulation, inputs=[ custom_model, vpvs_input, p_min_input, p_max_input, wave_type_input, mode_input, group_mode_input, ], outputs=[model_plot_output, plot_output, table_output, summary_output], ) else: # pragma: no cover - allows importing without gradio installed demo = None if __name__ == "__main__": if demo is None: raise ImportError("gradio must be installed to launch the interface.") demo.launch()