|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import gc |
|
|
|
|
|
import usocket as socket |
|
|
import ustruct as struct |
|
|
|
|
|
gc.collect() |
|
|
import uasyncio as asyncio |
|
|
from ubinascii import hexlify |
|
|
|
|
|
gc.collect() |
|
|
from uerrno import EINPROGRESS, ETIMEDOUT |
|
|
from utime import ticks_diff, ticks_ms |
|
|
|
|
|
gc.collect() |
|
|
import network |
|
|
from machine import unique_id |
|
|
from micropython import const |
|
|
|
|
|
gc.collect() |
|
|
from sys import platform |
|
|
|
|
|
VERSION = (0, 7, 1) |
|
|
|
|
|
|
|
|
_DEFAULT_MS = const(20) |
|
|
_SOCKET_POLL_DELAY = const(5) |
|
|
|
|
|
|
|
|
ESP32 = platform == "esp32" |
|
|
RP2 = platform == "rp2" |
|
|
if ESP32: |
|
|
|
|
|
BUSY_ERRORS = [EINPROGRESS, ETIMEDOUT, 118, 119] |
|
|
elif RP2: |
|
|
BUSY_ERRORS = [EINPROGRESS, ETIMEDOUT, -110] |
|
|
else: |
|
|
BUSY_ERRORS = [EINPROGRESS, ETIMEDOUT] |
|
|
|
|
|
ESP8266 = platform == "esp8266" |
|
|
PYBOARD = platform == "pyboard" |
|
|
|
|
|
|
|
|
|
|
|
async def eliza(*_): |
|
|
await asyncio.sleep_ms(_DEFAULT_MS) |
|
|
|
|
|
|
|
|
class MsgQueue: |
|
|
def __init__(self, size): |
|
|
self._q = [0 for _ in range(max(size, 4))] |
|
|
self._size = size |
|
|
self._wi = 0 |
|
|
self._ri = 0 |
|
|
self._evt = asyncio.Event() |
|
|
self.discards = 0 |
|
|
|
|
|
def put(self, *v): |
|
|
self._q[self._wi] = v |
|
|
self._evt.set() |
|
|
self._wi = (self._wi + 1) % self._size |
|
|
if self._wi == self._ri: |
|
|
self._ri = (self._ri + 1) % self._size |
|
|
self.discards += 1 |
|
|
|
|
|
def __aiter__(self): |
|
|
return self |
|
|
|
|
|
async def __anext__(self): |
|
|
if self._ri == self._wi: |
|
|
self._evt.clear() |
|
|
await self._evt.wait() |
|
|
r = self._q[self._ri] |
|
|
self._ri = (self._ri + 1) % self._size |
|
|
return r |
|
|
|
|
|
|
|
|
config = { |
|
|
"client_id": hexlify(unique_id()), |
|
|
"server": None, |
|
|
"port": 0, |
|
|
"user": "", |
|
|
"password": "", |
|
|
"keepalive": 60, |
|
|
"ping_interval": 0, |
|
|
"ssl": False, |
|
|
"ssl_params": {}, |
|
|
"response_time": 10, |
|
|
"clean_init": True, |
|
|
"clean": True, |
|
|
"max_repubs": 4, |
|
|
"will": None, |
|
|
"subs_cb": lambda *_: None, |
|
|
"wifi_coro": eliza, |
|
|
"connect_coro": eliza, |
|
|
"ssid": None, |
|
|
"wifi_pw": None, |
|
|
"queue_len": 0, |
|
|
"gateway": False, |
|
|
} |
|
|
|
|
|
|
|
|
class MQTTException(Exception): |
|
|
pass |
|
|
|
|
|
|
|
|
def pid_gen(): |
|
|
pid = 0 |
|
|
while True: |
|
|
pid = pid + 1 if pid < 65535 else 1 |
|
|
yield pid |
|
|
|
|
|
|
|
|
def qos_check(qos): |
|
|
if not (qos == 0 or qos == 1): |
|
|
raise ValueError("Only qos 0 and 1 are supported.") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class MQTT_base: |
|
|
REPUB_COUNT = 0 |
|
|
DEBUG = False |
|
|
|
|
|
def __init__(self, config): |
|
|
self._events = config["queue_len"] > 0 |
|
|
|
|
|
self._client_id = config["client_id"] |
|
|
self._user = config["user"] |
|
|
self._pswd = config["password"] |
|
|
self._keepalive = config["keepalive"] |
|
|
if self._keepalive >= 65536: |
|
|
raise ValueError("invalid keepalive time") |
|
|
self._response_time = ( |
|
|
config["response_time"] * 1000 |
|
|
) |
|
|
self._max_repubs = config["max_repubs"] |
|
|
self._clean_init = config[ |
|
|
"clean_init" |
|
|
] |
|
|
self._clean = config["clean"] |
|
|
will = config["will"] |
|
|
if will is None: |
|
|
self._lw_topic = False |
|
|
else: |
|
|
self._set_last_will(*will) |
|
|
|
|
|
self._ssid = config["ssid"] |
|
|
self._wifi_pw = config["wifi_pw"] |
|
|
self._ssl = config["ssl"] |
|
|
self._ssl_params = config["ssl_params"] |
|
|
|
|
|
if self._events: |
|
|
self.up = asyncio.Event() |
|
|
self.down = asyncio.Event() |
|
|
self.queue = MsgQueue(config["queue_len"]) |
|
|
else: |
|
|
self._cb = config["subs_cb"] |
|
|
self._wifi_handler = config["wifi_coro"] |
|
|
self._connect_handler = config["connect_coro"] |
|
|
|
|
|
self.port = config["port"] |
|
|
if self.port == 0: |
|
|
self.port = 8883 if self._ssl else 1883 |
|
|
self.server = config["server"] |
|
|
if self.server is None: |
|
|
raise ValueError("no server specified.") |
|
|
self._sock = None |
|
|
self._sta_if = network.WLAN(network.STA_IF) |
|
|
self._sta_if.active(True) |
|
|
if config["gateway"]: |
|
|
import aioespnow |
|
|
|
|
|
while not (sta := self._sta_if).active(): |
|
|
time.sleep(0.1) |
|
|
sta.config(pm=sta.PM_NONE) |
|
|
sta.active(True) |
|
|
self._espnow = ( |
|
|
aioespnow.AIOESPNow() |
|
|
) |
|
|
self._espnow.active(True) |
|
|
|
|
|
self.newpid = pid_gen() |
|
|
self.rcv_pids = set() |
|
|
self.last_rx = ticks_ms() |
|
|
self.lock = asyncio.Lock() |
|
|
|
|
|
def _set_last_will(self, topic, msg, retain=False, qos=0): |
|
|
qos_check(qos) |
|
|
if not topic: |
|
|
raise ValueError("Empty topic.") |
|
|
self._lw_topic = topic |
|
|
self._lw_msg = msg |
|
|
self._lw_qos = qos |
|
|
self._lw_retain = retain |
|
|
|
|
|
def dprint(self, msg, *args): |
|
|
if self.DEBUG: |
|
|
print(msg % args) |
|
|
|
|
|
def _timeout(self, t): |
|
|
return ticks_diff(ticks_ms(), t) > self._response_time |
|
|
|
|
|
async def _as_read(self, n, sock=None): |
|
|
if sock is None: |
|
|
sock = self._sock |
|
|
|
|
|
|
|
|
|
|
|
data = bytearray(n) |
|
|
buffer = memoryview(data) |
|
|
size = 0 |
|
|
t = ticks_ms() |
|
|
while size < n: |
|
|
if self._timeout(t) or not self.isconnected(): |
|
|
raise OSError(-1, "Timeout on socket read") |
|
|
try: |
|
|
msg_size = sock.readinto(buffer[size:], n - size) |
|
|
except OSError as e: |
|
|
msg_size = None |
|
|
if e.args[0] not in BUSY_ERRORS: |
|
|
raise |
|
|
if msg_size == 0: |
|
|
raise OSError(-1, "Connection closed by host") |
|
|
if msg_size is not None: |
|
|
size += msg_size |
|
|
t = ticks_ms() |
|
|
self.last_rx = ticks_ms() |
|
|
await asyncio.sleep_ms(_SOCKET_POLL_DELAY) |
|
|
return data |
|
|
|
|
|
async def _as_write(self, bytes_wr, length=0, sock=None): |
|
|
if sock is None: |
|
|
sock = self._sock |
|
|
|
|
|
|
|
|
bytes_wr = memoryview(bytes_wr) |
|
|
if length: |
|
|
bytes_wr = bytes_wr[:length] |
|
|
t = ticks_ms() |
|
|
while bytes_wr: |
|
|
if self._timeout(t) or not self.isconnected(): |
|
|
raise OSError(-1, "Timeout on socket write") |
|
|
try: |
|
|
n = sock.write(bytes_wr) |
|
|
except OSError as e: |
|
|
n = 0 |
|
|
if e.args[0] not in BUSY_ERRORS: |
|
|
raise |
|
|
if n: |
|
|
t = ticks_ms() |
|
|
bytes_wr = bytes_wr[n:] |
|
|
await asyncio.sleep_ms(_SOCKET_POLL_DELAY) |
|
|
|
|
|
async def _send_str(self, s): |
|
|
await self._as_write(struct.pack("!H", len(s))) |
|
|
await self._as_write(s) |
|
|
|
|
|
async def _recv_len(self): |
|
|
n = 0 |
|
|
sh = 0 |
|
|
while 1: |
|
|
res = await self._as_read(1) |
|
|
b = res[0] |
|
|
n |= (b & 0x7F) << sh |
|
|
if not b & 0x80: |
|
|
return n |
|
|
sh += 7 |
|
|
|
|
|
async def _connect(self, clean): |
|
|
self._sock = socket.socket() |
|
|
self._sock.setblocking(False) |
|
|
try: |
|
|
self._sock.connect(self._addr) |
|
|
except OSError as e: |
|
|
if e.args[0] not in BUSY_ERRORS: |
|
|
raise |
|
|
await asyncio.sleep_ms(_DEFAULT_MS) |
|
|
self.dprint("Connecting to broker.") |
|
|
if self._ssl: |
|
|
import ssl |
|
|
|
|
|
self._sock = ssl.wrap_socket(self._sock, **self._ssl_params) |
|
|
premsg = bytearray(b"\x10\0\0\0\0\0") |
|
|
msg = bytearray(b"\x04MQTT\x04\0\0\0") |
|
|
|
|
|
sz = 10 + 2 + len(self._client_id) |
|
|
msg[6] = clean << 1 |
|
|
if self._user: |
|
|
sz += 2 + len(self._user) + 2 + len(self._pswd) |
|
|
msg[6] |= 0xC0 |
|
|
if self._keepalive: |
|
|
msg[7] |= self._keepalive >> 8 |
|
|
msg[8] |= self._keepalive & 0x00FF |
|
|
if self._lw_topic: |
|
|
sz += 2 + len(self._lw_topic) + 2 + len(self._lw_msg) |
|
|
msg[6] |= 0x4 | (self._lw_qos & 0x1) << 3 | (self._lw_qos & 0x2) << 3 |
|
|
msg[6] |= self._lw_retain << 5 |
|
|
|
|
|
i = 1 |
|
|
while sz > 0x7F: |
|
|
premsg[i] = (sz & 0x7F) | 0x80 |
|
|
sz >>= 7 |
|
|
i += 1 |
|
|
premsg[i] = sz |
|
|
await self._as_write(premsg, i + 2) |
|
|
await self._as_write(msg) |
|
|
await self._send_str(self._client_id) |
|
|
if self._lw_topic: |
|
|
await self._send_str(self._lw_topic) |
|
|
await self._send_str(self._lw_msg) |
|
|
if self._user: |
|
|
await self._send_str(self._user) |
|
|
await self._send_str(self._pswd) |
|
|
|
|
|
|
|
|
resp = await self._as_read(4) |
|
|
self.dprint("Connected to broker.") |
|
|
if ( |
|
|
resp[3] != 0 or resp[0] != 0x20 or resp[1] != 0x02 |
|
|
): |
|
|
raise OSError( |
|
|
-1, |
|
|
f"Connect fail: 0x{(resp[0] << 8) + resp[1]:04x} {resp[3]} (README 7)", |
|
|
) |
|
|
|
|
|
async def _ping(self): |
|
|
async with self.lock: |
|
|
await self._as_write(b"\xc0\0") |
|
|
|
|
|
|
|
|
async def wan_ok( |
|
|
self, |
|
|
packet=b"$\x1a\x01\x00\x00\x01\x00\x00\x00\x00\x00\x00\x03www\x06google\x03com\x00\x00\x01\x00\x01", |
|
|
): |
|
|
if not self.isconnected(): |
|
|
return False |
|
|
length = 32 |
|
|
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) |
|
|
s.setblocking(False) |
|
|
s.connect(("8.8.8.8", 53)) |
|
|
await asyncio.sleep(1) |
|
|
try: |
|
|
await self._as_write(packet, sock=s) |
|
|
await asyncio.sleep(2) |
|
|
res = await self._as_read(length, s) |
|
|
if len(res) == length: |
|
|
return True |
|
|
except OSError: |
|
|
return False |
|
|
finally: |
|
|
s.close() |
|
|
return False |
|
|
|
|
|
async def broker_up(self): |
|
|
if not self.isconnected(): |
|
|
return False |
|
|
tlast = self.last_rx |
|
|
if ticks_diff(ticks_ms(), tlast) < 1000: |
|
|
return True |
|
|
try: |
|
|
await self._ping() |
|
|
except OSError: |
|
|
return False |
|
|
t = ticks_ms() |
|
|
while not self._timeout(t): |
|
|
await asyncio.sleep_ms(100) |
|
|
if ticks_diff(self.last_rx, tlast) > 0: |
|
|
return True |
|
|
return False |
|
|
|
|
|
async def disconnect(self): |
|
|
if self._sock is not None: |
|
|
await self._kill_tasks(False) |
|
|
try: |
|
|
async with self.lock: |
|
|
self._sock.write(b"\xe0\0") |
|
|
await asyncio.sleep_ms(100) |
|
|
except OSError: |
|
|
pass |
|
|
self._close() |
|
|
self._has_connected = False |
|
|
|
|
|
def _close(self): |
|
|
if self._sock is not None: |
|
|
self._sock.close() |
|
|
|
|
|
def close( |
|
|
self, |
|
|
): |
|
|
self._close() |
|
|
try: |
|
|
self._sta_if.disconnect() |
|
|
except OSError: |
|
|
self.dprint("Wi-Fi not started, unable to disconnect interface") |
|
|
self._sta_if.active(False) |
|
|
|
|
|
async def _await_pid(self, pid): |
|
|
t = ticks_ms() |
|
|
while pid in self.rcv_pids: |
|
|
if self._timeout(t) or not self.isconnected(): |
|
|
break |
|
|
await asyncio.sleep_ms(100) |
|
|
else: |
|
|
return True |
|
|
return False |
|
|
|
|
|
|
|
|
|
|
|
async def publish(self, topic, msg, retain, qos): |
|
|
pid = next(self.newpid) |
|
|
if qos: |
|
|
self.rcv_pids.add(pid) |
|
|
async with self.lock: |
|
|
await self._publish(topic, msg, retain, qos, 0, pid) |
|
|
if qos == 0: |
|
|
return |
|
|
|
|
|
count = 0 |
|
|
while 1: |
|
|
if await self._await_pid(pid): |
|
|
return |
|
|
|
|
|
if count >= self._max_repubs or not self.isconnected(): |
|
|
raise OSError(-1) |
|
|
async with self.lock: |
|
|
await self._publish(topic, msg, retain, qos, dup=1, pid=pid) |
|
|
count += 1 |
|
|
self.REPUB_COUNT += 1 |
|
|
|
|
|
async def _publish(self, topic, msg, retain, qos, dup, pid): |
|
|
pkt = bytearray(b"\x30\0\0\0") |
|
|
pkt[0] |= qos << 1 | retain | dup << 3 |
|
|
sz = 2 + len(topic) + len(msg) |
|
|
if qos > 0: |
|
|
sz += 2 |
|
|
if sz >= 2097152: |
|
|
raise MQTTException("Strings too long.") |
|
|
i = 1 |
|
|
while sz > 0x7F: |
|
|
pkt[i] = (sz & 0x7F) | 0x80 |
|
|
sz >>= 7 |
|
|
i += 1 |
|
|
pkt[i] = sz |
|
|
await self._as_write(pkt, i + 1) |
|
|
await self._send_str(topic) |
|
|
if qos > 0: |
|
|
struct.pack_into("!H", pkt, 0, pid) |
|
|
await self._as_write(pkt, 2) |
|
|
await self._as_write(msg) |
|
|
|
|
|
|
|
|
async def subscribe(self, topic, qos): |
|
|
pkt = bytearray(b"\x82\0\0\0") |
|
|
pid = next(self.newpid) |
|
|
self.rcv_pids.add(pid) |
|
|
struct.pack_into("!BH", pkt, 1, 2 + 2 + len(topic) + 1, pid) |
|
|
async with self.lock: |
|
|
await self._as_write(pkt) |
|
|
await self._send_str(topic) |
|
|
await self._as_write(qos.to_bytes(1, "little")) |
|
|
|
|
|
if not await self._await_pid(pid): |
|
|
raise OSError(-1) |
|
|
|
|
|
|
|
|
async def unsubscribe(self, topic): |
|
|
pkt = bytearray(b"\xa2\0\0\0") |
|
|
pid = next(self.newpid) |
|
|
self.rcv_pids.add(pid) |
|
|
struct.pack_into("!BH", pkt, 1, 2 + 2 + len(topic), pid) |
|
|
async with self.lock: |
|
|
await self._as_write(pkt) |
|
|
await self._send_str(topic) |
|
|
|
|
|
if not await self._await_pid(pid): |
|
|
raise OSError(-1) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def wait_msg(self): |
|
|
try: |
|
|
res = self._sock.read(1) |
|
|
except OSError as e: |
|
|
if e.args[0] in BUSY_ERRORS: |
|
|
await asyncio.sleep_ms(0) |
|
|
return |
|
|
raise |
|
|
if res is None: |
|
|
return |
|
|
if res == b"": |
|
|
raise OSError(-1, "Empty response") |
|
|
|
|
|
if res == b"\xd0": |
|
|
await self._as_read(1) |
|
|
return |
|
|
op = res[0] |
|
|
|
|
|
if op == 0x40: |
|
|
sz = await self._as_read(1) |
|
|
if sz != b"\x02": |
|
|
raise OSError(-1, "Invalid PUBACK packet") |
|
|
rcv_pid = await self._as_read(2) |
|
|
pid = rcv_pid[0] << 8 | rcv_pid[1] |
|
|
if pid in self.rcv_pids: |
|
|
self.rcv_pids.discard(pid) |
|
|
else: |
|
|
raise OSError(-1, "Invalid pid in PUBACK packet") |
|
|
|
|
|
if op == 0x90: |
|
|
resp = await self._as_read(4) |
|
|
if resp[3] == 0x80: |
|
|
raise OSError(-1, "Invalid SUBACK packet") |
|
|
pid = resp[2] | (resp[1] << 8) |
|
|
if pid in self.rcv_pids: |
|
|
self.rcv_pids.discard(pid) |
|
|
else: |
|
|
raise OSError(-1, "Invalid pid in SUBACK packet") |
|
|
|
|
|
if op == 0xB0: |
|
|
resp = await self._as_read(3) |
|
|
pid = resp[2] | (resp[1] << 8) |
|
|
if pid in self.rcv_pids: |
|
|
self.rcv_pids.discard(pid) |
|
|
else: |
|
|
raise OSError(-1) |
|
|
|
|
|
if op & 0xF0 != 0x30: |
|
|
return |
|
|
sz = await self._recv_len() |
|
|
topic_len = await self._as_read(2) |
|
|
topic_len = (topic_len[0] << 8) | topic_len[1] |
|
|
topic = await self._as_read(topic_len) |
|
|
sz -= topic_len + 2 |
|
|
if op & 6: |
|
|
pid = await self._as_read(2) |
|
|
pid = pid[0] << 8 | pid[1] |
|
|
sz -= 2 |
|
|
msg = await self._as_read(sz) |
|
|
retained = op & 0x01 |
|
|
if self._events: |
|
|
self.queue.put(topic, msg, bool(retained)) |
|
|
else: |
|
|
self._cb(topic, msg, bool(retained)) |
|
|
if op & 6 == 2: |
|
|
pkt = bytearray(b"\x40\x02\0\0") |
|
|
struct.pack_into("!H", pkt, 2, pid) |
|
|
await self._as_write(pkt) |
|
|
elif op & 6 == 4: |
|
|
raise OSError(-1, "QoS 2 not supported") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class MQTTClient(MQTT_base): |
|
|
def __init__(self, config): |
|
|
super().__init__(config) |
|
|
self._isconnected = False |
|
|
keepalive = 1000 * self._keepalive |
|
|
self._ping_interval = keepalive // 4 if keepalive else 20000 |
|
|
p_i = ( |
|
|
config["ping_interval"] * 1000 |
|
|
) |
|
|
if p_i and p_i < self._ping_interval: |
|
|
self._ping_interval = p_i |
|
|
self._in_connect = False |
|
|
self._has_connected = False |
|
|
self._tasks = [] |
|
|
if ESP8266: |
|
|
import esp |
|
|
|
|
|
esp.sleep_type( |
|
|
0 |
|
|
) |
|
|
|
|
|
async def wifi_connect(self, quick=False): |
|
|
s = self._sta_if |
|
|
if ESP8266: |
|
|
if s.isconnected(): |
|
|
return |
|
|
s.active(True) |
|
|
s.connect() |
|
|
for _ in range(60): |
|
|
if ( |
|
|
s.status() != network.STAT_CONNECTING |
|
|
): |
|
|
break |
|
|
await asyncio.sleep(1) |
|
|
if ( |
|
|
s.status() == network.STAT_CONNECTING |
|
|
): |
|
|
s.disconnect() |
|
|
await asyncio.sleep(1) |
|
|
if ( |
|
|
not s.isconnected() |
|
|
and self._ssid is not None |
|
|
and self._wifi_pw is not None |
|
|
): |
|
|
s.connect(self._ssid, self._wifi_pw) |
|
|
while ( |
|
|
s.status() == network.STAT_CONNECTING |
|
|
): |
|
|
await asyncio.sleep(1) |
|
|
else: |
|
|
s.active(True) |
|
|
if RP2: |
|
|
|
|
|
|
|
|
s.config(pm=0xA11140) |
|
|
s.connect(self._ssid, self._wifi_pw) |
|
|
for _ in range(60): |
|
|
await asyncio.sleep(1) |
|
|
|
|
|
if s.isconnected(): |
|
|
break |
|
|
if ESP32: |
|
|
if s.status() != network.STAT_CONNECTING: |
|
|
break |
|
|
elif PYBOARD: |
|
|
if not 1 <= s.status() <= 2: |
|
|
break |
|
|
elif RP2: |
|
|
if not 1 <= s.status() <= 2: |
|
|
break |
|
|
else: |
|
|
s.disconnect() |
|
|
await asyncio.sleep(1) |
|
|
|
|
|
if not s.isconnected(): |
|
|
raise OSError("Wi-Fi connect timed out") |
|
|
if not quick: |
|
|
|
|
|
self.dprint("Checking WiFi integrity.") |
|
|
for _ in range(5): |
|
|
if not s.isconnected(): |
|
|
raise OSError("Connection Unstable") |
|
|
await asyncio.sleep(1) |
|
|
self.dprint("Got reliable connection") |
|
|
|
|
|
async def connect( |
|
|
self, *, quick=False |
|
|
): |
|
|
if not self._has_connected: |
|
|
await self.wifi_connect(quick) |
|
|
|
|
|
|
|
|
self._addr = socket.getaddrinfo(self.server, self.port)[0][-1] |
|
|
self._in_connect = True |
|
|
try: |
|
|
if not self._has_connected and self._clean_init and not self._clean: |
|
|
|
|
|
|
|
|
await self._connect(True) |
|
|
try: |
|
|
async with self.lock: |
|
|
self._sock.write( |
|
|
b"\xe0\0" |
|
|
) |
|
|
except OSError: |
|
|
pass |
|
|
self.dprint("Waiting for disconnect") |
|
|
await asyncio.sleep(2) |
|
|
self.dprint("About to reconnect with unclean session.") |
|
|
await self._connect(self._clean) |
|
|
except Exception: |
|
|
self._close() |
|
|
self._in_connect = False |
|
|
raise |
|
|
self.rcv_pids.clear() |
|
|
|
|
|
self._isconnected = True |
|
|
self._in_connect = False |
|
|
if not self._events: |
|
|
asyncio.create_task(self._wifi_handler(True)) |
|
|
if not self._has_connected: |
|
|
self._has_connected = True |
|
|
asyncio.create_task(self._keep_connected()) |
|
|
|
|
|
|
|
|
asyncio.create_task(self._handle_msg()) |
|
|
self._tasks.append(asyncio.create_task(self._keep_alive())) |
|
|
if self.DEBUG: |
|
|
self._tasks.append(asyncio.create_task(self._memory())) |
|
|
if self._events: |
|
|
self.up.set() |
|
|
else: |
|
|
asyncio.create_task(self._connect_handler(self)) |
|
|
|
|
|
|
|
|
|
|
|
async def _handle_msg(self): |
|
|
try: |
|
|
while self.isconnected(): |
|
|
async with self.lock: |
|
|
await self.wait_msg() |
|
|
await asyncio.sleep_ms(_DEFAULT_MS) |
|
|
|
|
|
except OSError: |
|
|
pass |
|
|
self._reconnect() |
|
|
|
|
|
|
|
|
|
|
|
async def _keep_alive(self): |
|
|
while self.isconnected(): |
|
|
pings_due = ticks_diff(ticks_ms(), self.last_rx) // self._ping_interval |
|
|
if pings_due >= 4: |
|
|
self.dprint("Reconnect: broker fail.") |
|
|
break |
|
|
await asyncio.sleep_ms(self._ping_interval) |
|
|
try: |
|
|
await self._ping() |
|
|
except OSError: |
|
|
break |
|
|
self._reconnect() |
|
|
|
|
|
async def _kill_tasks(self, kill_skt): |
|
|
for task in self._tasks: |
|
|
task.cancel() |
|
|
self._tasks.clear() |
|
|
await asyncio.sleep_ms(0) |
|
|
if kill_skt: |
|
|
self._close() |
|
|
|
|
|
|
|
|
async def _memory(self): |
|
|
while True: |
|
|
await asyncio.sleep(20) |
|
|
gc.collect() |
|
|
self.dprint("RAM free %d alloc %d", gc.mem_free(), gc.mem_alloc()) |
|
|
|
|
|
def isconnected(self): |
|
|
if self._in_connect: |
|
|
return True |
|
|
if self._isconnected and not self._sta_if.isconnected(): |
|
|
self._reconnect() |
|
|
return self._isconnected |
|
|
|
|
|
def _reconnect(self): |
|
|
if self._isconnected: |
|
|
self._isconnected = False |
|
|
asyncio.create_task(self._kill_tasks(True)) |
|
|
if self._events: |
|
|
self.down.set() |
|
|
else: |
|
|
asyncio.create_task(self._wifi_handler(False)) |
|
|
|
|
|
|
|
|
async def _connection(self): |
|
|
while not self._isconnected: |
|
|
await asyncio.sleep(1) |
|
|
|
|
|
|
|
|
|
|
|
async def _keep_connected(self): |
|
|
while self._has_connected: |
|
|
if self.isconnected(): |
|
|
await asyncio.sleep(1) |
|
|
gc.collect() |
|
|
else: |
|
|
try: |
|
|
self._sta_if.disconnect() |
|
|
except OSError: |
|
|
self.dprint("Wi-Fi not started, unable to disconnect interface") |
|
|
await asyncio.sleep(1) |
|
|
try: |
|
|
await self.wifi_connect() |
|
|
except OSError: |
|
|
continue |
|
|
if ( |
|
|
not self._has_connected |
|
|
): |
|
|
self.dprint("Disconnected, exiting _keep_connected") |
|
|
break |
|
|
try: |
|
|
await self.connect() |
|
|
|
|
|
self.dprint("Reconnect OK!") |
|
|
except OSError as e: |
|
|
self.dprint("Error in reconnect. %s", e) |
|
|
|
|
|
self._close() |
|
|
self._in_connect = False |
|
|
self._isconnected = False |
|
|
self.dprint("Disconnected, exited _keep_connected") |
|
|
|
|
|
async def subscribe(self, topic, qos=0): |
|
|
qos_check(qos) |
|
|
while 1: |
|
|
await self._connection() |
|
|
try: |
|
|
return await super().subscribe(topic, qos) |
|
|
except OSError: |
|
|
pass |
|
|
self._reconnect() |
|
|
|
|
|
async def unsubscribe(self, topic): |
|
|
while 1: |
|
|
await self._connection() |
|
|
try: |
|
|
return await super().unsubscribe(topic) |
|
|
except OSError: |
|
|
pass |
|
|
self._reconnect() |
|
|
|
|
|
async def publish(self, topic, msg, retain=False, qos=0): |
|
|
qos_check(qos) |
|
|
while 1: |
|
|
await self._connection() |
|
|
try: |
|
|
return await super().publish(topic, msg, retain, qos) |
|
|
except OSError: |
|
|
pass |
|
|
self._reconnect() |
|
|
|