import streamlit as st
import requests
from datetime import datetime, timedelta
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import numpy as np
import os
import folium
from streamlit_folium import st_folium
import json
import time
from geopy.geocoders import Nominatim
from geopy.distance import geodesic
import warnings
warnings.filterwarnings('ignore')
# Secure API key handling
def get_groq_api_key():
"""Securely get GROQ API key from environment variables or Streamlit secrets"""
# Try to get from Streamlit secrets first
try:
return st.secrets["GROQ_API_KEY"]
except:
# Fallback to environment variable
api_key = os.getenv("GROQ_API_KEY")
if not api_key:
st.error("π GROQ API key not found. Please configure it in Streamlit secrets or environment variables.")
st.info("""
**To configure the API key:**
1. **For Hugging Face Spaces**: Add `GROQ_API_KEY` in your Space settings under 'Repository secrets'
2. **For local development**: Set environment variable `GROQ_API_KEY=your_key_here`
3. **For Streamlit Cloud**: Add to secrets.toml file
""")
return None
return api_key
# Color schemes for different magnitude levels
MAGNITUDE_COLORS = {
'Low': '#00ff00', # Green
'Moderate': '#ffff00', # Yellow
'High': '#ff8000', # Orange
'Severe': '#ff0000', # Red
'Extreme': '#800000' # Dark Red
}
# Risk assessment thresholds
RISK_THRESHOLDS = {
'low': {'count': 5, 'max_magnitude': 3.0},
'moderate': {'count': 10, 'max_magnitude': 4.5},
'high': {'count': 20, 'max_magnitude': 5.5},
'severe': {'count': 30, 'max_magnitude': 6.5},
'extreme': {'count': 50, 'max_magnitude': 7.0}
}
# Emergency protocols
EMERGENCY_PROTOCOLS = {
'low': "Monitor situation. No immediate action required.",
'moderate': "Stay alert. Review emergency plans.",
'high': "Prepare emergency kit. Stay informed.",
'severe': "Follow evacuation orders if issued. Seek shelter.",
'extreme': "IMMEDIATE EVACUATION. Follow emergency services."
}
def get_groq_summary(prompt, context=""):
"""Enhanced Groq LLM function with secure API key handling"""
api_key = get_groq_api_key()
if not api_key:
return "AI Analysis unavailable - API key not configured"
try:
# Import Groq only when needed to avoid errors if not installed
from groq import Groq
client = Groq(api_key=api_key)
full_prompt = f"{context}\n\n{prompt}" if context else prompt
response = client.chat.completions.create(
model="llama-3.3-70b-versatile",
messages=[
{"role": "system", "content": "You are an expert seismologist, emergency response specialist, and public safety advisor. Provide detailed, accurate, and actionable information."},
{"role": "user", "content": full_prompt}
],
max_tokens=2048,
temperature=0.7,
top_p=0.9,
presence_penalty=0.1,
frequency_penalty=0.1
)
return response.choices[0].message.content
except ImportError:
return "AI Analysis unavailable - Groq library not installed"
except Exception as e:
return f"AI Analysis Error: {str(e)}"
def fetch_earthquakes(min_magnitude=2.5, hours=24, region_bbox=None, detailed=True):
"""Fetch earthquake data with enhanced error handling and data processing"""
try:
endtime = datetime.utcnow()
starttime = endtime - timedelta(hours=hours)
url = "https://earthquake.usgs.gov/fdsnws/event/1/query"
params = {
"format": "geojson",
"starttime": starttime.strftime('%Y-%m-%dT%H:%M:%S'),
"endtime": endtime.strftime('%Y-%m-%dT%H:%M:%S'),
"minmagnitude": min_magnitude,
"orderby": "time",
"limit": 500 if detailed else 200
}
if region_bbox:
params.update({
"minlatitude": region_bbox[1],
"maxlatitude": region_bbox[3],
"minlongitude": region_bbox[0],
"maxlongitude": region_bbox[2],
})
response = requests.get(url, params=params, timeout=30)
response.raise_for_status()
data = response.json()
features = data.get('features', [])
earthquakes = []
for f in features:
prop = f['properties']
geom = f['geometry']
earthquake = {
'time': datetime.utcfromtimestamp(prop['time']/1000),
'place': prop['place'],
'magnitude': prop['mag'],
'longitude': geom['coordinates'][0],
'latitude': geom['coordinates'][1],
'depth': geom['coordinates'][2],
'url': prop['url'],
'type': prop.get('type', 'earthquake'),
'status': prop.get('status', 'automatic'),
'tsunami': prop.get('tsunami', 0),
'felt': prop.get('felt', 0),
'cdi': prop.get('cdi', 0),
'mmi': prop.get('mmi', 0),
'alert': prop.get('alert', ''),
'sig': prop.get('sig', 0)
}
earthquake['risk_level'] = calculate_risk_level(earthquake['magnitude'])
earthquake['time_ago'] = calculate_time_ago(earthquake['time'])
earthquakes.append(earthquake)
df = pd.DataFrame(earthquakes)
if not df.empty:
df['magnitude_category'] = df['magnitude'].apply(categorize_magnitude)
df['depth_category'] = df['depth'].apply(categorize_depth)
df['hour_of_day'] = df['time'].dt.hour
df['day_of_week'] = df['time'].dt.day_name()
return df
except requests.exceptions.RequestException as e:
st.error(f"Network error: {e}")
return pd.DataFrame()
except Exception as e:
st.error(f"Data processing error: {e}")
return pd.DataFrame()
def calculate_risk_level(magnitude):
"""Calculate risk level based on magnitude"""
if magnitude >= 7.0:
return 'Extreme'
elif magnitude >= 6.0:
return 'Severe'
elif magnitude >= 5.0:
return 'High'
elif magnitude >= 4.0:
return 'Moderate'
else:
return 'Low'
def categorize_magnitude(magnitude):
"""Categorize magnitude for analysis"""
if magnitude >= 7.0:
return 'Major (β₯7.0)'
elif magnitude >= 6.0:
return 'Strong (6.0-6.9)'
elif magnitude >= 5.0:
return 'Moderate (5.0-5.9)'
elif magnitude >= 4.0:
return 'Light (4.0-4.9)'
else:
return 'Minor (<4.0)'
def categorize_depth(depth):
"""Categorize depth for analysis"""
if depth < 70:
return 'Shallow (<70km)'
elif depth < 300:
return 'Intermediate (70-300km)'
else:
return 'Deep (>300km)'
def calculate_time_ago(time):
"""Calculate time ago in human readable format"""
now = datetime.utcnow()
diff = now - time
if diff.days > 0:
return f"{diff.days} day(s) ago"
elif diff.seconds >= 3600:
hours = diff.seconds // 3600
return f"{hours} hour(s) ago"
elif diff.seconds >= 60:
minutes = diff.seconds // 60
return f"{minutes} minute(s) ago"
else:
return "Just now"
def analyze_seismic_patterns(df):
"""Analyze seismic patterns and trends"""
if df.empty:
return {}
analysis = {}
try:
# Only calculate distributions if we have data
if len(df) > 0:
analysis['hourly_distribution'] = df['hour_of_day'].value_counts().sort_index()
analysis['daily_distribution'] = df['day_of_week'].value_counts()
# Magnitude statistics - only if we have magnitude data
if 'magnitude' in df.columns and len(df) > 0:
analysis['magnitude_stats'] = {
'mean': df['magnitude'].mean(),
'median': df['magnitude'].median(),
'std': df['magnitude'].std(),
'max': df['magnitude'].max(),
'min': df['magnitude'].min()
}
# Depth statistics - only if we have depth data
if 'depth' in df.columns and len(df) > 0:
analysis['depth_stats'] = {
'mean': df['depth'].mean(),
'median': df['depth'].median(),
'std': df['depth'].std()
}
# Risk distribution - only if we have risk level data
if 'risk_level' in df.columns and len(df) > 0:
analysis['risk_distribution'] = df['risk_level'].value_counts()
# Geographic center - only if we have multiple data points
if len(df) > 1 and 'latitude' in df.columns and 'longitude' in df.columns:
analysis['geographic_center'] = {
'lat': df['latitude'].mean(),
'lon': df['longitude'].mean()
}
except Exception as e:
st.warning(f"Error in pattern analysis: {str(e)}")
return {}
return analysis
def calculate_overall_risk(df):
"""Calculate overall risk assessment"""
if df.empty:
return 'low', "No recent seismic activity"
count = len(df)
max_magnitude = df['magnitude'].max()
risk_score = 0
if count >= RISK_THRESHOLDS['extreme']['count']:
risk_score += 40
elif count >= RISK_THRESHOLDS['severe']['count']:
risk_score += 30
elif count >= RISK_THRESHOLDS['high']['count']:
risk_score += 20
elif count >= RISK_THRESHOLDS['moderate']['count']:
risk_score += 10
if max_magnitude >= RISK_THRESHOLDS['extreme']['max_magnitude']:
risk_score += 40
elif max_magnitude >= RISK_THRESHOLDS['severe']['max_magnitude']:
risk_score += 30
elif max_magnitude >= RISK_THRESHOLDS['high']['max_magnitude']:
risk_score += 20
elif max_magnitude >= RISK_THRESHOLDS['moderate']['max_magnitude']:
risk_score += 10
if risk_score >= 60:
risk_level = 'extreme'
elif risk_score >= 40:
risk_level = 'severe'
elif risk_score >= 25:
risk_level = 'high'
elif risk_score >= 10:
risk_level = 'moderate'
else:
risk_level = 'low'
return risk_level, f"Risk Score: {risk_score}/80"
def create_advanced_map(df, region_bbox=None):
"""Create an advanced interactive map"""
if df.empty:
return None
center_lat = df['latitude'].mean()
center_lon = df['longitude'].mean()
m = folium.Map(
location=[center_lat, center_lon],
zoom_start=6,
tiles='OpenStreetMap'
)
for idx, row in df.iterrows():
if row['magnitude'] >= 6.0:
color = 'red'
radius = 15
elif row['magnitude'] >= 5.0:
color = 'orange'
radius = 12
elif row['magnitude'] >= 4.0:
color = 'yellow'
radius = 10
else:
color = 'green'
radius = 8
popup_content = f"""
Magnitude {row['magnitude']}
Location: {row['place']}
Time: {row['time'].strftime('%Y-%m-%d %H:%M:%S')}
Depth: {row['depth']:.1f} km
USGS Details
"""
folium.CircleMarker(
location=[row['latitude'], row['longitude']],
radius=radius,
popup=popup_content,
color=color,
fill=True,
fillOpacity=0.7
).add_to(m)
if region_bbox:
folium.Rectangle(
bounds=[[region_bbox[1], region_bbox[0]], [region_bbox[3], region_bbox[2]]],
color='blue',
weight=2,
fillOpacity=0.1
).add_to(m)
return m
def create_comprehensive_charts(df, analysis):
"""Create comprehensive visualization charts"""
if df.empty:
return []
charts = []
# Magnitude over time with trend - with error handling
fig1 = go.Figure()
fig1.add_trace(go.Scatter(
x=df['time'], y=df['magnitude'],
mode='markers',
marker=dict(
size=df['magnitude'] * 2,
color=df['magnitude'],
colorscale='Reds',
showscale=True
),
name='Earthquakes'
))
# Only add trend line if we have enough data points (at least 2)
if len(df) >= 2:
try:
z = np.polyfit(range(len(df)), df['magnitude'], 1)
p = np.poly1d(z)
fig1.add_trace(go.Scatter(
x=df['time'], y=p(range(len(df))),
mode='lines',
name='Trend',
line=dict(color='blue', dash='dash')
))
except (np.linalg.LinAlgError, ValueError) as e:
# If polynomial fitting fails, just show the scatter plot without trend
st.warning(f"Trend analysis unavailable: {str(e)}")
fig1.update_layout(
title='Earthquake Magnitude Over Time with Trend',
xaxis_title='Time',
yaxis_title='Magnitude',
height=400
)
charts.append(fig1)
# Magnitude distribution histogram - only if we have data
if len(df) > 0:
fig2 = px.histogram(
df, x='magnitude', nbins=min(20, len(df)), # Limit bins to data size
title='Magnitude Distribution',
labels={'magnitude': 'Magnitude', 'count': 'Frequency'}
)
fig2.update_layout(height=400)
charts.append(fig2)
# Depth vs Magnitude scatter - only if we have data
if len(df) > 0:
fig3 = px.scatter(
df, x='depth', y='magnitude', color='magnitude',
title='Depth vs Magnitude Relationship',
labels={'depth': 'Depth (km)', 'magnitude': 'Magnitude'}
)
fig3.update_layout(height=400)
charts.append(fig3)
# Hourly distribution - only if we have the data
if 'hourly_distribution' in analysis and len(analysis['hourly_distribution']) > 0:
fig4 = px.bar(
x=analysis['hourly_distribution'].index,
y=analysis['hourly_distribution'].values,
title='Earthquake Activity by Hour of Day',
labels={'x': 'Hour', 'y': 'Count'}
)
fig4.update_layout(height=400)
charts.append(fig4)
# Risk level distribution - only if we have the data
if 'risk_distribution' in analysis and len(analysis['risk_distribution']) > 0:
fig5 = px.pie(
values=analysis['risk_distribution'].values,
names=analysis['risk_distribution'].index,
title='Risk Level Distribution'
)
fig5.update_layout(height=400)
charts.append(fig5)
return charts
def main():
st.set_page_config(
page_title="π QuakeGuard AI",
page_icon="π",
layout="wide",
initial_sidebar_state="expanded"
)
st.markdown("""
""", unsafe_allow_html=True)
st.markdown('
Risk Score: 0/80
Emergency Protocol: Monitor situation. No immediate action required.
Risk Score: {risk_score}
Emergency Protocol: {EMERGENCY_PROTOCOLS[risk_level]}