""" DroneKit/MAVLink integration plugin for MorphGuard. """ import os from dronekit import connect from pymavlink import mavutil class DroneKitClient: """Client to send MorphGuard results via MAVLink to a Pix4/PX4 or ArduPilot autopilot.""" def __init__(self, conn_str: str = None): self.conn_str = conn_str or os.getenv('DRONEKIT_CONNECT', 'udp:127.0.0.1:14550') try: # Connect to vehicle without blocking on parameters self.vehicle = connect(self.conn_str, wait_ready=False) except Exception as e: raise RuntimeError(f"Failed to connect to vehicle at {self.conn_str}: {e}") def send_status_text(self, severity: int, text: str): """Send a STATUSTEXT MAVLink message.""" msg = self.vehicle.message_factory.statustext_encode( severity, text.encode('utf-8') ) self.vehicle.send_mavlink(msg) self.vehicle.flush() def send_detection(self, is_morphed: bool, confidence: float): """Broadcast detection result.""" severity = mavutil.mavlink.MAV_SEVERITY_ALERT if is_morphed else mavutil.mavlink.MAV_SEVERITY_INFO text = f"MorphGuard: morphed={is_morphed}, confidence={confidence:.2f}" self.send_status_text(severity, text) def send_demorph(self, success: bool): """Broadcast demorphing completion.""" severity = mavutil.mavlink.MAV_SEVERITY_INFO text = f"Demorph completed: success={success}" self.send_status_text(severity, text)