MineWatchAI / app.py
Ashkan Taghipour (The University of Western Australia)
Initial commit
f5648f5
"""
MineWatchAI - AI-Powered Mining Rehabilitation Monitoring
Main Streamlit Application
AI-powered environmental intelligence for measurable,
audit-ready mining rehabilitation.
"""
import streamlit as st
import json
import gc
from datetime import datetime, timedelta
from pathlib import Path
# Page configuration - MUST be first Streamlit command
st.set_page_config(
page_title="MineWatchAI - Green Tech",
page_icon="🤖",
layout="wide",
initial_sidebar_state="expanded"
)
# Show loading message while imports happen
with st.spinner("Loading MineWatchAI..."):
# Lazy imports for faster initial load
from streamlit_folium import st_folium
import folium
def load_css():
"""Load custom CSS styling."""
css_path = Path(__file__).parent / "assets" / "style.css"
if css_path.exists():
with open(css_path) as f:
st.markdown(f"<style>{f.read()}</style>", unsafe_allow_html=True)
@st.cache_data
def load_examples():
"""Load example mines data from JSON file."""
json_path = Path(__file__).parent / "data" / "examples.json"
if json_path.exists():
with open(json_path) as f:
return json.load(f)
return {"examples": [], "default_location": {"center": [-28.0, 121.0], "zoom": 6}}
def get_bbox_from_example(example: dict) -> tuple:
"""Get bounding box from example mine data."""
geom = example.get("geometry", {})
coords = geom.get("coordinates", [])
if len(coords) == 4:
return tuple(coords)
else:
center = example.get("center", [-28.0, 121.0])
lat, lon = center
return (lon - 0.05, lat - 0.05, lon + 0.05, lat + 0.05)
def create_simple_map(center, zoom=10, bbox=None):
"""Create a simple Folium map."""
m = folium.Map(location=center, zoom_start=zoom)
# Add satellite imagery
folium.TileLayer(
tiles='https://server.arcgisonline.com/ArcGIS/rest/services/World_Imagery/MapServer/tile/{z}/{y}/{x}',
attr='Esri',
name='Satellite',
overlay=False
).add_to(m)
if bbox is not None:
min_lon, min_lat, max_lon, max_lat = bbox
boundary_coords = [
[min_lat, min_lon],
[min_lat, max_lon],
[max_lat, max_lon],
[max_lat, min_lon],
[min_lat, min_lon]
]
folium.Polygon(
locations=boundary_coords,
color='#1B5E20',
weight=3,
fill=True,
fillColor='#2E7D32',
fillOpacity=0.2,
popup='Analysis Area'
).add_to(m)
folium.LayerControl().add_to(m)
return m
def run_analysis(bbox, mine_info, date_before, date_after):
"""Run the comprehensive vegetation change analysis."""
# Import heavy modules only when needed
from src.stac_utils import get_bbox_center
from src.analysis import (
analyze_vegetation_change,
analyze_terrain,
analyze_land_cover,
calculate_reference_ndvi,
calculate_rehab_score,
calculate_comprehensive_rehab_score,
generate_interpretation
)
progress = st.progress(0, text="Initializing analysis...")
try:
progress.progress(10, text="Searching for satellite imagery...")
# Run vegetation change analysis with all indices
results = analyze_vegetation_change(
bbox,
date_before,
date_after,
window_days=15,
cloud_threshold=30
)
progress.progress(50, text="Analyzing terrain...")
# Run terrain analysis
bsi_after = results.get('indices_after', {}).get('bsi')
terrain_results = analyze_terrain(bbox, bsi=bsi_after)
terrain_stats = terrain_results.get('stats', {})
progress.progress(70, text="Analyzing land cover...")
# Run land cover analysis
year_before = int(date_before[:4])
year_after = int(date_after[:4])
# Clamp years to available data range (2017-2023)
year_before = max(2017, min(2023, year_before))
year_after = max(2017, min(2023, year_after))
land_cover_results = analyze_land_cover(bbox, year_before, year_after)
land_cover_stats = land_cover_results.get('stats', {})
progress.progress(85, text="Calculating rehabilitation metrics...")
# Calculate reference NDVI
reference_ndvi = calculate_reference_ndvi(
bbox,
date_after,
window_days=15,
buffer_deg=0.01
)
# Calculate comprehensive rehabilitation scores
rehab_scores = calculate_comprehensive_rehab_score(
results['stats'],
terrain_stats=terrain_stats,
land_cover_stats=land_cover_stats,
reference_ndvi=reference_ndvi if reference_ndvi > 0 else 0.5
)
# Legacy score for backwards compatibility
site_ndvi = results['stats']['ndvi_after_mean']
rehab_score = calculate_rehab_score(site_ndvi, reference_ndvi)
progress.progress(95, text="Generating interpretation...")
# Generate comprehensive interpretation
interpretation = generate_interpretation(
results['stats'],
rehab_score,
terrain_stats=terrain_stats,
land_cover_stats=land_cover_stats
)
# Get center coordinates
center = get_bbox_center(bbox)
progress.progress(100, text="Analysis complete!")
# Store only necessary results (exclude large arrays to save memory)
st.session_state.analysis_results = {
'ndvi_before': results['ndvi_before'],
'ndvi_after': results['ndvi_after'],
'ndvi_change': results['ndvi_change'],
'indices_after': results.get('indices_after', {}),
'index_changes': results.get('index_changes', {}),
'stats': results['stats'],
'date_before': results['date_before'],
'date_after': results['date_after'],
'bbox': results['bbox'],
'terrain_results': terrain_results,
'terrain_stats': terrain_stats,
'land_cover_results': land_cover_results,
'land_cover_stats': land_cover_stats,
'rehab_score': rehab_score,
'rehab_scores': rehab_scores,
'reference_ndvi': reference_ndvi,
'interpretation': interpretation,
'mine_info': mine_info,
'center': center
}
# Clean up memory
del results
gc.collect()
return True
except ValueError as e:
st.error(f"Analysis Error: {str(e)}")
return False
except Exception as e:
st.error(f"Unexpected error: {str(e)}")
st.info("Try adjusting the date range or selecting a different site.")
return False
def display_results():
"""Display comprehensive analysis results."""
# Import visualization only when showing results
from src.visualization import (
create_comparison_map,
create_multi_index_map,
create_terrain_map,
create_land_cover_map,
create_comprehensive_stats_display,
create_area_breakdown_chart,
create_ndvi_comparison_chart,
create_multi_index_chart,
create_terrain_stats_chart,
create_land_cover_chart,
create_vegetation_health_chart,
create_environmental_indicators_chart,
create_statistics_table,
create_time_series_chart
)
from src.report import generate_pdf_report, stats_to_csv
results = st.session_state.analysis_results
stats = results['stats']
rehab_score = results['rehab_score']
rehab_scores = results.get('rehab_scores', {})
interpretation = results['interpretation']
mine_info = results.get('mine_info', {})
terrain_stats = results.get('terrain_stats', {})
land_cover_stats = results.get('land_cover_stats', {})
terrain_results = results.get('terrain_results', {})
land_cover_results = results.get('land_cover_results', {})
mine_name = mine_info.get('name', 'Selected Site')
tenement_id = mine_info.get('tenement_id', 'N/A')
st.markdown(f"### Analysis Results: {mine_name}")
st.markdown(f"*Tenement: {tenement_id} | Period: {results['date_before']} to {results['date_after']}*")
if st.button("🔄 New Analysis", type="secondary"):
st.session_state.analysis_results = None
st.rerun()
st.markdown("---")
# Map section with tabs for different views
st.markdown("### Maps")
map_tab1, map_tab2, map_tab3, map_tab4 = st.tabs([
"Vegetation Change", "Multi-Index", "Terrain", "Land Cover"
])
with map_tab1:
with st.spinner("Rendering vegetation change map..."):
comparison_map = create_comparison_map(
bbox=results['bbox'],
ndvi_before=results['ndvi_before'],
ndvi_after=results['ndvi_after'],
ndvi_change=results['ndvi_change'],
center_coords=results['center'],
zoom=mine_info.get('zoom', 13)
)
st_folium(comparison_map, width=None, height=450, returned_objects=[])
st.caption("Green = improvement, Red = decline. Use layer control to toggle views.")
with map_tab2:
with st.spinner("Rendering multi-index map..."):
indices_after = results.get('indices_after', {})
index_changes = results.get('index_changes', {})
if indices_after:
multi_map = create_multi_index_map(
bbox=results['bbox'],
indices_after=indices_after,
index_changes=index_changes,
center_coords=results['center'],
zoom=mine_info.get('zoom', 13)
)
st_folium(multi_map, width=None, height=450, returned_objects=[])
st.caption("Toggle layers to view different indices: NDVI, SAVI, EVI, BSI, NDWI, NDMI")
else:
st.info("Multi-index data not available")
with map_tab3:
if terrain_results and 'slope' in terrain_results:
with st.spinner("Rendering terrain map..."):
terrain_map = create_terrain_map(
bbox=results['bbox'],
slope=terrain_results['slope'],
aspect=terrain_results.get('aspect'),
erosion_risk=terrain_results.get('erosion_risk'),
center_coords=results['center'],
zoom=mine_info.get('zoom', 13)
)
st_folium(terrain_map, width=None, height=450, returned_objects=[])
st.caption("Slope analysis from Copernicus DEM GLO-30")
else:
st.info("Terrain data not available for this location")
with map_tab4:
if land_cover_results and 'lulc_after' in land_cover_results:
with st.spinner("Rendering land cover map..."):
lulc_map = create_land_cover_map(
bbox=results['bbox'],
lulc=land_cover_results['lulc_after'],
center_coords=results['center'],
zoom=mine_info.get('zoom', 13),
year=land_cover_stats.get('year_after', 2023)
)
st_folium(lulc_map, width=None, height=450, returned_objects=[])
st.caption("IO-LULC land cover classification")
else:
st.info("Land cover data not available for this location")
st.markdown("---")
# Main analysis tabs
tab1, tab2, tab3, tab4, tab5 = st.tabs([
"Summary", "All Indices", "Terrain & Land Cover", "Time Series", "Export"
])
with tab1:
# Use comprehensive stats display
create_comprehensive_stats_display(
stats, rehab_score,
terrain_stats=terrain_stats,
land_cover_stats=land_cover_stats
)
st.markdown("---")
st.markdown("### Interpretation")
st.info(interpretation)
col1, col2 = st.columns(2)
with col1:
st.plotly_chart(create_area_breakdown_chart(stats), use_container_width=True)
with col2:
st.plotly_chart(create_vegetation_health_chart(stats), use_container_width=True)
# Environmental indicators radar chart
st.plotly_chart(create_environmental_indicators_chart(stats), use_container_width=True)
with tab2:
st.markdown("### Multi-Index Analysis")
st.markdown("""
Multiple vegetation and soil indices provide a comprehensive view:
- **NDVI**: Overall vegetation health
- **SAVI**: Better for sparse vegetation (soil-adjusted)
- **EVI**: Better for dense vegetation
- **NDWI**: Water presence
- **NDMI**: Vegetation moisture content
- **BSI**: Bare soil extent
""")
st.plotly_chart(create_multi_index_chart(stats), use_container_width=True)
# Detailed stats table
st.markdown("### Detailed Statistics")
create_statistics_table(stats)
with tab3:
col1, col2 = st.columns(2)
with col1:
st.markdown("### Terrain Analysis")
if terrain_stats:
st.plotly_chart(create_terrain_stats_chart(terrain_stats), use_container_width=True)
st.markdown("**Terrain Metrics:**")
st.write(f"- Mean Slope: {terrain_stats.get('slope_mean', 0):.1f}°")
st.write(f"- Max Slope: {terrain_stats.get('slope_max', 0):.1f}°")
st.write(f"- Elevation Range: {terrain_stats.get('elevation_min', 0):.0f}m - {terrain_stats.get('elevation_max', 0):.0f}m")
if 'percent_high_erosion_risk' in terrain_stats:
st.write(f"- High Erosion Risk: {terrain_stats['percent_high_erosion_risk']:.1f}%")
else:
st.info("Terrain analysis not available")
with col2:
st.markdown("### Land Cover Change")
if land_cover_stats and 'class_changes' in land_cover_stats:
st.plotly_chart(create_land_cover_chart(land_cover_stats), use_container_width=True)
st.markdown("**Land Cover Metrics:**")
st.write(f"- Vegetation Cover: {land_cover_stats.get('vegetation_cover_after', 0):.1f}%")
st.write(f"- Vegetation Change: {land_cover_stats.get('vegetation_cover_change', 0):+.1f}%")
st.write(f"- Bare Ground: {land_cover_stats.get('bare_ground_after', 0):.1f}%")
st.write(f"- Bare Ground Change: {land_cover_stats.get('bare_ground_change', 0):+.1f}%")
else:
st.info("Land cover analysis not available")
with tab4:
st.markdown("### Time Series")
st.info("Click button below to load historical NDVI data (may take a few minutes)")
if st.button("Load Time Series"):
from src.analysis import get_monthly_ndvi_timeseries
date_before = datetime.strptime(results['date_before'], '%Y-%m-%d')
date_after = datetime.strptime(results['date_after'], '%Y-%m-%d')
with st.spinner("Loading time series..."):
try:
timeseries = get_monthly_ndvi_timeseries(
results['bbox'],
date_before.year,
date_after.year
)
if timeseries:
fig = create_time_series_chart(timeseries)
st.plotly_chart(fig, use_container_width=True)
else:
st.warning("No time series data available.")
except Exception as e:
st.error(f"Error: {e}")
with tab5:
st.markdown("### Export Results")
col1, col2 = st.columns(2)
with col1:
st.markdown("#### PDF Report")
pdf_bytes = generate_pdf_report(
tenement_id=tenement_id,
stats=stats,
rehab_score=rehab_score,
interpretation=interpretation,
date_before=results['date_before'],
date_after=results['date_after'],
mine_name=mine_name
)
st.download_button(
"Download PDF",
data=pdf_bytes,
file_name=f"rehabwatch_{tenement_id.replace(' ', '_')}.pdf",
mime="application/pdf",
use_container_width=True
)
with col2:
st.markdown("#### CSV Data")
csv_data = stats_to_csv(
stats=stats,
tenement_id=tenement_id,
rehab_score=rehab_score,
date_before=results['date_before'],
date_after=results['date_after'],
mine_name=mine_name
)
st.download_button(
"Download CSV",
data=csv_data,
file_name=f"rehabwatch_{tenement_id.replace(' ', '_')}.csv",
mime="text/csv",
use_container_width=True
)
def main():
"""Main application function."""
load_css()
examples_data = load_examples()
examples = examples_data.get("examples", [])
default_location = examples_data.get("default_location", {"center": [-28.0, 121.0], "zoom": 6})
# Initialize session state
if "analysis_results" not in st.session_state:
st.session_state.analysis_results = None
if "selected_bbox" not in st.session_state:
st.session_state.selected_bbox = None
if "selected_mine" not in st.session_state:
st.session_state.selected_mine = None
if "analyzing" not in st.session_state:
st.session_state.analyzing = False
# Sidebar
with st.sidebar:
st.markdown("## 🌱 Analysis Hub")
st.markdown("**MineWatchAI**")
st.markdown("---")
st.markdown("### Select Mining Site")
example_names = ["Select a mine..."] + [e["name"] for e in examples]
selected_name = st.selectbox("Choose a mine:", example_names)
if selected_name != "Select a mine...":
for example in examples:
if example["name"] == selected_name:
st.session_state.selected_mine = example
base_bbox = get_bbox_from_example(example)
# Area size adjustment
st.markdown("#### Adjust Analysis Area")
area_scale = st.slider(
"Area Size",
min_value=0.5,
max_value=2.0,
value=1.0,
step=0.1,
help="Adjust the analysis area: <1.0 = smaller, >1.0 = larger"
)
# Apply scaling to bbox
min_lon, min_lat, max_lon, max_lat = base_bbox
center_lon = (min_lon + max_lon) / 2
center_lat = (min_lat + max_lat) / 2
half_width = (max_lon - min_lon) / 2 * area_scale
half_height = (max_lat - min_lat) / 2 * area_scale
st.session_state.selected_bbox = (
center_lon - half_width,
center_lat - half_height,
center_lon + half_width,
center_lat + half_height
)
st.info(f"📌 {example['description']}")
break
st.markdown("---")
st.markdown("### Analysis Period")
col1, col2, col3 = st.columns(3)
today = datetime.now().date()
with col1:
if st.button("1Y", use_container_width=True):
st.session_state.date_before = today - timedelta(days=365)
st.session_state.date_after = today
with col2:
if st.button("2Y", use_container_width=True):
st.session_state.date_before = today - timedelta(days=730)
st.session_state.date_after = today
with col3:
if st.button("5Y", use_container_width=True):
st.session_state.date_before = today - timedelta(days=1825)
st.session_state.date_after = today
default_before = st.session_state.get("date_before", today - timedelta(days=730))
default_after = st.session_state.get("date_after", today)
date_before = st.date_input("Start Date", value=default_before, max_value=today)
date_after = st.date_input("End Date", value=default_after, max_value=today)
if date_after <= date_before:
st.error("End date must be after start date")
st.markdown("---")
analyze_disabled = st.session_state.selected_bbox is None or date_after <= date_before
if st.button("🤖 AI-Analyze", type="primary", use_container_width=True, disabled=analyze_disabled):
st.session_state.analyzing = True
if analyze_disabled:
st.caption("Select a mine and valid dates")
st.markdown("---")
with st.expander("ℹ️ About"):
st.markdown("""
**MineWatchAI** - AI-driven green technology
for sustainable mining rehabilitation.
---
Developed by [Ashkan Taghipour](https://ashkantaghipour.github.io/)
⚠️ *This is a research startup project currently
under active development.*
""")
# Main area
st.markdown("# 🤖 MineWatchAI")
st.markdown("*AI-powered environmental intelligence for measurable, audit-ready mining rehabilitation*")
# Handle analysis
if st.session_state.analyzing:
st.session_state.analyzing = False
success = run_analysis(
st.session_state.selected_bbox,
st.session_state.selected_mine,
str(date_before),
str(date_after)
)
if success:
st.rerun()
# Display results or initial map
if st.session_state.analysis_results is not None:
display_results()
else:
st.markdown("### Select a Mining Site")
if st.session_state.selected_bbox is not None:
mine = st.session_state.selected_mine
center = mine.get("center", default_location["center"])
zoom = mine.get("zoom", 12)
st.markdown(f"**Selected:** {mine['name']} ({mine['tenement_id']})")
m = create_simple_map(tuple(center), zoom, st.session_state.selected_bbox)
st_folium(m, width=None, height=400, returned_objects=[])
else:
st.info("👆 Select a mining site from the sidebar")
m = create_simple_map(tuple(default_location["center"]), default_location["zoom"])
st_folium(m, width=None, height=400, returned_objects=[])
st.markdown("### Available Sites")
cols = st.columns(len(examples))
for i, ex in enumerate(examples):
with cols[i]:
st.markdown(f"**{ex['name']}**")
st.caption(ex['tenement_id'])
# Footer
st.markdown("---")
st.caption("MineWatchAI - AI-driven green technology for sustainable mining rehabilitation.")
if __name__ == "__main__":
main()