# app.py import os, uuid, zipfile, math from pathlib import Path from typing import Optional from fastapi import FastAPI, UploadFile, File, Form, HTTPException from fastapi.responses import FileResponse import numpy as np import rasterio from rasterio.transform import Affine from scipy.ndimage import gaussian_filter, map_coordinates, binary_fill_holes, binary_closing, binary_dilation, generate_binary_structure from scipy.ndimage import label import matplotlib.pyplot as plt from tqdm import tqdm from shapely.geometry import LineString, mapping import geopandas as gpd from fastapi.responses import RedirectResponse # gravity g = 9.81 app = FastAPI(title="Runout simulator single-file API") BASE_WORKDIR = Path("/tmp/runout_jobs_single") BASE_WORKDIR.mkdir(parents=True, exist_ok=True) # ---------------- IO helpers ---------------- def read_dem(path): with rasterio.open(path) as ds: dem = ds.read(1).astype(float) meta = ds.meta.copy() transform = ds.transform crs = ds.crs return dem, transform, crs, meta def save_geotiff(arr, meta, path): meta2 = meta.copy() meta2.update(dtype="float32", count=1) os.makedirs(os.path.dirname(path) or ".", exist_ok=True) with rasterio.open(path, "w", **meta2) as dst: dst.write(np.array(arr, dtype='float32'), 1) def save_png(arr, out_path, cmap="inferno", smooth=1.0, log_scale=False, clip_percent=(0.1,99.9)): a = np.array(arr, dtype=float) a[~np.isfinite(a)] = np.nan if smooth and smooth > 0: a = gaussian_filter(a, sigma=smooth) vals = a[~np.isnan(a)] if vals.size == 0: a = np.zeros((10,10)) vmin, vmax = 0,1 else: if log_scale: a = np.log1p(np.clip(a,0,None)*1e4) vals = a[~np.isnan(a)] vmin = float(np.percentile(vals, clip_percent[0])) vmax = float(np.percentile(vals, clip_percent[1])) if vmax <= vmin: vmax = vmin + 1e-6 os.makedirs(os.path.dirname(out_path) or ".", exist_ok=True) plt.imsave(out_path, a, cmap=cmap, vmin=vmin, vmax=vmax) def save_samples_geojson(samples, transform, out_path, crs=None): feats=[] for i,path in enumerate(samples): if not path or len(path)<2: continue coords=[rasterio.transform.xy(transform,int(r),int(c)) for (r,c) in path] feats.append({"type":"Feature","geometry":mapping(LineString(coords)),"properties":{"id":i,"steps":len(coords)}}) if not feats: return gdf = gpd.GeoDataFrame.from_features(feats) if crs is not None: try: gdf.set_crs(crs, inplace=True) except Exception: pass os.makedirs(os.path.dirname(out_path) or ".", exist_ok=True) gdf.to_file(out_path, driver="GeoJSON") # ---------------- interpolation & gradients ---------------- def interp_bilinear(arr, pts): res = map_coordinates(arr, pts, order=1, mode='nearest') if np.size(res) == 1: return float(res) return res def dem_gradients(dem, cellsize): # return dz/dx, dz/dy (rise per meter) dy, dx = np.gradient(dem, cellsize, cellsize) return dx, dy def compute_slope_field(dx_field, dy_field): grad_mag = np.sqrt(dx_field*dx_field + dy_field*dy_field) slope_deg = np.degrees(np.arctan(grad_mag)) return slope_deg # ---------------- rim detection ---------------- def detect_rim_coords(dem, cellsize, pit_depth_frac=0.25, rim_buffer_m=6.0, min_rim_samples=500): dem_valid = np.where(np.isfinite(dem), dem, np.nan) zmin = float(np.nanmin(dem_valid)); zmax = float(np.nanmax(dem_valid)) depth = zmax - zmin if depth <= 0: h,w = dem.shape return [(h//2,w//2)], depth pit_thresh = zmin + pit_depth_frac * depth pit_mask = (dem <= pit_thresh) & np.isfinite(dem) pit_mask = binary_fill_holes(pit_mask) pit_mask = binary_closing(pit_mask, structure=generate_binary_structure(2,2), iterations=2) lbls, nlab = label(pit_mask) if nlab > 1: counts = [(lbls==i).sum() for i in range(1, nlab+1)] largest = int(np.argmax(counts) + 1) pit_mask = (lbls == largest) iters = max(1, int(round(rim_buffer_m / cellsize))) rim_band = binary_dilation(pit_mask, iterations=iters) & (~pit_mask) coords = list(zip(*np.where(rim_band))) if len(coords) > min_rim_samples: step = max(1, int(len(coords)/min_rim_samples)) coords = coords[::step] return coords, depth # ---------------- steepest march ---------------- def march_steepest(dem, start_rc, cellsize, slope_deg, slope_thresh_deg, min_drop_m, max_steps): offs = [(-1,-1),(-1,0),(-1,1),(0,-1),(0,1),(1,-1),(1,0),(1,1)] rows, cols = dem.shape r, c = int(start_rc[0]), int(start_rc[1]) z0 = dem[r,c] visited = set(); visited.add((r,c)) for step in range(max_steps): best = None; best_dz = 0.0 z = dem[r,c] if not np.isfinite(z): break for (dr,dc) in offs: rr,cc = r+dr, c+dc if rr<0 or rr>=rows or cc<0 or cc>=cols: continue if (rr,cc) in visited: continue zn = dem[rr,cc] if not np.isfinite(zn): continue dz = z - zn if dz > best_dz: best_dz = dz; best = (rr,cc) if best is None or best_dz <= 0: break r,c = best; visited.add((r,c)) cum_drop = z0 - dem[r,c] if slope_deg[r,c] >= slope_thresh_deg and cum_drop >= min_drop_m: return (r,c), cum_drop, step+1 return None, 0.0, step+1 # ---------------- particle integrator with wall handling ---------------- def particle_run(dem, dx_field, dy_field, cellsize, start_xy_m, initial_speed, mu, step_loss, transform, xi=1000.0, wall_slope_thresh_deg=60.0, wall_restitution=0.2, wall_deflect_frac=0.85, stop_on_wall_ke=5.0, dt_max=0.2, dt_min=0.005, max_steps=20000, v_threshold=0.2): x_m, y_m = start_xy_m rows, cols = dem.shape inv = ~transform # initial gradient / direction col0, row0 = inv * (x_m, y_m) pts0 = np.array([[row0],[col0]]) try: gx0 = float(interp_bilinear(dx_field, pts0)) gy0 = float(interp_bilinear(dy_field, pts0)) except Exception: gx0 = 0.0; gy0 = 0.0 downslope = np.array([-gx0, -gy0]); n0 = np.linalg.norm(downslope) dir_x, dir_y = (downslope / n0) if n0 > 1e-9 else (0.0, 0.0) vx = initial_speed * dir_x vy = initial_speed * dir_y pos_x = x_m; pos_y = y_m visited = [] ke_cells = {} for step in range(max_steps): col_cur, row_cur = inv * (pos_x, pos_y) if col_cur < 0 or col_cur >= cols-1 or row_cur < 0 or row_cur >= rows-1: break pts = np.array([[row_cur],[col_cur]]) try: gz_x = float(interp_bilinear(dx_field, pts)) gz_y = float(interp_bilinear(dy_field, pts)) except Exception: gz_x = 0.0; gz_y = 0.0 grad_mag = math.hypot(gz_x, gz_y) local_slope_deg = math.degrees(math.atan(grad_mag)) a_drive_x = -g * gz_x a_drive_y = -g * gz_y vmag = math.hypot(vx, vy) if vmag > 1e-9: ux = vx / vmag; uy = vy / vmag a_voellmy_mag = mu * g + (vmag * vmag) / max(1e-12, xi) a_voellmy_x = -a_voellmy_mag * ux a_voellmy_y = -a_voellmy_mag * uy else: a_voellmy_x = a_voellmy_y = 0.0 c_drag = 0.03 a_drag_x = -c_drag * vx * vmag a_drag_y = -c_drag * vy * vmag ax = a_drive_x + a_voellmy_x + a_drag_x ay = a_drive_y + a_voellmy_y + a_drag_y dt = min(dt_max, max(dt_min, 0.4 * (cellsize / (vmag + 1e-6)))) vx_mid = vx + 0.5 * ax * dt vy_mid = vy + 0.5 * ay * dt new_pos_x = pos_x + vx_mid * dt new_pos_y = pos_y + vy_mid * dt new_vx = vx + ax * dt new_vy = vy + ay * dt # wall check at next location col_next, row_next = inv * (new_pos_x, new_pos_y) wall_encounter = False gz_x_n = gz_y_n = grad_mag_n = 0.0 if 0 <= col_next < cols and 0 <= row_next < rows: try: gz_x_n = float(interp_bilinear(dx_field, np.array([[row_next],[col_next]]))) gz_y_n = float(interp_bilinear(dy_field, np.array([[row_next],[col_next]]))) grad_mag_n = math.hypot(gz_x_n, gz_y_n) local_slope_deg_n = math.degrees(math.atan(grad_mag_n)) except Exception: local_slope_deg_n = 0.0 grad_mag_n = 0.0 if local_slope_deg_n >= wall_slope_thresh_deg: wall_encounter = True n_hat = np.array([gz_x_n, gz_y_n]) / max(1e-12, grad_mag_n) if wall_encounter: ke = 0.5 * (vmag * vmag) if ke <= stop_on_wall_ke: r_idx = int(round(row_cur)); c_idx = int(round(col_cur)) if 0 <= r_idx < rows and 0 <= c_idx < cols: visited.append((r_idx, c_idx)) ke_cells[(r_idx,c_idx)] = max(ke_cells.get((r_idx,c_idx), 0.0), ke) break t = np.array([-gz_y_n, gz_x_n]) tnorm = np.linalg.norm(t) if tnorm < 1e-12: v_vec = np.array([new_vx, new_vy]) nv = np.dot(v_vec, n_hat) v_reflect = v_vec - (1.0 + wall_restitution) * nv * n_hat v_reflect *= 0.9 new_vx, new_vy = float(v_reflect[0]), float(v_reflect[1]) else: t_hat = t / tnorm v_vec = np.array([new_vx, new_vy]) v_tang = np.dot(v_vec, t_hat) * t_hat v_norm = v_vec - v_tang new_v_vec = wall_deflect_frac * v_tang - wall_restitution * v_norm new_vx, new_vy = float(new_v_vec[0]), float(new_v_vec[1]) new_pos_x = pos_x + new_vx * dt * 0.6 new_pos_y = pos_y + new_vy * dt * 0.6 vmag_after = math.hypot(new_vx, new_vy) if vmag_after < v_threshold: r_idx = int(round(row_cur)); c_idx = int(round(col_cur)) if 0 <= r_idx < rows and 0 <= c_idx < cols: visited.append((r_idx, c_idx)) ke_cells[(r_idx,c_idx)] = max(ke_cells.get((r_idx,c_idx), 0.0), 0.5 * vmag_after * vmag_after) break pos_x, pos_y = new_pos_x, new_pos_y vx, vy = new_vx, new_vy else: pos_x, pos_y = new_pos_x, new_pos_y vx, vy = new_vx, new_vy loss = np.clip(np.random.normal(loc=step_loss, scale=step_loss*0.2), 0.0, 0.6) vx *= (1.0 - loss); vy *= (1.0 - loss) vmag = math.hypot(vx, vy) col_round, row_round = inv * (pos_x, pos_y) r_idx = int(round(row_round)); c_idx = int(round(col_round)) if r_idx < 0 or r_idx >= rows or c_idx < 0 or c_idx >= cols: break visited.append((r_idx, c_idx)) ke_local = 0.5 * (vmag * vmag) ke_cells[(r_idx, c_idx)] = max(ke_cells.get((r_idx, c_idx), 0.0), ke_local) if vmag < v_threshold: break unique_visited = [] seen = set() for rc in visited: if rc not in seen: unique_visited.append(rc); seen.add(rc) return unique_visited, ke_cells # ---------------- Monte Carlo ---------------- def run_mc(dem, transform, release_cells, cellsize, trials, mu_mean, mu_std, xi, step_loss, min_drop, force_drop, sample_paths, seed, wall_slope_thresh_deg, wall_restitution, wall_deflect_frac, stop_on_wall_ke): rng = np.random.default_rng(seed) dx_field, dy_field = dem_gradients(dem, cellsize) heat = np.zeros_like(dem, dtype=float) impact = np.zeros_like(dem, dtype=float) sample_paths_out = [] n_releases = len(release_cells) if n_releases == 0: raise RuntimeError("No release cells provided.") for t in tqdm(range(trials), desc="Trials"): r,c = release_cells[rng.integers(0, n_releases)] x0, y0 = rasterio.transform.xy(transform, int(r), int(c)) if force_drop is not None: drop_m = float(force_drop) else: drop_m = max(min_drop, 0.02 * (np.nanmax(dem) - np.nanmin(dem))) initial_speed = math.sqrt(2.0 * g * drop_m) mu = float(max(0.0, rng.normal(mu_mean, mu_std))) path, ke_map = particle_run(dem, dx_field, dy_field, cellsize, (x0, y0), initial_speed, mu, step_loss, transform, xi=xi, wall_slope_thresh_deg=wall_slope_thresh_deg, wall_restitution=wall_restitution, wall_deflect_frac=wall_deflect_frac, stop_on_wall_ke=stop_on_wall_ke) for (rr, cc) in path: heat[rr, cc] += 1.0 if (rr,cc) in ke_map: impact[rr, cc] = max(impact[rr, cc], ke_map[(rr,cc)]) if len(sample_paths_out) < sample_paths: sample_paths_out.append(path) heat /= float(max(1, trials)) return heat, impact, sample_paths_out # ---------------- utils ---------------- def print_stats(name, arr): a = np.array(arr, dtype=float) a[~np.isfinite(a)] = np.nan non = a[~np.isnan(a)] if non.size == 0: return f"[STAT] {name}: no finite values" return f"[STAT] {name}: min={np.nanmin(non):.6g}, p1={np.nanpercentile(non,1):.6g}, p50={np.nanpercentile(non,50):.6g}, p99={np.nanpercentile(non,99):.6g}, max={np.nanmax(non):.6g}, mean={np.nanmean(non):.6g}" # ---------------- API endpoint ---------------- @app.post("/run") async def run_endpoint( dem: UploadFile = File(...), trials: int = Form(500), release_density: int = Form(1), pit_depth_frac_release: float = Form(0.25), slope_thresh_deg: float = Form(5.0), mu_mean: float = Form(0.05), mu_std: float = Form(0.01), xi: float = Form(1000.0), step_loss: float = Form(0.001), sample_paths: int = Form(50), wall_slope_thresh_deg: float = Form(60.0), wall_restitution: float = Form(0.2), wall_deflect_frac: float = Form(0.85), stop_on_wall_ke: float = Form(5.0), seed: int = Form(0), out_prefix: Optional[str] = Form(None), ): job_id = uuid.uuid4().hex[:12] job_dir = BASE_WORKDIR / job_id job_dir.mkdir(parents=True, exist_ok=True) dem_path = job_dir / "input_dem.tif" with dem_path.open("wb") as f: f.write(await dem.read()) # read dem try: dem_arr, transform, crs, meta = read_dem(str(dem_path)) except Exception as e: raise HTTPException(status_code=400, detail=f"Failed reading DEM: {e}") cellsize = abs(transform.a) # detect rim rim_coords, pit_depth = detect_rim_coords(dem_arr, cellsize, pit_depth_frac=pit_depth_frac_release, rim_buffer_m=6.0) dx_field, dy_field = dem_gradients(dem_arr, cellsize) slope_field = compute_slope_field(dx_field, dy_field) # march steepest to find release targets release_cells=[] for rc in rim_coords: tgt, cum_drop, steps = march_steepest(dem_arr, rc, cellsize, slope_field, slope_thresh_deg, 2.0, 400) if tgt is not None: release_cells.append(tgt) seeds = release_cells * max(1, int(release_density)) if len(seeds) == 0: raise HTTPException(status_code=400, detail="No release seeds found.") # debug rim png debug_rim_png = job_dir / f"{job_id}_rim_debug.png" try: overlay = np.zeros((dem_arr.shape[0], dem_arr.shape[1], 3), dtype=float) dmmin = float(np.nanmin(dem_arr)); dmmax = float(np.nanmax(dem_arr)) norm = (dem_arr - dmmin) / (dmmax - dmmin + 1e-9) overlay[..., :] = np.expand_dims(norm,2) * 0.35 for (r,c) in release_cells: if 0 <= r < overlay.shape[0] and 0 <= c < overlay.shape[1]: overlay[r,c] = [1.0, 0.0, 0.0] plt.imsave(str(debug_rim_png), np.clip(overlay,0,1)) except Exception: pass # run Monte Carlo try: heat, impact, sample_paths = run_mc(dem_arr, transform, seeds, cellsize, int(trials), float(mu_mean), float(mu_std), float(xi), float(step_loss), float(2.0), None, int(sample_paths), int(seed), float(wall_slope_thresh_deg), float(wall_restitution), float(wall_deflect_frac), float(stop_on_wall_ke)) except Exception as e: raise HTTPException(status_code=500, detail=f"Simulation error: {e}") # outputs prefix = out_prefix or job_id out_heat_png = job_dir / f"{prefix}_heat.png" out_impact_png = job_dir / f"{prefix}_impact.png" out_heat_tif = job_dir / f"{prefix}_heat.tif" out_impact_tif = job_dir / f"{prefix}_impact.tif" out_samples_geojson = job_dir / f"{prefix}_samples.geojson" try: save_geotiff(heat, meta, str(out_heat_tif)) save_geotiff(impact, meta, str(out_impact_tif)) save_png(heat, str(out_heat_png), cmap="inferno", smooth=1.0, log_scale=False) save_png(impact, str(out_impact_png), cmap="magma", smooth=1.0, log_scale=True) save_samples_geojson(sample_paths, transform, str(out_samples_geojson), crs=crs) except Exception as e: raise HTTPException(status_code=500, detail=f"Failed to write outputs: {e}") # diagnostics text diag_txt = job_dir / "diagnostics.txt" with diag_txt.open("w") as f: f.write(f"Rim pixels: {len(rim_coords)} pit_depth: {pit_depth}\n") f.write(print_stats("heat", heat) + "\n") f.write(print_stats("impact", impact) + "\n") try: idx = np.nanargmax(impact) r = int(idx) // impact.shape[1]; c = int(idx) % impact.shape[1] x,y = rasterio.transform.xy(transform, r, c) f.write(f"Max impact at row {r} col {c} -> x={x:.3f}, y={y:.3f}, KE={impact[r,c]:.6g}\n") except Exception: pass # zip outputs zip_path = job_dir / f"{job_id}_outputs.zip" with zipfile.ZipFile(zip_path, "w", compression=zipfile.ZIP_DEFLATED) as zf: for p in [out_heat_png, out_impact_png, out_heat_tif, out_impact_tif, out_samples_geojson, debug_rim_png, diag_txt]: if p.exists(): zf.write(p, arcname=p.name) return FileResponse(path=str(zip_path), filename=zip_path.name, media_type="application/zip") @app.get("/health") def health(): return {"status":"ok"} @app.get("/") def root(): return RedirectResponse(url="/docs")