Spaces:
Sleeping
Sleeping
| import os | |
| import json | |
| import numpy as np | |
| import rasterio | |
| from rasterio.features import rasterize | |
| from shapely.geometry import shape | |
| from PIL import Image | |
| import gradio as gr | |
| from pf_engine import Priors, Targets, compute_slope_deg, pf_and_prescriptions | |
| ART_DIR = os.path.join(os.getcwd(), "artifacts") | |
| os.makedirs(ART_DIR, exist_ok=True) | |
| def _normalize_to_png(arr: np.ndarray, clip_high_if_needed=True) -> Image.Image: | |
| a = arr.copy() | |
| if clip_high_if_needed and np.nanmax(a) > 1.5: | |
| finite = np.isfinite(a) | |
| hi = np.nanpercentile(a[finite], 99) if finite.any() else 1.0 | |
| a = np.clip(a / max(hi, 1e-6), 0, 1) | |
| a = np.nan_to_num(a, nan=0.0) | |
| a = (a * 255).astype(np.uint8) | |
| return Image.fromarray(a) | |
| def _mask_from_geojson(ds: rasterio.io.DatasetReader, gj_text: str | None) -> np.ndarray: | |
| """Return a boolean mask for the AOI. If text is empty/invalid, return full-extent mask.""" | |
| # Full-extent mask helper | |
| def full_mask(): | |
| return np.ones((ds.height, ds.width), dtype=bool) | |
| if not gj_text or not str(gj_text).strip(): | |
| return full_mask() | |
| # Try parse JSON; on failure -> full mask (do not crash job) | |
| try: | |
| gj = json.loads(gj_text) | |
| except Exception: | |
| return full_mask() | |
| # Accept FeatureCollection / Feature / raw geometry | |
| try: | |
| if isinstance(gj, dict) and gj.get("type") == "FeatureCollection": | |
| geoms = [shape(f["geometry"]) for f in gj.get("features", []) if "geometry" in f] | |
| elif isinstance(gj, dict) and gj.get("type") == "Feature": | |
| geoms = [shape(gj["geometry"])] | |
| elif isinstance(gj, dict) and "type" in gj: | |
| geoms = [shape(gj)] | |
| # Also accept a bare coordinate ring (list) like [[x,y], ...] | |
| elif isinstance(gj, (list, tuple)) and gj and isinstance(gj[0], (list, tuple)): | |
| from shapely.geometry import Polygon | |
| geoms = [Polygon(gj)] | |
| else: | |
| return full_mask() | |
| except Exception: | |
| return full_mask() | |
| if not geoms: | |
| return full_mask() | |
| mask = rasterize( | |
| [(g, 1) for g in geoms], | |
| out_shape=(ds.height, ds.width), | |
| transform=ds.transform, | |
| fill=0, | |
| all_touched=True, | |
| dtype="uint8", | |
| ).astype(bool) | |
| return mask | |
| def _open_geotiff_strict(path: str) -> rasterio.io.DatasetReader: | |
| """ | |
| Open the raster and ensure it's a GeoTIFF/COG. | |
| Raises with a clear message if not compatible. | |
| """ | |
| try: | |
| ds = rasterio.open(path) | |
| except Exception as e: | |
| raise ValueError(f"Could not open raster: {e}") | |
| # Accept common TIFF drivers | |
| driver = (getattr(ds, "driver", "") or "").lower() | |
| if driver not in {"gtiff", "cog"}: | |
| ds.close() | |
| raise ValueError(f"Unsupported raster driver '{driver}'. Please upload a GeoTIFF (.tif/.tiff).") | |
| # Basic sanity check: has at least one band | |
| if getattr(ds, "count", 0) < 1: | |
| ds.close() | |
| raise ValueError("Raster has no bands.") | |
| return ds | |
| def run_pipeline( | |
| dem_path: str, | |
| pf_target, | |
| N, c_mu, c_sd, phi_mu, phi_sd, gamma_mu, gamma_sd, z_mu, z_sd, ru_mu, ru_sd, | |
| polygon_geojson_text | |
| ): | |
| # Handle empty | |
| if not dem_path: | |
| return None, None, None, "Please upload a DEM GeoTIFF.", None, None, None | |
| # Try to open and validate as GeoTIFF/COG (server-side validation instead of UI filter) | |
| try: | |
| ds = _open_geotiff_strict(dem_path) | |
| except Exception as e: | |
| return None, None, None, f"File is not a valid GeoTIFF: {e}", None, None, None | |
| base = os.path.splitext(os.path.basename(dem_path))[0] | |
| try: | |
| dem = ds.read(1).astype(np.float32) | |
| mask = _mask_from_geojson(ds, polygon_geojson_text) | |
| finally: | |
| ds.close() | |
| slope_deg = compute_slope_deg(dem) | |
| pri = Priors( | |
| N=int(N), | |
| c_mu=float(c_mu), c_sd=float(c_sd), | |
| phi_mu=float(phi_mu), phi_sd=float(phi_sd), | |
| gamma_mu=float(gamma_mu), gamma_sd=float(gamma_sd), | |
| z_mu=float(z_mu), z_sd=float(z_sd), | |
| ru_mu=float(ru_mu), ru_sd=float(ru_sd), | |
| ) | |
| tgt = Targets(pf_target=float(pf_target)) | |
| rasters = pf_and_prescriptions(slope_deg, pri, tgt, mask=mask, seed=42) | |
| # Save artifacts | |
| out_pf = os.path.join(ART_DIR, f"{base}_pf.png") | |
| out_dc = os.path.join(ART_DIR, f"{base}_delta_c.png") | |
| out_dp = os.path.join(ART_DIR, f"{base}_delta_phi.png") | |
| _normalize_to_png(rasters["pf"], clip_high_if_needed=False).save(out_pf) | |
| _normalize_to_png(rasters["delta_c_kpa"], clip_high_if_needed=True).save(out_dc) | |
| _normalize_to_png(rasters["delta_phi_deg"], clip_high_if_needed=True).save(out_dp) | |
| # stats | |
| def nanstats(a): | |
| v = a[np.isfinite(a)] | |
| if v.size == 0: | |
| return {"mean": None, "p90": None, "max": None} | |
| return {"mean": float(v.mean()), "p90": float(np.quantile(v, 0.9)), "max": float(v.max())} | |
| stats = { | |
| "pf": nanstats(rasters["pf"]), | |
| "delta_c_kpa": nanstats(rasters["delta_c_kpa"]), | |
| "delta_phi_deg": nanstats(rasters["delta_phi_deg"]), | |
| "target_pf": tgt.pf_target, | |
| "priors": { | |
| "N": pri.N, "c_mu": pri.c_mu, "c_sd": pri.c_sd, | |
| "phi_mu": pri.phi_mu, "phi_sd": pri.phi_sd, | |
| "gamma_mu": pri.gamma_mu, "gamma_sd": pri.gamma_sd, | |
| "z_mu": pri.z_mu, "z_sd": pri.z_sd, | |
| "ru_mu": pri.ru_mu, "ru_sd": pri.ru_sd, | |
| } | |
| } | |
| return ( | |
| Image.open(out_pf), | |
| Image.open(out_dc), | |
| Image.open(out_dp), | |
| json.dumps(stats, indent=2), | |
| out_pf, out_dc, out_dp | |
| ) | |
| with gr.Blocks(title="Probabilistic Rockfall — Gradio") as demo: | |
| gr.Markdown("# Probabilistic Rockfall (Infinite Slope, Monte Carlo)") | |
| gr.Markdown( | |
| "Upload a **DEM GeoTIFF**, optionally paste a **GeoJSON** AOI, set priors / target Pf, " | |
| "then compute Pf and minimal Δc (kPa) / Δφ (deg) prescriptions." | |
| ) | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| # 🔧 Accept any file; validate in Python (fixes 'Invalid file type' at upload time) | |
| dem_file = gr.File( | |
| label="DEM GeoTIFF", | |
| file_types=None, # <— was [".tif", ".tiff"] | |
| type="filepath", | |
| file_count="single", | |
| ) | |
| polygon_geojson_text = gr.Textbox( | |
| label="AOI GeoJSON (optional)", | |
| placeholder='{"type":"Feature","geometry":{"type":"Polygon","coordinates":[[...]...]}}', | |
| lines=6 | |
| ) | |
| pf_target = gr.Slider(0.0, 0.5, value=0.05, step=0.005, label="Target Pf") | |
| with gr.Accordion("Priors (advanced)", open=False): | |
| N = gr.Slider(100, 10000, value=2000, step=100, label="N (samples)") | |
| c_mu = gr.Number(value=20.0, label="c' mean (kPa)") | |
| c_sd = gr.Number(value=5.0, label="c' sd") | |
| phi_mu = gr.Number(value=30.0, label="φ mean (deg)") | |
| phi_sd = gr.Number(value=5.0, label="φ sd") | |
| gamma_mu = gr.Number(value=19.0, label="γ mean (kN/m³)") | |
| gamma_sd = gr.Number(value=1.0, label="γ sd") | |
| z_mu = gr.Number(value=5.0, label="Failure depth mean z (m)") | |
| z_sd = gr.Number(value=1.0, label="z sd") | |
| ru_mu = gr.Number(value=0.2, label="ru mean (0–1)") | |
| ru_sd = gr.Number(value=0.05, label="ru sd") | |
| run_btn = gr.Button("Run", variant="primary") | |
| with gr.Column(scale=1): | |
| pf_img = gr.Image(label="Pf (0–1), normalized preview", interactive=False) | |
| dc_img = gr.Image(label="Δc (kPa), normalized preview", interactive=False) | |
| dp_img = gr.Image(label="Δφ (deg), normalized preview", interactive=False) | |
| stats_json = gr.Code(label="Stats & Params (JSON)", language="json") | |
| gr.Markdown("### Download Artifacts") | |
| out_pf = gr.File(label="pf.png") | |
| out_dc = gr.File(label="delta_c.png") | |
| out_dp = gr.File(label="delta_phi.png") | |
| run_btn.click( | |
| run_pipeline, | |
| inputs=[dem_file, pf_target, N, c_mu, c_sd, phi_mu, phi_sd, gamma_mu, gamma_sd, z_mu, z_sd, ru_mu, ru_sd, polygon_geojson_text], | |
| outputs=[pf_img, dc_img, dp_img, stats_json, out_pf, out_dc, out_dp] | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch() | |