simulator / app.py
rishab1090's picture
Update app.py
965c289 verified
# app.py
import os, uuid, zipfile, math
from pathlib import Path
from typing import Optional
from fastapi import FastAPI, UploadFile, File, Form, HTTPException
from fastapi.responses import FileResponse
import numpy as np
import rasterio
from rasterio.transform import Affine
from scipy.ndimage import gaussian_filter, map_coordinates, binary_fill_holes, binary_closing, binary_dilation, generate_binary_structure
from scipy.ndimage import label
import matplotlib.pyplot as plt
from tqdm import tqdm
from shapely.geometry import LineString, mapping
import geopandas as gpd
from fastapi.responses import RedirectResponse
# gravity
g = 9.81
app = FastAPI(title="Runout simulator single-file API")
BASE_WORKDIR = Path("/tmp/runout_jobs_single")
BASE_WORKDIR.mkdir(parents=True, exist_ok=True)
# ---------------- IO helpers ----------------
def read_dem(path):
with rasterio.open(path) as ds:
dem = ds.read(1).astype(float)
meta = ds.meta.copy()
transform = ds.transform
crs = ds.crs
return dem, transform, crs, meta
def save_geotiff(arr, meta, path):
meta2 = meta.copy()
meta2.update(dtype="float32", count=1)
os.makedirs(os.path.dirname(path) or ".", exist_ok=True)
with rasterio.open(path, "w", **meta2) as dst:
dst.write(np.array(arr, dtype='float32'), 1)
def save_png(arr, out_path, cmap="inferno", smooth=1.0, log_scale=False, clip_percent=(0.1,99.9)):
a = np.array(arr, dtype=float)
a[~np.isfinite(a)] = np.nan
if smooth and smooth > 0:
a = gaussian_filter(a, sigma=smooth)
vals = a[~np.isnan(a)]
if vals.size == 0:
a = np.zeros((10,10))
vmin, vmax = 0,1
else:
if log_scale:
a = np.log1p(np.clip(a,0,None)*1e4)
vals = a[~np.isnan(a)]
vmin = float(np.percentile(vals, clip_percent[0]))
vmax = float(np.percentile(vals, clip_percent[1]))
if vmax <= vmin:
vmax = vmin + 1e-6
os.makedirs(os.path.dirname(out_path) or ".", exist_ok=True)
plt.imsave(out_path, a, cmap=cmap, vmin=vmin, vmax=vmax)
def save_samples_geojson(samples, transform, out_path, crs=None):
feats=[]
for i,path in enumerate(samples):
if not path or len(path)<2: continue
coords=[rasterio.transform.xy(transform,int(r),int(c)) for (r,c) in path]
feats.append({"type":"Feature","geometry":mapping(LineString(coords)),"properties":{"id":i,"steps":len(coords)}})
if not feats:
return
gdf = gpd.GeoDataFrame.from_features(feats)
if crs is not None:
try:
gdf.set_crs(crs, inplace=True)
except Exception:
pass
os.makedirs(os.path.dirname(out_path) or ".", exist_ok=True)
gdf.to_file(out_path, driver="GeoJSON")
# ---------------- interpolation & gradients ----------------
def interp_bilinear(arr, pts):
res = map_coordinates(arr, pts, order=1, mode='nearest')
if np.size(res) == 1:
return float(res)
return res
def dem_gradients(dem, cellsize):
# return dz/dx, dz/dy (rise per meter)
dy, dx = np.gradient(dem, cellsize, cellsize)
return dx, dy
def compute_slope_field(dx_field, dy_field):
grad_mag = np.sqrt(dx_field*dx_field + dy_field*dy_field)
slope_deg = np.degrees(np.arctan(grad_mag))
return slope_deg
# ---------------- rim detection ----------------
def detect_rim_coords(dem, cellsize, pit_depth_frac=0.25, rim_buffer_m=6.0, min_rim_samples=500):
dem_valid = np.where(np.isfinite(dem), dem, np.nan)
zmin = float(np.nanmin(dem_valid)); zmax = float(np.nanmax(dem_valid))
depth = zmax - zmin
if depth <= 0:
h,w = dem.shape
return [(h//2,w//2)], depth
pit_thresh = zmin + pit_depth_frac * depth
pit_mask = (dem <= pit_thresh) & np.isfinite(dem)
pit_mask = binary_fill_holes(pit_mask)
pit_mask = binary_closing(pit_mask, structure=generate_binary_structure(2,2), iterations=2)
lbls, nlab = label(pit_mask)
if nlab > 1:
counts = [(lbls==i).sum() for i in range(1, nlab+1)]
largest = int(np.argmax(counts) + 1)
pit_mask = (lbls == largest)
iters = max(1, int(round(rim_buffer_m / cellsize)))
rim_band = binary_dilation(pit_mask, iterations=iters) & (~pit_mask)
coords = list(zip(*np.where(rim_band)))
if len(coords) > min_rim_samples:
step = max(1, int(len(coords)/min_rim_samples))
coords = coords[::step]
return coords, depth
# ---------------- steepest march ----------------
def march_steepest(dem, start_rc, cellsize, slope_deg, slope_thresh_deg, min_drop_m, max_steps):
offs = [(-1,-1),(-1,0),(-1,1),(0,-1),(0,1),(1,-1),(1,0),(1,1)]
rows, cols = dem.shape
r, c = int(start_rc[0]), int(start_rc[1])
z0 = dem[r,c]
visited = set(); visited.add((r,c))
for step in range(max_steps):
best = None; best_dz = 0.0
z = dem[r,c]
if not np.isfinite(z):
break
for (dr,dc) in offs:
rr,cc = r+dr, c+dc
if rr<0 or rr>=rows or cc<0 or cc>=cols: continue
if (rr,cc) in visited: continue
zn = dem[rr,cc]
if not np.isfinite(zn): continue
dz = z - zn
if dz > best_dz:
best_dz = dz; best = (rr,cc)
if best is None or best_dz <= 0:
break
r,c = best; visited.add((r,c))
cum_drop = z0 - dem[r,c]
if slope_deg[r,c] >= slope_thresh_deg and cum_drop >= min_drop_m:
return (r,c), cum_drop, step+1
return None, 0.0, step+1
# ---------------- particle integrator with wall handling ----------------
def particle_run(dem, dx_field, dy_field, cellsize, start_xy_m, initial_speed, mu, step_loss,
transform, xi=1000.0, wall_slope_thresh_deg=60.0, wall_restitution=0.2,
wall_deflect_frac=0.85, stop_on_wall_ke=5.0,
dt_max=0.2, dt_min=0.005, max_steps=20000, v_threshold=0.2):
x_m, y_m = start_xy_m
rows, cols = dem.shape
inv = ~transform
# initial gradient / direction
col0, row0 = inv * (x_m, y_m)
pts0 = np.array([[row0],[col0]])
try:
gx0 = float(interp_bilinear(dx_field, pts0))
gy0 = float(interp_bilinear(dy_field, pts0))
except Exception:
gx0 = 0.0; gy0 = 0.0
downslope = np.array([-gx0, -gy0]); n0 = np.linalg.norm(downslope)
dir_x, dir_y = (downslope / n0) if n0 > 1e-9 else (0.0, 0.0)
vx = initial_speed * dir_x
vy = initial_speed * dir_y
pos_x = x_m; pos_y = y_m
visited = []
ke_cells = {}
for step in range(max_steps):
col_cur, row_cur = inv * (pos_x, pos_y)
if col_cur < 0 or col_cur >= cols-1 or row_cur < 0 or row_cur >= rows-1:
break
pts = np.array([[row_cur],[col_cur]])
try:
gz_x = float(interp_bilinear(dx_field, pts))
gz_y = float(interp_bilinear(dy_field, pts))
except Exception:
gz_x = 0.0; gz_y = 0.0
grad_mag = math.hypot(gz_x, gz_y)
local_slope_deg = math.degrees(math.atan(grad_mag))
a_drive_x = -g * gz_x
a_drive_y = -g * gz_y
vmag = math.hypot(vx, vy)
if vmag > 1e-9:
ux = vx / vmag; uy = vy / vmag
a_voellmy_mag = mu * g + (vmag * vmag) / max(1e-12, xi)
a_voellmy_x = -a_voellmy_mag * ux
a_voellmy_y = -a_voellmy_mag * uy
else:
a_voellmy_x = a_voellmy_y = 0.0
c_drag = 0.03
a_drag_x = -c_drag * vx * vmag
a_drag_y = -c_drag * vy * vmag
ax = a_drive_x + a_voellmy_x + a_drag_x
ay = a_drive_y + a_voellmy_y + a_drag_y
dt = min(dt_max, max(dt_min, 0.4 * (cellsize / (vmag + 1e-6))))
vx_mid = vx + 0.5 * ax * dt
vy_mid = vy + 0.5 * ay * dt
new_pos_x = pos_x + vx_mid * dt
new_pos_y = pos_y + vy_mid * dt
new_vx = vx + ax * dt
new_vy = vy + ay * dt
# wall check at next location
col_next, row_next = inv * (new_pos_x, new_pos_y)
wall_encounter = False
gz_x_n = gz_y_n = grad_mag_n = 0.0
if 0 <= col_next < cols and 0 <= row_next < rows:
try:
gz_x_n = float(interp_bilinear(dx_field, np.array([[row_next],[col_next]])))
gz_y_n = float(interp_bilinear(dy_field, np.array([[row_next],[col_next]])))
grad_mag_n = math.hypot(gz_x_n, gz_y_n)
local_slope_deg_n = math.degrees(math.atan(grad_mag_n))
except Exception:
local_slope_deg_n = 0.0
grad_mag_n = 0.0
if local_slope_deg_n >= wall_slope_thresh_deg:
wall_encounter = True
n_hat = np.array([gz_x_n, gz_y_n]) / max(1e-12, grad_mag_n)
if wall_encounter:
ke = 0.5 * (vmag * vmag)
if ke <= stop_on_wall_ke:
r_idx = int(round(row_cur)); c_idx = int(round(col_cur))
if 0 <= r_idx < rows and 0 <= c_idx < cols:
visited.append((r_idx, c_idx))
ke_cells[(r_idx,c_idx)] = max(ke_cells.get((r_idx,c_idx), 0.0), ke)
break
t = np.array([-gz_y_n, gz_x_n])
tnorm = np.linalg.norm(t)
if tnorm < 1e-12:
v_vec = np.array([new_vx, new_vy])
nv = np.dot(v_vec, n_hat)
v_reflect = v_vec - (1.0 + wall_restitution) * nv * n_hat
v_reflect *= 0.9
new_vx, new_vy = float(v_reflect[0]), float(v_reflect[1])
else:
t_hat = t / tnorm
v_vec = np.array([new_vx, new_vy])
v_tang = np.dot(v_vec, t_hat) * t_hat
v_norm = v_vec - v_tang
new_v_vec = wall_deflect_frac * v_tang - wall_restitution * v_norm
new_vx, new_vy = float(new_v_vec[0]), float(new_v_vec[1])
new_pos_x = pos_x + new_vx * dt * 0.6
new_pos_y = pos_y + new_vy * dt * 0.6
vmag_after = math.hypot(new_vx, new_vy)
if vmag_after < v_threshold:
r_idx = int(round(row_cur)); c_idx = int(round(col_cur))
if 0 <= r_idx < rows and 0 <= c_idx < cols:
visited.append((r_idx, c_idx))
ke_cells[(r_idx,c_idx)] = max(ke_cells.get((r_idx,c_idx), 0.0), 0.5 * vmag_after * vmag_after)
break
pos_x, pos_y = new_pos_x, new_pos_y
vx, vy = new_vx, new_vy
else:
pos_x, pos_y = new_pos_x, new_pos_y
vx, vy = new_vx, new_vy
loss = np.clip(np.random.normal(loc=step_loss, scale=step_loss*0.2), 0.0, 0.6)
vx *= (1.0 - loss); vy *= (1.0 - loss)
vmag = math.hypot(vx, vy)
col_round, row_round = inv * (pos_x, pos_y)
r_idx = int(round(row_round)); c_idx = int(round(col_round))
if r_idx < 0 or r_idx >= rows or c_idx < 0 or c_idx >= cols:
break
visited.append((r_idx, c_idx))
ke_local = 0.5 * (vmag * vmag)
ke_cells[(r_idx, c_idx)] = max(ke_cells.get((r_idx, c_idx), 0.0), ke_local)
if vmag < v_threshold:
break
unique_visited = []
seen = set()
for rc in visited:
if rc not in seen:
unique_visited.append(rc); seen.add(rc)
return unique_visited, ke_cells
# ---------------- Monte Carlo ----------------
def run_mc(dem, transform, release_cells, cellsize, trials, mu_mean, mu_std, xi, step_loss,
min_drop, force_drop, sample_paths, seed, wall_slope_thresh_deg, wall_restitution, wall_deflect_frac, stop_on_wall_ke):
rng = np.random.default_rng(seed)
dx_field, dy_field = dem_gradients(dem, cellsize)
heat = np.zeros_like(dem, dtype=float)
impact = np.zeros_like(dem, dtype=float)
sample_paths_out = []
n_releases = len(release_cells)
if n_releases == 0:
raise RuntimeError("No release cells provided.")
for t in tqdm(range(trials), desc="Trials"):
r,c = release_cells[rng.integers(0, n_releases)]
x0, y0 = rasterio.transform.xy(transform, int(r), int(c))
if force_drop is not None:
drop_m = float(force_drop)
else:
drop_m = max(min_drop, 0.02 * (np.nanmax(dem) - np.nanmin(dem)))
initial_speed = math.sqrt(2.0 * g * drop_m)
mu = float(max(0.0, rng.normal(mu_mean, mu_std)))
path, ke_map = particle_run(dem, dx_field, dy_field, cellsize,
(x0, y0), initial_speed, mu,
step_loss, transform,
xi=xi,
wall_slope_thresh_deg=wall_slope_thresh_deg,
wall_restitution=wall_restitution,
wall_deflect_frac=wall_deflect_frac,
stop_on_wall_ke=stop_on_wall_ke)
for (rr, cc) in path:
heat[rr, cc] += 1.0
if (rr,cc) in ke_map:
impact[rr, cc] = max(impact[rr, cc], ke_map[(rr,cc)])
if len(sample_paths_out) < sample_paths:
sample_paths_out.append(path)
heat /= float(max(1, trials))
return heat, impact, sample_paths_out
# ---------------- utils ----------------
def print_stats(name, arr):
a = np.array(arr, dtype=float)
a[~np.isfinite(a)] = np.nan
non = a[~np.isnan(a)]
if non.size == 0:
return f"[STAT] {name}: no finite values"
return f"[STAT] {name}: min={np.nanmin(non):.6g}, p1={np.nanpercentile(non,1):.6g}, p50={np.nanpercentile(non,50):.6g}, p99={np.nanpercentile(non,99):.6g}, max={np.nanmax(non):.6g}, mean={np.nanmean(non):.6g}"
# ---------------- API endpoint ----------------
@app.post("/run")
async def run_endpoint(
dem: UploadFile = File(...),
trials: int = Form(500),
release_density: int = Form(1),
pit_depth_frac_release: float = Form(0.25),
slope_thresh_deg: float = Form(5.0),
mu_mean: float = Form(0.05),
mu_std: float = Form(0.01),
xi: float = Form(1000.0),
step_loss: float = Form(0.001),
sample_paths: int = Form(50),
wall_slope_thresh_deg: float = Form(60.0),
wall_restitution: float = Form(0.2),
wall_deflect_frac: float = Form(0.85),
stop_on_wall_ke: float = Form(5.0),
seed: int = Form(0),
out_prefix: Optional[str] = Form(None),
):
job_id = uuid.uuid4().hex[:12]
job_dir = BASE_WORKDIR / job_id
job_dir.mkdir(parents=True, exist_ok=True)
dem_path = job_dir / "input_dem.tif"
with dem_path.open("wb") as f:
f.write(await dem.read())
# read dem
try:
dem_arr, transform, crs, meta = read_dem(str(dem_path))
except Exception as e:
raise HTTPException(status_code=400, detail=f"Failed reading DEM: {e}")
cellsize = abs(transform.a)
# detect rim
rim_coords, pit_depth = detect_rim_coords(dem_arr, cellsize, pit_depth_frac=pit_depth_frac_release, rim_buffer_m=6.0)
dx_field, dy_field = dem_gradients(dem_arr, cellsize)
slope_field = compute_slope_field(dx_field, dy_field)
# march steepest to find release targets
release_cells=[]
for rc in rim_coords:
tgt, cum_drop, steps = march_steepest(dem_arr, rc, cellsize, slope_field, slope_thresh_deg, 2.0, 400)
if tgt is not None:
release_cells.append(tgt)
seeds = release_cells * max(1, int(release_density))
if len(seeds) == 0:
raise HTTPException(status_code=400, detail="No release seeds found.")
# debug rim png
debug_rim_png = job_dir / f"{job_id}_rim_debug.png"
try:
overlay = np.zeros((dem_arr.shape[0], dem_arr.shape[1], 3), dtype=float)
dmmin = float(np.nanmin(dem_arr)); dmmax = float(np.nanmax(dem_arr))
norm = (dem_arr - dmmin) / (dmmax - dmmin + 1e-9)
overlay[..., :] = np.expand_dims(norm,2) * 0.35
for (r,c) in release_cells:
if 0 <= r < overlay.shape[0] and 0 <= c < overlay.shape[1]:
overlay[r,c] = [1.0, 0.0, 0.0]
plt.imsave(str(debug_rim_png), np.clip(overlay,0,1))
except Exception:
pass
# run Monte Carlo
try:
heat, impact, sample_paths = run_mc(dem_arr, transform, seeds, cellsize,
int(trials), float(mu_mean), float(mu_std), float(xi), float(step_loss),
float(2.0), None, int(sample_paths), int(seed),
float(wall_slope_thresh_deg), float(wall_restitution), float(wall_deflect_frac), float(stop_on_wall_ke))
except Exception as e:
raise HTTPException(status_code=500, detail=f"Simulation error: {e}")
# outputs
prefix = out_prefix or job_id
out_heat_png = job_dir / f"{prefix}_heat.png"
out_impact_png = job_dir / f"{prefix}_impact.png"
out_heat_tif = job_dir / f"{prefix}_heat.tif"
out_impact_tif = job_dir / f"{prefix}_impact.tif"
out_samples_geojson = job_dir / f"{prefix}_samples.geojson"
try:
save_geotiff(heat, meta, str(out_heat_tif))
save_geotiff(impact, meta, str(out_impact_tif))
save_png(heat, str(out_heat_png), cmap="inferno", smooth=1.0, log_scale=False)
save_png(impact, str(out_impact_png), cmap="magma", smooth=1.0, log_scale=True)
save_samples_geojson(sample_paths, transform, str(out_samples_geojson), crs=crs)
except Exception as e:
raise HTTPException(status_code=500, detail=f"Failed to write outputs: {e}")
# diagnostics text
diag_txt = job_dir / "diagnostics.txt"
with diag_txt.open("w") as f:
f.write(f"Rim pixels: {len(rim_coords)} pit_depth: {pit_depth}\n")
f.write(print_stats("heat", heat) + "\n")
f.write(print_stats("impact", impact) + "\n")
try:
idx = np.nanargmax(impact)
r = int(idx) // impact.shape[1]; c = int(idx) % impact.shape[1]
x,y = rasterio.transform.xy(transform, r, c)
f.write(f"Max impact at row {r} col {c} -> x={x:.3f}, y={y:.3f}, KE={impact[r,c]:.6g}\n")
except Exception:
pass
# zip outputs
zip_path = job_dir / f"{job_id}_outputs.zip"
with zipfile.ZipFile(zip_path, "w", compression=zipfile.ZIP_DEFLATED) as zf:
for p in [out_heat_png, out_impact_png, out_heat_tif, out_impact_tif, out_samples_geojson, debug_rim_png, diag_txt]:
if p.exists():
zf.write(p, arcname=p.name)
return FileResponse(path=str(zip_path), filename=zip_path.name, media_type="application/zip")
@app.get("/health")
def health():
return {"status":"ok"}
@app.get("/")
def root():
return RedirectResponse(url="/docs")