# mqtt_as.py Asynchronous version of umqtt.robust # (C) Copyright Peter Hinch 2017-2023. # Released under the MIT licence. # Pyboard D support added also RP2/default # Various improvements contributed by Kevin Köck. import gc import usocket as socket import ustruct as struct gc.collect() import uasyncio as asyncio from ubinascii import hexlify gc.collect() from uerrno import EINPROGRESS, ETIMEDOUT from utime import ticks_diff, ticks_ms gc.collect() import network from machine import unique_id from micropython import const gc.collect() from sys import platform VERSION = (0, 7, 1) # Default short delay for good SynCom throughput (avoid sleep(0) with SynCom). _DEFAULT_MS = const(20) _SOCKET_POLL_DELAY = const(5) # 100ms added greatly to publish latency # Legitimate errors while waiting on a socket. See uasyncio __init__.py open_connection(). ESP32 = platform == "esp32" RP2 = platform == "rp2" if ESP32: # https://forum.micropython.org/viewtopic.php?f=16&t=3608&p=20942#p20942 BUSY_ERRORS = [EINPROGRESS, ETIMEDOUT, 118, 119] # Add in weird ESP32 errors elif RP2: BUSY_ERRORS = [EINPROGRESS, ETIMEDOUT, -110] else: BUSY_ERRORS = [EINPROGRESS, ETIMEDOUT] ESP8266 = platform == "esp8266" PYBOARD = platform == "pyboard" # Default "do little" coro for optional user replacement async def eliza(*_): # e.g. via set_wifi_handler(coro): see test program await asyncio.sleep_ms(_DEFAULT_MS) class MsgQueue: def __init__(self, size): self._q = [0 for _ in range(max(size, 4))] self._size = size self._wi = 0 self._ri = 0 self._evt = asyncio.Event() self.discards = 0 def put(self, *v): self._q[self._wi] = v self._evt.set() self._wi = (self._wi + 1) % self._size if self._wi == self._ri: # Would indicate empty self._ri = (self._ri + 1) % self._size # Discard a message self.discards += 1 def __aiter__(self): return self async def __anext__(self): if self._ri == self._wi: # Empty self._evt.clear() await self._evt.wait() r = self._q[self._ri] self._ri = (self._ri + 1) % self._size return r config = { "client_id": hexlify(unique_id()), "server": None, "port": 0, "user": "", "password": "", "keepalive": 60, "ping_interval": 0, "ssl": False, "ssl_params": {}, "response_time": 10, "clean_init": True, "clean": True, "max_repubs": 4, "will": None, "subs_cb": lambda *_: None, "wifi_coro": eliza, "connect_coro": eliza, "ssid": None, "wifi_pw": None, "queue_len": 0, "gateway": False, } class MQTTException(Exception): pass def pid_gen(): pid = 0 while True: pid = pid + 1 if pid < 65535 else 1 yield pid def qos_check(qos): if not (qos == 0 or qos == 1): raise ValueError("Only qos 0 and 1 are supported.") # MQTT_base class. Handles MQTT protocol on the basis of a good connection. # Exceptions from connectivity failures are handled by MQTTClient subclass. class MQTT_base: REPUB_COUNT = 0 # TEST DEBUG = False def __init__(self, config): self._events = config["queue_len"] > 0 # MQTT config self._client_id = config["client_id"] self._user = config["user"] self._pswd = config["password"] self._keepalive = config["keepalive"] if self._keepalive >= 65536: raise ValueError("invalid keepalive time") self._response_time = ( config["response_time"] * 1000 ) # Repub if no PUBACK received (ms). self._max_repubs = config["max_repubs"] self._clean_init = config[ "clean_init" ] # clean_session state on first connection self._clean = config["clean"] # clean_session state on reconnect will = config["will"] if will is None: self._lw_topic = False else: self._set_last_will(*will) # WiFi config self._ssid = config["ssid"] # Required for ESP32 / Pyboard D. Optional ESP8266 self._wifi_pw = config["wifi_pw"] self._ssl = config["ssl"] self._ssl_params = config["ssl_params"] # Callbacks and coros if self._events: self.up = asyncio.Event() self.down = asyncio.Event() self.queue = MsgQueue(config["queue_len"]) else: # Callbacks self._cb = config["subs_cb"] self._wifi_handler = config["wifi_coro"] self._connect_handler = config["connect_coro"] # Network self.port = config["port"] if self.port == 0: self.port = 8883 if self._ssl else 1883 self.server = config["server"] if self.server is None: raise ValueError("no server specified.") self._sock = None self._sta_if = network.WLAN(network.STA_IF) self._sta_if.active(True) if config["gateway"]: # Called from gateway (hence ESP32). import aioespnow # Set up ESPNOW while not (sta := self._sta_if).active(): time.sleep(0.1) sta.config(pm=sta.PM_NONE) # No power management sta.active(True) self._espnow = ( aioespnow.AIOESPNow() ) # Returns AIOESPNow enhanced with async support self._espnow.active(True) self.newpid = pid_gen() self.rcv_pids = set() # PUBACK and SUBACK pids awaiting ACK response self.last_rx = ticks_ms() # Time of last communication from broker self.lock = asyncio.Lock() def _set_last_will(self, topic, msg, retain=False, qos=0): qos_check(qos) if not topic: raise ValueError("Empty topic.") self._lw_topic = topic self._lw_msg = msg self._lw_qos = qos self._lw_retain = retain def dprint(self, msg, *args): if self.DEBUG: print(msg % args) def _timeout(self, t): return ticks_diff(ticks_ms(), t) > self._response_time async def _as_read(self, n, sock=None): # OSError caught by superclass if sock is None: sock = self._sock # Declare a byte array of size n. That space is needed anyway, better # to just 'allocate' it in one go instead of appending to an # existing object, this prevents reallocation and fragmentation. data = bytearray(n) buffer = memoryview(data) size = 0 t = ticks_ms() while size < n: if self._timeout(t) or not self.isconnected(): raise OSError(-1, "Timeout on socket read") try: msg_size = sock.readinto(buffer[size:], n - size) except OSError as e: # ESP32 issues weird 119 errors here msg_size = None if e.args[0] not in BUSY_ERRORS: raise if msg_size == 0: # Connection closed by host raise OSError(-1, "Connection closed by host") if msg_size is not None: # data received size += msg_size t = ticks_ms() self.last_rx = ticks_ms() await asyncio.sleep_ms(_SOCKET_POLL_DELAY) return data async def _as_write(self, bytes_wr, length=0, sock=None): if sock is None: sock = self._sock # Wrap bytes in memoryview to avoid copying during slicing bytes_wr = memoryview(bytes_wr) if length: bytes_wr = bytes_wr[:length] t = ticks_ms() while bytes_wr: if self._timeout(t) or not self.isconnected(): raise OSError(-1, "Timeout on socket write") try: n = sock.write(bytes_wr) except OSError as e: # ESP32 issues weird 119 errors here n = 0 if e.args[0] not in BUSY_ERRORS: raise if n: t = ticks_ms() bytes_wr = bytes_wr[n:] await asyncio.sleep_ms(_SOCKET_POLL_DELAY) async def _send_str(self, s): await self._as_write(struct.pack("!H", len(s))) await self._as_write(s) async def _recv_len(self): n = 0 sh = 0 while 1: res = await self._as_read(1) b = res[0] n |= (b & 0x7F) << sh if not b & 0x80: return n sh += 7 async def _connect(self, clean): self._sock = socket.socket() self._sock.setblocking(False) try: self._sock.connect(self._addr) except OSError as e: if e.args[0] not in BUSY_ERRORS: raise await asyncio.sleep_ms(_DEFAULT_MS) self.dprint("Connecting to broker.") if self._ssl: import ssl self._sock = ssl.wrap_socket(self._sock, **self._ssl_params) premsg = bytearray(b"\x10\0\0\0\0\0") msg = bytearray(b"\x04MQTT\x04\0\0\0") # Protocol 3.1.1 sz = 10 + 2 + len(self._client_id) msg[6] = clean << 1 if self._user: sz += 2 + len(self._user) + 2 + len(self._pswd) msg[6] |= 0xC0 if self._keepalive: msg[7] |= self._keepalive >> 8 msg[8] |= self._keepalive & 0x00FF if self._lw_topic: sz += 2 + len(self._lw_topic) + 2 + len(self._lw_msg) msg[6] |= 0x4 | (self._lw_qos & 0x1) << 3 | (self._lw_qos & 0x2) << 3 msg[6] |= self._lw_retain << 5 i = 1 while sz > 0x7F: premsg[i] = (sz & 0x7F) | 0x80 sz >>= 7 i += 1 premsg[i] = sz await self._as_write(premsg, i + 2) await self._as_write(msg) await self._send_str(self._client_id) if self._lw_topic: await self._send_str(self._lw_topic) await self._send_str(self._lw_msg) if self._user: await self._send_str(self._user) await self._send_str(self._pswd) # Await CONNACK # read causes ECONNABORTED if broker is out; triggers a reconnect. resp = await self._as_read(4) self.dprint("Connected to broker.") # Got CONNACK if ( resp[3] != 0 or resp[0] != 0x20 or resp[1] != 0x02 ): # Bad CONNACK e.g. authentication fail. raise OSError( -1, f"Connect fail: 0x{(resp[0] << 8) + resp[1]:04x} {resp[3]} (README 7)", ) async def _ping(self): async with self.lock: await self._as_write(b"\xc0\0") # Check internet connectivity by sending DNS lookup to Google's 8.8.8.8 async def wan_ok( self, packet=b"$\x1a\x01\x00\x00\x01\x00\x00\x00\x00\x00\x00\x03www\x06google\x03com\x00\x00\x01\x00\x01", ): if not self.isconnected(): # WiFi is down return False length = 32 # DNS query and response packet size s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s.setblocking(False) s.connect(("8.8.8.8", 53)) await asyncio.sleep(1) try: await self._as_write(packet, sock=s) await asyncio.sleep(2) res = await self._as_read(length, s) if len(res) == length: return True # DNS response size OK except OSError: # Timeout on read: no connectivity. return False finally: s.close() return False async def broker_up(self): # Test broker connectivity if not self.isconnected(): return False tlast = self.last_rx if ticks_diff(ticks_ms(), tlast) < 1000: return True try: await self._ping() except OSError: return False t = ticks_ms() while not self._timeout(t): await asyncio.sleep_ms(100) if ticks_diff(self.last_rx, tlast) > 0: # Response received return True return False async def disconnect(self): if self._sock is not None: await self._kill_tasks(False) # Keep socket open try: async with self.lock: self._sock.write(b"\xe0\0") # Close broker connection await asyncio.sleep_ms(100) except OSError: pass self._close() self._has_connected = False def _close(self): if self._sock is not None: self._sock.close() def close( self, ): # API. See https://github.com/peterhinch/micropython-mqtt/issues/60 self._close() try: self._sta_if.disconnect() # Disconnect Wi-Fi to avoid errors except OSError: self.dprint("Wi-Fi not started, unable to disconnect interface") self._sta_if.active(False) async def _await_pid(self, pid): t = ticks_ms() while pid in self.rcv_pids: # local copy if self._timeout(t) or not self.isconnected(): break # Must repub or bail out await asyncio.sleep_ms(100) else: return True # PID received. All done. return False # qos == 1: coro blocks until wait_msg gets correct PID. # If WiFi fails completely subclass re-publishes with new PID. async def publish(self, topic, msg, retain, qos): pid = next(self.newpid) if qos: self.rcv_pids.add(pid) async with self.lock: await self._publish(topic, msg, retain, qos, 0, pid) if qos == 0: return count = 0 while 1: # Await PUBACK, republish on timeout if await self._await_pid(pid): return # No match if count >= self._max_repubs or not self.isconnected(): raise OSError(-1) # Subclass to re-publish with new PID async with self.lock: await self._publish(topic, msg, retain, qos, dup=1, pid=pid) # Add pid count += 1 self.REPUB_COUNT += 1 async def _publish(self, topic, msg, retain, qos, dup, pid): pkt = bytearray(b"\x30\0\0\0") pkt[0] |= qos << 1 | retain | dup << 3 sz = 2 + len(topic) + len(msg) if qos > 0: sz += 2 if sz >= 2097152: raise MQTTException("Strings too long.") i = 1 while sz > 0x7F: pkt[i] = (sz & 0x7F) | 0x80 sz >>= 7 i += 1 pkt[i] = sz await self._as_write(pkt, i + 1) await self._send_str(topic) if qos > 0: struct.pack_into("!H", pkt, 0, pid) await self._as_write(pkt, 2) await self._as_write(msg) # Can raise OSError if WiFi fails. Subclass traps. async def subscribe(self, topic, qos): pkt = bytearray(b"\x82\0\0\0") pid = next(self.newpid) self.rcv_pids.add(pid) struct.pack_into("!BH", pkt, 1, 2 + 2 + len(topic) + 1, pid) async with self.lock: await self._as_write(pkt) await self._send_str(topic) await self._as_write(qos.to_bytes(1, "little")) if not await self._await_pid(pid): raise OSError(-1) # Can raise OSError if WiFi fails. Subclass traps. async def unsubscribe(self, topic): pkt = bytearray(b"\xa2\0\0\0") pid = next(self.newpid) self.rcv_pids.add(pid) struct.pack_into("!BH", pkt, 1, 2 + 2 + len(topic), pid) async with self.lock: await self._as_write(pkt) await self._send_str(topic) if not await self._await_pid(pid): raise OSError(-1) # Wait for a single incoming MQTT message and process it. # Subscribed messages are delivered to a callback previously # set by .setup() method. Other (internal) MQTT # messages processed internally. # Immediate return if no data available. Called from ._handle_msg(). async def wait_msg(self): try: res = self._sock.read(1) # Throws OSError on WiFi fail except OSError as e: if e.args[0] in BUSY_ERRORS: # Needed by RP2 await asyncio.sleep_ms(0) return raise if res is None: return if res == b"": raise OSError(-1, "Empty response") if res == b"\xd0": # PINGRESP await self._as_read(1) # Update .last_rx time return op = res[0] if op == 0x40: # PUBACK: save pid sz = await self._as_read(1) if sz != b"\x02": raise OSError(-1, "Invalid PUBACK packet") rcv_pid = await self._as_read(2) pid = rcv_pid[0] << 8 | rcv_pid[1] if pid in self.rcv_pids: self.rcv_pids.discard(pid) else: raise OSError(-1, "Invalid pid in PUBACK packet") if op == 0x90: # SUBACK resp = await self._as_read(4) if resp[3] == 0x80: raise OSError(-1, "Invalid SUBACK packet") pid = resp[2] | (resp[1] << 8) if pid in self.rcv_pids: self.rcv_pids.discard(pid) else: raise OSError(-1, "Invalid pid in SUBACK packet") if op == 0xB0: # UNSUBACK resp = await self._as_read(3) pid = resp[2] | (resp[1] << 8) if pid in self.rcv_pids: self.rcv_pids.discard(pid) else: raise OSError(-1) if op & 0xF0 != 0x30: return sz = await self._recv_len() topic_len = await self._as_read(2) topic_len = (topic_len[0] << 8) | topic_len[1] topic = await self._as_read(topic_len) sz -= topic_len + 2 if op & 6: pid = await self._as_read(2) pid = pid[0] << 8 | pid[1] sz -= 2 msg = await self._as_read(sz) retained = op & 0x01 if self._events: self.queue.put(topic, msg, bool(retained)) else: self._cb(topic, msg, bool(retained)) if op & 6 == 2: # qos 1 pkt = bytearray(b"\x40\x02\0\0") # Send PUBACK struct.pack_into("!H", pkt, 2, pid) await self._as_write(pkt) elif op & 6 == 4: # qos 2 not supported raise OSError(-1, "QoS 2 not supported") # MQTTClient class. Handles issues relating to connectivity. class MQTTClient(MQTT_base): def __init__(self, config): super().__init__(config) self._isconnected = False # Current connection state keepalive = 1000 * self._keepalive # ms self._ping_interval = keepalive // 4 if keepalive else 20000 p_i = ( config["ping_interval"] * 1000 ) # Can specify shorter e.g. for subscribe-only if p_i and p_i < self._ping_interval: self._ping_interval = p_i self._in_connect = False self._has_connected = False # Define 'Clean Session' value to use. self._tasks = [] if ESP8266: import esp esp.sleep_type( 0 ) # Improve connection integrity at cost of power consumption. async def wifi_connect(self, quick=False): s = self._sta_if if ESP8266: if s.isconnected(): # 1st attempt, already connected. return s.active(True) s.connect() # ESP8266 remembers connection. for _ in range(60): if ( s.status() != network.STAT_CONNECTING ): # Break out on fail or success. Check once per sec. break await asyncio.sleep(1) if ( s.status() == network.STAT_CONNECTING ): # might hang forever awaiting dhcp lease renewal or something else s.disconnect() await asyncio.sleep(1) if ( not s.isconnected() and self._ssid is not None and self._wifi_pw is not None ): s.connect(self._ssid, self._wifi_pw) while ( s.status() == network.STAT_CONNECTING ): # Break out on fail or success. Check once per sec. await asyncio.sleep(1) else: s.active(True) if RP2: # Disable auto-sleep. # https://datasheets.raspberrypi.com/picow/connecting-to-the-internet-with-pico-w.pdf # para 3.6.3 s.config(pm=0xA11140) s.connect(self._ssid, self._wifi_pw) for _ in range(60): # Break out on fail or success. Check once per sec. await asyncio.sleep(1) # Loop while connecting or no IP if s.isconnected(): break if ESP32: if s.status() != network.STAT_CONNECTING: # 1001 break elif PYBOARD: # No symbolic constants in network if not 1 <= s.status() <= 2: break elif RP2: # 1 is STAT_CONNECTING. 2 reported by user (No IP?) if not 1 <= s.status() <= 2: break else: # Timeout: still in connecting state s.disconnect() await asyncio.sleep(1) if not s.isconnected(): # Timed out raise OSError("Wi-Fi connect timed out") if not quick: # Skip on first connection only if power saving # Ensure connection stays up for a few secs. self.dprint("Checking WiFi integrity.") for _ in range(5): if not s.isconnected(): raise OSError("Connection Unstable") # in 1st 5 secs await asyncio.sleep(1) self.dprint("Got reliable connection") async def connect( self, *, quick=False ): # Quick initial connect option for battery apps if not self._has_connected: await self.wifi_connect(quick) # On 1st call, caller handles error # Note this blocks if DNS lookup occurs. Do it once to prevent # blocking during later internet outage: self._addr = socket.getaddrinfo(self.server, self.port)[0][-1] self._in_connect = True # Disable low level ._isconnected check try: if not self._has_connected and self._clean_init and not self._clean: # Power up. Clear previous session data but subsequently save it. # Issue #40 await self._connect(True) # Connect with clean session try: async with self.lock: self._sock.write( b"\xe0\0" ) # Force disconnect but keep socket open except OSError: pass self.dprint("Waiting for disconnect") await asyncio.sleep(2) # Wait for broker to disconnect self.dprint("About to reconnect with unclean session.") await self._connect(self._clean) except Exception: self._close() self._in_connect = False # Caller may run .isconnected() raise self.rcv_pids.clear() # If we get here without error broker/LAN must be up. self._isconnected = True self._in_connect = False # Low level code can now check connectivity. if not self._events: asyncio.create_task(self._wifi_handler(True)) # User handler. if not self._has_connected: self._has_connected = True # Use normal clean flag on reconnect. asyncio.create_task(self._keep_connected()) # Runs forever unless user issues .disconnect() asyncio.create_task(self._handle_msg()) # Task quits on connection fail. self._tasks.append(asyncio.create_task(self._keep_alive())) if self.DEBUG: self._tasks.append(asyncio.create_task(self._memory())) if self._events: self.up.set() # Connectivity is up else: asyncio.create_task(self._connect_handler(self)) # User handler. # Launched by .connect(). Runs until connectivity fails. Checks for and # handles incoming messages. async def _handle_msg(self): try: while self.isconnected(): async with self.lock: await self.wait_msg() # Immediate return if no message await asyncio.sleep_ms(_DEFAULT_MS) # Let other tasks get lock except OSError: pass self._reconnect() # Broker or WiFi fail. # Keep broker alive MQTT spec 3.1.2.10 Keep Alive. # Runs until ping failure or no response in keepalive period. async def _keep_alive(self): while self.isconnected(): pings_due = ticks_diff(ticks_ms(), self.last_rx) // self._ping_interval if pings_due >= 4: self.dprint("Reconnect: broker fail.") break await asyncio.sleep_ms(self._ping_interval) try: await self._ping() except OSError: break self._reconnect() # Broker or WiFi fail. async def _kill_tasks(self, kill_skt): # Cancel running tasks for task in self._tasks: task.cancel() self._tasks.clear() await asyncio.sleep_ms(0) # Ensure cancellation complete if kill_skt: # Close socket self._close() # DEBUG: show RAM messages. async def _memory(self): while True: await asyncio.sleep(20) gc.collect() self.dprint("RAM free %d alloc %d", gc.mem_free(), gc.mem_alloc()) def isconnected(self): if self._in_connect: # Disable low-level check during .connect() return True if self._isconnected and not self._sta_if.isconnected(): # It's going down. self._reconnect() return self._isconnected def _reconnect(self): # Schedule a reconnection if not underway. if self._isconnected: self._isconnected = False asyncio.create_task(self._kill_tasks(True)) # Shut down tasks and socket if self._events: # Signal an outage self.down.set() else: asyncio.create_task(self._wifi_handler(False)) # User handler. # Await broker connection. async def _connection(self): while not self._isconnected: await asyncio.sleep(1) # Scheduled on 1st successful connection. Runs forever maintaining wifi and # broker connection. Must handle conditions at edge of WiFi range. async def _keep_connected(self): while self._has_connected: if self.isconnected(): # Pause for 1 second await asyncio.sleep(1) gc.collect() else: # Link is down, socket is closed, tasks are killed try: self._sta_if.disconnect() except OSError: self.dprint("Wi-Fi not started, unable to disconnect interface") await asyncio.sleep(1) try: await self.wifi_connect() except OSError: continue if ( not self._has_connected ): # User has issued the terminal .disconnect() self.dprint("Disconnected, exiting _keep_connected") break try: await self.connect() # Now has set ._isconnected and scheduled _connect_handler(). self.dprint("Reconnect OK!") except OSError as e: self.dprint("Error in reconnect. %s", e) # Can get ECONNABORTED or -1. The latter signifies no or bad CONNACK received. self._close() # Disconnect and try again. self._in_connect = False self._isconnected = False self.dprint("Disconnected, exited _keep_connected") async def subscribe(self, topic, qos=0): qos_check(qos) while 1: await self._connection() try: return await super().subscribe(topic, qos) except OSError: pass self._reconnect() # Broker or WiFi fail. async def unsubscribe(self, topic): while 1: await self._connection() try: return await super().unsubscribe(topic) except OSError: pass self._reconnect() # Broker or WiFi fail. async def publish(self, topic, msg, retain=False, qos=0): qos_check(qos) while 1: await self._connection() try: return await super().publish(topic, msg, retain, qos) except OSError: pass self._reconnect() # Broker or WiFi fail.