Vikctor's picture
fix: variable name
6777160 verified
"""
FastAPI Earthquake Prediction System
Uses real-time USGS earthquake data to compute features and make predictions
Complete feature pipeline with correct transformations
"""
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, Field, ConfigDict
from typing import Optional, Dict, List, Any
import joblib
import catboost
import pandas as pd
import numpy as np
import requests
from datetime import datetime, timedelta
import logging
import math
import geopandas as gpd
from shapely.geometry import Point
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Initialize FastAPI
app = FastAPI(
title="Earthquake Prediction API",
description="Real-time earthquake prediction using USGS data and machine learning",
version="1.0.0"
)
# Add CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# ============================================================================
# Global Variables
# ============================================================================
occurrence_transformer = None
occurrence_model = None
severity_transformer = None
severity_model = None
USGS_API_BASE = "https://earthquake.usgs.gov/fdsnws/event/1/query"
ELEVATION_API = "https://api.open-elevation.com/api/v1/lookup"
DEFAULT_RADIUS_KM = 100 # Default radius for USGS data fetch
# ============================================================================
# Pydantic Models
# ============================================================================
class PredictionRequest(BaseModel):
model_config = ConfigDict(arbitrary_types_allowed=True)
lat: float = Field(..., ge=-90, le=90, description="lat (-90 to 90)")
lon: float = Field(..., ge=-180, le=180, description="lon (-180 to 180)")
time: str = Field(..., description="Prediction time in ISO format (e.g., '2025-10-22T14:00:00')")
class PredictionResponse(BaseModel):
model_config = ConfigDict(arbitrary_types_allowed=True)
location: Dict[str, Any]
all_features: Dict[str, Any]
features_for_transformation: Dict[str, float]
selected_features: Dict[str, float]
occurrence_prediction: Dict[str, Any] # Allows float for confidence
severity_prediction: Optional[Dict[str, Any]]
risk_assessment: Dict[str, Any]
data_quality: Dict[str, Any]
timestamp: str
# ============================================================================
# Startup Event - Load Models
# ============================================================================
@app.on_event("startup")
async def load_models():
"""Load all models and transformers on startup"""
global occurrence_transformer, occurrence_model
global severity_transformer, severity_model
try:
logger.info("Loading transformers...")
occurrence_transformer = joblib.load('occurence_transformer.joblib')
severity_transformer = joblib.load('severity_transformer.joblib')
logger.info("Loading occurrence model...")
occurrence_model = catboost.CatBoostClassifier()
occurrence_model.load_model('occurence_model.cbm')
logger.info("Loading severity model...")
severity_model = catboost.CatBoostClassifier()
severity_model.load_model('severity_model.cbm')
logger.info("All models loaded successfully!")
logger.info(f"Transformer expects: {list(occurrence_transformer.feature_names_in_)}")
except Exception as e:
logger.error(f"Error loading models: {e}")
raise
# ============================================================================
# USGS Data Fetching Functions
# ============================================================================
def fetch_usgs_earthquakes(
lat: float,
lon: float,
radius_km: float,
start_time: datetime,
end_time: datetime,
min_magnitude: float = 0.0
) -> List[Dict]:
"""
Fetch earthquake data from USGS API
"""
params = {
'format': 'geojson',
'lat': lat,
'lon': lon,
'maxradiuskm': radius_km,
'starttime': start_time.strftime('%Y-%m-%dT%H:%M:%S'),
'endtime': end_time.strftime('%Y-%m-%dT%H:%M:%S'),
'minmagnitude': min_magnitude,
'orderby': 'time'
}
try:
logger.info(f"Fetching earthquakes from USGS API...")
logger.info(f" Location: ({lat}, {lon})")
logger.info(f" Radius: {radius_km} km")
logger.info(f" Time range: {start_time} to {end_time}")
response = requests.get(USGS_API_BASE, params=params, timeout=30)
response.raise_for_status()
data = response.json()
earthquakes = []
if 'features' in data:
for feature in data['features']:
props = feature['properties']
coords = feature['geometry']['coordinates']
earthquakes.append({
'magnitude': props.get('mag', 0),
'lat': coords[1],
'lon': coords[0],
'depth': coords[2],
'time': datetime.fromtimestamp(props['time'] / 1000),
'place': props.get('place', 'Unknown')
})
logger.info(f" Found {len(earthquakes)} earthquakes")
return earthquakes
except requests.exceptions.RequestException as e:
logger.error(f"Error fetching USGS data: {e}")
return []
def get_elevation(lat: float, lon: float) -> float:
"""
Get elevation for a location using Open-Elevation API
"""
try:
params = {
'locations': f"{lat},{lon}"
}
response = requests.get(ELEVATION_API, params=params, timeout=10)
response.raise_for_status()
data = response.json()
if 'results' in data and len(data['results']) > 0:
elevation = data['results'][0]['elevation']
logger.info(f"Elevation: {elevation}m")
return float(elevation)
except Exception as e:
logger.warning(f"Could not fetch elevation: {e}")
return 0.0
# ============================================================================
# Tectonic and Geological Functions
# ============================================================================
BOUNDARIES_FILE = "tectonicplates-master/PB2002_steps.shp"
try:
BOUNDARIES = gpd.read_file(BOUNDARIES_FILE)
logger.info(f"Successfully loaded PB2002 steps with {len(BOUNDARIES)} records")
logger.info(f"Available columns: {list(BOUNDARIES.columns)}")
# Ensure STEPCLASS is correctly recognized (case-insensitive match)
step_class_col = next((col for col in BOUNDARIES.columns if 'stepclass' in col.lower()), None)
if step_class_col and step_class_col != 'StepClass':
logger.info(f"Renaming {step_class_col} to StepClass")
BOUNDARIES = BOUNDARIES.rename(columns={step_class_col: 'StepClass'})
except Exception as e:
logger.error(f"Failed to load PB2002 steps: {e}")
raise
def simplify_boundary_type(bt: str) -> int:
"""Map PB2002 STEPCLASS to integer based on simplified categories."""
boundary_types = {
'SUB': 0, # Subduction (Convergent)
'OCB': 0, # Oceanic Convergent Boundary (Convergent)
'CCB': 0, # Continental Convergent Boundary (Convergent)
'OSR': 1, # Oceanic Spreading Ridge (Divergent)
'CRB': 1, # Continental Rift Boundary (Divergent)
'OTF': 2, # Oceanic Transform Fault (Transform)
'CTF': 2 # Continental Transform Fault (Transform)
}
return boundary_types.get(bt, 3) # Default to 3 (other) for unrecognized types
def determine_boundary_type(lat: float, lon: float, max_distance_km: float = 1000000000000.0) -> int:
"""
Determine tectonic boundary type using PB2002 STEPCLASS and fallback to proximity.
Returns encoded integer: 0=convergent, 1=divergent, 2=transform, 3=other
"""
if not (-90 <= lat <= 90):
raise ValueError(f"lat {lat} must be between -90 and 90 degrees")
if not (-180 <= lon <= 180):
raise ValueError(f"lon {lon} must be between -180 and 180 degrees")
point = Point(lon, lat)
min_distance = float('inf')
closest_type = 3
closest_code = None
logger.info(f"Checking boundaries for location ({lat}, {lon})")
if 'StepClass' in BOUNDARIES.columns:
for idx, row in BOUNDARIES.iterrows():
distance = row.geometry.distance(point) * 111 # Approximate km
code = row.get('StepClass', None)
if code is None:
logger.warning(f"Empty StepClass at index {idx}")
continue
if distance <= max_distance_km and distance < min_distance:
min_distance = distance
closest_code = code
closest_type = simplify_boundary_type(code)
logger.info(f"PB2002 result: code={closest_code}, type={closest_type}, distance={min_distance:.2f} km")
else:
logger.warning("No StepClass column found, using fallback logic")
# Fallback logic
if closest_type == 3:
logger.info("Using fallback logic for boundary type based on proximity")
known_boundaries = [
(36.0, -121.0, 2, "San Andreas Fault"), # Transform
(38.0, 142.0, 0, "Japan Trench"), # Convergent
(-15.0, -75.0, 0, "Peru-Chile Trench"), # Convergent
(37.0, 29.0, 2, "North Anatolian Fault"), # Transform
(28.0, 85.0, 0, "Himalayan Front"), # Convergent
(-41.0, 174.0, 2, "Alpine Fault"), # Transform
(61.0, -147.0, 0, "Alaska"), # Convergent
(19.0, -155.0, 1, "Hawaii") # Divergent
]
for boundary_lat, boundary_lon, boundary_type, name in known_boundaries:
distance = haversine_distance(lat, lon, boundary_lat, boundary_lon)
logger.info(f" {name}: {distance:.2f} km, type={boundary_type}")
if distance <= max_distance_km and distance < min_distance:
min_distance = distance
closest_type = boundary_type
closest_code = f"Fallback_{name}"
logger.info(f"Fallback result: type={closest_type}, code={closest_code}, distance={min_distance:.2f} km")
return closest_type
def determine_crust_type(elevation: float) -> int:
"""
Determine crust type based on elevation: 0=oceanic (elevation < 0), 1=continental (elevation >= 0)
"""
return 0 if elevation < 0 else 1
# ============================================================================
# Feature Engineering Functions
# ============================================================================
def calculate_seismic_energy(magnitude: float) -> float:
"""
Calculate seismic energy from magnitude using the Gutenberg-Richter relation
"""
return 10 ** (1.5 * magnitude + 4.8)
def haversine_distance(lat1: float, lon1: float, lat2: float, lon2: float) -> float:
"""
Calculate the great circle distance between two points on Earth (in kilometers)
"""
R = 6371
lat1_rad = math.radians(lat1)
lat2_rad = math.radians(lat2)
dlat = math.radians(lat2 - lat1)
dlon = math.radians(lon2 - lon1)
a = (math.sin(dlat / 2) ** 2 +
math.cos(lat1_rad) * math.cos(lat2_rad) * math.sin(dlon / 2) ** 2)
c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
return R * c
def estimate_distance_to_boundary(lat: float, lon: float) -> float:
"""
Estimate distance to nearest tectonic plate boundary using hardcoded active zones
"""
active_zones = [
(36.0, -121.0), # San Andreas Fault
(38.0, 142.0), # Japan Trench
(-15.0, -75.0), # Peru-Chile Trench
(37.0, 29.0), # North Anatolian Fault
(28.0, 85.0), # Himalayan Front
(-41.0, 174.0), # Alpine Fault
(61.0, -147.0), # Alaska
(19.0, -155.0), # Hawaii
]
min_distance = float('inf')
for zone_lat, zone_lon in active_zones:
distance = haversine_distance(lat, lon, zone_lat, zone_lon)
min_distance = min(min_distance, distance)
logger.info(f"Estimated distance to nearest boundary: {min_distance:.2f} km")
return min_distance
def compute_all_features(
lat: float,
lon: float,
prediction_time: datetime
) -> tuple:
"""
Compute ALL features in the pipeline
"""
logger.info("=" * 80)
logger.info("STEP 1: Fetching historical earthquake data from USGS")
logger.info("=" * 80)
earthquakes_1d = fetch_usgs_earthquakes(lat, lon, DEFAULT_RADIUS_KM, prediction_time - timedelta(days=1),
prediction_time)
earthquakes_7d = fetch_usgs_earthquakes(lat, lon, DEFAULT_RADIUS_KM, prediction_time - timedelta(days=7),
prediction_time)
earthquakes_30d = fetch_usgs_earthquakes(lat, lon, DEFAULT_RADIUS_KM,
prediction_time - timedelta(days=30), prediction_time)
earthquakes_90d = fetch_usgs_earthquakes(lat, lon, DEFAULT_RADIUS_KM,
prediction_time - timedelta(days=90), prediction_time)
logger.info("=" * 80)
logger.info("STEP 2: Computing ALL features")
logger.info("=" * 80)
all_features = {}
all_features['count_prev_1d'] = len(earthquakes_1d)
if earthquakes_1d:
magnitudes_1d = [eq['magnitude'] for eq in earthquakes_1d]
all_features['meanmag_prev_1d'] = np.mean(magnitudes_1d)
all_features['maxmag_prev_1d'] = np.max(magnitudes_1d)
total_energy_1d = sum(calculate_seismic_energy(m) for m in magnitudes_1d)
all_features['log_energy_prev_1d'] = np.log10(total_energy_1d) if total_energy_1d > 0 else 0
else:
all_features['meanmag_prev_1d'] = 0.0
all_features['maxmag_prev_1d'] = 0.0
all_features['log_energy_prev_1d'] = 0.0
all_features['count_prev_7d'] = len(earthquakes_7d)
if earthquakes_7d:
magnitudes_7d = [eq['magnitude'] for eq in earthquakes_7d]
all_features['meanmag_prev_7d'] = np.mean(magnitudes_7d)
all_features['maxmag_prev_7d'] = np.max(magnitudes_7d)
total_energy_7d = sum(calculate_seismic_energy(m) for m in magnitudes_7d)
all_features['log_energy_prev_7d'] = np.log10(total_energy_7d) if total_energy_7d > 0 else 0
else:
all_features['meanmag_prev_7d'] = 0.0
all_features['maxmag_prev_7d'] = 0.0
all_features['log_energy_prev_7d'] = 0.0
all_features['count_prev_30d'] = len(earthquakes_30d)
if earthquakes_30d:
magnitudes_30d = [eq['magnitude'] for eq in earthquakes_30d]
all_features['meanmag_prev_30d'] = np.mean(magnitudes_30d)
all_features['maxmag_prev_30d'] = np.max(magnitudes_30d)
total_energy_30d = sum(calculate_seismic_energy(m) for m in magnitudes_30d)
all_features['log_energy_prev_30d'] = np.log10(total_energy_30d) if total_energy_30d > 0 else 0
else:
all_features['meanmag_prev_30d'] = 0.0
all_features['maxmag_prev_30d'] = 0.0
all_features['log_energy_prev_30d'] = 0.0
all_features['count_prev_90d'] = len(earthquakes_90d)
if earthquakes_90d:
magnitudes_90d = [eq['magnitude'] for eq in earthquakes_90d]
all_features['meanmag_prev_90d'] = np.mean(magnitudes_90d)
all_features['maxmag_prev_90d'] = np.max(magnitudes_90d)
total_energy_90d = sum(calculate_seismic_energy(m) for m in magnitudes_90d)
all_features['log_energy_prev_90d'] = np.log10(total_energy_90d) if total_energy_90d > 0 else 0
else:
all_features['meanmag_prev_90d'] = 0.0
all_features['maxmag_prev_90d'] = 0.0
all_features['log_energy_prev_90d'] = 0.0
if earthquakes_7d:
latest_earthquake = max(earthquakes_7d, key=lambda x: x['time'])
days_since = (prediction_time - latest_earthquake['time']).total_seconds() / 86400
all_features['days_since_last_event'] = days_since
else:
all_features['days_since_last_event'] = 7.0
rate_7d = all_features['count_prev_7d'] / 7.0
rate_30d = all_features['count_prev_30d'] / 30.0
if rate_30d > 0:
all_features['rate_change_7d_vs_30d'] = (rate_7d - rate_30d) / rate_30d
else:
all_features['rate_change_7d_vs_30d'] = 0.0
elevation = get_elevation(lat, lon)
all_features['dist_to_boundary_km'] = estimate_distance_to_boundary(lat, lon)
all_features['boundary_type'] = determine_boundary_type(lat, lon)
all_features['crust_type'] = determine_crust_type(elevation)
all_features['elevation_m'] = elevation
all_features['month'] = prediction_time.month
logger.info("All features computed:")
for key, value in all_features.items():
logger.info(f" {key}: {value}")
logger.info("=" * 80)
logger.info("STEP 3: Extracting 13 features for transformation")
logger.info("=" * 80)
transformation_features = {
'count_prev_1d': all_features['count_prev_1d'],
'meanmag_prev_1d': all_features['meanmag_prev_1d'],
'maxmag_prev_1d': all_features['maxmag_prev_1d'],
'log_energy_prev_1d': all_features['log_energy_prev_1d'],
'count_prev_7d': all_features['count_prev_7d'],
'meanmag_prev_7d': all_features['meanmag_prev_7d'],
'maxmag_prev_7d': all_features['maxmag_prev_7d'],
'log_energy_prev_7d': all_features['log_energy_prev_7d'],
'count_prev_30d': all_features['count_prev_30d'],
'count_prev_90d': all_features['count_prev_90d'],
'days_since_last_event': all_features['days_since_last_event'],
'rate_change_7d_vs_30d': all_features['rate_change_7d_vs_30d'],
'dist_to_boundary_km': all_features['dist_to_boundary_km']
}
logger.info("Features for transformation:")
for key, value in transformation_features.items():
logger.info(f" {key}: {value}")
logger.info("=" * 80)
logger.info("STEP 4: Computing cyclic month features")
logger.info("=" * 80)
month_sin = np.sin(2 * np.pi * prediction_time.month / 12)
month_cos = np.cos(2 * np.pi * prediction_time.month / 12)
logger.info(f" month: {prediction_time.month}")
logger.info(f" month_sin: {month_sin}")
logger.info(f" month_cos: {month_cos}")
data_info = {
'earthquakes_1d': len(earthquakes_1d),
'earthquakes_7d': len(earthquakes_7d),
'earthquakes_30d': len(earthquakes_30d),
'earthquakes_90d': len(earthquakes_90d),
'latest_earthquake': earthquakes_7d[0] if earthquakes_7d else None
}
return all_features, transformation_features, month_sin, month_cos, data_info
# ============================================================================
# API Endpoints
# ============================================================================
@app.get("/")
async def root():
"""Health check endpoint"""
return {
"status": "online",
"service": "Earthquake Prediction API",
"version": "1.0.0",
"models_loaded": all([
occurrence_transformer is not None,
occurrence_model is not None,
severity_transformer is not None,
severity_model is not None
])
}
@app.post("/predict", response_model=PredictionResponse)
async def predict_earthquake(request: PredictionRequest):
"""
Predict earthquake occurrence and severity for a given location and time
"""
try:
logger.info("=" * 80)
logger.info(f"NEW PREDICTION REQUEST")
logger.info(f"Location: ({request.lat}, {request.lon})")
logger.info(f"Time: {request.time}")
logger.info("=" * 80)
# Parse prediction time
try:
prediction_time = datetime.fromisoformat(request.time)
except ValueError:
raise HTTPException(status_code=400,
detail="Invalid time format. Use ISO format (e.g., '2025-10-22T14:00:00')")
all_features, transformation_features, month_sin, month_cos, data_info = compute_all_features(
request.lat,
request.lon,
prediction_time
)
logger.info("=" * 80)
logger.info("STEP 5: Applying PowerTransformer to 13 features")
logger.info("=" * 80)
transformer_feature_names = occurrence_transformer.feature_names_in_
df_for_transform = pd.DataFrame([transformation_features])[transformer_feature_names]
logger.info(f"DataFrame shape: {df_for_transform.shape}")
logger.info(f"Columns: {list(df_for_transform.columns)}")
transformed_features = occurrence_transformer.transform(df_for_transform)
logger.info(f"βœ“ Transformation successful")
logger.info(f" Transformed shape: {transformed_features.shape}")
logger.info(f" Sample values: {transformed_features[0][:5]}")
transformed_dict = {}
for i, feature_name in enumerate(transformer_feature_names):
transformed_dict[feature_name] = transformed_features[0][i]
logger.info("=" * 80)
logger.info("STEP 6: Building 16 selected features for model")
logger.info("=" * 80)
selected_features = {
'meanmag_prev_1d': transformed_dict['meanmag_prev_1d'],
'maxmag_prev_1d': transformed_dict['maxmag_prev_1d'],
'meanmag_prev_7d': transformed_dict['meanmag_prev_7d'],
'log_energy_prev_7d': transformed_dict['log_energy_prev_7d'],
'meanmag_prev_30d': all_features['meanmag_prev_30d'],
'log_energy_prev_30d': all_features['log_energy_prev_30d'],
'meanmag_prev_90d': all_features['meanmag_prev_90d'],
'log_energy_prev_90d': all_features['log_energy_prev_90d'],
'days_since_last_event': transformed_dict['days_since_last_event'],
'rate_change_7d_vs_30d': transformed_dict['rate_change_7d_vs_30d'],
'dist_to_boundary_km': transformed_dict['dist_to_boundary_km'],
'elevation_m': all_features['elevation_m'],
'boundary_type': all_features['boundary_type'],
'crust_type': all_features['crust_type'],
'month_sin': month_sin,
'month_cos': month_cos
}
logger.info("Selected features (in order):")
for key, value in selected_features.items():
logger.info(f" {key}: {value}")
logger.info("=" * 80)
logger.info("STEP 7: Creating final DataFrame for model")
logger.info("=" * 80)
final_df = pd.DataFrame([selected_features])
logger.info(f"Final DataFrame shape: {final_df.shape}")
logger.info(f"Final columns: {list(final_df.columns)}")
logger.info("=" * 80)
logger.info("STEP 8: Making predictions")
logger.info("=" * 80)
occurrence_pred = occurrence_model.predict(final_df)[0]
occurrence_prob = occurrence_model.predict_proba(final_df)[0]
will_occur = int(occurrence_pred) # 0 for not occurred, 1 for occurred
confidence = float(occurrence_prob[1]) # Keep as float for accuracy
logger.info(f"βœ“ Occurrence prediction: {will_occur}")
logger.info(f" Confidence: {confidence:.2%}")
logger.info(f" Probabilities: [No EQ: {occurrence_prob[0]:.4f}, EQ: {occurrence_prob[1]:.4f}]")
severity_result = None
if will_occur:
logger.info("Predicting severity...")
severity_pred = severity_model.predict(final_df)[0]
severity_prob = severity_model.predict_proba(final_df)[0]
severity_class = int(severity_pred) # 0 for medium, 1 for high
severity_result = {
"severity_class": severity_class,
"confidence": round(float(severity_prob[severity_pred]), 4)
}
logger.info(f"βœ“ Severity: {severity_class}")
logger.info(f" Confidence: {severity_result['confidence']:.2%}")
if will_occur and severity_result:
if severity_result['severity_class'] == 1:
risk_level = "HIGH"
recommendation = "Immediate evacuation and emergency preparedness"
else:
risk_level = "MODERATE"
recommendation = "Stay alert and prepare emergency supplies"
else:
risk_level = "VERY LOW"
recommendation = "No significant seismic activity expected"
severity_result = {
"severity_class": 0,
"confidence": round(float(1-confidence), 4)
}
logger.info(f"βœ“ Risk Level: {risk_level}")
logger.info("=" * 80)
response = PredictionResponse(
location={
"lat": request.lat,
"lon": request.lon,
"time": request.time
},
all_features=all_features,
features_for_transformation=transformation_features,
selected_features=selected_features,
occurrence_prediction={
"will_occur": will_occur,
"confidence": confidence # Float value
},
severity_prediction=severity_result,
risk_assessment={
"risk_level": risk_level,
"recommendation": recommendation
},
data_quality={
"earthquakes_analyzed": {
"last_1_day": data_info['earthquakes_1d'],
"last_7_days": data_info['earthquakes_7d'],
"last_30_days": data_info['earthquakes_30d'],
"last_90_days": data_info['earthquakes_90d']
},
"latest_earthquake": data_info['latest_earthquake']['place'] if data_info[
'latest_earthquake'] else "None in past 7 days",
"data_source": "USGS Earthquake Catalog",
"boundary_type": all_features['boundary_type'],
"crust_type": all_features['crust_type'],
"elevation_m": all_features['elevation_m']
},
timestamp=datetime.utcnow().isoformat()
)
logger.info("βœ“ Prediction completed successfully!")
logger.info("=" * 80)
return response
except Exception as e:
logger.error(f"βœ— Prediction error: {e}", exc_info=True)
raise HTTPException(status_code=500, detail=str(e))
@app.api_route("/health", methods=["GET", "HEAD"])
async def health_check():
"""Detailed health check"""
return {
"status": "healthy",
"models": {
"occurrence_transformer": occurrence_transformer is not None,
"occurrence_model": occurrence_model is not None,
"severity_transformer": severity_transformer is not None,
"severity_model": severity_model is not None
},
"external_services": {
"usgs_api": "operational",
"elevation_api": "operational"
},
"timestamp": datetime.utcnow().isoformat()
}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=7860)