""" MineWatchAI - AI-Powered Mining Rehabilitation Monitoring Main Streamlit Application AI-powered environmental intelligence for measurable, audit-ready mining rehabilitation. """ import streamlit as st import json import gc from datetime import datetime, timedelta from pathlib import Path # Page configuration - MUST be first Streamlit command st.set_page_config( page_title="MineWatchAI - Green Tech", page_icon="🤖", layout="wide", initial_sidebar_state="expanded" ) # Show loading message while imports happen with st.spinner("Loading MineWatchAI..."): # Lazy imports for faster initial load from streamlit_folium import st_folium import folium def load_css(): """Load custom CSS styling.""" css_path = Path(__file__).parent / "assets" / "style.css" if css_path.exists(): with open(css_path) as f: st.markdown(f"", unsafe_allow_html=True) @st.cache_data def load_examples(): """Load example mines data from JSON file.""" json_path = Path(__file__).parent / "data" / "examples.json" if json_path.exists(): with open(json_path) as f: return json.load(f) return {"examples": [], "default_location": {"center": [-28.0, 121.0], "zoom": 6}} def get_bbox_from_example(example: dict) -> tuple: """Get bounding box from example mine data.""" geom = example.get("geometry", {}) coords = geom.get("coordinates", []) if len(coords) == 4: return tuple(coords) else: center = example.get("center", [-28.0, 121.0]) lat, lon = center return (lon - 0.05, lat - 0.05, lon + 0.05, lat + 0.05) def create_simple_map(center, zoom=10, bbox=None): """Create a simple Folium map.""" m = folium.Map(location=center, zoom_start=zoom) # Add satellite imagery folium.TileLayer( tiles='https://server.arcgisonline.com/ArcGIS/rest/services/World_Imagery/MapServer/tile/{z}/{y}/{x}', attr='Esri', name='Satellite', overlay=False ).add_to(m) if bbox is not None: min_lon, min_lat, max_lon, max_lat = bbox boundary_coords = [ [min_lat, min_lon], [min_lat, max_lon], [max_lat, max_lon], [max_lat, min_lon], [min_lat, min_lon] ] folium.Polygon( locations=boundary_coords, color='#1B5E20', weight=3, fill=True, fillColor='#2E7D32', fillOpacity=0.2, popup='Analysis Area' ).add_to(m) folium.LayerControl().add_to(m) return m def run_analysis(bbox, mine_info, date_before, date_after): """Run the comprehensive vegetation change analysis.""" # Import heavy modules only when needed from src.stac_utils import get_bbox_center from src.analysis import ( analyze_vegetation_change, analyze_terrain, analyze_land_cover, calculate_reference_ndvi, calculate_rehab_score, calculate_comprehensive_rehab_score, generate_interpretation ) progress = st.progress(0, text="Initializing analysis...") try: progress.progress(10, text="Searching for satellite imagery...") # Run vegetation change analysis with all indices results = analyze_vegetation_change( bbox, date_before, date_after, window_days=15, cloud_threshold=30 ) progress.progress(50, text="Analyzing terrain...") # Run terrain analysis bsi_after = results.get('indices_after', {}).get('bsi') terrain_results = analyze_terrain(bbox, bsi=bsi_after) terrain_stats = terrain_results.get('stats', {}) progress.progress(70, text="Analyzing land cover...") # Run land cover analysis year_before = int(date_before[:4]) year_after = int(date_after[:4]) # Clamp years to available data range (2017-2023) year_before = max(2017, min(2023, year_before)) year_after = max(2017, min(2023, year_after)) land_cover_results = analyze_land_cover(bbox, year_before, year_after) land_cover_stats = land_cover_results.get('stats', {}) progress.progress(85, text="Calculating rehabilitation metrics...") # Calculate reference NDVI reference_ndvi = calculate_reference_ndvi( bbox, date_after, window_days=15, buffer_deg=0.01 ) # Calculate comprehensive rehabilitation scores rehab_scores = calculate_comprehensive_rehab_score( results['stats'], terrain_stats=terrain_stats, land_cover_stats=land_cover_stats, reference_ndvi=reference_ndvi if reference_ndvi > 0 else 0.5 ) # Legacy score for backwards compatibility site_ndvi = results['stats']['ndvi_after_mean'] rehab_score = calculate_rehab_score(site_ndvi, reference_ndvi) progress.progress(95, text="Generating interpretation...") # Generate comprehensive interpretation interpretation = generate_interpretation( results['stats'], rehab_score, terrain_stats=terrain_stats, land_cover_stats=land_cover_stats ) # Get center coordinates center = get_bbox_center(bbox) progress.progress(100, text="Analysis complete!") # Store only necessary results (exclude large arrays to save memory) st.session_state.analysis_results = { 'ndvi_before': results['ndvi_before'], 'ndvi_after': results['ndvi_after'], 'ndvi_change': results['ndvi_change'], 'indices_after': results.get('indices_after', {}), 'index_changes': results.get('index_changes', {}), 'stats': results['stats'], 'date_before': results['date_before'], 'date_after': results['date_after'], 'bbox': results['bbox'], 'terrain_results': terrain_results, 'terrain_stats': terrain_stats, 'land_cover_results': land_cover_results, 'land_cover_stats': land_cover_stats, 'rehab_score': rehab_score, 'rehab_scores': rehab_scores, 'reference_ndvi': reference_ndvi, 'interpretation': interpretation, 'mine_info': mine_info, 'center': center } # Clean up memory del results gc.collect() return True except ValueError as e: st.error(f"Analysis Error: {str(e)}") return False except Exception as e: st.error(f"Unexpected error: {str(e)}") st.info("Try adjusting the date range or selecting a different site.") return False def display_results(): """Display comprehensive analysis results.""" # Import visualization only when showing results from src.visualization import ( create_comparison_map, create_multi_index_map, create_terrain_map, create_land_cover_map, create_comprehensive_stats_display, create_area_breakdown_chart, create_ndvi_comparison_chart, create_multi_index_chart, create_terrain_stats_chart, create_land_cover_chart, create_vegetation_health_chart, create_environmental_indicators_chart, create_statistics_table, create_time_series_chart ) from src.report import generate_pdf_report, stats_to_csv results = st.session_state.analysis_results stats = results['stats'] rehab_score = results['rehab_score'] rehab_scores = results.get('rehab_scores', {}) interpretation = results['interpretation'] mine_info = results.get('mine_info', {}) terrain_stats = results.get('terrain_stats', {}) land_cover_stats = results.get('land_cover_stats', {}) terrain_results = results.get('terrain_results', {}) land_cover_results = results.get('land_cover_results', {}) mine_name = mine_info.get('name', 'Selected Site') tenement_id = mine_info.get('tenement_id', 'N/A') st.markdown(f"### Analysis Results: {mine_name}") st.markdown(f"*Tenement: {tenement_id} | Period: {results['date_before']} to {results['date_after']}*") if st.button("🔄 New Analysis", type="secondary"): st.session_state.analysis_results = None st.rerun() st.markdown("---") # Map section with tabs for different views st.markdown("### Maps") map_tab1, map_tab2, map_tab3, map_tab4 = st.tabs([ "Vegetation Change", "Multi-Index", "Terrain", "Land Cover" ]) with map_tab1: with st.spinner("Rendering vegetation change map..."): comparison_map = create_comparison_map( bbox=results['bbox'], ndvi_before=results['ndvi_before'], ndvi_after=results['ndvi_after'], ndvi_change=results['ndvi_change'], center_coords=results['center'], zoom=mine_info.get('zoom', 13) ) st_folium(comparison_map, width=None, height=450, returned_objects=[]) st.caption("Green = improvement, Red = decline. Use layer control to toggle views.") with map_tab2: with st.spinner("Rendering multi-index map..."): indices_after = results.get('indices_after', {}) index_changes = results.get('index_changes', {}) if indices_after: multi_map = create_multi_index_map( bbox=results['bbox'], indices_after=indices_after, index_changes=index_changes, center_coords=results['center'], zoom=mine_info.get('zoom', 13) ) st_folium(multi_map, width=None, height=450, returned_objects=[]) st.caption("Toggle layers to view different indices: NDVI, SAVI, EVI, BSI, NDWI, NDMI") else: st.info("Multi-index data not available") with map_tab3: if terrain_results and 'slope' in terrain_results: with st.spinner("Rendering terrain map..."): terrain_map = create_terrain_map( bbox=results['bbox'], slope=terrain_results['slope'], aspect=terrain_results.get('aspect'), erosion_risk=terrain_results.get('erosion_risk'), center_coords=results['center'], zoom=mine_info.get('zoom', 13) ) st_folium(terrain_map, width=None, height=450, returned_objects=[]) st.caption("Slope analysis from Copernicus DEM GLO-30") else: st.info("Terrain data not available for this location") with map_tab4: if land_cover_results and 'lulc_after' in land_cover_results: with st.spinner("Rendering land cover map..."): lulc_map = create_land_cover_map( bbox=results['bbox'], lulc=land_cover_results['lulc_after'], center_coords=results['center'], zoom=mine_info.get('zoom', 13), year=land_cover_stats.get('year_after', 2023) ) st_folium(lulc_map, width=None, height=450, returned_objects=[]) st.caption("IO-LULC land cover classification") else: st.info("Land cover data not available for this location") st.markdown("---") # Main analysis tabs tab1, tab2, tab3, tab4, tab5 = st.tabs([ "Summary", "All Indices", "Terrain & Land Cover", "Time Series", "Export" ]) with tab1: # Use comprehensive stats display create_comprehensive_stats_display( stats, rehab_score, terrain_stats=terrain_stats, land_cover_stats=land_cover_stats ) st.markdown("---") st.markdown("### Interpretation") st.info(interpretation) col1, col2 = st.columns(2) with col1: st.plotly_chart(create_area_breakdown_chart(stats), use_container_width=True) with col2: st.plotly_chart(create_vegetation_health_chart(stats), use_container_width=True) # Environmental indicators radar chart st.plotly_chart(create_environmental_indicators_chart(stats), use_container_width=True) with tab2: st.markdown("### Multi-Index Analysis") st.markdown(""" Multiple vegetation and soil indices provide a comprehensive view: - **NDVI**: Overall vegetation health - **SAVI**: Better for sparse vegetation (soil-adjusted) - **EVI**: Better for dense vegetation - **NDWI**: Water presence - **NDMI**: Vegetation moisture content - **BSI**: Bare soil extent """) st.plotly_chart(create_multi_index_chart(stats), use_container_width=True) # Detailed stats table st.markdown("### Detailed Statistics") create_statistics_table(stats) with tab3: col1, col2 = st.columns(2) with col1: st.markdown("### Terrain Analysis") if terrain_stats: st.plotly_chart(create_terrain_stats_chart(terrain_stats), use_container_width=True) st.markdown("**Terrain Metrics:**") st.write(f"- Mean Slope: {terrain_stats.get('slope_mean', 0):.1f}°") st.write(f"- Max Slope: {terrain_stats.get('slope_max', 0):.1f}°") st.write(f"- Elevation Range: {terrain_stats.get('elevation_min', 0):.0f}m - {terrain_stats.get('elevation_max', 0):.0f}m") if 'percent_high_erosion_risk' in terrain_stats: st.write(f"- High Erosion Risk: {terrain_stats['percent_high_erosion_risk']:.1f}%") else: st.info("Terrain analysis not available") with col2: st.markdown("### Land Cover Change") if land_cover_stats and 'class_changes' in land_cover_stats: st.plotly_chart(create_land_cover_chart(land_cover_stats), use_container_width=True) st.markdown("**Land Cover Metrics:**") st.write(f"- Vegetation Cover: {land_cover_stats.get('vegetation_cover_after', 0):.1f}%") st.write(f"- Vegetation Change: {land_cover_stats.get('vegetation_cover_change', 0):+.1f}%") st.write(f"- Bare Ground: {land_cover_stats.get('bare_ground_after', 0):.1f}%") st.write(f"- Bare Ground Change: {land_cover_stats.get('bare_ground_change', 0):+.1f}%") else: st.info("Land cover analysis not available") with tab4: st.markdown("### Time Series") st.info("Click button below to load historical NDVI data (may take a few minutes)") if st.button("Load Time Series"): from src.analysis import get_monthly_ndvi_timeseries date_before = datetime.strptime(results['date_before'], '%Y-%m-%d') date_after = datetime.strptime(results['date_after'], '%Y-%m-%d') with st.spinner("Loading time series..."): try: timeseries = get_monthly_ndvi_timeseries( results['bbox'], date_before.year, date_after.year ) if timeseries: fig = create_time_series_chart(timeseries) st.plotly_chart(fig, use_container_width=True) else: st.warning("No time series data available.") except Exception as e: st.error(f"Error: {e}") with tab5: st.markdown("### Export Results") col1, col2 = st.columns(2) with col1: st.markdown("#### PDF Report") pdf_bytes = generate_pdf_report( tenement_id=tenement_id, stats=stats, rehab_score=rehab_score, interpretation=interpretation, date_before=results['date_before'], date_after=results['date_after'], mine_name=mine_name ) st.download_button( "Download PDF", data=pdf_bytes, file_name=f"rehabwatch_{tenement_id.replace(' ', '_')}.pdf", mime="application/pdf", use_container_width=True ) with col2: st.markdown("#### CSV Data") csv_data = stats_to_csv( stats=stats, tenement_id=tenement_id, rehab_score=rehab_score, date_before=results['date_before'], date_after=results['date_after'], mine_name=mine_name ) st.download_button( "Download CSV", data=csv_data, file_name=f"rehabwatch_{tenement_id.replace(' ', '_')}.csv", mime="text/csv", use_container_width=True ) def main(): """Main application function.""" load_css() examples_data = load_examples() examples = examples_data.get("examples", []) default_location = examples_data.get("default_location", {"center": [-28.0, 121.0], "zoom": 6}) # Initialize session state if "analysis_results" not in st.session_state: st.session_state.analysis_results = None if "selected_bbox" not in st.session_state: st.session_state.selected_bbox = None if "selected_mine" not in st.session_state: st.session_state.selected_mine = None if "analyzing" not in st.session_state: st.session_state.analyzing = False # Sidebar with st.sidebar: st.markdown("## 🌱 Analysis Hub") st.markdown("**MineWatchAI**") st.markdown("---") st.markdown("### Select Mining Site") example_names = ["Select a mine..."] + [e["name"] for e in examples] selected_name = st.selectbox("Choose a mine:", example_names) if selected_name != "Select a mine...": for example in examples: if example["name"] == selected_name: st.session_state.selected_mine = example base_bbox = get_bbox_from_example(example) # Area size adjustment st.markdown("#### Adjust Analysis Area") area_scale = st.slider( "Area Size", min_value=0.5, max_value=2.0, value=1.0, step=0.1, help="Adjust the analysis area: <1.0 = smaller, >1.0 = larger" ) # Apply scaling to bbox min_lon, min_lat, max_lon, max_lat = base_bbox center_lon = (min_lon + max_lon) / 2 center_lat = (min_lat + max_lat) / 2 half_width = (max_lon - min_lon) / 2 * area_scale half_height = (max_lat - min_lat) / 2 * area_scale st.session_state.selected_bbox = ( center_lon - half_width, center_lat - half_height, center_lon + half_width, center_lat + half_height ) st.info(f"📌 {example['description']}") break st.markdown("---") st.markdown("### Analysis Period") col1, col2, col3 = st.columns(3) today = datetime.now().date() with col1: if st.button("1Y", use_container_width=True): st.session_state.date_before = today - timedelta(days=365) st.session_state.date_after = today with col2: if st.button("2Y", use_container_width=True): st.session_state.date_before = today - timedelta(days=730) st.session_state.date_after = today with col3: if st.button("5Y", use_container_width=True): st.session_state.date_before = today - timedelta(days=1825) st.session_state.date_after = today default_before = st.session_state.get("date_before", today - timedelta(days=730)) default_after = st.session_state.get("date_after", today) date_before = st.date_input("Start Date", value=default_before, max_value=today) date_after = st.date_input("End Date", value=default_after, max_value=today) if date_after <= date_before: st.error("End date must be after start date") st.markdown("---") analyze_disabled = st.session_state.selected_bbox is None or date_after <= date_before if st.button("🤖 AI-Analyze", type="primary", use_container_width=True, disabled=analyze_disabled): st.session_state.analyzing = True if analyze_disabled: st.caption("Select a mine and valid dates") st.markdown("---") with st.expander("â„šī¸ About"): st.markdown(""" **MineWatchAI** - AI-driven green technology for sustainable mining rehabilitation. --- Developed by [Ashkan Taghipour](https://ashkantaghipour.github.io/) âš ī¸ *This is a research startup project currently under active development.* """) # Main area st.markdown("# 🤖 MineWatchAI") st.markdown("*AI-powered environmental intelligence for measurable, audit-ready mining rehabilitation*") # Handle analysis if st.session_state.analyzing: st.session_state.analyzing = False success = run_analysis( st.session_state.selected_bbox, st.session_state.selected_mine, str(date_before), str(date_after) ) if success: st.rerun() # Display results or initial map if st.session_state.analysis_results is not None: display_results() else: st.markdown("### Select a Mining Site") if st.session_state.selected_bbox is not None: mine = st.session_state.selected_mine center = mine.get("center", default_location["center"]) zoom = mine.get("zoom", 12) st.markdown(f"**Selected:** {mine['name']} ({mine['tenement_id']})") m = create_simple_map(tuple(center), zoom, st.session_state.selected_bbox) st_folium(m, width=None, height=400, returned_objects=[]) else: st.info("👆 Select a mining site from the sidebar") m = create_simple_map(tuple(default_location["center"]), default_location["zoom"]) st_folium(m, width=None, height=400, returned_objects=[]) st.markdown("### Available Sites") cols = st.columns(len(examples)) for i, ex in enumerate(examples): with cols[i]: st.markdown(f"**{ex['name']}**") st.caption(ex['tenement_id']) # Footer st.markdown("---") st.caption("MineWatchAI - AI-driven green technology for sustainable mining rehabilitation.") if __name__ == "__main__": main()