Spaces:
Runtime error
Runtime error
| # app.py | |
| import os, uuid, zipfile, math | |
| from pathlib import Path | |
| from typing import Optional | |
| from fastapi import FastAPI, UploadFile, File, Form, HTTPException | |
| from fastapi.responses import FileResponse | |
| import numpy as np | |
| import rasterio | |
| from rasterio.transform import Affine | |
| from scipy.ndimage import gaussian_filter, map_coordinates, binary_fill_holes, binary_closing, binary_dilation, generate_binary_structure | |
| from scipy.ndimage import label | |
| import matplotlib.pyplot as plt | |
| from tqdm import tqdm | |
| from shapely.geometry import LineString, mapping | |
| import geopandas as gpd | |
| from fastapi.responses import RedirectResponse | |
| # gravity | |
| g = 9.81 | |
| app = FastAPI(title="Runout simulator single-file API") | |
| BASE_WORKDIR = Path("/tmp/runout_jobs_single") | |
| BASE_WORKDIR.mkdir(parents=True, exist_ok=True) | |
| # ---------------- IO helpers ---------------- | |
| def read_dem(path): | |
| with rasterio.open(path) as ds: | |
| dem = ds.read(1).astype(float) | |
| meta = ds.meta.copy() | |
| transform = ds.transform | |
| crs = ds.crs | |
| return dem, transform, crs, meta | |
| def save_geotiff(arr, meta, path): | |
| meta2 = meta.copy() | |
| meta2.update(dtype="float32", count=1) | |
| os.makedirs(os.path.dirname(path) or ".", exist_ok=True) | |
| with rasterio.open(path, "w", **meta2) as dst: | |
| dst.write(np.array(arr, dtype='float32'), 1) | |
| def save_png(arr, out_path, cmap="inferno", smooth=1.0, log_scale=False, clip_percent=(0.1,99.9)): | |
| a = np.array(arr, dtype=float) | |
| a[~np.isfinite(a)] = np.nan | |
| if smooth and smooth > 0: | |
| a = gaussian_filter(a, sigma=smooth) | |
| vals = a[~np.isnan(a)] | |
| if vals.size == 0: | |
| a = np.zeros((10,10)) | |
| vmin, vmax = 0,1 | |
| else: | |
| if log_scale: | |
| a = np.log1p(np.clip(a,0,None)*1e4) | |
| vals = a[~np.isnan(a)] | |
| vmin = float(np.percentile(vals, clip_percent[0])) | |
| vmax = float(np.percentile(vals, clip_percent[1])) | |
| if vmax <= vmin: | |
| vmax = vmin + 1e-6 | |
| os.makedirs(os.path.dirname(out_path) or ".", exist_ok=True) | |
| plt.imsave(out_path, a, cmap=cmap, vmin=vmin, vmax=vmax) | |
| def save_samples_geojson(samples, transform, out_path, crs=None): | |
| feats=[] | |
| for i,path in enumerate(samples): | |
| if not path or len(path)<2: continue | |
| coords=[rasterio.transform.xy(transform,int(r),int(c)) for (r,c) in path] | |
| feats.append({"type":"Feature","geometry":mapping(LineString(coords)),"properties":{"id":i,"steps":len(coords)}}) | |
| if not feats: | |
| return | |
| gdf = gpd.GeoDataFrame.from_features(feats) | |
| if crs is not None: | |
| try: | |
| gdf.set_crs(crs, inplace=True) | |
| except Exception: | |
| pass | |
| os.makedirs(os.path.dirname(out_path) or ".", exist_ok=True) | |
| gdf.to_file(out_path, driver="GeoJSON") | |
| # ---------------- interpolation & gradients ---------------- | |
| def interp_bilinear(arr, pts): | |
| res = map_coordinates(arr, pts, order=1, mode='nearest') | |
| if np.size(res) == 1: | |
| return float(res) | |
| return res | |
| def dem_gradients(dem, cellsize): | |
| # return dz/dx, dz/dy (rise per meter) | |
| dy, dx = np.gradient(dem, cellsize, cellsize) | |
| return dx, dy | |
| def compute_slope_field(dx_field, dy_field): | |
| grad_mag = np.sqrt(dx_field*dx_field + dy_field*dy_field) | |
| slope_deg = np.degrees(np.arctan(grad_mag)) | |
| return slope_deg | |
| # ---------------- rim detection ---------------- | |
| def detect_rim_coords(dem, cellsize, pit_depth_frac=0.25, rim_buffer_m=6.0, min_rim_samples=500): | |
| dem_valid = np.where(np.isfinite(dem), dem, np.nan) | |
| zmin = float(np.nanmin(dem_valid)); zmax = float(np.nanmax(dem_valid)) | |
| depth = zmax - zmin | |
| if depth <= 0: | |
| h,w = dem.shape | |
| return [(h//2,w//2)], depth | |
| pit_thresh = zmin + pit_depth_frac * depth | |
| pit_mask = (dem <= pit_thresh) & np.isfinite(dem) | |
| pit_mask = binary_fill_holes(pit_mask) | |
| pit_mask = binary_closing(pit_mask, structure=generate_binary_structure(2,2), iterations=2) | |
| lbls, nlab = label(pit_mask) | |
| if nlab > 1: | |
| counts = [(lbls==i).sum() for i in range(1, nlab+1)] | |
| largest = int(np.argmax(counts) + 1) | |
| pit_mask = (lbls == largest) | |
| iters = max(1, int(round(rim_buffer_m / cellsize))) | |
| rim_band = binary_dilation(pit_mask, iterations=iters) & (~pit_mask) | |
| coords = list(zip(*np.where(rim_band))) | |
| if len(coords) > min_rim_samples: | |
| step = max(1, int(len(coords)/min_rim_samples)) | |
| coords = coords[::step] | |
| return coords, depth | |
| # ---------------- steepest march ---------------- | |
| def march_steepest(dem, start_rc, cellsize, slope_deg, slope_thresh_deg, min_drop_m, max_steps): | |
| offs = [(-1,-1),(-1,0),(-1,1),(0,-1),(0,1),(1,-1),(1,0),(1,1)] | |
| rows, cols = dem.shape | |
| r, c = int(start_rc[0]), int(start_rc[1]) | |
| z0 = dem[r,c] | |
| visited = set(); visited.add((r,c)) | |
| for step in range(max_steps): | |
| best = None; best_dz = 0.0 | |
| z = dem[r,c] | |
| if not np.isfinite(z): | |
| break | |
| for (dr,dc) in offs: | |
| rr,cc = r+dr, c+dc | |
| if rr<0 or rr>=rows or cc<0 or cc>=cols: continue | |
| if (rr,cc) in visited: continue | |
| zn = dem[rr,cc] | |
| if not np.isfinite(zn): continue | |
| dz = z - zn | |
| if dz > best_dz: | |
| best_dz = dz; best = (rr,cc) | |
| if best is None or best_dz <= 0: | |
| break | |
| r,c = best; visited.add((r,c)) | |
| cum_drop = z0 - dem[r,c] | |
| if slope_deg[r,c] >= slope_thresh_deg and cum_drop >= min_drop_m: | |
| return (r,c), cum_drop, step+1 | |
| return None, 0.0, step+1 | |
| # ---------------- particle integrator with wall handling ---------------- | |
| def particle_run(dem, dx_field, dy_field, cellsize, start_xy_m, initial_speed, mu, step_loss, | |
| transform, xi=1000.0, wall_slope_thresh_deg=60.0, wall_restitution=0.2, | |
| wall_deflect_frac=0.85, stop_on_wall_ke=5.0, | |
| dt_max=0.2, dt_min=0.005, max_steps=20000, v_threshold=0.2): | |
| x_m, y_m = start_xy_m | |
| rows, cols = dem.shape | |
| inv = ~transform | |
| # initial gradient / direction | |
| col0, row0 = inv * (x_m, y_m) | |
| pts0 = np.array([[row0],[col0]]) | |
| try: | |
| gx0 = float(interp_bilinear(dx_field, pts0)) | |
| gy0 = float(interp_bilinear(dy_field, pts0)) | |
| except Exception: | |
| gx0 = 0.0; gy0 = 0.0 | |
| downslope = np.array([-gx0, -gy0]); n0 = np.linalg.norm(downslope) | |
| dir_x, dir_y = (downslope / n0) if n0 > 1e-9 else (0.0, 0.0) | |
| vx = initial_speed * dir_x | |
| vy = initial_speed * dir_y | |
| pos_x = x_m; pos_y = y_m | |
| visited = [] | |
| ke_cells = {} | |
| for step in range(max_steps): | |
| col_cur, row_cur = inv * (pos_x, pos_y) | |
| if col_cur < 0 or col_cur >= cols-1 or row_cur < 0 or row_cur >= rows-1: | |
| break | |
| pts = np.array([[row_cur],[col_cur]]) | |
| try: | |
| gz_x = float(interp_bilinear(dx_field, pts)) | |
| gz_y = float(interp_bilinear(dy_field, pts)) | |
| except Exception: | |
| gz_x = 0.0; gz_y = 0.0 | |
| grad_mag = math.hypot(gz_x, gz_y) | |
| local_slope_deg = math.degrees(math.atan(grad_mag)) | |
| a_drive_x = -g * gz_x | |
| a_drive_y = -g * gz_y | |
| vmag = math.hypot(vx, vy) | |
| if vmag > 1e-9: | |
| ux = vx / vmag; uy = vy / vmag | |
| a_voellmy_mag = mu * g + (vmag * vmag) / max(1e-12, xi) | |
| a_voellmy_x = -a_voellmy_mag * ux | |
| a_voellmy_y = -a_voellmy_mag * uy | |
| else: | |
| a_voellmy_x = a_voellmy_y = 0.0 | |
| c_drag = 0.03 | |
| a_drag_x = -c_drag * vx * vmag | |
| a_drag_y = -c_drag * vy * vmag | |
| ax = a_drive_x + a_voellmy_x + a_drag_x | |
| ay = a_drive_y + a_voellmy_y + a_drag_y | |
| dt = min(dt_max, max(dt_min, 0.4 * (cellsize / (vmag + 1e-6)))) | |
| vx_mid = vx + 0.5 * ax * dt | |
| vy_mid = vy + 0.5 * ay * dt | |
| new_pos_x = pos_x + vx_mid * dt | |
| new_pos_y = pos_y + vy_mid * dt | |
| new_vx = vx + ax * dt | |
| new_vy = vy + ay * dt | |
| # wall check at next location | |
| col_next, row_next = inv * (new_pos_x, new_pos_y) | |
| wall_encounter = False | |
| gz_x_n = gz_y_n = grad_mag_n = 0.0 | |
| if 0 <= col_next < cols and 0 <= row_next < rows: | |
| try: | |
| gz_x_n = float(interp_bilinear(dx_field, np.array([[row_next],[col_next]]))) | |
| gz_y_n = float(interp_bilinear(dy_field, np.array([[row_next],[col_next]]))) | |
| grad_mag_n = math.hypot(gz_x_n, gz_y_n) | |
| local_slope_deg_n = math.degrees(math.atan(grad_mag_n)) | |
| except Exception: | |
| local_slope_deg_n = 0.0 | |
| grad_mag_n = 0.0 | |
| if local_slope_deg_n >= wall_slope_thresh_deg: | |
| wall_encounter = True | |
| n_hat = np.array([gz_x_n, gz_y_n]) / max(1e-12, grad_mag_n) | |
| if wall_encounter: | |
| ke = 0.5 * (vmag * vmag) | |
| if ke <= stop_on_wall_ke: | |
| r_idx = int(round(row_cur)); c_idx = int(round(col_cur)) | |
| if 0 <= r_idx < rows and 0 <= c_idx < cols: | |
| visited.append((r_idx, c_idx)) | |
| ke_cells[(r_idx,c_idx)] = max(ke_cells.get((r_idx,c_idx), 0.0), ke) | |
| break | |
| t = np.array([-gz_y_n, gz_x_n]) | |
| tnorm = np.linalg.norm(t) | |
| if tnorm < 1e-12: | |
| v_vec = np.array([new_vx, new_vy]) | |
| nv = np.dot(v_vec, n_hat) | |
| v_reflect = v_vec - (1.0 + wall_restitution) * nv * n_hat | |
| v_reflect *= 0.9 | |
| new_vx, new_vy = float(v_reflect[0]), float(v_reflect[1]) | |
| else: | |
| t_hat = t / tnorm | |
| v_vec = np.array([new_vx, new_vy]) | |
| v_tang = np.dot(v_vec, t_hat) * t_hat | |
| v_norm = v_vec - v_tang | |
| new_v_vec = wall_deflect_frac * v_tang - wall_restitution * v_norm | |
| new_vx, new_vy = float(new_v_vec[0]), float(new_v_vec[1]) | |
| new_pos_x = pos_x + new_vx * dt * 0.6 | |
| new_pos_y = pos_y + new_vy * dt * 0.6 | |
| vmag_after = math.hypot(new_vx, new_vy) | |
| if vmag_after < v_threshold: | |
| r_idx = int(round(row_cur)); c_idx = int(round(col_cur)) | |
| if 0 <= r_idx < rows and 0 <= c_idx < cols: | |
| visited.append((r_idx, c_idx)) | |
| ke_cells[(r_idx,c_idx)] = max(ke_cells.get((r_idx,c_idx), 0.0), 0.5 * vmag_after * vmag_after) | |
| break | |
| pos_x, pos_y = new_pos_x, new_pos_y | |
| vx, vy = new_vx, new_vy | |
| else: | |
| pos_x, pos_y = new_pos_x, new_pos_y | |
| vx, vy = new_vx, new_vy | |
| loss = np.clip(np.random.normal(loc=step_loss, scale=step_loss*0.2), 0.0, 0.6) | |
| vx *= (1.0 - loss); vy *= (1.0 - loss) | |
| vmag = math.hypot(vx, vy) | |
| col_round, row_round = inv * (pos_x, pos_y) | |
| r_idx = int(round(row_round)); c_idx = int(round(col_round)) | |
| if r_idx < 0 or r_idx >= rows or c_idx < 0 or c_idx >= cols: | |
| break | |
| visited.append((r_idx, c_idx)) | |
| ke_local = 0.5 * (vmag * vmag) | |
| ke_cells[(r_idx, c_idx)] = max(ke_cells.get((r_idx, c_idx), 0.0), ke_local) | |
| if vmag < v_threshold: | |
| break | |
| unique_visited = [] | |
| seen = set() | |
| for rc in visited: | |
| if rc not in seen: | |
| unique_visited.append(rc); seen.add(rc) | |
| return unique_visited, ke_cells | |
| # ---------------- Monte Carlo ---------------- | |
| def run_mc(dem, transform, release_cells, cellsize, trials, mu_mean, mu_std, xi, step_loss, | |
| min_drop, force_drop, sample_paths, seed, wall_slope_thresh_deg, wall_restitution, wall_deflect_frac, stop_on_wall_ke): | |
| rng = np.random.default_rng(seed) | |
| dx_field, dy_field = dem_gradients(dem, cellsize) | |
| heat = np.zeros_like(dem, dtype=float) | |
| impact = np.zeros_like(dem, dtype=float) | |
| sample_paths_out = [] | |
| n_releases = len(release_cells) | |
| if n_releases == 0: | |
| raise RuntimeError("No release cells provided.") | |
| for t in tqdm(range(trials), desc="Trials"): | |
| r,c = release_cells[rng.integers(0, n_releases)] | |
| x0, y0 = rasterio.transform.xy(transform, int(r), int(c)) | |
| if force_drop is not None: | |
| drop_m = float(force_drop) | |
| else: | |
| drop_m = max(min_drop, 0.02 * (np.nanmax(dem) - np.nanmin(dem))) | |
| initial_speed = math.sqrt(2.0 * g * drop_m) | |
| mu = float(max(0.0, rng.normal(mu_mean, mu_std))) | |
| path, ke_map = particle_run(dem, dx_field, dy_field, cellsize, | |
| (x0, y0), initial_speed, mu, | |
| step_loss, transform, | |
| xi=xi, | |
| wall_slope_thresh_deg=wall_slope_thresh_deg, | |
| wall_restitution=wall_restitution, | |
| wall_deflect_frac=wall_deflect_frac, | |
| stop_on_wall_ke=stop_on_wall_ke) | |
| for (rr, cc) in path: | |
| heat[rr, cc] += 1.0 | |
| if (rr,cc) in ke_map: | |
| impact[rr, cc] = max(impact[rr, cc], ke_map[(rr,cc)]) | |
| if len(sample_paths_out) < sample_paths: | |
| sample_paths_out.append(path) | |
| heat /= float(max(1, trials)) | |
| return heat, impact, sample_paths_out | |
| # ---------------- utils ---------------- | |
| def print_stats(name, arr): | |
| a = np.array(arr, dtype=float) | |
| a[~np.isfinite(a)] = np.nan | |
| non = a[~np.isnan(a)] | |
| if non.size == 0: | |
| return f"[STAT] {name}: no finite values" | |
| return f"[STAT] {name}: min={np.nanmin(non):.6g}, p1={np.nanpercentile(non,1):.6g}, p50={np.nanpercentile(non,50):.6g}, p99={np.nanpercentile(non,99):.6g}, max={np.nanmax(non):.6g}, mean={np.nanmean(non):.6g}" | |
| # ---------------- API endpoint ---------------- | |
| async def run_endpoint( | |
| dem: UploadFile = File(...), | |
| trials: int = Form(500), | |
| release_density: int = Form(1), | |
| pit_depth_frac_release: float = Form(0.25), | |
| slope_thresh_deg: float = Form(5.0), | |
| mu_mean: float = Form(0.05), | |
| mu_std: float = Form(0.01), | |
| xi: float = Form(1000.0), | |
| step_loss: float = Form(0.001), | |
| sample_paths: int = Form(50), | |
| wall_slope_thresh_deg: float = Form(60.0), | |
| wall_restitution: float = Form(0.2), | |
| wall_deflect_frac: float = Form(0.85), | |
| stop_on_wall_ke: float = Form(5.0), | |
| seed: int = Form(0), | |
| out_prefix: Optional[str] = Form(None), | |
| ): | |
| job_id = uuid.uuid4().hex[:12] | |
| job_dir = BASE_WORKDIR / job_id | |
| job_dir.mkdir(parents=True, exist_ok=True) | |
| dem_path = job_dir / "input_dem.tif" | |
| with dem_path.open("wb") as f: | |
| f.write(await dem.read()) | |
| # read dem | |
| try: | |
| dem_arr, transform, crs, meta = read_dem(str(dem_path)) | |
| except Exception as e: | |
| raise HTTPException(status_code=400, detail=f"Failed reading DEM: {e}") | |
| cellsize = abs(transform.a) | |
| # detect rim | |
| rim_coords, pit_depth = detect_rim_coords(dem_arr, cellsize, pit_depth_frac=pit_depth_frac_release, rim_buffer_m=6.0) | |
| dx_field, dy_field = dem_gradients(dem_arr, cellsize) | |
| slope_field = compute_slope_field(dx_field, dy_field) | |
| # march steepest to find release targets | |
| release_cells=[] | |
| for rc in rim_coords: | |
| tgt, cum_drop, steps = march_steepest(dem_arr, rc, cellsize, slope_field, slope_thresh_deg, 2.0, 400) | |
| if tgt is not None: | |
| release_cells.append(tgt) | |
| seeds = release_cells * max(1, int(release_density)) | |
| if len(seeds) == 0: | |
| raise HTTPException(status_code=400, detail="No release seeds found.") | |
| # debug rim png | |
| debug_rim_png = job_dir / f"{job_id}_rim_debug.png" | |
| try: | |
| overlay = np.zeros((dem_arr.shape[0], dem_arr.shape[1], 3), dtype=float) | |
| dmmin = float(np.nanmin(dem_arr)); dmmax = float(np.nanmax(dem_arr)) | |
| norm = (dem_arr - dmmin) / (dmmax - dmmin + 1e-9) | |
| overlay[..., :] = np.expand_dims(norm,2) * 0.35 | |
| for (r,c) in release_cells: | |
| if 0 <= r < overlay.shape[0] and 0 <= c < overlay.shape[1]: | |
| overlay[r,c] = [1.0, 0.0, 0.0] | |
| plt.imsave(str(debug_rim_png), np.clip(overlay,0,1)) | |
| except Exception: | |
| pass | |
| # run Monte Carlo | |
| try: | |
| heat, impact, sample_paths = run_mc(dem_arr, transform, seeds, cellsize, | |
| int(trials), float(mu_mean), float(mu_std), float(xi), float(step_loss), | |
| float(2.0), None, int(sample_paths), int(seed), | |
| float(wall_slope_thresh_deg), float(wall_restitution), float(wall_deflect_frac), float(stop_on_wall_ke)) | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Simulation error: {e}") | |
| # outputs | |
| prefix = out_prefix or job_id | |
| out_heat_png = job_dir / f"{prefix}_heat.png" | |
| out_impact_png = job_dir / f"{prefix}_impact.png" | |
| out_heat_tif = job_dir / f"{prefix}_heat.tif" | |
| out_impact_tif = job_dir / f"{prefix}_impact.tif" | |
| out_samples_geojson = job_dir / f"{prefix}_samples.geojson" | |
| try: | |
| save_geotiff(heat, meta, str(out_heat_tif)) | |
| save_geotiff(impact, meta, str(out_impact_tif)) | |
| save_png(heat, str(out_heat_png), cmap="inferno", smooth=1.0, log_scale=False) | |
| save_png(impact, str(out_impact_png), cmap="magma", smooth=1.0, log_scale=True) | |
| save_samples_geojson(sample_paths, transform, str(out_samples_geojson), crs=crs) | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=f"Failed to write outputs: {e}") | |
| # diagnostics text | |
| diag_txt = job_dir / "diagnostics.txt" | |
| with diag_txt.open("w") as f: | |
| f.write(f"Rim pixels: {len(rim_coords)} pit_depth: {pit_depth}\n") | |
| f.write(print_stats("heat", heat) + "\n") | |
| f.write(print_stats("impact", impact) + "\n") | |
| try: | |
| idx = np.nanargmax(impact) | |
| r = int(idx) // impact.shape[1]; c = int(idx) % impact.shape[1] | |
| x,y = rasterio.transform.xy(transform, r, c) | |
| f.write(f"Max impact at row {r} col {c} -> x={x:.3f}, y={y:.3f}, KE={impact[r,c]:.6g}\n") | |
| except Exception: | |
| pass | |
| # zip outputs | |
| zip_path = job_dir / f"{job_id}_outputs.zip" | |
| with zipfile.ZipFile(zip_path, "w", compression=zipfile.ZIP_DEFLATED) as zf: | |
| for p in [out_heat_png, out_impact_png, out_heat_tif, out_impact_tif, out_samples_geojson, debug_rim_png, diag_txt]: | |
| if p.exists(): | |
| zf.write(p, arcname=p.name) | |
| return FileResponse(path=str(zip_path), filename=zip_path.name, media_type="application/zip") | |
| def health(): | |
| return {"status":"ok"} | |
| def root(): | |
| return RedirectResponse(url="/docs") | |