# SPDX-License-Identifier: Apache-2.0 # © 2026 Lutar, Stephen P. — SZL Holdings · ORCID 0009-0001-0110-4173 · Doctrine v11 """ Killinchu protocol decoders — REAL parsers, NO MOCKS. remote_id_decode(hex) — OpenDroneID / ASTM F3411 25-byte message byte parser (Basic ID, Location/Vector, Self-ID, System, Operator ID). adsb_decode(hex|pair) — ADS-B Mode-S 1090ES via pyModeS v3 (icao/typecode/callsign/ altitude/velocity; even+odd pair → full lat/lon via CPR). mavlink_parse(hex) — MAVLink v1/v2 frame parse via pymavlink (common dialect). Honesty (Doctrine v11): malformed/short input returns an explicit error object, never a silent pass. ADS-B/Remote-ID are unauthenticated broadcast — decoded fields are *claims*. """ from __future__ import annotations import struct from typing import Any # --------------------------------------------------------------------------- # OpenDroneID / ASTM F3411 — Remote ID broadcast message decoder (real bytes) # Reference: opendroneid/opendroneid-core-c + ASTM F3411-22a. # Each message = 25 bytes. Byte0 = (type<<4)|version. # --------------------------------------------------------------------------- _RID_MSG_TYPES = { 0x0: "Basic ID", 0x1: "Location/Vector", 0x2: "Authentication", 0x3: "Self-ID", 0x4: "System", 0x5: "Operator ID", 0xF: "Message Pack", } _RID_ID_TYPES = {0: "None", 1: "Serial Number (ANSI/CTA-2063-A)", 2: "CAA Registration ID", 3: "UTM (UUID)", 4: "Specific Session ID"} _RID_UA_TYPES = {0: "None", 1: "Aeroplane/Fixed-wing", 2: "Helicopter/Multirotor", 3: "Gyroplane", 4: "Hybrid Lift", 5: "Ornithopter", 6: "Glider", 7: "Kite", 8: "Free Balloon", 9: "Captive Balloon", 10: "Airship", 11: "Free Fall/Parachute", 12: "Rocket", 13: "Tethered Powered Aircraft", 14: "Ground Obstacle", 15: "Other"} _RID_STATUS = {0: "Undeclared", 1: "Ground", 2: "Airborne", 3: "Emergency", 4: "RID System Failure"} def _hex_to_bytes(s: str) -> bytes: s = s.strip().replace(" ", "").replace(":", "").replace("0x", "") if len(s) % 2 != 0: raise ValueError(f"odd-length hex string ({len(s)} chars)") return bytes.fromhex(s) def _ascii_field(b: bytes) -> str: return b.split(b"\x00")[0].decode("ascii", errors="replace").strip() def remote_id_decode(hexstr: str) -> dict[str, Any]: """Decode a single 25-byte OpenDroneID broadcast frame. Returns honest error on bad input.""" try: raw = _hex_to_bytes(hexstr) except ValueError as e: return {"ok": False, "error": f"invalid hex: {e}", "protocol": "OpenDroneID/ASTM F3411"} if len(raw) < 1: return {"ok": False, "error": "empty message", "protocol": "OpenDroneID/ASTM F3411"} # Many capture tools prepend a 1-byte "ADD counter / message size" or app header. # ASTM frame proper is 25 bytes; accept 25 (frame) or 24 (header+body alt) gracefully. if len(raw) not in (24, 25): return { "ok": False, "error": f"unexpected length {len(raw)} bytes — ASTM F3411 message is 25 bytes " f"(1 header + 24 body). Provide a single 25-byte frame.", "protocol": "OpenDroneID/ASTM F3411", "hint": "Example Basic-ID frame: 0012 + 20-byte serial + pad (50 hex chars).", } header = raw[0] mtype = (header >> 4) & 0xF version = header & 0xF body = raw[1:] out: dict[str, Any] = { "ok": True, "protocol": "OpenDroneID / ASTM F3411-22a", "message_type_code": mtype, "message_type": _RID_MSG_TYPES.get(mtype, f"Reserved(0x{mtype:X})"), "protocol_version": version, "length_bytes": len(raw), "honesty": "Broadcast Remote ID is unauthenticated and spoofable — decoded fields are claims.", } if mtype == 0x0: # Basic ID id_type = (body[0] >> 4) & 0xF ua_type = body[0] & 0xF uas_id = _ascii_field(body[1:21]) out["fields"] = { "id_type_code": id_type, "id_type": _RID_ID_TYPES.get(id_type, "Reserved"), "ua_type_code": ua_type, "ua_type": _RID_UA_TYPES.get(ua_type, "Reserved"), "uas_id": uas_id, } elif mtype == 0x1: # Location/Vector status_byte = body[0] status = (status_byte >> 4) & 0xF height_type = (status_byte >> 2) & 0x1 ew_dir = (status_byte >> 1) & 0x1 # E/W direction segment for track speed_mult = status_byte & 0x1 track_raw = body[1] track = track_raw + (180 if ew_dir else 0) speed_raw = body[2] speed = speed_raw * 0.25 if speed_mult == 0 else (speed_raw * 0.75 + 255 * 0.25) vspeed = struct.unpack(" dict[str, Any]: """Decode ADS-B. `msg` may be a single 28-hex string, or a {even,odd} pair / list for CPR position. Uses pyModeS v3 decode().""" import pyModeS from pyModeS.util import icao as _icao, crc as _crc, df as _df pair = None single = None if isinstance(msg, (list, tuple)): pair = [str(m).strip() for m in msg if str(m).strip()] elif isinstance(msg, dict): e, o = msg.get("even"), msg.get("odd") if e and o: pair = [str(e).strip(), str(o).strip()] else: single = str(msg.get("msg") or msg.get("hex") or "").strip() else: single = str(msg).strip() def _meta(m: str) -> dict[str, Any]: m = m.replace(" ", "").upper() if len(m) not in (14, 28): raise ValueError(f"Mode-S message must be 14 or 28 hex chars, got {len(m)}") return {"icao": _icao(m), "downlink_format": _df(m), "crc": _crc(m), "crc_valid": _crc(m) == 0} try: if pair and len(pair) >= 2: decoded = pyModeS.decode(pair[:2]) merged: dict[str, Any] = {} for d in decoded: merged.update({k: v for k, v in d.items() if v is not None}) return { "ok": True, "protocol": "ADS-B Mode-S 1090ES (CPR pair)", "even": _meta(pair[0]), "odd": _meta(pair[1]), "decoded": merged, "latitude": merged.get("latitude"), "longitude": merged.get("longitude"), "altitude_ft": merged.get("altitude"), "honesty": "ADS-B is unauthenticated broadcast — position is a self-reported claim.", } else: m = (single or "").upper().replace(" ", "") meta = _meta(m) decoded = pyModeS.decode(m) return { "ok": True, "protocol": "ADS-B Mode-S 1090ES (single frame)", "meta": meta, "decoded": decoded, "note": "Airborne-position frames (TC 9-18) need an even+odd pair for full lat/lon. " "Submit {even, odd} to resolve global position.", "honesty": "ADS-B is unauthenticated broadcast — fields are self-reported claims.", } except Exception as e: return {"ok": False, "protocol": "ADS-B Mode-S 1090ES", "error": f"{type(e).__name__}: {e}", "hint": "Provide a 28-hex DF17 extended squitter, or {even, odd} for position."} # --------------------------------------------------------------------------- # MAVLink v1/v2 — pymavlink (real frame parse) # --------------------------------------------------------------------------- def mavlink_parse(hexstr: str) -> dict[str, Any]: """Parse one or more concatenated MAVLink frames from hex. Real pymavlink decode.""" try: raw = _hex_to_bytes(hexstr) except ValueError as e: return {"ok": False, "protocol": "MAVLink", "error": f"invalid hex: {e}"} if not raw: return {"ok": False, "protocol": "MAVLink", "error": "empty frame"} magic = raw[0] version = 2 if magic == 0xFD else 1 if magic == 0xFE else None if version is None: return {"ok": False, "protocol": "MAVLink", "error": f"first byte 0x{magic:02X} is not a MAVLink start marker " f"(v1=0xFE, v2=0xFD)"} import io from pymavlink.dialects.v20 import common as mavlink2 mav = mavlink2.MAVLink(io.BytesIO()) mav.robust_parsing = True msgs = [] try: parsed = mav.parse_buffer(bytearray(raw)) or [] for m in parsed: d = m.to_dict() msgs.append({ "type": m.get_type(), "msg_id": m.get_msgId(), "src_system": m.get_srcSystem(), "src_component": m.get_srcComponent(), "fields": {k: v for k, v in d.items() if k != "mavpackettype"}, }) except Exception as e: return {"ok": False, "protocol": f"MAVLink v{version}", "error": f"{type(e).__name__}: {e}", "start_marker": f"0x{magic:02X}"} if not msgs: return {"ok": False, "protocol": f"MAVLink v{version}", "error": "no complete MAVLink message decoded (truncated frame or bad CRC)", "start_marker": f"0x{magic:02X}", "bytes": len(raw)} return {"ok": True, "protocol": f"MAVLink v{version}", "frame_count": len(msgs), "messages": msgs, "honesty": "MAVLink is typically unencrypted on civilian builds; frames are claims."}