| import gradio as gr |
| import pandas as pd |
| import numpy as np |
| import joblib |
| import pickle |
| from typing import Tuple, Dict, Any |
| import plotly.graph_objects as go |
| import plotly.express as px |
| from plotly.subplots import make_subplots |
|
|
| from huggingface_hub import hf_hub_download |
| import pickle |
|
|
| MODEL_REPO = "MeghanaK25/ais-isolation-forest" |
| MODEL_FILENAME = "isolationforest_model.pkl" |
|
|
| try: |
| |
| model_path = hf_hub_download(repo_id=MODEL_REPO, filename=MODEL_FILENAME) |
| |
| |
| with open(model_path, "rb") as f: |
| model_data = pickle.load(f) |
| |
| model = model_data["model"] |
| scaler = model_data["scaler"] |
| model_name = model_data.get("model_name", "Isolation Forest") |
| |
| print(f"β
Loaded {model_name} model successfully from Hugging Face Hub") |
| except Exception as e: |
| print(f"β Could not load model from Hub: {e}") |
| model = None |
| scaler = None |
|
|
| def preprocess_single_vessel(mmsi, ship_type, nav_status, sog, cog, heading, width, length, draught): |
| """Preprocess a single vessel's data for prediction""" |
| |
| |
| features = {} |
| |
| |
| features['sog'] = float(sog) if sog else 0.0 |
| features['cog'] = float(cog) if cog else 0.0 |
| features['heading'] = float(heading) if heading else features['cog'] |
| features['width'] = float(width) if width else 17.0 |
| features['length'] = float(length) if length else 115.0 |
| features['draught'] = float(draught) if draught else 6.3 |
| |
| |
| nav_status_mapping = { |
| 'Under way using engine': 0, |
| 'At anchor': 1, |
| 'Moored': 2, |
| 'Constrained by her draught': 3, |
| 'Unknown value': 4 |
| } |
| |
| ship_type_mapping = { |
| 'Cargo': 0, |
| 'Tanker': 1, |
| 'Fishing': 2, |
| 'Passenger': 3, |
| 'Tug': 4, |
| 'Military': 5, |
| 'Pleasure': 6, |
| 'Sailing': 7 |
| } |
| |
| features['navigationalstatus_encoded'] = nav_status_mapping.get(nav_status, 4) |
| features['shiptype_encoded'] = ship_type_mapping.get(ship_type, 0) |
| |
| |
| |
| if features['sog'] <= 0.5: |
| features['speed_category'] = 0 |
| elif features['sog'] <= 3: |
| features['speed_category'] = 1 |
| elif features['sog'] <= 15: |
| features['speed_category'] = 2 |
| else: |
| features['speed_category'] = 3 |
| |
| |
| if features['length'] <= 50: |
| features['size_category'] = 0 |
| elif features['length'] <= 150: |
| features['size_category'] = 1 |
| elif features['length'] <= 300: |
| features['size_category'] = 2 |
| else: |
| features['size_category'] = 3 |
| |
| |
| course_diff = abs(features['cog'] - features['heading']) |
| features['course_diff'] = min(course_diff, 360 - course_diff) if course_diff > 180 else course_diff |
| |
| |
| features['aspect_ratio'] = features['length'] / (features['width'] + 0.001) |
| |
| return features |
|
|
| def predict_anomaly(mmsi, ship_type, nav_status, sog, cog, heading, width, length, draught): |
| """Predict if a vessel is anomalous""" |
| |
| if model is None: |
| return "β Model not loaded - Demo mode", 0.0, "Normal", "Model not available" |
| |
| try: |
| |
| features = preprocess_single_vessel(mmsi, ship_type, nav_status, sog, cog, heading, width, length, draught) |
| |
| |
| feature_vector = np.array([[ |
| features['sog'], |
| features['cog'], |
| features['heading'], |
| features['width'], |
| features['length'], |
| features['draught'], |
| features['navigationalstatus_encoded'], |
| features['shiptype_encoded'], |
| features['speed_category'], |
| features['size_category'], |
| features['course_diff'], |
| features['aspect_ratio'] |
| ]]) |
| |
| |
| feature_vector_scaled = scaler.transform(feature_vector) |
| |
| |
| prediction = model.predict(feature_vector_scaled)[0] |
| anomaly_score = model.decision_function(feature_vector_scaled)[0] |
| |
| |
| is_anomaly = prediction == -1 |
| risk_level = "π¨ HIGH RISK" if anomaly_score < -0.1 else "β οΈ ANOMALOUS" if is_anomaly else "β
NORMAL" |
| |
| |
| explanation = generate_explanation(features, is_anomaly, anomaly_score) |
| |
| result = f"{'π¨ ANOMALOUS VESSEL DETECTED' if is_anomaly else 'β
Normal vessel behavior'}" |
| |
| return result, float(anomaly_score), risk_level, explanation |
| |
| except Exception as e: |
| return f"β Error: {str(e)}", 0.0, "Error", "Could not process input" |
|
|
| def generate_explanation(features: Dict, is_anomaly: bool, score: float) -> str: |
| """Generate human-readable explanation of the prediction""" |
| |
| explanations = [] |
| |
| |
| speed = features['sog'] |
| if speed == 0: |
| explanations.append(f"π Vessel is stationary (0 knots)") |
| elif speed < 3: |
| explanations.append(f"π Very low speed ({speed:.1f} knots)") |
| elif speed > 20: |
| explanations.append(f"π High speed ({speed:.1f} knots)") |
| else: |
| explanations.append(f"β‘ Speed: {speed:.1f} knots") |
| |
| |
| length = features['length'] |
| width = features['width'] |
| if length > 300: |
| explanations.append(f"ποΈ Very large vessel ({length:.0f}m long)") |
| elif length < 20: |
| explanations.append(f"π€ Small vessel ({length:.0f}m long)") |
| |
| |
| draught = features['draught'] |
| if draught > 10: |
| explanations.append(f"β Deep draught ({draught:.1f}m) - heavily loaded") |
| elif draught < 2: |
| explanations.append(f"πͺΆ Shallow draught ({draught:.1f}m) - lightly loaded") |
| |
| |
| course_diff = features['course_diff'] |
| if course_diff > 45: |
| explanations.append(f"π§ Large course/heading difference ({course_diff:.1f}Β°)") |
| |
| |
| if is_anomaly: |
| explanations.append(f"β οΈ Anomaly score: {score:.3f} (threshold: ~0.000)") |
| explanations.append("π Recommend: Monitor vessel closely or investigate with SAR imagery") |
| else: |
| explanations.append(f"β
Normal behavior (score: {score:.3f})") |
| |
| return " | ".join(explanations) |
|
|
| def create_demo_data(): |
| """Create sample data for demonstration""" |
| demo_vessels = [ |
| |
| [219123456, "Cargo", "Under way using engine", 12.5, 045, 045, 25, 180, 8.5], |
| |
| [477307700, "Tanker", "At anchor", 0.1, 180, 135, 60, 333, 15.2], |
| |
| [219999999, "Fishing", "Engaged in fishing", 25.0, 090, 095, 8, 45, 3.2], |
| |
| [311234567, "Cargo", "Under way using engine", 15.2, 270, 315, 32, 350, 12.1] |
| ] |
| |
| return demo_vessels |
|
|
| def batch_analysis(file): |
| """Analyze multiple vessels from uploaded CSV""" |
| if file is None: |
| return "Please upload a CSV file", None |
| |
| try: |
| df = pd.read_csv(file.name) |
| |
| |
| required_cols = ['mmsi', 'shiptype', 'navigationalstatus', 'sog', 'cog', 'heading', 'width', 'length', 'draught'] |
| missing_cols = [col for col in required_cols if col not in df.columns] |
| |
| if missing_cols: |
| return f"Missing columns: {missing_cols}", None |
| |
| |
| results = [] |
| for _, row in df.head(100).iterrows(): |
| try: |
| _, score, risk, explanation = predict_anomaly( |
| row['mmsi'], row['shiptype'], row['navigationalstatus'], |
| row['sog'], row['cog'], row['heading'], |
| row['width'], row['length'], row['draught'] |
| ) |
| |
| results.append({ |
| 'MMSI': row['mmsi'], |
| 'Ship_Type': row['shiptype'], |
| 'Risk_Level': risk, |
| 'Anomaly_Score': score, |
| 'Speed': row['sog'], |
| 'Length': row['length'] |
| }) |
| except: |
| continue |
| |
| result_df = pd.DataFrame(results) |
| |
| |
| fig = px.scatter(result_df, |
| x='Length', y='Speed', |
| color='Risk_Level', |
| hover_data=['MMSI', 'Ship_Type', 'Anomaly_Score'], |
| title='Vessel Anomaly Analysis', |
| color_discrete_map={ |
| 'β
NORMAL': 'green', |
| 'β οΈ ANOMALOUS': 'orange', |
| 'π¨ HIGH RISK': 'red' |
| }) |
| |
| return f"Analyzed {len(results)} vessels", fig |
| |
| except Exception as e: |
| return f"Error processing file: {str(e)}", None |
|
|
| |
| with gr.Blocks(title="π’ AIS Maritime Anomaly Detection", theme=gr.themes.Ocean()) as app: |
| |
| gr.Markdown(""" |
| # π’ AIS Maritime Anomaly Detection |
| |
| **Detect suspicious vessel behavior for oil spill prevention and maritime safety** |
| |
| This system uses advanced machine learning to identify anomalous vessel patterns that could indicate: |
| - π’οΈ Illegal oil discharge or transfer operations |
| - π¨ Vessels in distress or behaving suspiciously |
| - β Ships loitering in restricted areas |
| - π Navigation patterns requiring investigation |
| |
| *Models trained on 358K+ AIS records with 99.6% performance score* |
| """) |
| |
| with gr.Tab("π Single Vessel Analysis"): |
| gr.Markdown("### Analyze individual vessel behavior") |
| |
| with gr.Row(): |
| with gr.Column(): |
| mmsi_input = gr.Number(label="MMSI (Ship ID)", value=219861000) |
| ship_type_input = gr.Dropdown( |
| choices=["Cargo", "Tanker", "Fishing", "Passenger", "Tug", "Military", "Pleasure", "Sailing"], |
| label="Ship Type", value="Cargo" |
| ) |
| nav_status_input = gr.Dropdown( |
| choices=["Under way using engine", "At anchor", "Moored", "Constrained by her draught", "Unknown value"], |
| label="Navigation Status", value="Under way using engine" |
| ) |
| |
| with gr.Column(): |
| sog_input = gr.Number(label="Speed Over Ground (knots)", value=12.5) |
| cog_input = gr.Number(label="Course Over Ground (degrees)", value=45) |
| heading_input = gr.Number(label="Heading (degrees)", value=45) |
| |
| with gr.Column(): |
| width_input = gr.Number(label="Width (meters)", value=25) |
| length_input = gr.Number(label="Length (meters)", value=180) |
| draught_input = gr.Number(label="Draught (meters)", value=8.5) |
| |
| analyze_btn = gr.Button("π Analyze Vessel", variant="primary") |
| |
| with gr.Row(): |
| with gr.Column(): |
| result_output = gr.Textbox(label="Analysis Result", interactive=False) |
| risk_output = gr.Textbox(label="Risk Level", interactive=False) |
| with gr.Column(): |
| score_output = gr.Number(label="Anomaly Score", interactive=False) |
| explanation_output = gr.Textbox(label="Detailed Explanation", interactive=False, lines=3) |
| |
| with gr.Tab("π Batch Analysis"): |
| gr.Markdown("### Upload CSV file for bulk analysis") |
| gr.Markdown("*CSV should contain columns: mmsi, shiptype, navigationalstatus, sog, cog, heading, width, length, draught*") |
| |
| file_input = gr.File(label="Upload AIS Data CSV", file_types=[".csv"]) |
| batch_btn = gr.Button("π Analyze Fleet", variant="primary") |
| |
| batch_result = gr.Textbox(label="Batch Results", interactive=False) |
| batch_plot = gr.Plot(label="Anomaly Visualization") |
| |
| with gr.Tab("π Demo Examples"): |
| gr.Markdown("### Try these example vessels:") |
| |
| demo_data = create_demo_data() |
| |
| for i, vessel_data in enumerate(demo_data): |
| mmsi, ship_type, nav_status, sog, cog, heading, width, length, draught = vessel_data |
| |
| with gr.Row(): |
| gr.Markdown(f"**Example {i+1}:** {ship_type} - MMSI {mmsi}") |
| demo_btn = gr.Button(f"Load Example {i+1}") |
| |
| def load_demo(data=vessel_data): |
| return data |
| |
| demo_btn.click( |
| fn=load_demo, |
| outputs=[mmsi_input, ship_type_input, nav_status_input, sog_input, |
| cog_input, heading_input, width_input, length_input, draught_input] |
| ) |
| |
| with gr.Tab("βΉοΈ About"): |
| gr.Markdown(""" |
| ### Model Information |
| |
| - **Algorithm**: Isolation Forest (Recommended) |
| - **Training Data**: 358,351 AIS records from Denmark Maritime Authority |
| - **Performance**: 100% contamination accuracy, 99.6% overall score |
| - **Features**: 12 engineered features from AIS data |
| - **Use Case**: Oil spill detection, maritime safety monitoring |
| |
| ### Key Features Analyzed: |
| 1. **Speed patterns** - Unusual fast/slow movement |
| 2. **Vessel dimensions** - Size vs. behavior correlation |
| 3. **Course deviations** - Navigation inconsistencies |
| 4. **Operational status** - Anchoring/mooring patterns |
| 5. **Draught analysis** - Loading state indicators |
| |
| ### Integration: |
| This model serves as the AIS anomaly detection component in oil spill monitoring pipelines, |
| triggering Sentinel-1 SAR analysis for suspicious vessel locations. |
| |
| **π Environmental Impact**: Protecting marine ecosystems through early detection of potential pollution events. |
| """) |
| |
| |
| analyze_btn.click( |
| fn=predict_anomaly, |
| inputs=[mmsi_input, ship_type_input, nav_status_input, sog_input, |
| cog_input, heading_input, width_input, length_input, draught_input], |
| outputs=[result_output, score_output, risk_output, explanation_output] |
| ) |
| |
| batch_btn.click( |
| fn=batch_analysis, |
| inputs=[file_input], |
| outputs=[batch_result, batch_plot] |
| ) |
|
|
| |
| if __name__ == "__main__": |
| app.launch(share=True) |
|
|