MeghanaK25's picture
Update app.py
fb7ce9d verified
import gradio as gr
import pandas as pd
import numpy as np
import joblib
import pickle
from typing import Tuple, Dict, Any
import plotly.graph_objects as go
import plotly.express as px
from plotly.subplots import make_subplots
from huggingface_hub import hf_hub_download
import pickle
MODEL_REPO = "MeghanaK25/ais-isolation-forest" # replace with your repo name
MODEL_FILENAME = "isolationforest_model.pkl"
try:
# Download the model from the Hub
model_path = hf_hub_download(repo_id=MODEL_REPO, filename=MODEL_FILENAME)
# Load the model
with open(model_path, "rb") as f:
model_data = pickle.load(f)
model = model_data["model"]
scaler = model_data["scaler"]
model_name = model_data.get("model_name", "Isolation Forest")
print(f"βœ… Loaded {model_name} model successfully from Hugging Face Hub")
except Exception as e:
print(f"❌ Could not load model from Hub: {e}")
model = None
scaler = None
def preprocess_single_vessel(mmsi, ship_type, nav_status, sog, cog, heading, width, length, draught):
"""Preprocess a single vessel's data for prediction"""
# Create feature vector
features = {}
# Basic features
features['sog'] = float(sog) if sog else 0.0
features['cog'] = float(cog) if cog else 0.0
features['heading'] = float(heading) if heading else features['cog'] # Use COG if heading missing
features['width'] = float(width) if width else 17.0 # median width
features['length'] = float(length) if length else 115.0 # median length
features['draught'] = float(draught) if draught else 6.3 # median draught
# Encode categorical features (simplified mapping)
nav_status_mapping = {
'Under way using engine': 0,
'At anchor': 1,
'Moored': 2,
'Constrained by her draught': 3,
'Unknown value': 4
}
ship_type_mapping = {
'Cargo': 0,
'Tanker': 1,
'Fishing': 2,
'Passenger': 3,
'Tug': 4,
'Military': 5,
'Pleasure': 6,
'Sailing': 7
}
features['navigationalstatus_encoded'] = nav_status_mapping.get(nav_status, 4)
features['shiptype_encoded'] = ship_type_mapping.get(ship_type, 0)
# Derived features
# Speed category
if features['sog'] <= 0.5:
features['speed_category'] = 0
elif features['sog'] <= 3:
features['speed_category'] = 1
elif features['sog'] <= 15:
features['speed_category'] = 2
else:
features['speed_category'] = 3
# Size category
if features['length'] <= 50:
features['size_category'] = 0
elif features['length'] <= 150:
features['size_category'] = 1
elif features['length'] <= 300:
features['size_category'] = 2
else:
features['size_category'] = 3
# Course difference
course_diff = abs(features['cog'] - features['heading'])
features['course_diff'] = min(course_diff, 360 - course_diff) if course_diff > 180 else course_diff
# Aspect ratio
features['aspect_ratio'] = features['length'] / (features['width'] + 0.001)
return features
def predict_anomaly(mmsi, ship_type, nav_status, sog, cog, heading, width, length, draught):
"""Predict if a vessel is anomalous"""
if model is None:
return "❌ Model not loaded - Demo mode", 0.0, "Normal", "Model not available"
try:
# Preprocess input
features = preprocess_single_vessel(mmsi, ship_type, nav_status, sog, cog, heading, width, length, draught)
# Create feature vector in correct order
feature_vector = np.array([[
features['sog'],
features['cog'],
features['heading'],
features['width'],
features['length'],
features['draught'],
features['navigationalstatus_encoded'],
features['shiptype_encoded'],
features['speed_category'],
features['size_category'],
features['course_diff'],
features['aspect_ratio']
]])
# Scale features
feature_vector_scaled = scaler.transform(feature_vector)
# Make prediction
prediction = model.predict(feature_vector_scaled)[0]
anomaly_score = model.decision_function(feature_vector_scaled)[0]
# Interpret results
is_anomaly = prediction == -1
risk_level = "🚨 HIGH RISK" if anomaly_score < -0.1 else "⚠️ ANOMALOUS" if is_anomaly else "βœ… NORMAL"
# Generate explanation
explanation = generate_explanation(features, is_anomaly, anomaly_score)
result = f"{'🚨 ANOMALOUS VESSEL DETECTED' if is_anomaly else 'βœ… Normal vessel behavior'}"
return result, float(anomaly_score), risk_level, explanation
except Exception as e:
return f"❌ Error: {str(e)}", 0.0, "Error", "Could not process input"
def generate_explanation(features: Dict, is_anomaly: bool, score: float) -> str:
"""Generate human-readable explanation of the prediction"""
explanations = []
# Speed analysis
speed = features['sog']
if speed == 0:
explanations.append(f"πŸ›‘ Vessel is stationary (0 knots)")
elif speed < 3:
explanations.append(f"🐌 Very low speed ({speed:.1f} knots)")
elif speed > 20:
explanations.append(f"πŸš€ High speed ({speed:.1f} knots)")
else:
explanations.append(f"⚑ Speed: {speed:.1f} knots")
# Size analysis
length = features['length']
width = features['width']
if length > 300:
explanations.append(f"πŸ—οΈ Very large vessel ({length:.0f}m long)")
elif length < 20:
explanations.append(f"🚀 Small vessel ({length:.0f}m long)")
# Draught analysis
draught = features['draught']
if draught > 10:
explanations.append(f"βš“ Deep draught ({draught:.1f}m) - heavily loaded")
elif draught < 2:
explanations.append(f"πŸͺΆ Shallow draught ({draught:.1f}m) - lightly loaded")
# Course analysis
course_diff = features['course_diff']
if course_diff > 45:
explanations.append(f"🧭 Large course/heading difference ({course_diff:.1f}°)")
# Risk assessment
if is_anomaly:
explanations.append(f"⚠️ Anomaly score: {score:.3f} (threshold: ~0.000)")
explanations.append("πŸ” Recommend: Monitor vessel closely or investigate with SAR imagery")
else:
explanations.append(f"βœ… Normal behavior (score: {score:.3f})")
return " | ".join(explanations)
def create_demo_data():
"""Create sample data for demonstration"""
demo_vessels = [
# Normal cargo ship
[219123456, "Cargo", "Under way using engine", 12.5, 045, 045, 25, 180, 8.5],
# Suspicious stationary tanker
[477307700, "Tanker", "At anchor", 0.1, 180, 135, 60, 333, 15.2],
# High-speed fishing vessel (suspicious)
[219999999, "Fishing", "Engaged in fishing", 25.0, 090, 095, 8, 45, 3.2],
# Large cargo ship with course deviation
[311234567, "Cargo", "Under way using engine", 15.2, 270, 315, 32, 350, 12.1]
]
return demo_vessels
def batch_analysis(file):
"""Analyze multiple vessels from uploaded CSV"""
if file is None:
return "Please upload a CSV file", None
try:
df = pd.read_csv(file.name)
# Check required columns
required_cols = ['mmsi', 'shiptype', 'navigationalstatus', 'sog', 'cog', 'heading', 'width', 'length', 'draught']
missing_cols = [col for col in required_cols if col not in df.columns]
if missing_cols:
return f"Missing columns: {missing_cols}", None
# Process each vessel
results = []
for _, row in df.head(100).iterrows(): # Limit to first 100 for demo
try:
_, score, risk, explanation = predict_anomaly(
row['mmsi'], row['shiptype'], row['navigationalstatus'],
row['sog'], row['cog'], row['heading'],
row['width'], row['length'], row['draught']
)
results.append({
'MMSI': row['mmsi'],
'Ship_Type': row['shiptype'],
'Risk_Level': risk,
'Anomaly_Score': score,
'Speed': row['sog'],
'Length': row['length']
})
except:
continue
result_df = pd.DataFrame(results)
# Create visualization
fig = px.scatter(result_df,
x='Length', y='Speed',
color='Risk_Level',
hover_data=['MMSI', 'Ship_Type', 'Anomaly_Score'],
title='Vessel Anomaly Analysis',
color_discrete_map={
'βœ… NORMAL': 'green',
'⚠️ ANOMALOUS': 'orange',
'🚨 HIGH RISK': 'red'
})
return f"Analyzed {len(results)} vessels", fig
except Exception as e:
return f"Error processing file: {str(e)}", None
# Create Gradio interface
with gr.Blocks(title="🚒 AIS Maritime Anomaly Detection", theme=gr.themes.Ocean()) as app:
gr.Markdown("""
# 🚒 AIS Maritime Anomaly Detection
**Detect suspicious vessel behavior for oil spill prevention and maritime safety**
This system uses advanced machine learning to identify anomalous vessel patterns that could indicate:
- πŸ›’οΈ Illegal oil discharge or transfer operations
- 🚨 Vessels in distress or behaving suspiciously
- βš“ Ships loitering in restricted areas
- πŸ” Navigation patterns requiring investigation
*Models trained on 358K+ AIS records with 99.6% performance score*
""")
with gr.Tab("πŸ” Single Vessel Analysis"):
gr.Markdown("### Analyze individual vessel behavior")
with gr.Row():
with gr.Column():
mmsi_input = gr.Number(label="MMSI (Ship ID)", value=219861000)
ship_type_input = gr.Dropdown(
choices=["Cargo", "Tanker", "Fishing", "Passenger", "Tug", "Military", "Pleasure", "Sailing"],
label="Ship Type", value="Cargo"
)
nav_status_input = gr.Dropdown(
choices=["Under way using engine", "At anchor", "Moored", "Constrained by her draught", "Unknown value"],
label="Navigation Status", value="Under way using engine"
)
with gr.Column():
sog_input = gr.Number(label="Speed Over Ground (knots)", value=12.5)
cog_input = gr.Number(label="Course Over Ground (degrees)", value=45)
heading_input = gr.Number(label="Heading (degrees)", value=45)
with gr.Column():
width_input = gr.Number(label="Width (meters)", value=25)
length_input = gr.Number(label="Length (meters)", value=180)
draught_input = gr.Number(label="Draught (meters)", value=8.5)
analyze_btn = gr.Button("πŸ” Analyze Vessel", variant="primary")
with gr.Row():
with gr.Column():
result_output = gr.Textbox(label="Analysis Result", interactive=False)
risk_output = gr.Textbox(label="Risk Level", interactive=False)
with gr.Column():
score_output = gr.Number(label="Anomaly Score", interactive=False)
explanation_output = gr.Textbox(label="Detailed Explanation", interactive=False, lines=3)
with gr.Tab("πŸ“Š Batch Analysis"):
gr.Markdown("### Upload CSV file for bulk analysis")
gr.Markdown("*CSV should contain columns: mmsi, shiptype, navigationalstatus, sog, cog, heading, width, length, draught*")
file_input = gr.File(label="Upload AIS Data CSV", file_types=[".csv"])
batch_btn = gr.Button("πŸ“Š Analyze Fleet", variant="primary")
batch_result = gr.Textbox(label="Batch Results", interactive=False)
batch_plot = gr.Plot(label="Anomaly Visualization")
with gr.Tab("πŸ“‹ Demo Examples"):
gr.Markdown("### Try these example vessels:")
demo_data = create_demo_data()
for i, vessel_data in enumerate(demo_data):
mmsi, ship_type, nav_status, sog, cog, heading, width, length, draught = vessel_data
with gr.Row():
gr.Markdown(f"**Example {i+1}:** {ship_type} - MMSI {mmsi}")
demo_btn = gr.Button(f"Load Example {i+1}")
def load_demo(data=vessel_data):
return data
demo_btn.click(
fn=load_demo,
outputs=[mmsi_input, ship_type_input, nav_status_input, sog_input,
cog_input, heading_input, width_input, length_input, draught_input]
)
with gr.Tab("ℹ️ About"):
gr.Markdown("""
### Model Information
- **Algorithm**: Isolation Forest (Recommended)
- **Training Data**: 358,351 AIS records from Denmark Maritime Authority
- **Performance**: 100% contamination accuracy, 99.6% overall score
- **Features**: 12 engineered features from AIS data
- **Use Case**: Oil spill detection, maritime safety monitoring
### Key Features Analyzed:
1. **Speed patterns** - Unusual fast/slow movement
2. **Vessel dimensions** - Size vs. behavior correlation
3. **Course deviations** - Navigation inconsistencies
4. **Operational status** - Anchoring/mooring patterns
5. **Draught analysis** - Loading state indicators
### Integration:
This model serves as the AIS anomaly detection component in oil spill monitoring pipelines,
triggering Sentinel-1 SAR analysis for suspicious vessel locations.
**🌍 Environmental Impact**: Protecting marine ecosystems through early detection of potential pollution events.
""")
# Event handlers
analyze_btn.click(
fn=predict_anomaly,
inputs=[mmsi_input, ship_type_input, nav_status_input, sog_input,
cog_input, heading_input, width_input, length_input, draught_input],
outputs=[result_output, score_output, risk_output, explanation_output]
)
batch_btn.click(
fn=batch_analysis,
inputs=[file_input],
outputs=[batch_result, batch_plot]
)
# Launch the app
if __name__ == "__main__":
app.launch(share=True)