Spaces:
Sleeping
Sleeping
| import os | |
| import numpy as np | |
| import matplotlib.pyplot as plt | |
| import gradio as gr | |
| from landlab import RasterModelGrid, imshow_grid | |
| from landlab.components import ( | |
| FlowAccumulator, | |
| FastscapeEroder, | |
| LinearDiffuser | |
| ) | |
| from landlab.io import read_esri_ascii, write_esri_ascii | |
| import matplotlib | |
| matplotlib.use('Agg') # Use non-interactive backend for Matplotlib | |
| import time | |
| import glob | |
| from mpl_toolkits.mplot3d import Axes3D | |
| from scipy.ndimage import gaussian_filter | |
| # Set up paths and directories | |
| RESULTS_DIR = "results" | |
| TEMP_DIR = "temp" | |
| os.makedirs(RESULTS_DIR, exist_ok=True) | |
| os.makedirs(TEMP_DIR, exist_ok=True) | |
| # Global variable to store the path to the uploaded DEM file | |
| saved_dem_path = None | |
| # Set up a basic grid | |
| def setup_basic_grid(grid_type, rows=50, cols=50, dx=10.0): | |
| """Create a grid with different initial topographies.""" | |
| mg = RasterModelGrid((rows, cols), dx) | |
| # Create node coordinates for the grid | |
| x, y = np.meshgrid(np.linspace(0, cols-1, cols), np.linspace(0, rows-1, rows)) | |
| center_x, center_y = cols / 2, rows / 2 | |
| # Define elevation based on grid type | |
| if grid_type == "dome": | |
| # Dome-shaped topography | |
| distance = np.sqrt((x - center_x)**2 + (y - center_y)**2) | |
| elevation = 1000 - distance * 10 | |
| elif grid_type == "valley": | |
| # Valley with ridges | |
| elevation = 1000 + 500 * np.sin(x / cols * np.pi * 2) + 500 * np.cos(y / rows * np.pi * 2) | |
| elif grid_type == "plateau": | |
| # Plateau with escarpment | |
| distance = np.sqrt((x - center_x)**2 + (y - center_y)**2) | |
| elevation = 1000 * (distance < (cols/3)) + 500 * ((distance >= (cols/3)) & (distance < (cols/2))) + 200 * (distance >= (cols/2)) | |
| elif grid_type == "random": | |
| # Random terrain | |
| elevation = 1000 + 200 * np.random.rand(rows, cols) | |
| # Smooth the random terrain | |
| elevation = gaussian_filter(elevation, sigma=2) | |
| elif grid_type == "mine_pit": | |
| # Mining pit with surrounding area | |
| distance = np.sqrt((x - center_x)**2 + (y - center_y)**2) | |
| elevation = 1000 - 800 * np.exp(-distance**2 / (cols/4)**2) | |
| else: # Default is flat with noise | |
| elevation = 1000 + 50 * np.random.rand(rows, cols) | |
| # Add the topographic elevation field | |
| mg.add_field("topographic__elevation", elevation.flatten(), at="node") | |
| return mg | |
| # Process the uploaded DEM | |
| def process_dem(dem_file): | |
| """Process an uploaded DEM file.""" | |
| # With type="filepath", dem_file is directly the path to the uploaded file, or None | |
| if dem_file is None: | |
| print("No DEM file provided, using default dome") | |
| # Return a default grid if no file is provided | |
| return setup_basic_grid("dome") | |
| print(f"Processing DEM file: {dem_file} (type: {type(dem_file)})") | |
| try: | |
| # Check if file exists | |
| if not os.path.exists(dem_file): | |
| print(f"File does not exist at path: {dem_file}") | |
| return setup_basic_grid("dome") | |
| # Try to read the file directly to diagnose format issues | |
| try: | |
| with open(dem_file, 'r') as f: | |
| header_lines = [] | |
| for i in range(6): # ESRI ASCII files should have 6 header lines | |
| try: | |
| line = next(f).strip() | |
| header_lines.append(line) | |
| print(f"Header line {i+1}: {line}") | |
| except StopIteration: | |
| print(f"File too short, only {i} header lines") | |
| break | |
| # Check if the header looks correct | |
| has_valid_header = ( | |
| len(header_lines) >= 6 and | |
| header_lines[0].lower().startswith('ncols') and | |
| header_lines[1].lower().startswith('nrows') and | |
| header_lines[2].lower().startswith('xll') and | |
| header_lines[3].lower().startswith('yll') and | |
| header_lines[4].lower().startswith('cell') and | |
| header_lines[5].lower().startswith('nodata') | |
| ) | |
| if has_valid_header: | |
| print("File has valid ESRI ASCII header format") | |
| else: | |
| print("File doesn't have valid ESRI ASCII header format") | |
| # Read a few data lines | |
| data_lines = [] | |
| for i in range(4): # Read 4 data lines | |
| try: | |
| line = next(f).strip() | |
| data_lines.append(line) | |
| print(f"Data line {i+1}: {line}") | |
| except StopIteration: | |
| break | |
| except Exception as file_read_e: | |
| print(f"Warning: Could not read the file directly: {file_read_e}") | |
| has_valid_header = False | |
| # Read the DEM file with landlab | |
| try: | |
| print(f"Attempting to read file with landlab.io.read_esri_ascii: {dem_file}") | |
| try: | |
| (grid, elevation) = read_esri_ascii(dem_file) | |
| print(f"Successfully read DEM file with grid shape: {grid.shape}") | |
| return grid | |
| except Exception as e: | |
| print(f"Error with read_esri_ascii: {e}") | |
| # If there's no valid header or read_esri_ascii failed, try manual parsing | |
| if not has_valid_header: | |
| raise ValueError("Invalid ESRI ASCII header") | |
| # Try a different approach if the header is invalid but seems to be in the right format | |
| try: | |
| print("Trying to parse ASCII grid manually...") | |
| header = {} | |
| # Parse header from our previously read header lines | |
| if len(header_lines) >= 6: | |
| try: | |
| header['ncols'] = int(header_lines[0].split()[1]) | |
| header['nrows'] = int(header_lines[1].split()[1]) | |
| header['xllcorner'] = float(header_lines[2].split()[1]) | |
| header['yllcorner'] = float(header_lines[3].split()[1]) | |
| header['cellsize'] = float(header_lines[4].split()[1]) | |
| header['nodata_value'] = float(header_lines[5].split()[1]) | |
| except (IndexError, ValueError) as parse_error: | |
| print(f"Error parsing header lines: {parse_error}") | |
| raise ValueError("Invalid header values") | |
| else: | |
| # If we don't have header lines, try to reopen the file | |
| with open(dem_file, 'r') as f: | |
| header['ncols'] = int(f.readline().split()[1]) | |
| header['nrows'] = int(f.readline().split()[1]) | |
| header['xllcorner'] = float(f.readline().split()[1]) | |
| header['yllcorner'] = float(f.readline().split()[1]) | |
| header['cellsize'] = float(f.readline().split()[1]) | |
| header['nodata_value'] = float(f.readline().split()[1]) | |
| # Validate the header values make sense | |
| if header['ncols'] <= 0 or header['nrows'] <= 0 or header['cellsize'] <= 0: | |
| raise ValueError("Invalid header values (negative or zero dimensions)") | |
| if header['ncols'] > 10000 or header['nrows'] > 10000: | |
| raise ValueError("Grid too large, dimensions exceed 10000") | |
| print(f"Header parsed: ncols={header['ncols']}, nrows={header['nrows']}, cellsize={header['cellsize']}") | |
| # Create empty grid | |
| mg = RasterModelGrid((header['nrows'], header['ncols']), header['cellsize']) | |
| # Read elevation data | |
| with open(dem_file, 'r') as f: | |
| # Skip header lines | |
| for i in range(6): | |
| f.readline() | |
| # Read all data lines | |
| data = [] | |
| for line in f: | |
| data.extend([float(val) for val in line.split()]) | |
| # Check if we have the right amount of data | |
| expected_data_points = header['nrows'] * header['ncols'] | |
| if len(data) != expected_data_points: | |
| print(f"Warning: Data length mismatch. Expected {expected_data_points}, got {len(data)}") | |
| # Fill or truncate if needed | |
| if len(data) < expected_data_points: | |
| nodata = header['nodata_value'] | |
| data.extend([nodata] * (expected_data_points - len(data))) | |
| print(f"Extended data with {expected_data_points - len(data)} nodata values") | |
| else: | |
| data = data[:expected_data_points] | |
| print(f"Truncated data to {expected_data_points} values") | |
| # Set elevation field | |
| mg.add_field("topographic__elevation", np.array(data), at="node") | |
| print(f"Successfully parsed ASCII grid manually. Grid shape: {mg.shape}") | |
| return mg | |
| except Exception as manual_e: | |
| import traceback | |
| print(f"Error with manual parsing: {manual_e}") | |
| print(traceback.format_exc()) | |
| print("Falling back to default grid") | |
| return setup_basic_grid("dome") | |
| except Exception as inner_e: | |
| import traceback | |
| print(f"Error reading DEM with read_esri_ascii: {inner_e}") | |
| print(traceback.format_exc()) | |
| # Return a default grid if there's an error | |
| return setup_basic_grid("dome") | |
| except Exception as e: | |
| import traceback | |
| print(f"Error processing DEM file: {e}") | |
| print(traceback.format_exc()) | |
| # Return a default grid if there's an error | |
| return setup_basic_grid("dome") | |
| # Run the landscape evolution simulation | |
| def run_simulation(grid, params): | |
| """Run a landscape evolution simulation with the given parameters.""" | |
| # Extract parameters | |
| n_steps = params.get("n_steps", 1000) | |
| rainfall_rate = params.get("rainfall_rate", 0.001) | |
| k_sp = params.get("k_sp", 0.0001) | |
| diffusion_rate = params.get("diffusion_rate", 0.01) | |
| # Instantiate components | |
| flow_router = FlowAccumulator( | |
| grid, | |
| flow_director='D8', | |
| depression_finder='DepressionFinderAndRouter' | |
| ) | |
| # Stream power erosion | |
| stream_power = FastscapeEroder( | |
| grid, | |
| K_sp=k_sp, | |
| m_sp=0.5, | |
| n_sp=1.0 | |
| ) | |
| # Linear diffusion (hillslope processes) | |
| diffusion = LinearDiffuser( | |
| grid, | |
| linear_diffusivity=diffusion_rate | |
| ) | |
| # Save initial state | |
| initial_elevation = grid.at_node['topographic__elevation'].copy() | |
| # Store states for animation | |
| states = [] | |
| if params.get("save_steps", False): | |
| # Save the initial state | |
| snapshot = { | |
| 'step': 0, | |
| 'elevation': grid.at_node['topographic__elevation'].copy() | |
| } | |
| states.append(snapshot) | |
| # Run the simulation for a specified number of time steps | |
| timestep = rainfall_rate # Using rainfall rate as the time step | |
| # Optionally save intermediate steps for animation | |
| steps_to_save = min(20, n_steps) # Save at most 20 steps for animation | |
| save_interval = max(1, n_steps // steps_to_save) | |
| # Progress tracking | |
| progress_interval = max(1, n_steps // 10) | |
| for i in range(n_steps): | |
| # Run components for one time step | |
| flow_router.run_one_step() | |
| stream_power.run_one_step(dt=timestep) | |
| diffusion.run_one_step(dt=timestep) | |
| # Save intermediate states for animation if requested | |
| if params.get("save_steps", False) and i % save_interval == 0: | |
| snapshot = { | |
| 'step': i + 1, | |
| 'elevation': grid.at_node['topographic__elevation'].copy() | |
| } | |
| states.append(snapshot) | |
| # Print progress | |
| if (i + 1) % progress_interval == 0: | |
| print(f"Progress: {i + 1}/{n_steps} steps completed") | |
| # Calculate the difference (erosion and deposition) | |
| final_elevation = grid.at_node['topographic__elevation'] | |
| difference = final_elevation - initial_elevation | |
| grid.add_field("topographic__change", difference, at="node") | |
| # Calculate some statistics | |
| total_erosion = abs(sum(difference[difference < 0])) | |
| total_deposition = sum(difference[difference > 0]) | |
| max_erosion = abs(min(difference)) | |
| max_deposition = max(difference) | |
| stats = { | |
| 'total_erosion': total_erosion, | |
| 'total_deposition': total_deposition, | |
| 'max_erosion': max_erosion, | |
| 'max_deposition': max_deposition, | |
| 'net_change': total_deposition - total_erosion | |
| } | |
| return grid, stats, states | |
| # Visualize the topography | |
| def visualize_topography(grid, filename=None, title="Topographic Elevation", colormap="terrain"): | |
| """Create a visualization of the topography.""" | |
| plt.figure(figsize=(10, 8)) | |
| # Plot topography | |
| imshow_grid(grid, 'topographic__elevation', cmap=colormap, grid_units=('m', 'm')) | |
| plt.colorbar(label='Elevation (m)') | |
| plt.title(title) | |
| if filename: | |
| plt.savefig(filename, dpi=300, bbox_inches='tight') | |
| plt.close() | |
| return filename | |
| else: | |
| plt.close() | |
| return plt | |
| # Visualize the elevation change | |
| def visualize_elevation_change(grid, filename=None): | |
| """Create a visualization of the elevation change (erosion and deposition).""" | |
| plt.figure(figsize=(10, 8)) | |
| # Get the elevation change | |
| difference = grid.at_node['topographic__change'] | |
| # Set symmetric limits for the colormap | |
| vmax = max(abs(np.min(difference)), abs(np.max(difference))) | |
| vmin = -vmax | |
| # Plot the elevation change | |
| imshow_grid( | |
| grid, | |
| 'topographic__change', | |
| cmap='RdBu_r', # Red for erosion, blue for deposition | |
| grid_units=('m', 'm'), | |
| vmin=vmin, | |
| vmax=vmax | |
| ) | |
| plt.colorbar(label='Elevation Change (m)') | |
| plt.title('Erosion (red) and Deposition (blue)') | |
| if filename: | |
| plt.savefig(filename, dpi=300, bbox_inches='tight') | |
| plt.close() | |
| return filename | |
| else: | |
| plt.close() | |
| return plt | |
| # Create a 3D visualization | |
| def visualize_3d(grid, filename=None, title="3D Topography"): | |
| """Create a 3D visualization of the topography.""" | |
| fig = plt.figure(figsize=(12, 10)) | |
| ax = fig.add_subplot(111, projection='3d') | |
| # Get the shape of the grid | |
| nc = grid.number_of_node_columns | |
| nr = grid.number_of_node_rows | |
| # Reshape the elevation data | |
| elevation = grid.at_node['topographic__elevation'].reshape(nr, nc) | |
| # Create the meshgrid | |
| x, y = np.meshgrid(np.arange(nc), np.arange(nr)) | |
| # Create the 3D surface plot | |
| surf = ax.plot_surface( | |
| x, y, elevation, | |
| cmap='terrain', | |
| linewidth=0, | |
| antialiased=True, | |
| alpha=0.8 | |
| ) | |
| # Add a color bar | |
| fig.colorbar(surf, ax=ax, shrink=0.5, aspect=5, label='Elevation (m)') | |
| # Set labels and title | |
| ax.set_xlabel('X (grid cells)') | |
| ax.set_ylabel('Y (grid cells)') | |
| ax.set_zlabel('Elevation (m)') | |
| ax.set_title(title) | |
| # Adjust the viewing angle | |
| ax.view_init(elev=30, azim=225) | |
| if filename: | |
| plt.savefig(filename, dpi=300, bbox_inches='tight') | |
| plt.close() | |
| return filename | |
| else: | |
| plt.close() | |
| return plt | |
| # Create an animation of the evolution | |
| def create_animation(states, grid, filename=None): | |
| """Create an animation of the landscape evolution.""" | |
| from matplotlib.animation import FuncAnimation | |
| # Get the shape of the grid | |
| nc = grid.number_of_node_columns | |
| nr = grid.number_of_node_rows | |
| fig, ax = plt.subplots(figsize=(10, 8)) | |
| # Set up the initial plot | |
| initial_elevation = states[0]['elevation'].reshape(nr, nc) | |
| img = ax.imshow(initial_elevation, cmap='terrain', aspect='equal') | |
| plt.colorbar(img, ax=ax, label='Elevation (m)') | |
| title = ax.set_title(f"Step: {states[0]['step']}") | |
| # Animation update function | |
| def update(frame): | |
| # Update the image data | |
| elevation = states[frame]['elevation'].reshape(nr, nc) | |
| img.set_array(elevation) | |
| title.set_text(f"Step: {states[frame]['step']}") | |
| return img, title | |
| # Create the animation | |
| ani = FuncAnimation( | |
| fig, | |
| update, | |
| frames=range(len(states)), | |
| interval=200, # milliseconds between frames | |
| blit=True | |
| ) | |
| if filename: | |
| # Save the animation | |
| ani.save(filename, writer='pillow', fps=5) | |
| plt.close() | |
| return filename | |
| else: | |
| plt.close() | |
| return ani | |
| # Create a flow accumulation visualization | |
| def visualize_flow_accumulation(grid, filename=None): | |
| """Create a visualization of flow accumulation.""" | |
| # Make sure we have flow accumulation data | |
| if 'flow__accumulation' not in grid.at_node: | |
| # Run the flow router to get accumulation | |
| flow_router = FlowAccumulator( | |
| grid, | |
| flow_director='D8', | |
| depression_finder='DepressionFinderAndRouter' | |
| ) | |
| flow_router.run_one_step() | |
| plt.figure(figsize=(10, 8)) | |
| # Log-transform the accumulation data for better visualization | |
| log_acc = np.log10(grid.at_node['flow__accumulation'] + 1) | |
| grid.add_field("log_flow__accumulation", log_acc, at="node") | |
| # Plot the flow accumulation | |
| imshow_grid( | |
| grid, | |
| 'log_flow__accumulation', | |
| cmap='Blues', | |
| grid_units=('m', 'm') | |
| ) | |
| plt.colorbar(label='Log10 Flow Accumulation') | |
| plt.title('Flow Accumulation (log scale)') | |
| if filename: | |
| plt.savefig(filename, dpi=300, bbox_inches='tight') | |
| plt.close() | |
| return filename | |
| else: | |
| plt.close() | |
| return plt | |
| # Generate a summary report | |
| def generate_report(grid, stats, params, initial_topo_file, final_topo_file, diff_topo_file, flow_acc_file, topo_3d_file): | |
| """Generate a summary HTML report.""" | |
| report_file = os.path.join(RESULTS_DIR, "summary_report.html") | |
| # Create a simple HTML report | |
| html = f""" | |
| <!DOCTYPE html> | |
| <html> | |
| <head> | |
| <title>GeoReclaimAI - Simulation Report</title> | |
| <style> | |
| body {{ font-family: Arial, sans-serif; line-height: 1.6; padding: 20px; max-width: 1200px; margin: 0 auto; }} | |
| h1, h2, h3 {{ color: #333; }} | |
| .container {{ display: flex; flex-wrap: wrap; justify-content: space-between; }} | |
| .image-container {{ width: 48%; margin-bottom: 20px; }} | |
| img {{ max-width: 100%; border: 1px solid #ddd; }} | |
| table {{ border-collapse: collapse; width: 100%; margin: 20px 0; }} | |
| th, td {{ border: 1px solid #ddd; padding: 8px; text-align: left; }} | |
| th {{ background-color: #f2f2f2; }} | |
| .stats {{ margin: 20px 0; padding: 15px; background-color: #f9f9f9; border-radius: 5px; }} | |
| </style> | |
| </head> | |
| <body> | |
| <h1>GeoReclaimAI - Landscape Evolution Simulation Report</h1> | |
| <h2>Simulation Parameters</h2> | |
| <table> | |
| <tr><th>Parameter</th><th>Value</th></tr> | |
| <tr><td>Simulation Steps</td><td>{params.get('n_steps', 1000)}</td></tr> | |
| <tr><td>Rainfall Rate</td><td>{params.get('rainfall_rate', 0.001)}</td></tr> | |
| <tr><td>Erosion Rate (K)</td><td>{params.get('k_sp', 0.0001)}</td></tr> | |
| <tr><td>Hillslope Diffusion Rate</td><td>{params.get('diffusion_rate', 0.01)}</td></tr> | |
| </table> | |
| <h2>Erosion and Deposition Statistics</h2> | |
| <div class="stats"> | |
| <p><strong>Total Erosion Volume:</strong> {stats['total_erosion']:.2f} m³</p> | |
| <p><strong>Total Deposition Volume:</strong> {stats['total_deposition']:.2f} m³</p> | |
| <p><strong>Maximum Erosion Depth:</strong> {stats['max_erosion']:.2f} m</p> | |
| <p><strong>Maximum Deposition Height:</strong> {stats['max_deposition']:.2f} m</p> | |
| <p><strong>Net Volume Change:</strong> {stats['net_change']:.2f} m³</p> | |
| </div> | |
| <h2>Visualizations</h2> | |
| <div class="container"> | |
| <div class="image-container"> | |
| <h3>Initial Topography</h3> | |
| <img src="{os.path.basename(initial_topo_file)}" alt="Initial Topography"> | |
| </div> | |
| <div class="image-container"> | |
| <h3>Final Topography</h3> | |
| <img src="{os.path.basename(final_topo_file)}" alt="Final Topography"> | |
| </div> | |
| <div class="image-container"> | |
| <h3>Erosion and Deposition</h3> | |
| <img src="{os.path.basename(diff_topo_file)}" alt="Erosion and Deposition"> | |
| </div> | |
| <div class="image-container"> | |
| <h3>Flow Accumulation</h3> | |
| <img src="{os.path.basename(flow_acc_file)}" alt="Flow Accumulation"> | |
| </div> | |
| <div class="image-container"> | |
| <h3>3D Topography</h3> | |
| <img src="{os.path.basename(topo_3d_file)}" alt="3D Topography"> | |
| </div> | |
| </div> | |
| <h2>Recommendations</h2> | |
| <p>Based on the simulation results, the following recommendations can be made:</p> | |
| <ul> | |
| <li>Areas with high erosion rates should be stabilized with vegetation or erosion control structures.</li> | |
| <li>Deposition areas may require drainage systems to prevent sediment accumulation.</li> | |
| <li>High flow accumulation areas are prone to channel formation and should be reinforced.</li> | |
| </ul> | |
| <p><em>Report generated on {time.strftime('%Y-%m-%d %H:%M:%S')}</em></p> | |
| </body> | |
| </html> | |
| """ | |
| with open(report_file, "w") as f: | |
| f.write(html) | |
| return report_file | |
| # Main function to run the landscape evolution simulation | |
| def run_landscape_evolution( | |
| dem_file=None, | |
| grid_type="dome", | |
| grid_rows=50, | |
| grid_cols=50, | |
| grid_resolution=10.0, | |
| simulation_steps=1000, | |
| rainfall_rate=0.001, | |
| erosion_rate=0.0001, | |
| diffusion_rate=0.01, | |
| save_animation=False | |
| ): | |
| # Clear previous results | |
| for f in glob.glob(os.path.join(RESULTS_DIR, "*")): | |
| try: | |
| os.remove(f) | |
| except: | |
| pass | |
| # Process input grid | |
| if dem_file is not None: | |
| grid = process_dem(dem_file) | |
| else: | |
| grid = setup_basic_grid(grid_type, grid_rows, grid_cols, grid_resolution) | |
| # Set up parameters | |
| params = { | |
| "n_steps": simulation_steps, | |
| "rainfall_rate": rainfall_rate, | |
| "k_sp": erosion_rate, | |
| "diffusion_rate": diffusion_rate, | |
| "save_steps": save_animation | |
| } | |
| # Save initial state | |
| initial_topo_file = os.path.join(RESULTS_DIR, "initial_topography.png") | |
| visualize_topography(grid, initial_topo_file, title="Initial Topography") | |
| # Run simulation | |
| evolved_grid, stats, states = run_simulation(grid, params) | |
| # Save final state | |
| final_topo_file = os.path.join(RESULTS_DIR, "final_topography.png") | |
| visualize_topography(evolved_grid, final_topo_file, title="Final Topography") | |
| # Visualize the elevation change | |
| diff_topo_file = os.path.join(RESULTS_DIR, "elevation_change.png") | |
| visualize_elevation_change(evolved_grid, diff_topo_file) | |
| # Visualize flow accumulation | |
| flow_acc_file = os.path.join(RESULTS_DIR, "flow_accumulation.png") | |
| visualize_flow_accumulation(evolved_grid, flow_acc_file) | |
| # Create 3D visualization | |
| topo_3d_file = os.path.join(RESULTS_DIR, "topography_3d.png") | |
| visualize_3d(evolved_grid, topo_3d_file, title="Final 3D Topography") | |
| # Create animation if requested | |
| animation_file = None | |
| if save_animation and states: | |
| animation_file = os.path.join(RESULTS_DIR, "evolution_animation.gif") | |
| create_animation(states, evolved_grid, animation_file) | |
| # Export the final DEM | |
| final_dem_file = os.path.join(RESULTS_DIR, "final_dem.asc") | |
| write_esri_ascii(final_dem_file, evolved_grid) | |
| # Generate report | |
| report_file = generate_report( | |
| evolved_grid, | |
| stats, | |
| params, | |
| initial_topo_file, | |
| final_topo_file, | |
| diff_topo_file, | |
| flow_acc_file, | |
| topo_3d_file | |
| ) | |
| return ( | |
| initial_topo_file, | |
| final_topo_file, | |
| diff_topo_file, | |
| flow_acc_file, | |
| topo_3d_file, | |
| animation_file if animation_file else "", | |
| report_file, | |
| final_dem_file | |
| ) | |
| # Set up the Gradio interface | |
| demo = gr.Blocks(title="GeoReclaimAI - Landscape Evolution Model") | |
| with demo: | |
| gr.Markdown("# GeoReclaimAI - Landscape Evolution Model") | |
| gr.Markdown("A tool for simulating erosion patterns on reclamation designs. Upload a DEM file or use the built-in terrain generators.") | |
| with gr.Tabs(): | |
| with gr.TabItem("Simulation Setup"): | |
| with gr.Row(): | |
| with gr.Column(): | |
| # Updated File component for better handling in Gradio 5.24.0 | |
| gr.Markdown(""" | |
| ### Step 1: Upload your DEM file | |
| 1. Click "Browse" to select your ESRI ASCII (.asc) file | |
| 2. After uploading, click "Process Uploaded File" to prepare the file for simulation | |
| 3. Once the file is processed, click "Run Simulation" to start the landscape evolution model | |
| """) | |
| dem_file = gr.File( | |
| label="Upload DEM File (Optional - ESRI ASCII format)", | |
| file_types=[".asc", ".txt"], | |
| type="filepath", | |
| file_count="single", | |
| interactive=True | |
| ) | |
| # Add a file upload status indicator | |
| file_status = gr.Markdown("No file uploaded yet") | |
| # Add a button to explicitly save the uploaded file | |
| upload_button = gr.Button("Process Uploaded File", variant="secondary", size="lg") | |
| # Event handler for file upload | |
| def process_uploaded_file(file_path): | |
| global saved_dem_path | |
| if not file_path: | |
| return "No file selected. Please upload a file first." | |
| if not os.path.exists(file_path): | |
| return f"Error: File not found at {file_path}" | |
| try: | |
| # Check if file appears to be an ESRI ASCII Grid | |
| file_size = os.path.getsize(file_path) | |
| if file_size == 0: | |
| return "Error: File is empty" | |
| # Check first few lines to validate format | |
| with open(file_path, 'r') as f: | |
| try: | |
| # Check header lines | |
| header_valid = True | |
| expected_headers = ["ncols", "nrows", "xll", "yll", "cell", "nodata"] | |
| for i, expected in enumerate(expected_headers): | |
| try: | |
| line = f.readline().strip().lower() | |
| if not line.startswith(expected): | |
| header_valid = False | |
| return f"Error: Invalid ESRI ASCII file format. Expected '{expected}' in line {i+1}, but got: '{line}'" | |
| except Exception: | |
| header_valid = False | |
| return f"Error: File too short, missing required header line {i+1}" | |
| # Try to parse a few data lines | |
| data_lines_valid = True | |
| for i in range(3): # Check first 3 data lines | |
| try: | |
| line = f.readline().strip() | |
| # Check if line contains valid numbers | |
| try: | |
| [float(val) for val in line.split()] | |
| except ValueError: | |
| data_lines_valid = False | |
| return f"Error: Line {i+7} contains non-numeric data: '{line}'" | |
| except Exception: | |
| if i == 0: # Must have at least one data line | |
| return "Error: File contains header but no data lines" | |
| break | |
| except UnicodeDecodeError: | |
| return "Error: File is not a valid text file. Please upload an ASCII file." | |
| except Exception as e: | |
| print(f"Warning during file validation: {str(e)}") | |
| # Copy file to temp directory | |
| filename = os.path.basename(file_path) | |
| temp_path = os.path.join(TEMP_DIR, filename) | |
| # Make sure TEMP_DIR exists | |
| os.makedirs(TEMP_DIR, exist_ok=True) | |
| # Simple file copy | |
| with open(file_path, 'rb') as src, open(temp_path, 'wb') as dst: | |
| dst.write(src.read()) | |
| # Verify file was copied | |
| if not os.path.exists(temp_path): | |
| return "Error: Failed to save file" | |
| # Store path globally | |
| saved_dem_path = temp_path | |
| file_size_mb = file_size / (1024*1024) | |
| return f"File processed and ready: {filename} ({file_size_mb:.2f} MB)" | |
| except Exception as e: | |
| import traceback | |
| print(f"Error processing file: {str(e)}") | |
| print(traceback.format_exc()) | |
| return f"Error processing file: {str(e)}" | |
| # Connect file upload button | |
| upload_button.click( | |
| fn=process_uploaded_file, | |
| inputs=[dem_file], | |
| outputs=[file_status] | |
| ) | |
| with gr.Group(): | |
| gr.Markdown("### Terrain Generator (if no DEM is provided)") | |
| grid_type = gr.Radio( | |
| choices=["dome", "valley", "plateau", "random", "mine_pit", "flat"], | |
| value="dome", | |
| label="Terrain Type" | |
| ) | |
| grid_rows = gr.Slider(minimum=20, maximum=200, value=50, step=10, label="Grid Rows") | |
| grid_cols = gr.Slider(minimum=20, maximum=200, value=50, step=10, label="Grid Columns") | |
| grid_resolution = gr.Slider(minimum=1.0, maximum=100.0, value=10.0, step=1.0, label="Grid Resolution (m)") | |
| with gr.Column(): | |
| with gr.Group(): | |
| gr.Markdown("### Simulation Parameters") | |
| simulation_steps = gr.Slider(minimum=100, maximum=10000, value=1000, step=100, label="Simulation Steps") | |
| rainfall_rate = gr.Slider(minimum=0.0001, maximum=0.01, value=0.001, step=0.0001, label="Rainfall Rate") | |
| erosion_rate = gr.Slider(minimum=0.00001, maximum=0.001, value=0.0001, step=0.00001, label="Erosion Rate (K)") | |
| diffusion_rate = gr.Slider(minimum=0.001, maximum=0.1, value=0.01, step=0.001, label="Hillslope Diffusion Rate") | |
| save_animation = gr.Checkbox(label="Generate Animation (slower)", value=False) | |
| # Split into two buttons for clarity | |
| with gr.Row(): | |
| gr.Markdown("### Step 2: Run Simulation") | |
| with gr.Row(): | |
| run_btn = gr.Button("Run Simulation", variant="primary", size="lg") | |
| debug_btn = gr.Button("Debug File Upload", variant="secondary") | |
| progress_text = gr.Markdown("Simulation status will appear here.") | |
| gr.Markdown("Note: Simulations may take several minutes to complete, especially for larger grids or more steps.") | |
| results_tab = gr.TabItem("Results") | |
| with results_tab: | |
| with gr.Row(): | |
| with gr.Column(): | |
| gr.Markdown("### Initial Topography") | |
| initial_topo = gr.Image(label="Initial Topography") | |
| with gr.Column(): | |
| gr.Markdown("### Final Topography") | |
| final_topo = gr.Image(label="Final Topography") | |
| with gr.Row(): | |
| with gr.Column(): | |
| gr.Markdown("### Erosion and Deposition") | |
| diff_topo = gr.Image(label="Erosion (red) and Deposition (blue)") | |
| with gr.Column(): | |
| gr.Markdown("### Flow Accumulation") | |
| flow_acc = gr.Image(label="Flow Accumulation") | |
| with gr.Row(): | |
| with gr.Column(): | |
| gr.Markdown("### 3D Visualization") | |
| topo_3d = gr.Image(label="3D Topography") | |
| with gr.Column(): | |
| gr.Markdown("### Animation") | |
| evolution_animation = gr.Image(label="Evolution Animation") | |
| with gr.Row(): | |
| with gr.Column(): | |
| gr.Markdown("### Summary Report") | |
| report = gr.File(label="Download Summary Report") | |
| with gr.Column(): | |
| gr.Markdown("### Output Files") | |
| final_dem = gr.File(label="Download Final DEM (ESRI ASCII)") | |
| with gr.TabItem("Help"): | |
| gr.Markdown(""" | |
| # GeoReclaimAI - Help Guide | |
| ## Overview | |
| GeoReclaimAI is a tool for simulating landscape evolution and erosion patterns on reclamation designs. It uses the Landlab framework to model how terrain changes over time due to rainfall, erosion, and sediment transport. | |
| ## Input Options | |
| ### DEM File | |
| You can upload a Digital Elevation Model (DEM) file in ESRI ASCII format. This should be a grid of elevation values with header information about the grid dimensions and cell size. | |
| ### Terrain Generator | |
| If you don't have a DEM file, you can use the built-in terrain generator to create a simple terrain: | |
| - **Dome**: A dome-shaped hill | |
| - **Valley**: A terrain with ridges and valleys | |
| - **Plateau**: A flat-topped plateau with escarpments | |
| - **Random**: A randomly generated terrain with some smoothing | |
| - **Mine Pit**: A terrain with a central depression, like an open-pit mine | |
| - **Flat**: A flat surface with slight random variations | |
| You can also set the grid dimensions and resolution. | |
| ## Simulation Parameters | |
| - **Simulation Steps**: Number of time steps to run the simulation. More steps mean more evolution but take longer to compute. | |
| - **Rainfall Rate**: Controls the intensity of rainfall and thus the rate of erosion. | |
| - **Erosion Rate (K)**: The erodibility coefficient. Higher values mean faster erosion. | |
| - **Hillslope Diffusion Rate**: Controls the rate of hillslope processes like soil creep. Higher values mean faster smoothing of the landscape. | |
| - **Generate Animation**: If checked, the tool will create an animation of the landscape evolution process. This makes the simulation slower. | |
| ## Outputs | |
| ### Visualizations | |
| - **Initial Topography**: The starting terrain | |
| - **Final Topography**: The terrain after simulation | |
| - **Erosion and Deposition**: Red areas show erosion, blue areas show deposition | |
| - **Flow Accumulation**: Shows where water flows and accumulates | |
| - **3D Visualization**: A 3D view of the final terrain | |
| - **Animation**: If requested, an animation of the landscape evolution process | |
| ### Files | |
| - **Summary Report**: An HTML report with simulation results and statistics | |
| - **Final DEM**: The final terrain as an ESRI ASCII file for use in GIS software | |
| ## Tips for Effective Use | |
| 1. Start with a simple terrain and fewer simulation steps to get quick results. | |
| 2. Adjust parameters gradually to see their effects. | |
| 3. For reclamation design, focus on areas with high erosion rates. | |
| 4. Use the flow accumulation output to identify potential drainage issues. | |
| 5. The animation can help understand the evolution process but takes longer to generate. | |
| 6. Export the final DEM for further analysis in GIS software. | |
| """) | |
| # Update the event handler for Gradio 5.24.0 | |
| def run_sim_with_progress(): | |
| progress_text.update("Starting simulation...") | |
| global saved_dem_path | |
| try: | |
| # First check if we have a saved DEM path from the upload button | |
| if saved_dem_path is not None and os.path.exists(saved_dem_path): | |
| print(f"Using previously processed DEM file: {saved_dem_path}") | |
| dem_data = saved_dem_path | |
| # Log file size and check if it's readable | |
| file_size = os.path.getsize(dem_data) | |
| print(f"File size: {file_size} bytes ({file_size / (1024*1024):.2f} MB)") | |
| # Attempt to read the file | |
| try: | |
| with open(dem_data, 'r') as f: | |
| for i, line in enumerate(f): | |
| if i >= 10: | |
| break | |
| print(f"Line {i+1}: {line.strip()}") | |
| except Exception as read_error: | |
| print(f"Warning - File read error (but continuing anyway): {str(read_error)}") | |
| progress_text.update("Processing DEM file...") | |
| else: | |
| # Fallback to the file component value | |
| dem_data = dem_file.value | |
| print(f"DEM file value type: {type(dem_data)}") | |
| print(f"DEM file value content: {dem_data}") | |
| # With type="filepath", dem_data is the direct path to the file or None | |
| if dem_data is not None and dem_data: | |
| print(f"Using uploaded DEM file: {dem_data}") | |
| # Verify the file exists | |
| if os.path.exists(dem_data): | |
| print(f"File exists at path: {dem_data}") | |
| # Copy the file to TEMP_DIR to ensure we have access to it | |
| try: | |
| filename = os.path.basename(dem_data) | |
| temp_path = os.path.join(TEMP_DIR, filename) | |
| with open(dem_data, 'rb') as src, open(temp_path, 'wb') as dst: | |
| dst.write(src.read()) | |
| dem_data = temp_path | |
| saved_dem_path = temp_path | |
| print(f"Copied file to: {temp_path}") | |
| except Exception as copy_error: | |
| print(f"Warning - Copy error (but continuing anyway): {str(copy_error)}") | |
| else: | |
| print(f"File does not exist at path: {dem_data}") | |
| error_msg = "Error: Uploaded file not found" | |
| progress_text.update(error_msg) | |
| return [None] * 8 + [error_msg] | |
| else: | |
| print("No file provided directly. Checking temp directory for ASC files...") | |
| # Check if there are any ASC files in the temp directory that might be the uploaded file | |
| temp_files = os.listdir(TEMP_DIR) | |
| asc_files = [f for f in temp_files if f.endswith('.asc') or f.endswith('.txt')] | |
| if asc_files: | |
| # Use the first ASC file found | |
| dem_data = os.path.join(TEMP_DIR, asc_files[0]) | |
| print(f"Found ASC file in temp directory: {dem_data}") | |
| saved_dem_path = dem_data | |
| else: | |
| print("No ASC files found in temp directory, using terrain generator") | |
| dem_data = None | |
| # Run the landscape evolution simulation | |
| progress_msg = "Starting simulation. This may take several minutes..." | |
| progress_text.update(progress_msg) | |
| try: | |
| outputs = run_landscape_evolution( | |
| dem_file=dem_data, | |
| grid_type=grid_type.value, | |
| grid_rows=grid_rows.value, | |
| grid_cols=grid_cols.value, | |
| grid_resolution=grid_resolution.value, | |
| simulation_steps=simulation_steps.value, | |
| rainfall_rate=rainfall_rate.value, | |
| erosion_rate=erosion_rate.value, | |
| diffusion_rate=diffusion_rate.value, | |
| save_animation=save_animation.value | |
| ) | |
| success_msg = "Simulation completed successfully!" | |
| progress_text.update(success_msg) | |
| # Return the visualization outputs plus the status message | |
| return list(outputs) + [success_msg] | |
| except Exception as sim_error: | |
| import traceback | |
| error_msg = f"Error in simulation: {str(sim_error)}" | |
| print(error_msg) | |
| print(traceback.format_exc()) | |
| progress_text.update(error_msg) | |
| return [None] * 8 + [error_msg] | |
| except Exception as e: | |
| import traceback | |
| error_msg = f"Error setting up simulation: {str(e)}" | |
| print(error_msg) | |
| print(traceback.format_exc()) | |
| progress_text.update(error_msg) | |
| return [None] * 8 + [error_msg] | |
| # Connect the run button to the simulation function using Gradio 5.24.0 syntax | |
| run_btn.click( | |
| fn=run_sim_with_progress, | |
| inputs=None, | |
| outputs=[ | |
| initial_topo, | |
| final_topo, | |
| diff_topo, | |
| flow_acc, | |
| topo_3d, | |
| evolution_animation, | |
| report, | |
| final_dem, | |
| progress_text | |
| ] | |
| ) | |
| # Debug function to check file upload status | |
| def debug_file_upload(): | |
| file_value = dem_file.value | |
| debug_info = [ | |
| f"File Value Type: {type(file_value)}", | |
| f"File Value: {file_value}", | |
| ] | |
| # Check internal component state | |
| debug_info.append(f"File Component State: {dem_file.__dict__.get('value', 'Not Available')}") | |
| if file_value is not None and file_value: | |
| # With type="filepath", file_value is directly the file path | |
| debug_info.append(f"File path: {file_value}") | |
| # Check if file exists | |
| if os.path.exists(file_value): | |
| debug_info.append(f"File exists: Yes") | |
| file_size = os.path.getsize(file_value) | |
| debug_info.append(f"File size: {file_size} bytes ({file_size / (1024*1024):.2f} MB)") | |
| # Try to read the first few lines to verify content | |
| try: | |
| with open(file_value, 'r') as f: | |
| first_lines = [next(f) for _ in range(10) if _ < 10] | |
| debug_info.append("First 10 lines of file:") | |
| for i, line in enumerate(first_lines): | |
| debug_info.append(f" Line {i+1}: {line.strip()}") | |
| except Exception as e: | |
| debug_info.append(f"Error reading file: {str(e)}") | |
| else: | |
| debug_info.append(f"File exists: No") | |
| # List files in temp directory | |
| debug_info.append("Files in temp directory:") | |
| for f in os.listdir(TEMP_DIR): | |
| f_path = os.path.join(TEMP_DIR, f) | |
| f_size = os.path.getsize(f_path) | |
| debug_info.append(f" {f} ({f_size / (1024*1024):.2f} MB)") | |
| else: | |
| debug_info.append("No file uploaded or file value is None") | |
| # Check if there are any files in the temp directory | |
| temp_files = os.listdir(TEMP_DIR) | |
| if temp_files: | |
| debug_info.append("Files in temp directory that might be the uploaded file:") | |
| for f in temp_files: | |
| f_path = os.path.join(TEMP_DIR, f) | |
| if f.endswith('.asc') or f.endswith('.txt'): | |
| f_size = os.path.getsize(f_path) | |
| debug_info.append(f" {f} ({f_size / (1024*1024):.2f} MB)") | |
| # Try to read the first few lines | |
| try: | |
| with open(f_path, 'r') as file: | |
| first_lines = [next(file) for _ in range(5) if _ < 5] | |
| debug_info.append(" First 5 lines:") | |
| for i, line in enumerate(first_lines): | |
| debug_info.append(f" Line {i+1}: {line.strip()}") | |
| except Exception as e: | |
| debug_info.append(f" Error reading file: {str(e)}") | |
| return "\n".join(debug_info) | |
| # Connect the debug button | |
| debug_btn.click( | |
| fn=debug_file_upload, | |
| inputs=None, | |
| outputs=progress_text | |
| ) | |
| # Launch the interface | |
| if __name__ == "__main__": | |
| demo.launch() | |