GeoReclaimAI_v3 / app.py
kdub307's picture
Upload app.py
60efd26 verified
import os
import numpy as np
import matplotlib.pyplot as plt
import gradio as gr
from landlab import RasterModelGrid, imshow_grid
from landlab.components import (
FlowAccumulator,
FastscapeEroder,
LinearDiffuser
)
from landlab.io import read_esri_ascii, write_esri_ascii
import matplotlib
matplotlib.use('Agg') # Use non-interactive backend for Matplotlib
import time
import glob
from mpl_toolkits.mplot3d import Axes3D
from scipy.ndimage import gaussian_filter
# Set up paths and directories
RESULTS_DIR = "results"
TEMP_DIR = "temp"
os.makedirs(RESULTS_DIR, exist_ok=True)
os.makedirs(TEMP_DIR, exist_ok=True)
# Global variable to store the path to the uploaded DEM file
saved_dem_path = None
# Set up a basic grid
def setup_basic_grid(grid_type, rows=50, cols=50, dx=10.0):
"""Create a grid with different initial topographies."""
mg = RasterModelGrid((rows, cols), dx)
# Create node coordinates for the grid
x, y = np.meshgrid(np.linspace(0, cols-1, cols), np.linspace(0, rows-1, rows))
center_x, center_y = cols / 2, rows / 2
# Define elevation based on grid type
if grid_type == "dome":
# Dome-shaped topography
distance = np.sqrt((x - center_x)**2 + (y - center_y)**2)
elevation = 1000 - distance * 10
elif grid_type == "valley":
# Valley with ridges
elevation = 1000 + 500 * np.sin(x / cols * np.pi * 2) + 500 * np.cos(y / rows * np.pi * 2)
elif grid_type == "plateau":
# Plateau with escarpment
distance = np.sqrt((x - center_x)**2 + (y - center_y)**2)
elevation = 1000 * (distance < (cols/3)) + 500 * ((distance >= (cols/3)) & (distance < (cols/2))) + 200 * (distance >= (cols/2))
elif grid_type == "random":
# Random terrain
elevation = 1000 + 200 * np.random.rand(rows, cols)
# Smooth the random terrain
elevation = gaussian_filter(elevation, sigma=2)
elif grid_type == "mine_pit":
# Mining pit with surrounding area
distance = np.sqrt((x - center_x)**2 + (y - center_y)**2)
elevation = 1000 - 800 * np.exp(-distance**2 / (cols/4)**2)
else: # Default is flat with noise
elevation = 1000 + 50 * np.random.rand(rows, cols)
# Add the topographic elevation field
mg.add_field("topographic__elevation", elevation.flatten(), at="node")
return mg
# Process the uploaded DEM
def process_dem(dem_file):
"""Process an uploaded DEM file."""
# With type="filepath", dem_file is directly the path to the uploaded file, or None
if dem_file is None:
print("No DEM file provided, using default dome")
# Return a default grid if no file is provided
return setup_basic_grid("dome")
print(f"Processing DEM file: {dem_file} (type: {type(dem_file)})")
try:
# Check if file exists
if not os.path.exists(dem_file):
print(f"File does not exist at path: {dem_file}")
return setup_basic_grid("dome")
# Try to read the file directly to diagnose format issues
try:
with open(dem_file, 'r') as f:
header_lines = []
for i in range(6): # ESRI ASCII files should have 6 header lines
try:
line = next(f).strip()
header_lines.append(line)
print(f"Header line {i+1}: {line}")
except StopIteration:
print(f"File too short, only {i} header lines")
break
# Check if the header looks correct
has_valid_header = (
len(header_lines) >= 6 and
header_lines[0].lower().startswith('ncols') and
header_lines[1].lower().startswith('nrows') and
header_lines[2].lower().startswith('xll') and
header_lines[3].lower().startswith('yll') and
header_lines[4].lower().startswith('cell') and
header_lines[5].lower().startswith('nodata')
)
if has_valid_header:
print("File has valid ESRI ASCII header format")
else:
print("File doesn't have valid ESRI ASCII header format")
# Read a few data lines
data_lines = []
for i in range(4): # Read 4 data lines
try:
line = next(f).strip()
data_lines.append(line)
print(f"Data line {i+1}: {line}")
except StopIteration:
break
except Exception as file_read_e:
print(f"Warning: Could not read the file directly: {file_read_e}")
has_valid_header = False
# Read the DEM file with landlab
try:
print(f"Attempting to read file with landlab.io.read_esri_ascii: {dem_file}")
try:
(grid, elevation) = read_esri_ascii(dem_file)
print(f"Successfully read DEM file with grid shape: {grid.shape}")
return grid
except Exception as e:
print(f"Error with read_esri_ascii: {e}")
# If there's no valid header or read_esri_ascii failed, try manual parsing
if not has_valid_header:
raise ValueError("Invalid ESRI ASCII header")
# Try a different approach if the header is invalid but seems to be in the right format
try:
print("Trying to parse ASCII grid manually...")
header = {}
# Parse header from our previously read header lines
if len(header_lines) >= 6:
try:
header['ncols'] = int(header_lines[0].split()[1])
header['nrows'] = int(header_lines[1].split()[1])
header['xllcorner'] = float(header_lines[2].split()[1])
header['yllcorner'] = float(header_lines[3].split()[1])
header['cellsize'] = float(header_lines[4].split()[1])
header['nodata_value'] = float(header_lines[5].split()[1])
except (IndexError, ValueError) as parse_error:
print(f"Error parsing header lines: {parse_error}")
raise ValueError("Invalid header values")
else:
# If we don't have header lines, try to reopen the file
with open(dem_file, 'r') as f:
header['ncols'] = int(f.readline().split()[1])
header['nrows'] = int(f.readline().split()[1])
header['xllcorner'] = float(f.readline().split()[1])
header['yllcorner'] = float(f.readline().split()[1])
header['cellsize'] = float(f.readline().split()[1])
header['nodata_value'] = float(f.readline().split()[1])
# Validate the header values make sense
if header['ncols'] <= 0 or header['nrows'] <= 0 or header['cellsize'] <= 0:
raise ValueError("Invalid header values (negative or zero dimensions)")
if header['ncols'] > 10000 or header['nrows'] > 10000:
raise ValueError("Grid too large, dimensions exceed 10000")
print(f"Header parsed: ncols={header['ncols']}, nrows={header['nrows']}, cellsize={header['cellsize']}")
# Create empty grid
mg = RasterModelGrid((header['nrows'], header['ncols']), header['cellsize'])
# Read elevation data
with open(dem_file, 'r') as f:
# Skip header lines
for i in range(6):
f.readline()
# Read all data lines
data = []
for line in f:
data.extend([float(val) for val in line.split()])
# Check if we have the right amount of data
expected_data_points = header['nrows'] * header['ncols']
if len(data) != expected_data_points:
print(f"Warning: Data length mismatch. Expected {expected_data_points}, got {len(data)}")
# Fill or truncate if needed
if len(data) < expected_data_points:
nodata = header['nodata_value']
data.extend([nodata] * (expected_data_points - len(data)))
print(f"Extended data with {expected_data_points - len(data)} nodata values")
else:
data = data[:expected_data_points]
print(f"Truncated data to {expected_data_points} values")
# Set elevation field
mg.add_field("topographic__elevation", np.array(data), at="node")
print(f"Successfully parsed ASCII grid manually. Grid shape: {mg.shape}")
return mg
except Exception as manual_e:
import traceback
print(f"Error with manual parsing: {manual_e}")
print(traceback.format_exc())
print("Falling back to default grid")
return setup_basic_grid("dome")
except Exception as inner_e:
import traceback
print(f"Error reading DEM with read_esri_ascii: {inner_e}")
print(traceback.format_exc())
# Return a default grid if there's an error
return setup_basic_grid("dome")
except Exception as e:
import traceback
print(f"Error processing DEM file: {e}")
print(traceback.format_exc())
# Return a default grid if there's an error
return setup_basic_grid("dome")
# Run the landscape evolution simulation
def run_simulation(grid, params):
"""Run a landscape evolution simulation with the given parameters."""
# Extract parameters
n_steps = params.get("n_steps", 1000)
rainfall_rate = params.get("rainfall_rate", 0.001)
k_sp = params.get("k_sp", 0.0001)
diffusion_rate = params.get("diffusion_rate", 0.01)
# Instantiate components
flow_router = FlowAccumulator(
grid,
flow_director='D8',
depression_finder='DepressionFinderAndRouter'
)
# Stream power erosion
stream_power = FastscapeEroder(
grid,
K_sp=k_sp,
m_sp=0.5,
n_sp=1.0
)
# Linear diffusion (hillslope processes)
diffusion = LinearDiffuser(
grid,
linear_diffusivity=diffusion_rate
)
# Save initial state
initial_elevation = grid.at_node['topographic__elevation'].copy()
# Store states for animation
states = []
if params.get("save_steps", False):
# Save the initial state
snapshot = {
'step': 0,
'elevation': grid.at_node['topographic__elevation'].copy()
}
states.append(snapshot)
# Run the simulation for a specified number of time steps
timestep = rainfall_rate # Using rainfall rate as the time step
# Optionally save intermediate steps for animation
steps_to_save = min(20, n_steps) # Save at most 20 steps for animation
save_interval = max(1, n_steps // steps_to_save)
# Progress tracking
progress_interval = max(1, n_steps // 10)
for i in range(n_steps):
# Run components for one time step
flow_router.run_one_step()
stream_power.run_one_step(dt=timestep)
diffusion.run_one_step(dt=timestep)
# Save intermediate states for animation if requested
if params.get("save_steps", False) and i % save_interval == 0:
snapshot = {
'step': i + 1,
'elevation': grid.at_node['topographic__elevation'].copy()
}
states.append(snapshot)
# Print progress
if (i + 1) % progress_interval == 0:
print(f"Progress: {i + 1}/{n_steps} steps completed")
# Calculate the difference (erosion and deposition)
final_elevation = grid.at_node['topographic__elevation']
difference = final_elevation - initial_elevation
grid.add_field("topographic__change", difference, at="node")
# Calculate some statistics
total_erosion = abs(sum(difference[difference < 0]))
total_deposition = sum(difference[difference > 0])
max_erosion = abs(min(difference))
max_deposition = max(difference)
stats = {
'total_erosion': total_erosion,
'total_deposition': total_deposition,
'max_erosion': max_erosion,
'max_deposition': max_deposition,
'net_change': total_deposition - total_erosion
}
return grid, stats, states
# Visualize the topography
def visualize_topography(grid, filename=None, title="Topographic Elevation", colormap="terrain"):
"""Create a visualization of the topography."""
plt.figure(figsize=(10, 8))
# Plot topography
imshow_grid(grid, 'topographic__elevation', cmap=colormap, grid_units=('m', 'm'))
plt.colorbar(label='Elevation (m)')
plt.title(title)
if filename:
plt.savefig(filename, dpi=300, bbox_inches='tight')
plt.close()
return filename
else:
plt.close()
return plt
# Visualize the elevation change
def visualize_elevation_change(grid, filename=None):
"""Create a visualization of the elevation change (erosion and deposition)."""
plt.figure(figsize=(10, 8))
# Get the elevation change
difference = grid.at_node['topographic__change']
# Set symmetric limits for the colormap
vmax = max(abs(np.min(difference)), abs(np.max(difference)))
vmin = -vmax
# Plot the elevation change
imshow_grid(
grid,
'topographic__change',
cmap='RdBu_r', # Red for erosion, blue for deposition
grid_units=('m', 'm'),
vmin=vmin,
vmax=vmax
)
plt.colorbar(label='Elevation Change (m)')
plt.title('Erosion (red) and Deposition (blue)')
if filename:
plt.savefig(filename, dpi=300, bbox_inches='tight')
plt.close()
return filename
else:
plt.close()
return plt
# Create a 3D visualization
def visualize_3d(grid, filename=None, title="3D Topography"):
"""Create a 3D visualization of the topography."""
fig = plt.figure(figsize=(12, 10))
ax = fig.add_subplot(111, projection='3d')
# Get the shape of the grid
nc = grid.number_of_node_columns
nr = grid.number_of_node_rows
# Reshape the elevation data
elevation = grid.at_node['topographic__elevation'].reshape(nr, nc)
# Create the meshgrid
x, y = np.meshgrid(np.arange(nc), np.arange(nr))
# Create the 3D surface plot
surf = ax.plot_surface(
x, y, elevation,
cmap='terrain',
linewidth=0,
antialiased=True,
alpha=0.8
)
# Add a color bar
fig.colorbar(surf, ax=ax, shrink=0.5, aspect=5, label='Elevation (m)')
# Set labels and title
ax.set_xlabel('X (grid cells)')
ax.set_ylabel('Y (grid cells)')
ax.set_zlabel('Elevation (m)')
ax.set_title(title)
# Adjust the viewing angle
ax.view_init(elev=30, azim=225)
if filename:
plt.savefig(filename, dpi=300, bbox_inches='tight')
plt.close()
return filename
else:
plt.close()
return plt
# Create an animation of the evolution
def create_animation(states, grid, filename=None):
"""Create an animation of the landscape evolution."""
from matplotlib.animation import FuncAnimation
# Get the shape of the grid
nc = grid.number_of_node_columns
nr = grid.number_of_node_rows
fig, ax = plt.subplots(figsize=(10, 8))
# Set up the initial plot
initial_elevation = states[0]['elevation'].reshape(nr, nc)
img = ax.imshow(initial_elevation, cmap='terrain', aspect='equal')
plt.colorbar(img, ax=ax, label='Elevation (m)')
title = ax.set_title(f"Step: {states[0]['step']}")
# Animation update function
def update(frame):
# Update the image data
elevation = states[frame]['elevation'].reshape(nr, nc)
img.set_array(elevation)
title.set_text(f"Step: {states[frame]['step']}")
return img, title
# Create the animation
ani = FuncAnimation(
fig,
update,
frames=range(len(states)),
interval=200, # milliseconds between frames
blit=True
)
if filename:
# Save the animation
ani.save(filename, writer='pillow', fps=5)
plt.close()
return filename
else:
plt.close()
return ani
# Create a flow accumulation visualization
def visualize_flow_accumulation(grid, filename=None):
"""Create a visualization of flow accumulation."""
# Make sure we have flow accumulation data
if 'flow__accumulation' not in grid.at_node:
# Run the flow router to get accumulation
flow_router = FlowAccumulator(
grid,
flow_director='D8',
depression_finder='DepressionFinderAndRouter'
)
flow_router.run_one_step()
plt.figure(figsize=(10, 8))
# Log-transform the accumulation data for better visualization
log_acc = np.log10(grid.at_node['flow__accumulation'] + 1)
grid.add_field("log_flow__accumulation", log_acc, at="node")
# Plot the flow accumulation
imshow_grid(
grid,
'log_flow__accumulation',
cmap='Blues',
grid_units=('m', 'm')
)
plt.colorbar(label='Log10 Flow Accumulation')
plt.title('Flow Accumulation (log scale)')
if filename:
plt.savefig(filename, dpi=300, bbox_inches='tight')
plt.close()
return filename
else:
plt.close()
return plt
# Generate a summary report
def generate_report(grid, stats, params, initial_topo_file, final_topo_file, diff_topo_file, flow_acc_file, topo_3d_file):
"""Generate a summary HTML report."""
report_file = os.path.join(RESULTS_DIR, "summary_report.html")
# Create a simple HTML report
html = f"""
<!DOCTYPE html>
<html>
<head>
<title>GeoReclaimAI - Simulation Report</title>
<style>
body {{ font-family: Arial, sans-serif; line-height: 1.6; padding: 20px; max-width: 1200px; margin: 0 auto; }}
h1, h2, h3 {{ color: #333; }}
.container {{ display: flex; flex-wrap: wrap; justify-content: space-between; }}
.image-container {{ width: 48%; margin-bottom: 20px; }}
img {{ max-width: 100%; border: 1px solid #ddd; }}
table {{ border-collapse: collapse; width: 100%; margin: 20px 0; }}
th, td {{ border: 1px solid #ddd; padding: 8px; text-align: left; }}
th {{ background-color: #f2f2f2; }}
.stats {{ margin: 20px 0; padding: 15px; background-color: #f9f9f9; border-radius: 5px; }}
</style>
</head>
<body>
<h1>GeoReclaimAI - Landscape Evolution Simulation Report</h1>
<h2>Simulation Parameters</h2>
<table>
<tr><th>Parameter</th><th>Value</th></tr>
<tr><td>Simulation Steps</td><td>{params.get('n_steps', 1000)}</td></tr>
<tr><td>Rainfall Rate</td><td>{params.get('rainfall_rate', 0.001)}</td></tr>
<tr><td>Erosion Rate (K)</td><td>{params.get('k_sp', 0.0001)}</td></tr>
<tr><td>Hillslope Diffusion Rate</td><td>{params.get('diffusion_rate', 0.01)}</td></tr>
</table>
<h2>Erosion and Deposition Statistics</h2>
<div class="stats">
<p><strong>Total Erosion Volume:</strong> {stats['total_erosion']:.2f} m³</p>
<p><strong>Total Deposition Volume:</strong> {stats['total_deposition']:.2f} m³</p>
<p><strong>Maximum Erosion Depth:</strong> {stats['max_erosion']:.2f} m</p>
<p><strong>Maximum Deposition Height:</strong> {stats['max_deposition']:.2f} m</p>
<p><strong>Net Volume Change:</strong> {stats['net_change']:.2f} m³</p>
</div>
<h2>Visualizations</h2>
<div class="container">
<div class="image-container">
<h3>Initial Topography</h3>
<img src="{os.path.basename(initial_topo_file)}" alt="Initial Topography">
</div>
<div class="image-container">
<h3>Final Topography</h3>
<img src="{os.path.basename(final_topo_file)}" alt="Final Topography">
</div>
<div class="image-container">
<h3>Erosion and Deposition</h3>
<img src="{os.path.basename(diff_topo_file)}" alt="Erosion and Deposition">
</div>
<div class="image-container">
<h3>Flow Accumulation</h3>
<img src="{os.path.basename(flow_acc_file)}" alt="Flow Accumulation">
</div>
<div class="image-container">
<h3>3D Topography</h3>
<img src="{os.path.basename(topo_3d_file)}" alt="3D Topography">
</div>
</div>
<h2>Recommendations</h2>
<p>Based on the simulation results, the following recommendations can be made:</p>
<ul>
<li>Areas with high erosion rates should be stabilized with vegetation or erosion control structures.</li>
<li>Deposition areas may require drainage systems to prevent sediment accumulation.</li>
<li>High flow accumulation areas are prone to channel formation and should be reinforced.</li>
</ul>
<p><em>Report generated on {time.strftime('%Y-%m-%d %H:%M:%S')}</em></p>
</body>
</html>
"""
with open(report_file, "w") as f:
f.write(html)
return report_file
# Main function to run the landscape evolution simulation
def run_landscape_evolution(
dem_file=None,
grid_type="dome",
grid_rows=50,
grid_cols=50,
grid_resolution=10.0,
simulation_steps=1000,
rainfall_rate=0.001,
erosion_rate=0.0001,
diffusion_rate=0.01,
save_animation=False
):
# Clear previous results
for f in glob.glob(os.path.join(RESULTS_DIR, "*")):
try:
os.remove(f)
except:
pass
# Process input grid
if dem_file is not None:
grid = process_dem(dem_file)
else:
grid = setup_basic_grid(grid_type, grid_rows, grid_cols, grid_resolution)
# Set up parameters
params = {
"n_steps": simulation_steps,
"rainfall_rate": rainfall_rate,
"k_sp": erosion_rate,
"diffusion_rate": diffusion_rate,
"save_steps": save_animation
}
# Save initial state
initial_topo_file = os.path.join(RESULTS_DIR, "initial_topography.png")
visualize_topography(grid, initial_topo_file, title="Initial Topography")
# Run simulation
evolved_grid, stats, states = run_simulation(grid, params)
# Save final state
final_topo_file = os.path.join(RESULTS_DIR, "final_topography.png")
visualize_topography(evolved_grid, final_topo_file, title="Final Topography")
# Visualize the elevation change
diff_topo_file = os.path.join(RESULTS_DIR, "elevation_change.png")
visualize_elevation_change(evolved_grid, diff_topo_file)
# Visualize flow accumulation
flow_acc_file = os.path.join(RESULTS_DIR, "flow_accumulation.png")
visualize_flow_accumulation(evolved_grid, flow_acc_file)
# Create 3D visualization
topo_3d_file = os.path.join(RESULTS_DIR, "topography_3d.png")
visualize_3d(evolved_grid, topo_3d_file, title="Final 3D Topography")
# Create animation if requested
animation_file = None
if save_animation and states:
animation_file = os.path.join(RESULTS_DIR, "evolution_animation.gif")
create_animation(states, evolved_grid, animation_file)
# Export the final DEM
final_dem_file = os.path.join(RESULTS_DIR, "final_dem.asc")
write_esri_ascii(final_dem_file, evolved_grid)
# Generate report
report_file = generate_report(
evolved_grid,
stats,
params,
initial_topo_file,
final_topo_file,
diff_topo_file,
flow_acc_file,
topo_3d_file
)
return (
initial_topo_file,
final_topo_file,
diff_topo_file,
flow_acc_file,
topo_3d_file,
animation_file if animation_file else "",
report_file,
final_dem_file
)
# Set up the Gradio interface
demo = gr.Blocks(title="GeoReclaimAI - Landscape Evolution Model")
with demo:
gr.Markdown("# GeoReclaimAI - Landscape Evolution Model")
gr.Markdown("A tool for simulating erosion patterns on reclamation designs. Upload a DEM file or use the built-in terrain generators.")
with gr.Tabs():
with gr.TabItem("Simulation Setup"):
with gr.Row():
with gr.Column():
# Updated File component for better handling in Gradio 5.24.0
gr.Markdown("""
### Step 1: Upload your DEM file
1. Click "Browse" to select your ESRI ASCII (.asc) file
2. After uploading, click "Process Uploaded File" to prepare the file for simulation
3. Once the file is processed, click "Run Simulation" to start the landscape evolution model
""")
dem_file = gr.File(
label="Upload DEM File (Optional - ESRI ASCII format)",
file_types=[".asc", ".txt"],
type="filepath",
file_count="single",
interactive=True
)
# Add a file upload status indicator
file_status = gr.Markdown("No file uploaded yet")
# Add a button to explicitly save the uploaded file
upload_button = gr.Button("Process Uploaded File", variant="secondary", size="lg")
# Event handler for file upload
def process_uploaded_file(file_path):
global saved_dem_path
if not file_path:
return "No file selected. Please upload a file first."
if not os.path.exists(file_path):
return f"Error: File not found at {file_path}"
try:
# Check if file appears to be an ESRI ASCII Grid
file_size = os.path.getsize(file_path)
if file_size == 0:
return "Error: File is empty"
# Check first few lines to validate format
with open(file_path, 'r') as f:
try:
# Check header lines
header_valid = True
expected_headers = ["ncols", "nrows", "xll", "yll", "cell", "nodata"]
for i, expected in enumerate(expected_headers):
try:
line = f.readline().strip().lower()
if not line.startswith(expected):
header_valid = False
return f"Error: Invalid ESRI ASCII file format. Expected '{expected}' in line {i+1}, but got: '{line}'"
except Exception:
header_valid = False
return f"Error: File too short, missing required header line {i+1}"
# Try to parse a few data lines
data_lines_valid = True
for i in range(3): # Check first 3 data lines
try:
line = f.readline().strip()
# Check if line contains valid numbers
try:
[float(val) for val in line.split()]
except ValueError:
data_lines_valid = False
return f"Error: Line {i+7} contains non-numeric data: '{line}'"
except Exception:
if i == 0: # Must have at least one data line
return "Error: File contains header but no data lines"
break
except UnicodeDecodeError:
return "Error: File is not a valid text file. Please upload an ASCII file."
except Exception as e:
print(f"Warning during file validation: {str(e)}")
# Copy file to temp directory
filename = os.path.basename(file_path)
temp_path = os.path.join(TEMP_DIR, filename)
# Make sure TEMP_DIR exists
os.makedirs(TEMP_DIR, exist_ok=True)
# Simple file copy
with open(file_path, 'rb') as src, open(temp_path, 'wb') as dst:
dst.write(src.read())
# Verify file was copied
if not os.path.exists(temp_path):
return "Error: Failed to save file"
# Store path globally
saved_dem_path = temp_path
file_size_mb = file_size / (1024*1024)
return f"File processed and ready: {filename} ({file_size_mb:.2f} MB)"
except Exception as e:
import traceback
print(f"Error processing file: {str(e)}")
print(traceback.format_exc())
return f"Error processing file: {str(e)}"
# Connect file upload button
upload_button.click(
fn=process_uploaded_file,
inputs=[dem_file],
outputs=[file_status]
)
with gr.Group():
gr.Markdown("### Terrain Generator (if no DEM is provided)")
grid_type = gr.Radio(
choices=["dome", "valley", "plateau", "random", "mine_pit", "flat"],
value="dome",
label="Terrain Type"
)
grid_rows = gr.Slider(minimum=20, maximum=200, value=50, step=10, label="Grid Rows")
grid_cols = gr.Slider(minimum=20, maximum=200, value=50, step=10, label="Grid Columns")
grid_resolution = gr.Slider(minimum=1.0, maximum=100.0, value=10.0, step=1.0, label="Grid Resolution (m)")
with gr.Column():
with gr.Group():
gr.Markdown("### Simulation Parameters")
simulation_steps = gr.Slider(minimum=100, maximum=10000, value=1000, step=100, label="Simulation Steps")
rainfall_rate = gr.Slider(minimum=0.0001, maximum=0.01, value=0.001, step=0.0001, label="Rainfall Rate")
erosion_rate = gr.Slider(minimum=0.00001, maximum=0.001, value=0.0001, step=0.00001, label="Erosion Rate (K)")
diffusion_rate = gr.Slider(minimum=0.001, maximum=0.1, value=0.01, step=0.001, label="Hillslope Diffusion Rate")
save_animation = gr.Checkbox(label="Generate Animation (slower)", value=False)
# Split into two buttons for clarity
with gr.Row():
gr.Markdown("### Step 2: Run Simulation")
with gr.Row():
run_btn = gr.Button("Run Simulation", variant="primary", size="lg")
debug_btn = gr.Button("Debug File Upload", variant="secondary")
progress_text = gr.Markdown("Simulation status will appear here.")
gr.Markdown("Note: Simulations may take several minutes to complete, especially for larger grids or more steps.")
results_tab = gr.TabItem("Results")
with results_tab:
with gr.Row():
with gr.Column():
gr.Markdown("### Initial Topography")
initial_topo = gr.Image(label="Initial Topography")
with gr.Column():
gr.Markdown("### Final Topography")
final_topo = gr.Image(label="Final Topography")
with gr.Row():
with gr.Column():
gr.Markdown("### Erosion and Deposition")
diff_topo = gr.Image(label="Erosion (red) and Deposition (blue)")
with gr.Column():
gr.Markdown("### Flow Accumulation")
flow_acc = gr.Image(label="Flow Accumulation")
with gr.Row():
with gr.Column():
gr.Markdown("### 3D Visualization")
topo_3d = gr.Image(label="3D Topography")
with gr.Column():
gr.Markdown("### Animation")
evolution_animation = gr.Image(label="Evolution Animation")
with gr.Row():
with gr.Column():
gr.Markdown("### Summary Report")
report = gr.File(label="Download Summary Report")
with gr.Column():
gr.Markdown("### Output Files")
final_dem = gr.File(label="Download Final DEM (ESRI ASCII)")
with gr.TabItem("Help"):
gr.Markdown("""
# GeoReclaimAI - Help Guide
## Overview
GeoReclaimAI is a tool for simulating landscape evolution and erosion patterns on reclamation designs. It uses the Landlab framework to model how terrain changes over time due to rainfall, erosion, and sediment transport.
## Input Options
### DEM File
You can upload a Digital Elevation Model (DEM) file in ESRI ASCII format. This should be a grid of elevation values with header information about the grid dimensions and cell size.
### Terrain Generator
If you don't have a DEM file, you can use the built-in terrain generator to create a simple terrain:
- **Dome**: A dome-shaped hill
- **Valley**: A terrain with ridges and valleys
- **Plateau**: A flat-topped plateau with escarpments
- **Random**: A randomly generated terrain with some smoothing
- **Mine Pit**: A terrain with a central depression, like an open-pit mine
- **Flat**: A flat surface with slight random variations
You can also set the grid dimensions and resolution.
## Simulation Parameters
- **Simulation Steps**: Number of time steps to run the simulation. More steps mean more evolution but take longer to compute.
- **Rainfall Rate**: Controls the intensity of rainfall and thus the rate of erosion.
- **Erosion Rate (K)**: The erodibility coefficient. Higher values mean faster erosion.
- **Hillslope Diffusion Rate**: Controls the rate of hillslope processes like soil creep. Higher values mean faster smoothing of the landscape.
- **Generate Animation**: If checked, the tool will create an animation of the landscape evolution process. This makes the simulation slower.
## Outputs
### Visualizations
- **Initial Topography**: The starting terrain
- **Final Topography**: The terrain after simulation
- **Erosion and Deposition**: Red areas show erosion, blue areas show deposition
- **Flow Accumulation**: Shows where water flows and accumulates
- **3D Visualization**: A 3D view of the final terrain
- **Animation**: If requested, an animation of the landscape evolution process
### Files
- **Summary Report**: An HTML report with simulation results and statistics
- **Final DEM**: The final terrain as an ESRI ASCII file for use in GIS software
## Tips for Effective Use
1. Start with a simple terrain and fewer simulation steps to get quick results.
2. Adjust parameters gradually to see their effects.
3. For reclamation design, focus on areas with high erosion rates.
4. Use the flow accumulation output to identify potential drainage issues.
5. The animation can help understand the evolution process but takes longer to generate.
6. Export the final DEM for further analysis in GIS software.
""")
# Update the event handler for Gradio 5.24.0
def run_sim_with_progress():
progress_text.update("Starting simulation...")
global saved_dem_path
try:
# First check if we have a saved DEM path from the upload button
if saved_dem_path is not None and os.path.exists(saved_dem_path):
print(f"Using previously processed DEM file: {saved_dem_path}")
dem_data = saved_dem_path
# Log file size and check if it's readable
file_size = os.path.getsize(dem_data)
print(f"File size: {file_size} bytes ({file_size / (1024*1024):.2f} MB)")
# Attempt to read the file
try:
with open(dem_data, 'r') as f:
for i, line in enumerate(f):
if i >= 10:
break
print(f"Line {i+1}: {line.strip()}")
except Exception as read_error:
print(f"Warning - File read error (but continuing anyway): {str(read_error)}")
progress_text.update("Processing DEM file...")
else:
# Fallback to the file component value
dem_data = dem_file.value
print(f"DEM file value type: {type(dem_data)}")
print(f"DEM file value content: {dem_data}")
# With type="filepath", dem_data is the direct path to the file or None
if dem_data is not None and dem_data:
print(f"Using uploaded DEM file: {dem_data}")
# Verify the file exists
if os.path.exists(dem_data):
print(f"File exists at path: {dem_data}")
# Copy the file to TEMP_DIR to ensure we have access to it
try:
filename = os.path.basename(dem_data)
temp_path = os.path.join(TEMP_DIR, filename)
with open(dem_data, 'rb') as src, open(temp_path, 'wb') as dst:
dst.write(src.read())
dem_data = temp_path
saved_dem_path = temp_path
print(f"Copied file to: {temp_path}")
except Exception as copy_error:
print(f"Warning - Copy error (but continuing anyway): {str(copy_error)}")
else:
print(f"File does not exist at path: {dem_data}")
error_msg = "Error: Uploaded file not found"
progress_text.update(error_msg)
return [None] * 8 + [error_msg]
else:
print("No file provided directly. Checking temp directory for ASC files...")
# Check if there are any ASC files in the temp directory that might be the uploaded file
temp_files = os.listdir(TEMP_DIR)
asc_files = [f for f in temp_files if f.endswith('.asc') or f.endswith('.txt')]
if asc_files:
# Use the first ASC file found
dem_data = os.path.join(TEMP_DIR, asc_files[0])
print(f"Found ASC file in temp directory: {dem_data}")
saved_dem_path = dem_data
else:
print("No ASC files found in temp directory, using terrain generator")
dem_data = None
# Run the landscape evolution simulation
progress_msg = "Starting simulation. This may take several minutes..."
progress_text.update(progress_msg)
try:
outputs = run_landscape_evolution(
dem_file=dem_data,
grid_type=grid_type.value,
grid_rows=grid_rows.value,
grid_cols=grid_cols.value,
grid_resolution=grid_resolution.value,
simulation_steps=simulation_steps.value,
rainfall_rate=rainfall_rate.value,
erosion_rate=erosion_rate.value,
diffusion_rate=diffusion_rate.value,
save_animation=save_animation.value
)
success_msg = "Simulation completed successfully!"
progress_text.update(success_msg)
# Return the visualization outputs plus the status message
return list(outputs) + [success_msg]
except Exception as sim_error:
import traceback
error_msg = f"Error in simulation: {str(sim_error)}"
print(error_msg)
print(traceback.format_exc())
progress_text.update(error_msg)
return [None] * 8 + [error_msg]
except Exception as e:
import traceback
error_msg = f"Error setting up simulation: {str(e)}"
print(error_msg)
print(traceback.format_exc())
progress_text.update(error_msg)
return [None] * 8 + [error_msg]
# Connect the run button to the simulation function using Gradio 5.24.0 syntax
run_btn.click(
fn=run_sim_with_progress,
inputs=None,
outputs=[
initial_topo,
final_topo,
diff_topo,
flow_acc,
topo_3d,
evolution_animation,
report,
final_dem,
progress_text
]
)
# Debug function to check file upload status
def debug_file_upload():
file_value = dem_file.value
debug_info = [
f"File Value Type: {type(file_value)}",
f"File Value: {file_value}",
]
# Check internal component state
debug_info.append(f"File Component State: {dem_file.__dict__.get('value', 'Not Available')}")
if file_value is not None and file_value:
# With type="filepath", file_value is directly the file path
debug_info.append(f"File path: {file_value}")
# Check if file exists
if os.path.exists(file_value):
debug_info.append(f"File exists: Yes")
file_size = os.path.getsize(file_value)
debug_info.append(f"File size: {file_size} bytes ({file_size / (1024*1024):.2f} MB)")
# Try to read the first few lines to verify content
try:
with open(file_value, 'r') as f:
first_lines = [next(f) for _ in range(10) if _ < 10]
debug_info.append("First 10 lines of file:")
for i, line in enumerate(first_lines):
debug_info.append(f" Line {i+1}: {line.strip()}")
except Exception as e:
debug_info.append(f"Error reading file: {str(e)}")
else:
debug_info.append(f"File exists: No")
# List files in temp directory
debug_info.append("Files in temp directory:")
for f in os.listdir(TEMP_DIR):
f_path = os.path.join(TEMP_DIR, f)
f_size = os.path.getsize(f_path)
debug_info.append(f" {f} ({f_size / (1024*1024):.2f} MB)")
else:
debug_info.append("No file uploaded or file value is None")
# Check if there are any files in the temp directory
temp_files = os.listdir(TEMP_DIR)
if temp_files:
debug_info.append("Files in temp directory that might be the uploaded file:")
for f in temp_files:
f_path = os.path.join(TEMP_DIR, f)
if f.endswith('.asc') or f.endswith('.txt'):
f_size = os.path.getsize(f_path)
debug_info.append(f" {f} ({f_size / (1024*1024):.2f} MB)")
# Try to read the first few lines
try:
with open(f_path, 'r') as file:
first_lines = [next(file) for _ in range(5) if _ < 5]
debug_info.append(" First 5 lines:")
for i, line in enumerate(first_lines):
debug_info.append(f" Line {i+1}: {line.strip()}")
except Exception as e:
debug_info.append(f" Error reading file: {str(e)}")
return "\n".join(debug_info)
# Connect the debug button
debug_btn.click(
fn=debug_file_upload,
inputs=None,
outputs=progress_text
)
# Launch the interface
if __name__ == "__main__":
demo.launch()