Spaces:
Sleeping
Sleeping
| from __future__ import annotations | |
| from typing import Sequence | |
| import matplotlib.pyplot as plt | |
| import numpy as np | |
| import torch | |
| import math | |
| try: | |
| import gradio as gr | |
| except ModuleNotFoundError: # pragma: no cover - optional dependency for tests | |
| gr = None # type: ignore[assignment] | |
| from surfdisp2k25 import dispsurf2k25_simulator | |
| DISPERSION_SAMPLES = 60 | |
| MAX_LAYERS = 100 | |
| # Default layered model with a half-space. | |
| DEFAULT_CUSTOM_PROFILE: list[tuple[float, float]] = [ | |
| (1.0, 2.0), | |
| (1.0, 2.5), | |
| (1.0, 3.0), | |
| (0.0, 3.5), | |
| ] | |
| DEFAULT_THICKNESS = np.asarray([layer[0] for layer in DEFAULT_CUSTOM_PROFILE], dtype=np.float32) | |
| DEFAULT_VS = np.asarray([layer[1] for layer in DEFAULT_CUSTOM_PROFILE], dtype=np.float32) | |
| DEFAULT_TABLE = [[float(h), float(vs)] for h, vs in DEFAULT_CUSTOM_PROFILE] | |
| WAVE_TYPE_OPTIONS = { | |
| "Rayleigh waves (iwave=2)": 2, | |
| "Love waves (iwave=1)": 1, | |
| } | |
| GROUP_VELOCITY_OPTIONS = { | |
| "Phase velocity only (igr=0)": 0, | |
| "Phase & group velocity (igr=1)": 1, | |
| } | |
| class ValidationError(ValueError): | |
| """Raised when user input is invalid.""" | |
| def _fail(message: str) -> None: | |
| if gr is not None: | |
| raise gr.Error(message) | |
| raise ValidationError(message) | |
| def _normalize_table(table_values: Sequence[Sequence[float]]) -> list[list[float]]: | |
| if table_values is None: # type: ignore[comparison-overlap] | |
| return [] | |
| if hasattr(table_values, "to_numpy"): | |
| return table_values.to_numpy().tolist() # type: ignore[no-any-return] | |
| if isinstance(table_values, np.ndarray): | |
| return table_values.tolist() | |
| return [list(row) if isinstance(row, (list, tuple, np.ndarray)) else [row] for row in table_values] # type: ignore[misc] | |
| def _is_blank(value: object) -> bool: | |
| if value in (None, ""): | |
| return True | |
| if isinstance(value, (float, np.floating)): | |
| return math.isnan(value) | |
| return False | |
| def _table_is_empty(table_values: Sequence[Sequence[float]]) -> bool: | |
| rows = _normalize_table(table_values) | |
| for row in rows: | |
| if not isinstance(row, (list, tuple)): | |
| continue | |
| if len(row) < 2: | |
| continue | |
| if not (_is_blank(row[0]) and _is_blank(row[1])): | |
| return False | |
| return True | |
| def _extract_layer_values( | |
| table_values: Sequence[Sequence[float]] | None, | |
| ) -> tuple[np.ndarray, np.ndarray]: | |
| normalized = _normalize_table(table_values) | |
| if _table_is_empty(normalized): | |
| return DEFAULT_THICKNESS.copy(), DEFAULT_VS.copy() | |
| thickness_values: list[float] = [] | |
| vs_values: list[float] = [] | |
| for idx, row in enumerate(normalized, start=1): | |
| if not isinstance(row, (list, tuple)): | |
| _fail("Each layer must provide thickness and Vs.") | |
| if len(row) < 2: | |
| _fail(f"Layer {idx} must include both thickness and Vs.") | |
| raw_thickness, raw_vs = row[0], row[1] | |
| if _is_blank(raw_thickness) and _is_blank(raw_vs): | |
| continue # Ignore empty rows for dynamic tables | |
| if _is_blank(raw_thickness) or _is_blank(raw_vs): | |
| _fail(f"Layer {idx} must include both thickness and Vs.") | |
| try: | |
| thickness = float(raw_thickness) | |
| vs = float(raw_vs) | |
| except (TypeError, ValueError): | |
| _fail("Layer thickness and Vs must be numeric values.") | |
| if thickness < 0.0: | |
| _fail(f"Layer {idx} thickness must be non-negative.") | |
| if vs <= 0.0: | |
| _fail(f"Layer {idx} shear-wave velocity must be greater than 0.") | |
| thickness_values.append(thickness) | |
| vs_values.append(vs) | |
| if not thickness_values: | |
| _fail("Custom model must contain at least one layer.") | |
| if len(thickness_values) > MAX_LAYERS: | |
| _fail(f"Models cannot exceed {MAX_LAYERS} layers.") | |
| thickness_array = np.asarray(thickness_values, dtype=np.float32) | |
| vs_array = np.asarray(vs_values, dtype=np.float32) | |
| if thickness_array.size == 0: | |
| _fail("Custom model must contain at least one layer.") | |
| if thickness_array[-1] != 0.0: | |
| thickness_array[-1] = 0.0 | |
| if np.any(thickness_array[:-1] <= 0.0): | |
| _fail("Only the final layer may have zero thickness.") | |
| return thickness_array, vs_array | |
| def _build_theta( | |
| thickness_values: np.ndarray, vs_values: np.ndarray, vp_vs_ratio: float | |
| ) -> torch.Tensor: | |
| if vp_vs_ratio <= 1.0: | |
| _fail("Vp/Vs ratio must be greater than 1.0.") | |
| if thickness_values.size != vs_values.size: | |
| _fail("Each layer must have a corresponding Vs value.") | |
| n_layers = vs_values.size | |
| if n_layers == 0: | |
| _fail("Model must contain at least one layer.") | |
| if n_layers > MAX_LAYERS: | |
| _fail(f"Models cannot exceed {MAX_LAYERS} layers.") | |
| if np.any(thickness_values < 0.0): | |
| _fail("Layer thickness values must be non-negative.") | |
| if thickness_values[-1] != 0.0: | |
| _fail("The final layer must have zero thickness (half-space).") | |
| if np.any(vs_values <= 0.0): | |
| _fail("Shear-wave velocity must be positive.") | |
| theta = np.concatenate( | |
| [ | |
| np.array([float(n_layers), float(vp_vs_ratio)], dtype=np.float32), | |
| thickness_values.astype(np.float32), | |
| vs_values.astype(np.float32), | |
| ] | |
| ) | |
| return torch.from_numpy(theta).unsqueeze(0) | |
| def _make_model_plot( | |
| thickness_values: np.ndarray, vs_values: np.ndarray | |
| ): | |
| min_depth = 0.0 | |
| cumulative_depth = np.cumsum(thickness_values) | |
| depth_edges = min_depth + np.concatenate(([0.0], cumulative_depth)) | |
| velocity_steps = np.concatenate((vs_values, [vs_values[-1]])) | |
| plot_depth = np.concatenate((depth_edges, [depth_edges[-1] + 1.0])) | |
| plot_velocity = np.concatenate((velocity_steps, [velocity_steps[-1]])) | |
| max_depth = depth_edges[-1] + 1.0 | |
| if max_depth <= min_depth: | |
| max_depth = min_depth + 1.0 | |
| fig, ax = plt.subplots(figsize=(7, 4)) | |
| ax.step(plot_depth, plot_velocity, where="post", linewidth=1.5) | |
| ax.set_xlabel("Depth (km)") | |
| ax.set_ylabel("Shear velocity (km/s)") | |
| ax.set_title("Layered Vs model") | |
| ax.set_xlim(min_depth, max_depth) | |
| ax.grid(True, linestyle="--", linewidth=0.6, alpha=0.5) | |
| fig.tight_layout() | |
| return fig | |
| def _make_plot(periods: np.ndarray, velocities: np.ndarray): | |
| fig, ax = plt.subplots(figsize=(7, 4)) | |
| ax.plot(periods, velocities, marker="o", markersize=3, linewidth=1.5) | |
| ax.set_xlabel("Period (s)") | |
| ax.set_ylabel("Phase velocity (km/s)") | |
| ax.set_title("SurfDisp2k25 Dispersion Curve") | |
| ax.grid(True, linestyle="--", linewidth=0.6, alpha=0.5) | |
| fig.tight_layout() | |
| return fig | |
| def _layers_to_table(thickness: np.ndarray, vs: np.ndarray) -> list[list[float]]: | |
| return [[float(h), float(v)] for h, v in zip(thickness, vs)] | |
| def _generate_random_model() -> tuple[np.ndarray, np.ndarray]: | |
| rng = np.random.default_rng() | |
| n_layers = int(rng.integers(2, 21)) | |
| finite_thickness = rng.uniform(0.1, 3.0, size=n_layers - 1).astype(np.float32) | |
| vs_values = rng.uniform(2.0, 8.0, size=n_layers - 1).astype(np.float32) | |
| thickness = np.concatenate([finite_thickness, np.array([0.0], dtype=np.float32)]) | |
| last_vs = float(rng.uniform(2.0, 8.0)) | |
| while abs(last_vs - float(vs_values[-1])) < 1e-3: | |
| last_vs = float(rng.uniform(2.0, 8.0)) | |
| vs = np.concatenate([vs_values, np.array([last_vs], dtype=np.float32)]) | |
| return thickness, vs | |
| def on_layer_table_change( | |
| table_values: Sequence[Sequence[float]] | None, | |
| ): | |
| if gr is None: | |
| raise RuntimeError("Gradio is required for interactive updates.") | |
| thickness, vs = _extract_layer_values(table_values) | |
| sanitized_table = _layers_to_table(thickness, vs) | |
| plot = _make_model_plot(thickness, vs) | |
| return sanitized_table, plot | |
| def on_random_model_click(): | |
| if gr is None: | |
| raise RuntimeError("Gradio is required for interactive updates.") | |
| thickness, vs = _generate_random_model() | |
| table = _layers_to_table(thickness, vs) | |
| plot = _make_model_plot(thickness, vs) | |
| return table, plot | |
| def run_simulation( | |
| custom_values: Sequence[Sequence[float]] | None, | |
| vp_vs_ratio: float, | |
| p_min: float, | |
| p_max: float, | |
| wave_type_label: str, | |
| mode: int, | |
| group_label: str, | |
| ): | |
| thickness_values, vs_values = _extract_layer_values(custom_values) | |
| if p_min <= 0.0 or p_max <= 0.0: | |
| _fail("Periods must be positive numbers.") | |
| if p_max <= p_min: | |
| _fail("Maximum period must be greater than minimum period.") | |
| if mode < 1: | |
| _fail("Mode number must be at least 1.") | |
| if mode > 3: | |
| _fail("Mode number cannot exceed 3 in this simulator.") | |
| theta = _build_theta(thickness_values, vs_values, vp_vs_ratio) | |
| iflsph = 0 # Always use flat Earth | |
| iwave = WAVE_TYPE_OPTIONS[wave_type_label] | |
| igr = GROUP_VELOCITY_OPTIONS[group_label] | |
| try: | |
| disp = dispsurf2k25_simulator( | |
| theta=theta, | |
| p_min=float(p_min), | |
| p_max=float(p_max), | |
| kmax=DISPERSION_SAMPLES, | |
| iflsph=iflsph, | |
| iwave=iwave, | |
| mode=int(mode), | |
| igr=igr, | |
| dtype=torch.float32, | |
| ) | |
| except RuntimeError as exc: | |
| _fail(f"Simulation failed: {exc}") | |
| velocities = disp.squeeze(0).detach().cpu().numpy() | |
| periods = np.linspace(p_min, p_max, velocities.size) | |
| dispersion_plot = _make_plot(periods, velocities) | |
| model_plot = _make_model_plot(thickness_values, vs_values) | |
| table = [[float(p), float(v)] for p, v in zip(periods, velocities)] | |
| finite_thickness = thickness_values[:-1] if thickness_values.size > 1 else thickness_values | |
| total_depth = float(np.sum(finite_thickness)) | |
| max_depth = total_depth | |
| n_finite_layers = finite_thickness.size | |
| summary = "\n".join( | |
| [ | |
| "**Model**: Custom layered model", | |
| f"**Layers**: {vs_values.size} (including half-space)", | |
| f"**Finite thickness layers**: {n_finite_layers}", | |
| f"**Depth window**: 0.00 – {max_depth:.2f} km", | |
| f"**Vp/Vs ratio**: {vp_vs_ratio:.2f}", | |
| f"**Vs range**: {vs_values.min():.2f} – {vs_values.max():.2f} km/s", | |
| f"**Total thickness**: {total_depth:.2f} km", | |
| f"**Periods**: {p_min:.2f} – {p_max:.2f} s ({velocities.size} samples)", | |
| f"**Wave type**: {wave_type_label}; mode = {int(mode)}", | |
| f"**Phase velocity range**: {velocities.min():.2f} – {velocities.max():.2f} km/s", | |
| ] | |
| ) | |
| return model_plot, dispersion_plot, table, summary | |
| if gr is not None: | |
| with gr.Blocks(title="SurfDisp2k25 Simulator") as demo: | |
| gr.Markdown( | |
| """## SurfDisp2k25 - Interactive Surface Wave Dispersion Simulator (Alpha) | |
| This simulator computes surface wave dispersion curves (Love and Rayleigh waves) for layered Earth models. | |
| You can define layer thicknesses and shear-wave velocities manually or generate a random model. | |
| **Parameters** | |
| - Minimum/Maximum period (s) - Range of periods used to compute the dispersion curve. | |
| - Wave type - Choose between Rayleigh (vertical-radial motion) and Love (horizontal shear) waves. | |
| - Mode number - Number of modes (fundamental plus higher harmonics) to compute. | |
| - Velocity output - Select whether to compute only phase velocity or both phase and group velocity. | |
| - Vp/Vs ratio - Defines the P-wave velocity from the S-wave velocity. | |
| The simulator displays both the layered Vs model and the resulting dispersion curve. | |
| This is an alpha version; results may contain numerical artefacts, so use with caution for testing and visualization.""" | |
| ) | |
| with gr.Row(): | |
| with gr.Column(): | |
| random_button = gr.Button("Generate random model") | |
| custom_model = gr.Dataframe( | |
| headers=["Thickness (km)", "Vs (km/s)"], | |
| value=DEFAULT_TABLE, | |
| row_count=(len(DEFAULT_TABLE), "dynamic"), | |
| col_count=(2, "fixed"), | |
| datatype=["float", "float"], | |
| label="Custom layered model", | |
| ) | |
| vpvs_input = gr.Slider( | |
| minimum=1.5, | |
| maximum=2.2, | |
| value=1.75, | |
| step=0.01, | |
| label="Vp/Vs ratio", | |
| ) | |
| with gr.Column(): | |
| p_min_input = gr.Number(value=1.0, label="Minimum period (s)") | |
| p_max_input = gr.Number(value=30.0, label="Maximum period (s)") | |
| wave_type_input = gr.Radio( | |
| choices=list(WAVE_TYPE_OPTIONS.keys()), | |
| value="Rayleigh waves (iwave=2)", | |
| label="Wave type", | |
| ) | |
| mode_input = gr.Slider( | |
| minimum=1, | |
| maximum=3, | |
| value=1, | |
| step=1, | |
| label="Mode number", | |
| ) | |
| group_mode_input = gr.Radio( | |
| choices=list(GROUP_VELOCITY_OPTIONS.keys()), | |
| value="Phase velocity only (igr=0)", | |
| label="Velocity output", | |
| ) | |
| run_button = gr.Button("Run simulation", variant="primary") | |
| with gr.Row(): | |
| model_plot_output = gr.Plot(label="Shear-wave velocity profile") | |
| plot_output = gr.Plot(label="Dispersion curve") | |
| with gr.Row(): | |
| table_output = gr.Dataframe( | |
| headers=["Period (s)", "Phase velocity (km/s)"], | |
| datatype="float", | |
| col_count=(2, "fixed"), | |
| row_count=(DISPERSION_SAMPLES, "dynamic"), | |
| label="Sampled dispersion values", | |
| ) | |
| summary_output = gr.Markdown() | |
| demo.load( | |
| fn=lambda: _make_model_plot(DEFAULT_THICKNESS, DEFAULT_VS), | |
| inputs=None, | |
| outputs=model_plot_output, | |
| ) | |
| custom_model.change( | |
| fn=on_layer_table_change, | |
| inputs=custom_model, | |
| outputs=[custom_model, model_plot_output], | |
| trigger_mode="always_last", | |
| queue=False, | |
| ) | |
| random_button.click( | |
| fn=on_random_model_click, | |
| inputs=None, | |
| outputs=[custom_model, model_plot_output], | |
| ) | |
| run_button.click( | |
| fn=run_simulation, | |
| inputs=[ | |
| custom_model, | |
| vpvs_input, | |
| p_min_input, | |
| p_max_input, | |
| wave_type_input, | |
| mode_input, | |
| group_mode_input, | |
| ], | |
| outputs=[model_plot_output, plot_output, table_output, summary_output], | |
| ) | |
| else: # pragma: no cover - allows importing without gradio installed | |
| demo = None | |
| if __name__ == "__main__": | |
| if demo is None: | |
| raise ImportError("gradio must be installed to launch the interface.") | |
| demo.launch() | |