| import gradio as gr | |
| import random | |
| import time | |
| def mqtt_simulator(topic, qos, message): | |
| time.sleep(0.5) | |
| msg_id = random.randint(1000, 9999) | |
| timestamp = time.strftime("%Y-%m-%d %H:%M:%S") | |
| status = random.choice(["Published", "Published", "Published", "Queued"]) | |
| result = f"""### MQTT Message {status} | |
| **Topic**: `{topic}` | |
| **QoS Level**: {qos} | |
| **Message ID**: {msg_id} | |
| **Timestamp**: {timestamp} | |
| **Message Content**: | |
| ``` | |
| {message} | |
| ``` | |
| π **Anktechsol** - IoT Solutions Expert | |
| [Learn more](https://anktechsol.com)""" | |
| return result | |
| with gr.Blocks(title="MQTT Protocol Demo") as demo: | |
| gr.Markdown("# π‘ MQTT Protocol Demonstration") | |
| gr.Markdown("Interactive MQTT messaging demo - **Anktechsol**") | |
| with gr.Row(): | |
| with gr.Column(): | |
| topic = gr.Textbox(label="Topic", value="iot/sensors/temperature", placeholder="Enter MQTT topic") | |
| qos = gr.Radio(["0 - At most once", "1 - At least once", "2 - Exactly once"], label="QoS Level", value="1 - At least once") | |
| message = gr.Textbox(label="Message Payload", value='{"temp":25.4,"humidity":60}', lines=3) | |
| btn = gr.Button("Publish Message") | |
| with gr.Column(): | |
| output = gr.Markdown() | |
| btn.click(mqtt_simulator, inputs=[topic, qos, message], outputs=output) | |
| gr.Markdown("""--- | |
| ### About MQTT & Anktechsol | |
| MQTT is a lightweight messaging protocol for IoT. Expert IoT consulting at [anktechsol.com](https://anktechsol.com)""") | |
| demo.launch() |