Spaces:
Running
Running
| """ | |
| MineWatchAI - AI-Powered Mining Rehabilitation Monitoring | |
| Main Streamlit Application | |
| AI-powered environmental intelligence for measurable, | |
| audit-ready mining rehabilitation. | |
| """ | |
| import streamlit as st | |
| import json | |
| import gc | |
| from datetime import datetime, timedelta | |
| from pathlib import Path | |
| # Page configuration - MUST be first Streamlit command | |
| st.set_page_config( | |
| page_title="MineWatchAI - Green Tech", | |
| page_icon="🤖", | |
| layout="wide", | |
| initial_sidebar_state="expanded" | |
| ) | |
| # Show loading message while imports happen | |
| with st.spinner("Loading MineWatchAI..."): | |
| # Lazy imports for faster initial load | |
| from streamlit_folium import st_folium | |
| import folium | |
| def load_css(): | |
| """Load custom CSS styling.""" | |
| css_path = Path(__file__).parent / "assets" / "style.css" | |
| if css_path.exists(): | |
| with open(css_path) as f: | |
| st.markdown(f"<style>{f.read()}</style>", unsafe_allow_html=True) | |
| def load_examples(): | |
| """Load example mines data from JSON file.""" | |
| json_path = Path(__file__).parent / "data" / "examples.json" | |
| if json_path.exists(): | |
| with open(json_path) as f: | |
| return json.load(f) | |
| return {"examples": [], "default_location": {"center": [-28.0, 121.0], "zoom": 6}} | |
| def get_bbox_from_example(example: dict) -> tuple: | |
| """Get bounding box from example mine data.""" | |
| geom = example.get("geometry", {}) | |
| coords = geom.get("coordinates", []) | |
| if len(coords) == 4: | |
| return tuple(coords) | |
| else: | |
| center = example.get("center", [-28.0, 121.0]) | |
| lat, lon = center | |
| return (lon - 0.05, lat - 0.05, lon + 0.05, lat + 0.05) | |
| def create_simple_map(center, zoom=10, bbox=None): | |
| """Create a simple Folium map.""" | |
| m = folium.Map(location=center, zoom_start=zoom) | |
| # Add satellite imagery | |
| folium.TileLayer( | |
| tiles='https://server.arcgisonline.com/ArcGIS/rest/services/World_Imagery/MapServer/tile/{z}/{y}/{x}', | |
| attr='Esri', | |
| name='Satellite', | |
| overlay=False | |
| ).add_to(m) | |
| if bbox is not None: | |
| min_lon, min_lat, max_lon, max_lat = bbox | |
| boundary_coords = [ | |
| [min_lat, min_lon], | |
| [min_lat, max_lon], | |
| [max_lat, max_lon], | |
| [max_lat, min_lon], | |
| [min_lat, min_lon] | |
| ] | |
| folium.Polygon( | |
| locations=boundary_coords, | |
| color='#1B5E20', | |
| weight=3, | |
| fill=True, | |
| fillColor='#2E7D32', | |
| fillOpacity=0.2, | |
| popup='Analysis Area' | |
| ).add_to(m) | |
| folium.LayerControl().add_to(m) | |
| return m | |
| def run_analysis(bbox, mine_info, date_before, date_after): | |
| """Run the comprehensive vegetation change analysis.""" | |
| # Import heavy modules only when needed | |
| from src.stac_utils import get_bbox_center | |
| from src.analysis import ( | |
| analyze_vegetation_change, | |
| analyze_terrain, | |
| analyze_land_cover, | |
| calculate_reference_ndvi, | |
| calculate_rehab_score, | |
| calculate_comprehensive_rehab_score, | |
| generate_interpretation | |
| ) | |
| progress = st.progress(0, text="Initializing analysis...") | |
| try: | |
| progress.progress(10, text="Searching for satellite imagery...") | |
| # Run vegetation change analysis with all indices | |
| results = analyze_vegetation_change( | |
| bbox, | |
| date_before, | |
| date_after, | |
| window_days=15, | |
| cloud_threshold=30 | |
| ) | |
| progress.progress(50, text="Analyzing terrain...") | |
| # Run terrain analysis | |
| bsi_after = results.get('indices_after', {}).get('bsi') | |
| terrain_results = analyze_terrain(bbox, bsi=bsi_after) | |
| terrain_stats = terrain_results.get('stats', {}) | |
| progress.progress(70, text="Analyzing land cover...") | |
| # Run land cover analysis | |
| year_before = int(date_before[:4]) | |
| year_after = int(date_after[:4]) | |
| # Clamp years to available data range (2017-2023) | |
| year_before = max(2017, min(2023, year_before)) | |
| year_after = max(2017, min(2023, year_after)) | |
| land_cover_results = analyze_land_cover(bbox, year_before, year_after) | |
| land_cover_stats = land_cover_results.get('stats', {}) | |
| progress.progress(85, text="Calculating rehabilitation metrics...") | |
| # Calculate reference NDVI | |
| reference_ndvi = calculate_reference_ndvi( | |
| bbox, | |
| date_after, | |
| window_days=15, | |
| buffer_deg=0.01 | |
| ) | |
| # Calculate comprehensive rehabilitation scores | |
| rehab_scores = calculate_comprehensive_rehab_score( | |
| results['stats'], | |
| terrain_stats=terrain_stats, | |
| land_cover_stats=land_cover_stats, | |
| reference_ndvi=reference_ndvi if reference_ndvi > 0 else 0.5 | |
| ) | |
| # Legacy score for backwards compatibility | |
| site_ndvi = results['stats']['ndvi_after_mean'] | |
| rehab_score = calculate_rehab_score(site_ndvi, reference_ndvi) | |
| progress.progress(95, text="Generating interpretation...") | |
| # Generate comprehensive interpretation | |
| interpretation = generate_interpretation( | |
| results['stats'], | |
| rehab_score, | |
| terrain_stats=terrain_stats, | |
| land_cover_stats=land_cover_stats | |
| ) | |
| # Get center coordinates | |
| center = get_bbox_center(bbox) | |
| progress.progress(100, text="Analysis complete!") | |
| # Store only necessary results (exclude large arrays to save memory) | |
| st.session_state.analysis_results = { | |
| 'ndvi_before': results['ndvi_before'], | |
| 'ndvi_after': results['ndvi_after'], | |
| 'ndvi_change': results['ndvi_change'], | |
| 'indices_after': results.get('indices_after', {}), | |
| 'index_changes': results.get('index_changes', {}), | |
| 'stats': results['stats'], | |
| 'date_before': results['date_before'], | |
| 'date_after': results['date_after'], | |
| 'bbox': results['bbox'], | |
| 'terrain_results': terrain_results, | |
| 'terrain_stats': terrain_stats, | |
| 'land_cover_results': land_cover_results, | |
| 'land_cover_stats': land_cover_stats, | |
| 'rehab_score': rehab_score, | |
| 'rehab_scores': rehab_scores, | |
| 'reference_ndvi': reference_ndvi, | |
| 'interpretation': interpretation, | |
| 'mine_info': mine_info, | |
| 'center': center | |
| } | |
| # Clean up memory | |
| del results | |
| gc.collect() | |
| return True | |
| except ValueError as e: | |
| st.error(f"Analysis Error: {str(e)}") | |
| return False | |
| except Exception as e: | |
| st.error(f"Unexpected error: {str(e)}") | |
| st.info("Try adjusting the date range or selecting a different site.") | |
| return False | |
| def display_results(): | |
| """Display comprehensive analysis results.""" | |
| # Import visualization only when showing results | |
| from src.visualization import ( | |
| create_comparison_map, | |
| create_multi_index_map, | |
| create_terrain_map, | |
| create_land_cover_map, | |
| create_comprehensive_stats_display, | |
| create_area_breakdown_chart, | |
| create_ndvi_comparison_chart, | |
| create_multi_index_chart, | |
| create_terrain_stats_chart, | |
| create_land_cover_chart, | |
| create_vegetation_health_chart, | |
| create_environmental_indicators_chart, | |
| create_statistics_table, | |
| create_time_series_chart | |
| ) | |
| from src.report import generate_pdf_report, stats_to_csv | |
| results = st.session_state.analysis_results | |
| stats = results['stats'] | |
| rehab_score = results['rehab_score'] | |
| rehab_scores = results.get('rehab_scores', {}) | |
| interpretation = results['interpretation'] | |
| mine_info = results.get('mine_info', {}) | |
| terrain_stats = results.get('terrain_stats', {}) | |
| land_cover_stats = results.get('land_cover_stats', {}) | |
| terrain_results = results.get('terrain_results', {}) | |
| land_cover_results = results.get('land_cover_results', {}) | |
| mine_name = mine_info.get('name', 'Selected Site') | |
| tenement_id = mine_info.get('tenement_id', 'N/A') | |
| st.markdown(f"### Analysis Results: {mine_name}") | |
| st.markdown(f"*Tenement: {tenement_id} | Period: {results['date_before']} to {results['date_after']}*") | |
| if st.button("🔄 New Analysis", type="secondary"): | |
| st.session_state.analysis_results = None | |
| st.rerun() | |
| st.markdown("---") | |
| # Map section with tabs for different views | |
| st.markdown("### Maps") | |
| map_tab1, map_tab2, map_tab3, map_tab4 = st.tabs([ | |
| "Vegetation Change", "Multi-Index", "Terrain", "Land Cover" | |
| ]) | |
| with map_tab1: | |
| with st.spinner("Rendering vegetation change map..."): | |
| comparison_map = create_comparison_map( | |
| bbox=results['bbox'], | |
| ndvi_before=results['ndvi_before'], | |
| ndvi_after=results['ndvi_after'], | |
| ndvi_change=results['ndvi_change'], | |
| center_coords=results['center'], | |
| zoom=mine_info.get('zoom', 13) | |
| ) | |
| st_folium(comparison_map, width=None, height=450, returned_objects=[]) | |
| st.caption("Green = improvement, Red = decline. Use layer control to toggle views.") | |
| with map_tab2: | |
| with st.spinner("Rendering multi-index map..."): | |
| indices_after = results.get('indices_after', {}) | |
| index_changes = results.get('index_changes', {}) | |
| if indices_after: | |
| multi_map = create_multi_index_map( | |
| bbox=results['bbox'], | |
| indices_after=indices_after, | |
| index_changes=index_changes, | |
| center_coords=results['center'], | |
| zoom=mine_info.get('zoom', 13) | |
| ) | |
| st_folium(multi_map, width=None, height=450, returned_objects=[]) | |
| st.caption("Toggle layers to view different indices: NDVI, SAVI, EVI, BSI, NDWI, NDMI") | |
| else: | |
| st.info("Multi-index data not available") | |
| with map_tab3: | |
| if terrain_results and 'slope' in terrain_results: | |
| with st.spinner("Rendering terrain map..."): | |
| terrain_map = create_terrain_map( | |
| bbox=results['bbox'], | |
| slope=terrain_results['slope'], | |
| aspect=terrain_results.get('aspect'), | |
| erosion_risk=terrain_results.get('erosion_risk'), | |
| center_coords=results['center'], | |
| zoom=mine_info.get('zoom', 13) | |
| ) | |
| st_folium(terrain_map, width=None, height=450, returned_objects=[]) | |
| st.caption("Slope analysis from Copernicus DEM GLO-30") | |
| else: | |
| st.info("Terrain data not available for this location") | |
| with map_tab4: | |
| if land_cover_results and 'lulc_after' in land_cover_results: | |
| with st.spinner("Rendering land cover map..."): | |
| lulc_map = create_land_cover_map( | |
| bbox=results['bbox'], | |
| lulc=land_cover_results['lulc_after'], | |
| center_coords=results['center'], | |
| zoom=mine_info.get('zoom', 13), | |
| year=land_cover_stats.get('year_after', 2023) | |
| ) | |
| st_folium(lulc_map, width=None, height=450, returned_objects=[]) | |
| st.caption("IO-LULC land cover classification") | |
| else: | |
| st.info("Land cover data not available for this location") | |
| st.markdown("---") | |
| # Main analysis tabs | |
| tab1, tab2, tab3, tab4, tab5 = st.tabs([ | |
| "Summary", "All Indices", "Terrain & Land Cover", "Time Series", "Export" | |
| ]) | |
| with tab1: | |
| # Use comprehensive stats display | |
| create_comprehensive_stats_display( | |
| stats, rehab_score, | |
| terrain_stats=terrain_stats, | |
| land_cover_stats=land_cover_stats | |
| ) | |
| st.markdown("---") | |
| st.markdown("### Interpretation") | |
| st.info(interpretation) | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| st.plotly_chart(create_area_breakdown_chart(stats), use_container_width=True) | |
| with col2: | |
| st.plotly_chart(create_vegetation_health_chart(stats), use_container_width=True) | |
| # Environmental indicators radar chart | |
| st.plotly_chart(create_environmental_indicators_chart(stats), use_container_width=True) | |
| with tab2: | |
| st.markdown("### Multi-Index Analysis") | |
| st.markdown(""" | |
| Multiple vegetation and soil indices provide a comprehensive view: | |
| - **NDVI**: Overall vegetation health | |
| - **SAVI**: Better for sparse vegetation (soil-adjusted) | |
| - **EVI**: Better for dense vegetation | |
| - **NDWI**: Water presence | |
| - **NDMI**: Vegetation moisture content | |
| - **BSI**: Bare soil extent | |
| """) | |
| st.plotly_chart(create_multi_index_chart(stats), use_container_width=True) | |
| # Detailed stats table | |
| st.markdown("### Detailed Statistics") | |
| create_statistics_table(stats) | |
| with tab3: | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| st.markdown("### Terrain Analysis") | |
| if terrain_stats: | |
| st.plotly_chart(create_terrain_stats_chart(terrain_stats), use_container_width=True) | |
| st.markdown("**Terrain Metrics:**") | |
| st.write(f"- Mean Slope: {terrain_stats.get('slope_mean', 0):.1f}°") | |
| st.write(f"- Max Slope: {terrain_stats.get('slope_max', 0):.1f}°") | |
| st.write(f"- Elevation Range: {terrain_stats.get('elevation_min', 0):.0f}m - {terrain_stats.get('elevation_max', 0):.0f}m") | |
| if 'percent_high_erosion_risk' in terrain_stats: | |
| st.write(f"- High Erosion Risk: {terrain_stats['percent_high_erosion_risk']:.1f}%") | |
| else: | |
| st.info("Terrain analysis not available") | |
| with col2: | |
| st.markdown("### Land Cover Change") | |
| if land_cover_stats and 'class_changes' in land_cover_stats: | |
| st.plotly_chart(create_land_cover_chart(land_cover_stats), use_container_width=True) | |
| st.markdown("**Land Cover Metrics:**") | |
| st.write(f"- Vegetation Cover: {land_cover_stats.get('vegetation_cover_after', 0):.1f}%") | |
| st.write(f"- Vegetation Change: {land_cover_stats.get('vegetation_cover_change', 0):+.1f}%") | |
| st.write(f"- Bare Ground: {land_cover_stats.get('bare_ground_after', 0):.1f}%") | |
| st.write(f"- Bare Ground Change: {land_cover_stats.get('bare_ground_change', 0):+.1f}%") | |
| else: | |
| st.info("Land cover analysis not available") | |
| with tab4: | |
| st.markdown("### Time Series") | |
| st.info("Click button below to load historical NDVI data (may take a few minutes)") | |
| if st.button("Load Time Series"): | |
| from src.analysis import get_monthly_ndvi_timeseries | |
| date_before = datetime.strptime(results['date_before'], '%Y-%m-%d') | |
| date_after = datetime.strptime(results['date_after'], '%Y-%m-%d') | |
| with st.spinner("Loading time series..."): | |
| try: | |
| timeseries = get_monthly_ndvi_timeseries( | |
| results['bbox'], | |
| date_before.year, | |
| date_after.year | |
| ) | |
| if timeseries: | |
| fig = create_time_series_chart(timeseries) | |
| st.plotly_chart(fig, use_container_width=True) | |
| else: | |
| st.warning("No time series data available.") | |
| except Exception as e: | |
| st.error(f"Error: {e}") | |
| with tab5: | |
| st.markdown("### Export Results") | |
| col1, col2 = st.columns(2) | |
| with col1: | |
| st.markdown("#### PDF Report") | |
| pdf_bytes = generate_pdf_report( | |
| tenement_id=tenement_id, | |
| stats=stats, | |
| rehab_score=rehab_score, | |
| interpretation=interpretation, | |
| date_before=results['date_before'], | |
| date_after=results['date_after'], | |
| mine_name=mine_name | |
| ) | |
| st.download_button( | |
| "Download PDF", | |
| data=pdf_bytes, | |
| file_name=f"rehabwatch_{tenement_id.replace(' ', '_')}.pdf", | |
| mime="application/pdf", | |
| use_container_width=True | |
| ) | |
| with col2: | |
| st.markdown("#### CSV Data") | |
| csv_data = stats_to_csv( | |
| stats=stats, | |
| tenement_id=tenement_id, | |
| rehab_score=rehab_score, | |
| date_before=results['date_before'], | |
| date_after=results['date_after'], | |
| mine_name=mine_name | |
| ) | |
| st.download_button( | |
| "Download CSV", | |
| data=csv_data, | |
| file_name=f"rehabwatch_{tenement_id.replace(' ', '_')}.csv", | |
| mime="text/csv", | |
| use_container_width=True | |
| ) | |
| def main(): | |
| """Main application function.""" | |
| load_css() | |
| examples_data = load_examples() | |
| examples = examples_data.get("examples", []) | |
| default_location = examples_data.get("default_location", {"center": [-28.0, 121.0], "zoom": 6}) | |
| # Initialize session state | |
| if "analysis_results" not in st.session_state: | |
| st.session_state.analysis_results = None | |
| if "selected_bbox" not in st.session_state: | |
| st.session_state.selected_bbox = None | |
| if "selected_mine" not in st.session_state: | |
| st.session_state.selected_mine = None | |
| if "analyzing" not in st.session_state: | |
| st.session_state.analyzing = False | |
| # Sidebar | |
| with st.sidebar: | |
| st.markdown("## 🌱 Analysis Hub") | |
| st.markdown("**MineWatchAI**") | |
| st.markdown("---") | |
| st.markdown("### Select Mining Site") | |
| example_names = ["Select a mine..."] + [e["name"] for e in examples] | |
| selected_name = st.selectbox("Choose a mine:", example_names) | |
| if selected_name != "Select a mine...": | |
| for example in examples: | |
| if example["name"] == selected_name: | |
| st.session_state.selected_mine = example | |
| base_bbox = get_bbox_from_example(example) | |
| # Area size adjustment | |
| st.markdown("#### Adjust Analysis Area") | |
| area_scale = st.slider( | |
| "Area Size", | |
| min_value=0.5, | |
| max_value=2.0, | |
| value=1.0, | |
| step=0.1, | |
| help="Adjust the analysis area: <1.0 = smaller, >1.0 = larger" | |
| ) | |
| # Apply scaling to bbox | |
| min_lon, min_lat, max_lon, max_lat = base_bbox | |
| center_lon = (min_lon + max_lon) / 2 | |
| center_lat = (min_lat + max_lat) / 2 | |
| half_width = (max_lon - min_lon) / 2 * area_scale | |
| half_height = (max_lat - min_lat) / 2 * area_scale | |
| st.session_state.selected_bbox = ( | |
| center_lon - half_width, | |
| center_lat - half_height, | |
| center_lon + half_width, | |
| center_lat + half_height | |
| ) | |
| st.info(f"📌 {example['description']}") | |
| break | |
| st.markdown("---") | |
| st.markdown("### Analysis Period") | |
| col1, col2, col3 = st.columns(3) | |
| today = datetime.now().date() | |
| with col1: | |
| if st.button("1Y", use_container_width=True): | |
| st.session_state.date_before = today - timedelta(days=365) | |
| st.session_state.date_after = today | |
| with col2: | |
| if st.button("2Y", use_container_width=True): | |
| st.session_state.date_before = today - timedelta(days=730) | |
| st.session_state.date_after = today | |
| with col3: | |
| if st.button("5Y", use_container_width=True): | |
| st.session_state.date_before = today - timedelta(days=1825) | |
| st.session_state.date_after = today | |
| default_before = st.session_state.get("date_before", today - timedelta(days=730)) | |
| default_after = st.session_state.get("date_after", today) | |
| date_before = st.date_input("Start Date", value=default_before, max_value=today) | |
| date_after = st.date_input("End Date", value=default_after, max_value=today) | |
| if date_after <= date_before: | |
| st.error("End date must be after start date") | |
| st.markdown("---") | |
| analyze_disabled = st.session_state.selected_bbox is None or date_after <= date_before | |
| if st.button("🤖 AI-Analyze", type="primary", use_container_width=True, disabled=analyze_disabled): | |
| st.session_state.analyzing = True | |
| if analyze_disabled: | |
| st.caption("Select a mine and valid dates") | |
| st.markdown("---") | |
| with st.expander("ℹ️ About"): | |
| st.markdown(""" | |
| **MineWatchAI** - AI-driven green technology | |
| for sustainable mining rehabilitation. | |
| --- | |
| Developed by [Ashkan Taghipour](https://ashkantaghipour.github.io/) | |
| ⚠️ *This is a research startup project currently | |
| under active development.* | |
| """) | |
| # Main area | |
| st.markdown("# 🤖 MineWatchAI") | |
| st.markdown("*AI-powered environmental intelligence for measurable, audit-ready mining rehabilitation*") | |
| # Handle analysis | |
| if st.session_state.analyzing: | |
| st.session_state.analyzing = False | |
| success = run_analysis( | |
| st.session_state.selected_bbox, | |
| st.session_state.selected_mine, | |
| str(date_before), | |
| str(date_after) | |
| ) | |
| if success: | |
| st.rerun() | |
| # Display results or initial map | |
| if st.session_state.analysis_results is not None: | |
| display_results() | |
| else: | |
| st.markdown("### Select a Mining Site") | |
| if st.session_state.selected_bbox is not None: | |
| mine = st.session_state.selected_mine | |
| center = mine.get("center", default_location["center"]) | |
| zoom = mine.get("zoom", 12) | |
| st.markdown(f"**Selected:** {mine['name']} ({mine['tenement_id']})") | |
| m = create_simple_map(tuple(center), zoom, st.session_state.selected_bbox) | |
| st_folium(m, width=None, height=400, returned_objects=[]) | |
| else: | |
| st.info("👆 Select a mining site from the sidebar") | |
| m = create_simple_map(tuple(default_location["center"]), default_location["zoom"]) | |
| st_folium(m, width=None, height=400, returned_objects=[]) | |
| st.markdown("### Available Sites") | |
| cols = st.columns(len(examples)) | |
| for i, ex in enumerate(examples): | |
| with cols[i]: | |
| st.markdown(f"**{ex['name']}**") | |
| st.caption(ex['tenement_id']) | |
| # Footer | |
| st.markdown("---") | |
| st.caption("MineWatchAI - AI-driven green technology for sustainable mining rehabilitation.") | |
| if __name__ == "__main__": | |
| main() | |