MorphGuard / plugins /dronekit_plugin.py
juanquy's picture
Initial clean commit of modular MorphGuard
2978bba
Raw
History Blame Contribute Delete
1.52 kB
"""
DroneKit/MAVLink integration plugin for MorphGuard.
"""
import os
from dronekit import connect
from pymavlink import mavutil
class DroneKitClient:
"""Client to send MorphGuard results via MAVLink to a Pix4/PX4 or ArduPilot autopilot."""
def __init__(self, conn_str: str = None):
self.conn_str = conn_str or os.getenv('DRONEKIT_CONNECT', 'udp:127.0.0.1:14550')
try:
# Connect to vehicle without blocking on parameters
self.vehicle = connect(self.conn_str, wait_ready=False)
except Exception as e:
raise RuntimeError(f"Failed to connect to vehicle at {self.conn_str}: {e}")
def send_status_text(self, severity: int, text: str):
"""Send a STATUSTEXT MAVLink message."""
msg = self.vehicle.message_factory.statustext_encode(
severity, text.encode('utf-8')
)
self.vehicle.send_mavlink(msg)
self.vehicle.flush()
def send_detection(self, is_morphed: bool, confidence: float):
"""Broadcast detection result."""
severity = mavutil.mavlink.MAV_SEVERITY_ALERT if is_morphed else mavutil.mavlink.MAV_SEVERITY_INFO
text = f"MorphGuard: morphed={is_morphed}, confidence={confidence:.2f}"
self.send_status_text(severity, text)
def send_demorph(self, success: bool):
"""Broadcast demorphing completion."""
severity = mavutil.mavlink.MAV_SEVERITY_INFO
text = f"Demorph completed: success={success}"
self.send_status_text(severity, text)