fan-control / scripts /basic_send_receive.py
sgbaird's picture
scripts
8197dfa
import paho.mqtt.client as mqtt
import json
from queue import Queue
PICO_ID = "test-fan" # UPDATE THIS TO YOUR ID
command_topic = f"fan-control/picow/{PICO_ID}/speed"
sensor_data_topic = f"fan-control/picow/{PICO_ID}/rpm"
HIVEMQ_USERNAME = "sgbaird"
HIVEMQ_PASSWORD = "D.Pq5gYtejYbU#L"
HIVEMQ_HOST = "248cc294c37642359297f75b7b023374.s2.eu.hivemq.cloud"
sensor_data_queue: "Queue[dict]" = Queue()
def create_paho_client(tls=True):
client = mqtt.Client(protocol=mqtt.MQTTv5)
if tls:
client.tls_set(tls_version=mqtt.ssl.PROTOCOL_TLS_CLIENT)
return client
# Define setup separately since sensor_data is dynamic
def setup_paho_client(
client, rpm_data_topic, hostname, username, password=None, port=8883
):
def on_message(client, userdata, msg):
sensor_data_queue.put(json.loads(msg.payload))
def on_connect(client, userdata, flags, rc, properties=None):
if rc != 0:
print("Connected with result code " + str(rc))
client.subscribe(rpm_data_topic, qos=1)
client.on_connect = on_connect
client.on_message = on_message
client.username_pw_set(username, password)
client.connect(hostname, port)
client.loop_start() # Use a non-blocking loop
return client
def send_and_receive(client, command_topic, msg, queue_timeout=60):
client.publish(command_topic, json.dumps(msg), qos=2)
client.loop_start()
while True:
sensor_data = sensor_data_queue.get(True, queue_timeout)
client.loop_stop()
return sensor_data
client = create_paho_client(tls=True)
client = setup_paho_client(
client, sensor_data_topic, HIVEMQ_HOST, HIVEMQ_USERNAME, password=HIVEMQ_PASSWORD
)
# Sending speed command
command_msg = {"speed": 50} # Update the speed value as needed (0-100)
sensor_data = send_and_receive(client, command_topic, command_msg, queue_timeout=30)
print(sensor_data)