AIOT / mqtt_client.py
balenkano's picture
Update mqtt_client.py
27f793a verified
import threading
import json
import paho.mqtt.client as mqtt
from paho.mqtt.enums import CallbackAPIVersion
import config # dùng getattr để lấy hằng số với giá trị mặc định
# Lấy cấu hình với fallback an toàn nếu thiếu trong config.py
MQTT_BROKER = getattr(config, "MQTT_BROKER", "test.mosquitto.org")
MQTT_PORT = int(getattr(config, "MQTT_PORT", 1883))
MQTT_TOPIC_GAS_IN = getattr(config, "MQTT_TOPIC_GAS_IN", "balenkano_tuan_gas_data")
MQTT_TOPIC_DOOR_CMD = getattr(config, "MQTT_TOPIC_DOOR_CMD", "balenkano_tuan_door_2025")
class MQTTService:
def __init__(self, on_gas_message):
# Callback nhận chuỗi JSON (hoặc sẽ được chuyển thành JSON nếu payload là số)
self.on_gas_message = on_gas_message
self.client = mqtt.Client(CallbackAPIVersion.VERSION2)
self.client.on_message = self._on_message
def _on_message(self, client, userdata, message):
# Robust: nhận JSON hoặc số, luôn gọi callback với chuỗi JSON
try:
payload = message.payload.decode(errors="ignore")
except Exception:
payload = ""
delivered = False
# Thử parse JSON trước (ESP32 thường gửi JSON đầy đủ)
if payload:
try:
m = json.loads(payload)
if isinstance(m, dict):
if self.on_gas_message:
# chuyển thẳng JSON đã nhận cho EnvAI
self.on_gas_message(payload)
delivered = True
except Exception:
pass
# Nếu không phải JSON, thử coi là số gas (payload đơn giản)
if not delivered:
try:
val = float(payload)
if self.on_gas_message:
# bọc thành JSON đơn giản cho EnvAI
self.on_gas_message(json.dumps({"gas": val}))
except Exception:
# payload không hợp lệ, bỏ qua để UI không crash
pass
def start(self):
try:
self.client.connect(MQTT_BROKER, MQTT_PORT)
self.client.subscribe(MQTT_TOPIC_GAS_IN)
self.client.loop_start()
except Exception:
# Spaces có thể chặn mạng; UI vẫn hoạt động
pass
def publish_cmd(self, cmd: str):
# Gửi OPEN/LOCK (và các lệnh khác nếu firmware hỗ trợ)
def _publish():
try:
c = mqtt.Client(CallbackAPIVersion.VERSION2)
c.connect(MQTT_BROKER, MQTT_PORT)
c.publish(MQTT_TOPIC_DOOR_CMD, cmd)
c.disconnect()
except Exception:
pass
threading.Thread(target=_publish, daemon=True).start()