import os import numpy as np import matplotlib.pyplot as plt import gradio as gr from landlab import RasterModelGrid, imshow_grid from landlab.components import ( FlowAccumulator, FastscapeEroder, LinearDiffuser ) from landlab.io import read_esri_ascii, write_esri_ascii import matplotlib matplotlib.use('Agg') # Use non-interactive backend for Matplotlib import time import glob from mpl_toolkits.mplot3d import Axes3D from scipy.ndimage import gaussian_filter # Set up paths and directories RESULTS_DIR = "results" TEMP_DIR = "temp" os.makedirs(RESULTS_DIR, exist_ok=True) os.makedirs(TEMP_DIR, exist_ok=True) # Global variable to store the path to the uploaded DEM file saved_dem_path = None # Set up a basic grid def setup_basic_grid(grid_type, rows=50, cols=50, dx=10.0): """Create a grid with different initial topographies.""" mg = RasterModelGrid((rows, cols), dx) # Create node coordinates for the grid x, y = np.meshgrid(np.linspace(0, cols-1, cols), np.linspace(0, rows-1, rows)) center_x, center_y = cols / 2, rows / 2 # Define elevation based on grid type if grid_type == "dome": # Dome-shaped topography distance = np.sqrt((x - center_x)**2 + (y - center_y)**2) elevation = 1000 - distance * 10 elif grid_type == "valley": # Valley with ridges elevation = 1000 + 500 * np.sin(x / cols * np.pi * 2) + 500 * np.cos(y / rows * np.pi * 2) elif grid_type == "plateau": # Plateau with escarpment distance = np.sqrt((x - center_x)**2 + (y - center_y)**2) elevation = 1000 * (distance < (cols/3)) + 500 * ((distance >= (cols/3)) & (distance < (cols/2))) + 200 * (distance >= (cols/2)) elif grid_type == "random": # Random terrain elevation = 1000 + 200 * np.random.rand(rows, cols) # Smooth the random terrain elevation = gaussian_filter(elevation, sigma=2) elif grid_type == "mine_pit": # Mining pit with surrounding area distance = np.sqrt((x - center_x)**2 + (y - center_y)**2) elevation = 1000 - 800 * np.exp(-distance**2 / (cols/4)**2) else: # Default is flat with noise elevation = 1000 + 50 * np.random.rand(rows, cols) # Add the topographic elevation field mg.add_field("topographic__elevation", elevation.flatten(), at="node") return mg # Process the uploaded DEM def process_dem(dem_file): """Process an uploaded DEM file.""" # With type="filepath", dem_file is directly the path to the uploaded file, or None if dem_file is None: print("No DEM file provided, using default dome") # Return a default grid if no file is provided return setup_basic_grid("dome") print(f"Processing DEM file: {dem_file} (type: {type(dem_file)})") try: # Check if file exists if not os.path.exists(dem_file): print(f"File does not exist at path: {dem_file}") return setup_basic_grid("dome") # Try to read the file directly to diagnose format issues try: with open(dem_file, 'r') as f: header_lines = [] for i in range(6): # ESRI ASCII files should have 6 header lines try: line = next(f).strip() header_lines.append(line) print(f"Header line {i+1}: {line}") except StopIteration: print(f"File too short, only {i} header lines") break # Check if the header looks correct has_valid_header = ( len(header_lines) >= 6 and header_lines[0].lower().startswith('ncols') and header_lines[1].lower().startswith('nrows') and header_lines[2].lower().startswith('xll') and header_lines[3].lower().startswith('yll') and header_lines[4].lower().startswith('cell') and header_lines[5].lower().startswith('nodata') ) if has_valid_header: print("File has valid ESRI ASCII header format") else: print("File doesn't have valid ESRI ASCII header format") # Read a few data lines data_lines = [] for i in range(4): # Read 4 data lines try: line = next(f).strip() data_lines.append(line) print(f"Data line {i+1}: {line}") except StopIteration: break except Exception as file_read_e: print(f"Warning: Could not read the file directly: {file_read_e}") has_valid_header = False # Read the DEM file with landlab try: print(f"Attempting to read file with landlab.io.read_esri_ascii: {dem_file}") try: (grid, elevation) = read_esri_ascii(dem_file) print(f"Successfully read DEM file with grid shape: {grid.shape}") return grid except Exception as e: print(f"Error with read_esri_ascii: {e}") # If there's no valid header or read_esri_ascii failed, try manual parsing if not has_valid_header: raise ValueError("Invalid ESRI ASCII header") # Try a different approach if the header is invalid but seems to be in the right format try: print("Trying to parse ASCII grid manually...") header = {} # Parse header from our previously read header lines if len(header_lines) >= 6: try: header['ncols'] = int(header_lines[0].split()[1]) header['nrows'] = int(header_lines[1].split()[1]) header['xllcorner'] = float(header_lines[2].split()[1]) header['yllcorner'] = float(header_lines[3].split()[1]) header['cellsize'] = float(header_lines[4].split()[1]) header['nodata_value'] = float(header_lines[5].split()[1]) except (IndexError, ValueError) as parse_error: print(f"Error parsing header lines: {parse_error}") raise ValueError("Invalid header values") else: # If we don't have header lines, try to reopen the file with open(dem_file, 'r') as f: header['ncols'] = int(f.readline().split()[1]) header['nrows'] = int(f.readline().split()[1]) header['xllcorner'] = float(f.readline().split()[1]) header['yllcorner'] = float(f.readline().split()[1]) header['cellsize'] = float(f.readline().split()[1]) header['nodata_value'] = float(f.readline().split()[1]) # Validate the header values make sense if header['ncols'] <= 0 or header['nrows'] <= 0 or header['cellsize'] <= 0: raise ValueError("Invalid header values (negative or zero dimensions)") if header['ncols'] > 10000 or header['nrows'] > 10000: raise ValueError("Grid too large, dimensions exceed 10000") print(f"Header parsed: ncols={header['ncols']}, nrows={header['nrows']}, cellsize={header['cellsize']}") # Create empty grid mg = RasterModelGrid((header['nrows'], header['ncols']), header['cellsize']) # Read elevation data with open(dem_file, 'r') as f: # Skip header lines for i in range(6): f.readline() # Read all data lines data = [] for line in f: data.extend([float(val) for val in line.split()]) # Check if we have the right amount of data expected_data_points = header['nrows'] * header['ncols'] if len(data) != expected_data_points: print(f"Warning: Data length mismatch. Expected {expected_data_points}, got {len(data)}") # Fill or truncate if needed if len(data) < expected_data_points: nodata = header['nodata_value'] data.extend([nodata] * (expected_data_points - len(data))) print(f"Extended data with {expected_data_points - len(data)} nodata values") else: data = data[:expected_data_points] print(f"Truncated data to {expected_data_points} values") # Set elevation field mg.add_field("topographic__elevation", np.array(data), at="node") print(f"Successfully parsed ASCII grid manually. Grid shape: {mg.shape}") return mg except Exception as manual_e: import traceback print(f"Error with manual parsing: {manual_e}") print(traceback.format_exc()) print("Falling back to default grid") return setup_basic_grid("dome") except Exception as inner_e: import traceback print(f"Error reading DEM with read_esri_ascii: {inner_e}") print(traceback.format_exc()) # Return a default grid if there's an error return setup_basic_grid("dome") except Exception as e: import traceback print(f"Error processing DEM file: {e}") print(traceback.format_exc()) # Return a default grid if there's an error return setup_basic_grid("dome") # Run the landscape evolution simulation def run_simulation(grid, params): """Run a landscape evolution simulation with the given parameters.""" # Extract parameters n_steps = params.get("n_steps", 1000) rainfall_rate = params.get("rainfall_rate", 0.001) k_sp = params.get("k_sp", 0.0001) diffusion_rate = params.get("diffusion_rate", 0.01) # Instantiate components flow_router = FlowAccumulator( grid, flow_director='D8', depression_finder='DepressionFinderAndRouter' ) # Stream power erosion stream_power = FastscapeEroder( grid, K_sp=k_sp, m_sp=0.5, n_sp=1.0 ) # Linear diffusion (hillslope processes) diffusion = LinearDiffuser( grid, linear_diffusivity=diffusion_rate ) # Save initial state initial_elevation = grid.at_node['topographic__elevation'].copy() # Store states for animation states = [] if params.get("save_steps", False): # Save the initial state snapshot = { 'step': 0, 'elevation': grid.at_node['topographic__elevation'].copy() } states.append(snapshot) # Run the simulation for a specified number of time steps timestep = rainfall_rate # Using rainfall rate as the time step # Optionally save intermediate steps for animation steps_to_save = min(20, n_steps) # Save at most 20 steps for animation save_interval = max(1, n_steps // steps_to_save) # Progress tracking progress_interval = max(1, n_steps // 10) for i in range(n_steps): # Run components for one time step flow_router.run_one_step() stream_power.run_one_step(dt=timestep) diffusion.run_one_step(dt=timestep) # Save intermediate states for animation if requested if params.get("save_steps", False) and i % save_interval == 0: snapshot = { 'step': i + 1, 'elevation': grid.at_node['topographic__elevation'].copy() } states.append(snapshot) # Print progress if (i + 1) % progress_interval == 0: print(f"Progress: {i + 1}/{n_steps} steps completed") # Calculate the difference (erosion and deposition) final_elevation = grid.at_node['topographic__elevation'] difference = final_elevation - initial_elevation grid.add_field("topographic__change", difference, at="node") # Calculate some statistics total_erosion = abs(sum(difference[difference < 0])) total_deposition = sum(difference[difference > 0]) max_erosion = abs(min(difference)) max_deposition = max(difference) stats = { 'total_erosion': total_erosion, 'total_deposition': total_deposition, 'max_erosion': max_erosion, 'max_deposition': max_deposition, 'net_change': total_deposition - total_erosion } return grid, stats, states # Visualize the topography def visualize_topography(grid, filename=None, title="Topographic Elevation", colormap="terrain"): """Create a visualization of the topography.""" plt.figure(figsize=(10, 8)) # Plot topography imshow_grid(grid, 'topographic__elevation', cmap=colormap, grid_units=('m', 'm')) plt.colorbar(label='Elevation (m)') plt.title(title) if filename: plt.savefig(filename, dpi=300, bbox_inches='tight') plt.close() return filename else: plt.close() return plt # Visualize the elevation change def visualize_elevation_change(grid, filename=None): """Create a visualization of the elevation change (erosion and deposition).""" plt.figure(figsize=(10, 8)) # Get the elevation change difference = grid.at_node['topographic__change'] # Set symmetric limits for the colormap vmax = max(abs(np.min(difference)), abs(np.max(difference))) vmin = -vmax # Plot the elevation change imshow_grid( grid, 'topographic__change', cmap='RdBu_r', # Red for erosion, blue for deposition grid_units=('m', 'm'), vmin=vmin, vmax=vmax ) plt.colorbar(label='Elevation Change (m)') plt.title('Erosion (red) and Deposition (blue)') if filename: plt.savefig(filename, dpi=300, bbox_inches='tight') plt.close() return filename else: plt.close() return plt # Create a 3D visualization def visualize_3d(grid, filename=None, title="3D Topography"): """Create a 3D visualization of the topography.""" fig = plt.figure(figsize=(12, 10)) ax = fig.add_subplot(111, projection='3d') # Get the shape of the grid nc = grid.number_of_node_columns nr = grid.number_of_node_rows # Reshape the elevation data elevation = grid.at_node['topographic__elevation'].reshape(nr, nc) # Create the meshgrid x, y = np.meshgrid(np.arange(nc), np.arange(nr)) # Create the 3D surface plot surf = ax.plot_surface( x, y, elevation, cmap='terrain', linewidth=0, antialiased=True, alpha=0.8 ) # Add a color bar fig.colorbar(surf, ax=ax, shrink=0.5, aspect=5, label='Elevation (m)') # Set labels and title ax.set_xlabel('X (grid cells)') ax.set_ylabel('Y (grid cells)') ax.set_zlabel('Elevation (m)') ax.set_title(title) # Adjust the viewing angle ax.view_init(elev=30, azim=225) if filename: plt.savefig(filename, dpi=300, bbox_inches='tight') plt.close() return filename else: plt.close() return plt # Create an animation of the evolution def create_animation(states, grid, filename=None): """Create an animation of the landscape evolution.""" from matplotlib.animation import FuncAnimation # Get the shape of the grid nc = grid.number_of_node_columns nr = grid.number_of_node_rows fig, ax = plt.subplots(figsize=(10, 8)) # Set up the initial plot initial_elevation = states[0]['elevation'].reshape(nr, nc) img = ax.imshow(initial_elevation, cmap='terrain', aspect='equal') plt.colorbar(img, ax=ax, label='Elevation (m)') title = ax.set_title(f"Step: {states[0]['step']}") # Animation update function def update(frame): # Update the image data elevation = states[frame]['elevation'].reshape(nr, nc) img.set_array(elevation) title.set_text(f"Step: {states[frame]['step']}") return img, title # Create the animation ani = FuncAnimation( fig, update, frames=range(len(states)), interval=200, # milliseconds between frames blit=True ) if filename: # Save the animation ani.save(filename, writer='pillow', fps=5) plt.close() return filename else: plt.close() return ani # Create a flow accumulation visualization def visualize_flow_accumulation(grid, filename=None): """Create a visualization of flow accumulation.""" # Make sure we have flow accumulation data if 'flow__accumulation' not in grid.at_node: # Run the flow router to get accumulation flow_router = FlowAccumulator( grid, flow_director='D8', depression_finder='DepressionFinderAndRouter' ) flow_router.run_one_step() plt.figure(figsize=(10, 8)) # Log-transform the accumulation data for better visualization log_acc = np.log10(grid.at_node['flow__accumulation'] + 1) grid.add_field("log_flow__accumulation", log_acc, at="node") # Plot the flow accumulation imshow_grid( grid, 'log_flow__accumulation', cmap='Blues', grid_units=('m', 'm') ) plt.colorbar(label='Log10 Flow Accumulation') plt.title('Flow Accumulation (log scale)') if filename: plt.savefig(filename, dpi=300, bbox_inches='tight') plt.close() return filename else: plt.close() return plt # Generate a summary report def generate_report(grid, stats, params, initial_topo_file, final_topo_file, diff_topo_file, flow_acc_file, topo_3d_file): """Generate a summary HTML report.""" report_file = os.path.join(RESULTS_DIR, "summary_report.html") # Create a simple HTML report html = f"""
| Parameter | Value |
|---|---|
| Simulation Steps | {params.get('n_steps', 1000)} |
| Rainfall Rate | {params.get('rainfall_rate', 0.001)} |
| Erosion Rate (K) | {params.get('k_sp', 0.0001)} |
| Hillslope Diffusion Rate | {params.get('diffusion_rate', 0.01)} |
Total Erosion Volume: {stats['total_erosion']:.2f} m³
Total Deposition Volume: {stats['total_deposition']:.2f} m³
Maximum Erosion Depth: {stats['max_erosion']:.2f} m
Maximum Deposition Height: {stats['max_deposition']:.2f} m
Net Volume Change: {stats['net_change']:.2f} m³
Based on the simulation results, the following recommendations can be made:
Report generated on {time.strftime('%Y-%m-%d %H:%M:%S')}
""" with open(report_file, "w") as f: f.write(html) return report_file # Main function to run the landscape evolution simulation def run_landscape_evolution( dem_file=None, grid_type="dome", grid_rows=50, grid_cols=50, grid_resolution=10.0, simulation_steps=1000, rainfall_rate=0.001, erosion_rate=0.0001, diffusion_rate=0.01, save_animation=False ): # Clear previous results for f in glob.glob(os.path.join(RESULTS_DIR, "*")): try: os.remove(f) except: pass # Process input grid if dem_file is not None: grid = process_dem(dem_file) else: grid = setup_basic_grid(grid_type, grid_rows, grid_cols, grid_resolution) # Set up parameters params = { "n_steps": simulation_steps, "rainfall_rate": rainfall_rate, "k_sp": erosion_rate, "diffusion_rate": diffusion_rate, "save_steps": save_animation } # Save initial state initial_topo_file = os.path.join(RESULTS_DIR, "initial_topography.png") visualize_topography(grid, initial_topo_file, title="Initial Topography") # Run simulation evolved_grid, stats, states = run_simulation(grid, params) # Save final state final_topo_file = os.path.join(RESULTS_DIR, "final_topography.png") visualize_topography(evolved_grid, final_topo_file, title="Final Topography") # Visualize the elevation change diff_topo_file = os.path.join(RESULTS_DIR, "elevation_change.png") visualize_elevation_change(evolved_grid, diff_topo_file) # Visualize flow accumulation flow_acc_file = os.path.join(RESULTS_DIR, "flow_accumulation.png") visualize_flow_accumulation(evolved_grid, flow_acc_file) # Create 3D visualization topo_3d_file = os.path.join(RESULTS_DIR, "topography_3d.png") visualize_3d(evolved_grid, topo_3d_file, title="Final 3D Topography") # Create animation if requested animation_file = None if save_animation and states: animation_file = os.path.join(RESULTS_DIR, "evolution_animation.gif") create_animation(states, evolved_grid, animation_file) # Export the final DEM final_dem_file = os.path.join(RESULTS_DIR, "final_dem.asc") write_esri_ascii(final_dem_file, evolved_grid) # Generate report report_file = generate_report( evolved_grid, stats, params, initial_topo_file, final_topo_file, diff_topo_file, flow_acc_file, topo_3d_file ) return ( initial_topo_file, final_topo_file, diff_topo_file, flow_acc_file, topo_3d_file, animation_file if animation_file else "", report_file, final_dem_file ) # Set up the Gradio interface demo = gr.Blocks(title="GeoReclaimAI - Landscape Evolution Model") with demo: gr.Markdown("# GeoReclaimAI - Landscape Evolution Model") gr.Markdown("A tool for simulating erosion patterns on reclamation designs. Upload a DEM file or use the built-in terrain generators.") with gr.Tabs(): with gr.TabItem("Simulation Setup"): with gr.Row(): with gr.Column(): # Updated File component for better handling in Gradio 5.24.0 gr.Markdown(""" ### Step 1: Upload your DEM file 1. Click "Browse" to select your ESRI ASCII (.asc) file 2. After uploading, click "Process Uploaded File" to prepare the file for simulation 3. Once the file is processed, click "Run Simulation" to start the landscape evolution model """) dem_file = gr.File( label="Upload DEM File (Optional - ESRI ASCII format)", file_types=[".asc", ".txt"], type="filepath", file_count="single", interactive=True ) # Add a file upload status indicator file_status = gr.Markdown("No file uploaded yet") # Add a button to explicitly save the uploaded file upload_button = gr.Button("Process Uploaded File", variant="secondary", size="lg") # Event handler for file upload def process_uploaded_file(file_path): global saved_dem_path if not file_path: return "No file selected. Please upload a file first." if not os.path.exists(file_path): return f"Error: File not found at {file_path}" try: # Check if file appears to be an ESRI ASCII Grid file_size = os.path.getsize(file_path) if file_size == 0: return "Error: File is empty" # Check first few lines to validate format with open(file_path, 'r') as f: try: # Check header lines header_valid = True expected_headers = ["ncols", "nrows", "xll", "yll", "cell", "nodata"] for i, expected in enumerate(expected_headers): try: line = f.readline().strip().lower() if not line.startswith(expected): header_valid = False return f"Error: Invalid ESRI ASCII file format. Expected '{expected}' in line {i+1}, but got: '{line}'" except Exception: header_valid = False return f"Error: File too short, missing required header line {i+1}" # Try to parse a few data lines data_lines_valid = True for i in range(3): # Check first 3 data lines try: line = f.readline().strip() # Check if line contains valid numbers try: [float(val) for val in line.split()] except ValueError: data_lines_valid = False return f"Error: Line {i+7} contains non-numeric data: '{line}'" except Exception: if i == 0: # Must have at least one data line return "Error: File contains header but no data lines" break except UnicodeDecodeError: return "Error: File is not a valid text file. Please upload an ASCII file." except Exception as e: print(f"Warning during file validation: {str(e)}") # Copy file to temp directory filename = os.path.basename(file_path) temp_path = os.path.join(TEMP_DIR, filename) # Make sure TEMP_DIR exists os.makedirs(TEMP_DIR, exist_ok=True) # Simple file copy with open(file_path, 'rb') as src, open(temp_path, 'wb') as dst: dst.write(src.read()) # Verify file was copied if not os.path.exists(temp_path): return "Error: Failed to save file" # Store path globally saved_dem_path = temp_path file_size_mb = file_size / (1024*1024) return f"File processed and ready: {filename} ({file_size_mb:.2f} MB)" except Exception as e: import traceback print(f"Error processing file: {str(e)}") print(traceback.format_exc()) return f"Error processing file: {str(e)}" # Connect file upload button upload_button.click( fn=process_uploaded_file, inputs=[dem_file], outputs=[file_status] ) with gr.Group(): gr.Markdown("### Terrain Generator (if no DEM is provided)") grid_type = gr.Radio( choices=["dome", "valley", "plateau", "random", "mine_pit", "flat"], value="dome", label="Terrain Type" ) grid_rows = gr.Slider(minimum=20, maximum=200, value=50, step=10, label="Grid Rows") grid_cols = gr.Slider(minimum=20, maximum=200, value=50, step=10, label="Grid Columns") grid_resolution = gr.Slider(minimum=1.0, maximum=100.0, value=10.0, step=1.0, label="Grid Resolution (m)") with gr.Column(): with gr.Group(): gr.Markdown("### Simulation Parameters") simulation_steps = gr.Slider(minimum=100, maximum=10000, value=1000, step=100, label="Simulation Steps") rainfall_rate = gr.Slider(minimum=0.0001, maximum=0.01, value=0.001, step=0.0001, label="Rainfall Rate") erosion_rate = gr.Slider(minimum=0.00001, maximum=0.001, value=0.0001, step=0.00001, label="Erosion Rate (K)") diffusion_rate = gr.Slider(minimum=0.001, maximum=0.1, value=0.01, step=0.001, label="Hillslope Diffusion Rate") save_animation = gr.Checkbox(label="Generate Animation (slower)", value=False) # Split into two buttons for clarity with gr.Row(): gr.Markdown("### Step 2: Run Simulation") with gr.Row(): run_btn = gr.Button("Run Simulation", variant="primary", size="lg") debug_btn = gr.Button("Debug File Upload", variant="secondary") progress_text = gr.Markdown("Simulation status will appear here.") gr.Markdown("Note: Simulations may take several minutes to complete, especially for larger grids or more steps.") results_tab = gr.TabItem("Results") with results_tab: with gr.Row(): with gr.Column(): gr.Markdown("### Initial Topography") initial_topo = gr.Image(label="Initial Topography") with gr.Column(): gr.Markdown("### Final Topography") final_topo = gr.Image(label="Final Topography") with gr.Row(): with gr.Column(): gr.Markdown("### Erosion and Deposition") diff_topo = gr.Image(label="Erosion (red) and Deposition (blue)") with gr.Column(): gr.Markdown("### Flow Accumulation") flow_acc = gr.Image(label="Flow Accumulation") with gr.Row(): with gr.Column(): gr.Markdown("### 3D Visualization") topo_3d = gr.Image(label="3D Topography") with gr.Column(): gr.Markdown("### Animation") evolution_animation = gr.Image(label="Evolution Animation") with gr.Row(): with gr.Column(): gr.Markdown("### Summary Report") report = gr.File(label="Download Summary Report") with gr.Column(): gr.Markdown("### Output Files") final_dem = gr.File(label="Download Final DEM (ESRI ASCII)") with gr.TabItem("Help"): gr.Markdown(""" # GeoReclaimAI - Help Guide ## Overview GeoReclaimAI is a tool for simulating landscape evolution and erosion patterns on reclamation designs. It uses the Landlab framework to model how terrain changes over time due to rainfall, erosion, and sediment transport. ## Input Options ### DEM File You can upload a Digital Elevation Model (DEM) file in ESRI ASCII format. This should be a grid of elevation values with header information about the grid dimensions and cell size. ### Terrain Generator If you don't have a DEM file, you can use the built-in terrain generator to create a simple terrain: - **Dome**: A dome-shaped hill - **Valley**: A terrain with ridges and valleys - **Plateau**: A flat-topped plateau with escarpments - **Random**: A randomly generated terrain with some smoothing - **Mine Pit**: A terrain with a central depression, like an open-pit mine - **Flat**: A flat surface with slight random variations You can also set the grid dimensions and resolution. ## Simulation Parameters - **Simulation Steps**: Number of time steps to run the simulation. More steps mean more evolution but take longer to compute. - **Rainfall Rate**: Controls the intensity of rainfall and thus the rate of erosion. - **Erosion Rate (K)**: The erodibility coefficient. Higher values mean faster erosion. - **Hillslope Diffusion Rate**: Controls the rate of hillslope processes like soil creep. Higher values mean faster smoothing of the landscape. - **Generate Animation**: If checked, the tool will create an animation of the landscape evolution process. This makes the simulation slower. ## Outputs ### Visualizations - **Initial Topography**: The starting terrain - **Final Topography**: The terrain after simulation - **Erosion and Deposition**: Red areas show erosion, blue areas show deposition - **Flow Accumulation**: Shows where water flows and accumulates - **3D Visualization**: A 3D view of the final terrain - **Animation**: If requested, an animation of the landscape evolution process ### Files - **Summary Report**: An HTML report with simulation results and statistics - **Final DEM**: The final terrain as an ESRI ASCII file for use in GIS software ## Tips for Effective Use 1. Start with a simple terrain and fewer simulation steps to get quick results. 2. Adjust parameters gradually to see their effects. 3. For reclamation design, focus on areas with high erosion rates. 4. Use the flow accumulation output to identify potential drainage issues. 5. The animation can help understand the evolution process but takes longer to generate. 6. Export the final DEM for further analysis in GIS software. """) # Update the event handler for Gradio 5.24.0 def run_sim_with_progress(): progress_text.update("Starting simulation...") global saved_dem_path try: # First check if we have a saved DEM path from the upload button if saved_dem_path is not None and os.path.exists(saved_dem_path): print(f"Using previously processed DEM file: {saved_dem_path}") dem_data = saved_dem_path # Log file size and check if it's readable file_size = os.path.getsize(dem_data) print(f"File size: {file_size} bytes ({file_size / (1024*1024):.2f} MB)") # Attempt to read the file try: with open(dem_data, 'r') as f: for i, line in enumerate(f): if i >= 10: break print(f"Line {i+1}: {line.strip()}") except Exception as read_error: print(f"Warning - File read error (but continuing anyway): {str(read_error)}") progress_text.update("Processing DEM file...") else: # Fallback to the file component value dem_data = dem_file.value print(f"DEM file value type: {type(dem_data)}") print(f"DEM file value content: {dem_data}") # With type="filepath", dem_data is the direct path to the file or None if dem_data is not None and dem_data: print(f"Using uploaded DEM file: {dem_data}") # Verify the file exists if os.path.exists(dem_data): print(f"File exists at path: {dem_data}") # Copy the file to TEMP_DIR to ensure we have access to it try: filename = os.path.basename(dem_data) temp_path = os.path.join(TEMP_DIR, filename) with open(dem_data, 'rb') as src, open(temp_path, 'wb') as dst: dst.write(src.read()) dem_data = temp_path saved_dem_path = temp_path print(f"Copied file to: {temp_path}") except Exception as copy_error: print(f"Warning - Copy error (but continuing anyway): {str(copy_error)}") else: print(f"File does not exist at path: {dem_data}") error_msg = "Error: Uploaded file not found" progress_text.update(error_msg) return [None] * 8 + [error_msg] else: print("No file provided directly. Checking temp directory for ASC files...") # Check if there are any ASC files in the temp directory that might be the uploaded file temp_files = os.listdir(TEMP_DIR) asc_files = [f for f in temp_files if f.endswith('.asc') or f.endswith('.txt')] if asc_files: # Use the first ASC file found dem_data = os.path.join(TEMP_DIR, asc_files[0]) print(f"Found ASC file in temp directory: {dem_data}") saved_dem_path = dem_data else: print("No ASC files found in temp directory, using terrain generator") dem_data = None # Run the landscape evolution simulation progress_msg = "Starting simulation. This may take several minutes..." progress_text.update(progress_msg) try: outputs = run_landscape_evolution( dem_file=dem_data, grid_type=grid_type.value, grid_rows=grid_rows.value, grid_cols=grid_cols.value, grid_resolution=grid_resolution.value, simulation_steps=simulation_steps.value, rainfall_rate=rainfall_rate.value, erosion_rate=erosion_rate.value, diffusion_rate=diffusion_rate.value, save_animation=save_animation.value ) success_msg = "Simulation completed successfully!" progress_text.update(success_msg) # Return the visualization outputs plus the status message return list(outputs) + [success_msg] except Exception as sim_error: import traceback error_msg = f"Error in simulation: {str(sim_error)}" print(error_msg) print(traceback.format_exc()) progress_text.update(error_msg) return [None] * 8 + [error_msg] except Exception as e: import traceback error_msg = f"Error setting up simulation: {str(e)}" print(error_msg) print(traceback.format_exc()) progress_text.update(error_msg) return [None] * 8 + [error_msg] # Connect the run button to the simulation function using Gradio 5.24.0 syntax run_btn.click( fn=run_sim_with_progress, inputs=None, outputs=[ initial_topo, final_topo, diff_topo, flow_acc, topo_3d, evolution_animation, report, final_dem, progress_text ] ) # Debug function to check file upload status def debug_file_upload(): file_value = dem_file.value debug_info = [ f"File Value Type: {type(file_value)}", f"File Value: {file_value}", ] # Check internal component state debug_info.append(f"File Component State: {dem_file.__dict__.get('value', 'Not Available')}") if file_value is not None and file_value: # With type="filepath", file_value is directly the file path debug_info.append(f"File path: {file_value}") # Check if file exists if os.path.exists(file_value): debug_info.append(f"File exists: Yes") file_size = os.path.getsize(file_value) debug_info.append(f"File size: {file_size} bytes ({file_size / (1024*1024):.2f} MB)") # Try to read the first few lines to verify content try: with open(file_value, 'r') as f: first_lines = [next(f) for _ in range(10) if _ < 10] debug_info.append("First 10 lines of file:") for i, line in enumerate(first_lines): debug_info.append(f" Line {i+1}: {line.strip()}") except Exception as e: debug_info.append(f"Error reading file: {str(e)}") else: debug_info.append(f"File exists: No") # List files in temp directory debug_info.append("Files in temp directory:") for f in os.listdir(TEMP_DIR): f_path = os.path.join(TEMP_DIR, f) f_size = os.path.getsize(f_path) debug_info.append(f" {f} ({f_size / (1024*1024):.2f} MB)") else: debug_info.append("No file uploaded or file value is None") # Check if there are any files in the temp directory temp_files = os.listdir(TEMP_DIR) if temp_files: debug_info.append("Files in temp directory that might be the uploaded file:") for f in temp_files: f_path = os.path.join(TEMP_DIR, f) if f.endswith('.asc') or f.endswith('.txt'): f_size = os.path.getsize(f_path) debug_info.append(f" {f} ({f_size / (1024*1024):.2f} MB)") # Try to read the first few lines try: with open(f_path, 'r') as file: first_lines = [next(file) for _ in range(5) if _ < 5] debug_info.append(" First 5 lines:") for i, line in enumerate(first_lines): debug_info.append(f" Line {i+1}: {line.strip()}") except Exception as e: debug_info.append(f" Error reading file: {str(e)}") return "\n".join(debug_info) # Connect the debug button debug_btn.click( fn=debug_file_upload, inputs=None, outputs=progress_text ) # Launch the interface if __name__ == "__main__": demo.launch()