Spaces:
Running
Running
| """ | |
| DroneKit/MAVLink integration plugin for MorphGuard. | |
| """ | |
| import os | |
| from dronekit import connect | |
| from pymavlink import mavutil | |
| class DroneKitClient: | |
| """Client to send MorphGuard results via MAVLink to a Pix4/PX4 or ArduPilot autopilot.""" | |
| def __init__(self, conn_str: str = None): | |
| self.conn_str = conn_str or os.getenv('DRONEKIT_CONNECT', 'udp:127.0.0.1:14550') | |
| try: | |
| # Connect to vehicle without blocking on parameters | |
| self.vehicle = connect(self.conn_str, wait_ready=False) | |
| except Exception as e: | |
| raise RuntimeError(f"Failed to connect to vehicle at {self.conn_str}: {e}") | |
| def send_status_text(self, severity: int, text: str): | |
| """Send a STATUSTEXT MAVLink message.""" | |
| msg = self.vehicle.message_factory.statustext_encode( | |
| severity, text.encode('utf-8') | |
| ) | |
| self.vehicle.send_mavlink(msg) | |
| self.vehicle.flush() | |
| def send_detection(self, is_morphed: bool, confidence: float): | |
| """Broadcast detection result.""" | |
| severity = mavutil.mavlink.MAV_SEVERITY_ALERT if is_morphed else mavutil.mavlink.MAV_SEVERITY_INFO | |
| text = f"MorphGuard: morphed={is_morphed}, confidence={confidence:.2f}" | |
| self.send_status_text(severity, text) | |
| def send_demorph(self, success: bool): | |
| """Broadcast demorphing completion.""" | |
| severity = mavutil.mavlink.MAV_SEVERITY_INFO | |
| text = f"Demorph completed: success={success}" | |
| self.send_status_text(severity, text) |