| import threading |
| import json |
| import paho.mqtt.client as mqtt |
| from paho.mqtt.enums import CallbackAPIVersion |
|
|
| import config |
|
|
| |
| MQTT_BROKER = getattr(config, "MQTT_BROKER", "test.mosquitto.org") |
| MQTT_PORT = int(getattr(config, "MQTT_PORT", 1883)) |
| MQTT_TOPIC_GAS_IN = getattr(config, "MQTT_TOPIC_GAS_IN", "balenkano_tuan_gas_data") |
| MQTT_TOPIC_DOOR_CMD = getattr(config, "MQTT_TOPIC_DOOR_CMD", "balenkano_tuan_door_2025") |
|
|
| class MQTTService: |
| def __init__(self, on_gas_message): |
| |
| self.on_gas_message = on_gas_message |
| self.client = mqtt.Client(CallbackAPIVersion.VERSION2) |
| self.client.on_message = self._on_message |
|
|
| def _on_message(self, client, userdata, message): |
| |
| try: |
| payload = message.payload.decode(errors="ignore") |
| except Exception: |
| payload = "" |
|
|
| delivered = False |
| |
| if payload: |
| try: |
| m = json.loads(payload) |
| if isinstance(m, dict): |
| if self.on_gas_message: |
| |
| self.on_gas_message(payload) |
| delivered = True |
| except Exception: |
| pass |
|
|
| |
| if not delivered: |
| try: |
| val = float(payload) |
| if self.on_gas_message: |
| |
| self.on_gas_message(json.dumps({"gas": val})) |
| except Exception: |
| |
| pass |
|
|
| def start(self): |
| try: |
| self.client.connect(MQTT_BROKER, MQTT_PORT) |
| self.client.subscribe(MQTT_TOPIC_GAS_IN) |
| self.client.loop_start() |
| except Exception: |
| |
| pass |
|
|
| def publish_cmd(self, cmd: str): |
| |
| def _publish(): |
| try: |
| c = mqtt.Client(CallbackAPIVersion.VERSION2) |
| c.connect(MQTT_BROKER, MQTT_PORT) |
| c.publish(MQTT_TOPIC_DOOR_CMD, cmd) |
| c.disconnect() |
| except Exception: |
| pass |
| threading.Thread(target=_publish, daemon=True).start() |