| import json |
| import gradio as gr |
| import paho.mqtt.client as mqtt |
| import time |
| import random |
| from queue import Queue |
| import numpy as np |
| import pandas as pd |
| from datetime import datetime, timedelta |
| import plotly.graph_objects as go |
|
|
| |
| MQTT_HOST = "broker.hivemq.com" |
| MQTT_PORT = 1883 |
|
|
| |
| response_queue = Queue() |
| command_queue = Queue() |
| mqtt_ping_client = None |
| mqtt_pong_client = None |
| session_id = None |
| device_state = { |
| "rgb": {"r": 0, "g": 0, "b": 0}, |
| "temperature": 25.0, |
| "rpm": 0 |
| } |
|
|
| |
| history_data = { |
| "rgb": [], |
| "temperature": [], |
| "timestamps": [] |
| } |
|
|
| |
| def on_ping_connect(client, userdata, flags, rc): |
| print(f"Ping connected with result code {rc}") |
| if session_id: |
| client.subscribe(f"pong/{session_id}/response") |
|
|
| def on_pong_connect(client, userdata, flags, rc): |
| print(f"Pong connected with result code {rc}") |
| client.subscribe("ping/command") |
|
|
| def on_ping_message(client, userdata, msg): |
| try: |
| response = json.loads(msg.payload.decode()) |
| response_queue.put(response) |
| print(f"Ping received: {response}") |
| except Exception as e: |
| print(f"Ping error: {e}") |
|
|
| def on_pong_message(client, userdata, msg): |
| try: |
| command = json.loads(msg.payload.decode()) |
| command_queue.put(command) |
| print(f"Pong received: {command}") |
| except Exception as e: |
| print(f"Pong error: {e}") |
|
|
| |
| def initialize_ping(): |
| global mqtt_ping_client, session_id |
| session_id = f"ping_{int(time.time())}" |
| mqtt_ping_client = mqtt.Client() |
| mqtt_ping_client.on_connect = on_ping_connect |
| mqtt_ping_client.on_message = on_ping_message |
| mqtt_ping_client.connect(MQTT_HOST, MQTT_PORT, 60) |
| mqtt_ping_client.loop_start() |
| return f"Ping initialized: {session_id}" |
|
|
| def send_command(command_type, data=None): |
| if not mqtt_ping_client: |
| return "Please initialize ping first" |
| |
| payload = { |
| "type": command_type, |
| "data": data or {}, |
| "session_id": session_id, |
| "timestamp": time.time() |
| } |
| mqtt_ping_client.publish("ping/command", json.dumps(payload)) |
| return f"Sent {command_type}" |
|
|
| def send_rgb(r, g, b): |
| if not mqtt_ping_client: |
| return "Please initialize ping first" |
| |
| payload = { |
| "type": "RGB Command", |
| "data": {"r": r, "g": g, "b": b}, |
| "session_id": session_id, |
| "timestamp": time.time() |
| } |
| |
| mqtt_ping_client.publish("ping/command", json.dumps(payload)) |
| |
| command_queue.put(payload) |
| return f"Sent RGB Command: R={r}, G={g}, B={b}" |
|
|
| def send_weight_request(rpm): |
| return send_command("Weight Data", {"set_rpm": rpm, "request_weight": True}) |
|
|
| |
| def initialize_pong(): |
| global mqtt_pong_client |
| mqtt_pong_client = mqtt.Client() |
| mqtt_pong_client.on_connect = on_pong_connect |
| mqtt_pong_client.on_message = on_pong_message |
| mqtt_pong_client.connect(MQTT_HOST, MQTT_PORT, 60) |
| mqtt_pong_client.loop_start() |
| return "Pong started" |
|
|
| def process_command(command): |
| global device_state |
| command_type = command.get("type") |
| data = command.get("data", {}) |
| session_id = command.get("session_id") |
| timestamp = datetime.fromtimestamp(command.get("timestamp", time.time())) |
| |
| if command_type == "RGB Command": |
| |
| device_state = { |
| **device_state, |
| "rgb": { |
| "r": int(data.get("r", 0)), |
| "g": int(data.get("g", 0)), |
| "b": int(data.get("b", 0)) |
| } |
| } |
| print(f"Processing RGB command: {data}") |
| response_data = { |
| "current_state": "applied", |
| "power_consumption": random.uniform(0.1, 1.0), |
| "applied_values": device_state["rgb"] |
| } |
| |
| rgb_avg = sum([int(data.get(k, 0)) for k in ['r', 'g', 'b']]) / 3 |
| record_data("rgb", rgb_avg, timestamp) |
| elif command_type == "Temperature Reading": |
| device_state["temperature"] += random.uniform(-0.5, 0.5) |
| response_data = { |
| "current_temperature": device_state["temperature"], |
| "humidity": random.uniform(40, 60), |
| "pressure": random.uniform(980, 1020) |
| } |
| |
| record_data("temperature", device_state["temperature"], timestamp) |
| elif command_type == "Weight Data": |
| if "set_rpm" in data: |
| device_state["rpm"] = data["set_rpm"] |
| response_data = { |
| "calibrated_weight": random.uniform(95, 105), |
| "current_rpm": device_state["rpm"], |
| "stability": random.uniform(0.98, 1.02) |
| } |
| else: |
| response_data = {"error": "Unknown command type"} |
| |
| response = { |
| "type": command_type, |
| "data": response_data, |
| "timestamp": time.time(), |
| "session_id": session_id |
| } |
| |
| if mqtt_pong_client: |
| mqtt_pong_client.publish(f"pong/{session_id}/response", json.dumps(response)) |
| return json.dumps(response, indent=2) |
|
|
| def check_ping_responses(): |
| """Check the ping response queue""" |
| responses = [] |
| while not response_queue.empty(): |
| response = response_queue.get_nowait() |
| responses.append(json.dumps(response, indent=2)) |
| return "\n".join(responses) if responses else "No new responses" |
|
|
| def check_pong_commands(): |
| """Check and process command queue""" |
| responses = [] |
| while not command_queue.empty(): |
| command = command_queue.get_nowait() |
| response = process_command(command) |
| responses.append(response) |
| return "\n".join(responses) if responses else "No new commands" |
|
|
| |
| def stop_mqtt(): |
| global mqtt_ping_client, mqtt_pong_client |
| if mqtt_ping_client: |
| mqtt_ping_client.loop_stop() |
| mqtt_ping_client.disconnect() |
| if mqtt_pong_client: |
| mqtt_pong_client.loop_stop() |
| mqtt_pong_client.disconnect() |
| return "Both Ping and Pong clients stopped" |
|
|
| |
| def update_rgb_preview(r, g, b): |
| """Real-time update RGB preview values""" |
| global device_state |
| device_state = { |
| **device_state, |
| "rgb": { |
| "r": int(r), |
| "g": int(g), |
| "b": int(b) |
| } |
| } |
| return gr.update(value=device_state) |
|
|
| def update_rpm_preview(rpm): |
| """Real-time update RPM preview values""" |
| global device_state |
| device_state = { |
| **device_state, |
| "rpm": int(rpm) |
| } |
| return gr.update(value=device_state) |
|
|
| def update_temperature_preview(temperature): |
| """Real-time update temperature preview values""" |
| global device_state |
| device_state = { |
| **device_state, |
| "temperature": float(temperature) |
| } |
| return gr.update(value=device_state) |
|
|
| |
| def record_data(data_type, value, timestamp): |
| """Record historical data""" |
| history_data[data_type].append(value) |
| history_data["timestamps"].append(timestamp) |
| |
| if len(history_data[data_type]) > 100: |
| history_data[data_type] = history_data[data_type][-100:] |
| history_data["timestamps"] = history_data["timestamps"][-100:] |
|
|
| def generate_prediction(data_type): |
| """Generate simple prediction""" |
| if len(history_data[data_type]) < 2: |
| return None |
| |
| |
| x = np.arange(len(history_data[data_type])) |
| y = np.array(history_data[data_type]) |
| z = np.polyfit(x, y, 1) |
| p = np.poly1d(z) |
| |
| |
| future_x = np.arange(len(x), len(x) + 5) |
| future_y = p(future_x) |
| |
| return { |
| "historical": history_data[data_type], |
| "predicted": future_y.tolist(), |
| "timestamps": history_data["timestamps"] |
| } |
|
|
| def plot_data_with_prediction(data_type): |
| """Create chart with prediction""" |
| if len(history_data[data_type]) < 2: |
| return None |
| |
| |
| x = np.arange(len(history_data[data_type])) |
| y = np.array(history_data[data_type]) |
| z = np.polyfit(x, y, 1) |
| p = np.poly1d(z) |
| |
| |
| future_x = np.arange(len(x), len(x) + 5) |
| future_y = p(future_x) |
| |
| |
| fig = go.Figure() |
| |
| |
| fig.add_trace(go.Scatter( |
| x=[t.strftime('%H:%M:%S') for t in history_data["timestamps"]], |
| y=history_data[data_type], |
| mode='lines+markers', |
| name='Historical' |
| )) |
| |
| |
| future_times = [ |
| (history_data["timestamps"][-1] + timedelta(minutes=i)).strftime('%H:%M:%S') |
| for i in range(1, 6) |
| ] |
| fig.add_trace(go.Scatter( |
| x=future_times, |
| y=future_y, |
| mode='lines', |
| line=dict(dash='dash'), |
| name='Predicted' |
| )) |
| |
| |
| fig.update_layout( |
| title=f"{data_type.upper()} Trend Analysis", |
| xaxis_title="Time", |
| yaxis_title="Value", |
| showlegend=True |
| ) |
| |
| return fig |
|
|
| |
| def refresh_all(): |
| """Manually refresh all states""" |
| commands = check_pong_commands() |
| print(f"Current device state: {device_state}") |
| return [commands, gr.update(value=device_state)] |
|
|
| |
| with gr.Blocks(title="MQTT Ping-Pong System", theme=gr.themes.Base( |
| primary_hue=gr.themes.colors.Color( |
| c50="#edf2f0", |
| c100="#dbE5E1", |
| c200="#B8CCC6", |
| c300="#96B3AB", |
| c400="#7FA595", |
| c500="#5C8072", |
| c600="#4A665B", |
| c700="#374D45", |
| c800="#25332E", |
| c900="#121917", |
| c950="#080C0B", |
| ) |
| )) as demo: |
| gr.Markdown("# 🏓 MQTT Ping-Pong Communication System <span class='pong-emoji'>🏓</span>") |
| |
| |
| gr.HTML(""" |
| <style> |
| /* Basic styles */ |
| .ping-panel { |
| background-color: #5C8072 !important; |
| border-radius: 8px; |
| padding: 20px; |
| } |
| .pong-panel { |
| background-color: #7FA595 !important; |
| border-radius: 8px; |
| padding: 20px; |
| } |
| |
| /* Font settings */ |
| * { |
| font-family: -apple-system, BlinkMacSystemFont, "Segoe UI", Roboto, |
| "Helvetica Neue", Arial, sans-serif; |
| } |
| |
| /* Title styles */ |
| h1, h2, h3, .header { |
| font-family: -apple-system, BlinkMacSystemFont, "Segoe UI", Roboto, |
| "Helvetica Neue", Arial, sans-serif; |
| font-weight: 600; |
| color: #2c3e50; |
| } |
| |
| /* Button styles */ |
| .gr-button { |
| margin: 10px 0; |
| font-family: -apple-system, BlinkMacSystemFont, "Segoe UI", Roboto, |
| "Helvetica Neue", Arial, sans-serif; |
| font-weight: 500; |
| } |
| |
| /* Group styles */ |
| .gr-group { |
| background-color: rgba(255, 255, 255, 0.1); |
| border-radius: 8px; |
| padding: 15px; |
| margin: 10px 0; |
| } |
| |
| /* Label styles */ |
| label { |
| font-family: -apple-system, BlinkMacSystemFont, "Segoe UI", Roboto, |
| "Helvetica Neue", Arial, sans-serif; |
| font-weight: 500; |
| } |
| |
| /* Pong emoji flip - applied to all elements with pong-emoji class */ |
| .pong-emoji { |
| display: inline-block; |
| transform: scaleX(-1); |
| margin-left: 5px; /* Add a little space */ |
| } |
| |
| /* Step title styles */ |
| .step-header { |
| font-family: -apple-system, BlinkMacSystemFont, "Segoe UI", Roboto, |
| "Helvetica Neue", Arial, sans-serif; |
| font-weight: 600; |
| color: #2c3e50; |
| margin: 15px 0 10px 0; |
| } |
| </style> |
| """) |
| |
| with gr.Row(): |
| |
| with gr.Column(scale=1, variant="panel", elem_classes=["ping-panel"]): |
| gr.Markdown("### 🏓 Ping Control (Sender)") |
| with gr.Group(): |
| gr.Markdown("**Step 1: Initialize Connection**") |
| ping_init_btn = gr.Button("Initialize Ping", variant="primary", size="lg") |
| ping_status = gr.Textbox(label="Connection Status", lines=2) |
| |
| with gr.Tabs(): |
| with gr.TabItem("RGB Control"): |
| with gr.Group(): |
| gr.Markdown("**Step 2: Configure RGB Values**") |
| r = gr.Slider(0, 255, 128, label="Red Value", interactive=True) |
| g = gr.Slider(0, 255, 128, label="Green Value", interactive=True) |
| b = gr.Slider(0, 255, 128, label="Blue Value", interactive=True) |
| gr.Markdown("**Step 3: Send Command**") |
| send_rgb_btn = gr.Button("Send RGB Command", variant="secondary", size="lg") |
| rgb_status = gr.Textbox(label="RGB Status", lines=2) |
| |
| with gr.TabItem("Weight Control"): |
| with gr.Group(): |
| gr.Markdown("**Step 2: Set RPM Value**") |
| rpm = gr.Slider(0, 5000, 1000, label="RPM Setting", interactive=True) |
| gr.Markdown("**Step 3: Send Request**") |
| send_weight_btn = gr.Button("Send Weight Request", variant="secondary", size="lg") |
| weight_status = gr.Textbox(label="Weight Status", lines=2) |
| |
| |
| with gr.TabItem("Temperature Control"): |
| with gr.Group(): |
| gr.Markdown("**Step 2: Set Temperature Value**") |
| temperature = gr.Slider(0, 50, 25, label="Temperature (°C)", interactive=True) |
| gr.Markdown("**Step 3: Send Request**") |
| send_temp_btn = gr.Button("Send Temperature Request", variant="secondary", size="lg") |
| temp_status = gr.Textbox(label="Temperature Status", lines=2) |
| |
| with gr.Group(): |
| gr.Markdown("**Step 4: Check Responses**") |
| check_ping_btn = gr.Button("Check Responses", variant="secondary", size="lg") |
| ping_responses = gr.Textbox( |
| label="Response Log", |
| lines=10, |
| show_copy_button=True |
| ) |
| |
| |
| with gr.Column(scale=1, variant="panel", elem_classes=["pong-panel"]): |
| gr.Markdown("### Pong Monitor (Receiver) <span class='pong-emoji'>🏓</span>") |
| with gr.Group(): |
| gr.Markdown("**Step 1: Start System**") |
| with gr.Row(): |
| pong_init_btn = gr.Button("Start Pong", variant="primary", size="lg") |
| pong_stop_btn = gr.Button("Stop Pong", variant="secondary", size="lg") |
| pong_status = gr.Textbox(label="System Status", lines=2) |
| |
| with gr.Group(): |
| gr.Markdown("**Step 2: Monitor Device Status**") |
| refresh_btn = gr.Button("🔄 Refresh All", variant="primary", size="lg") |
| device_info = gr.JSON( |
| label="Current Device State", |
| value=device_state, |
| show_label=True |
| ) |
| |
| with gr.Group(): |
| gr.Markdown("**Step 3: Check Incoming Commands**") |
| check_pong_btn = gr.Button("Check Commands", variant="secondary", size="lg") |
| pong_commands = gr.Textbox( |
| label="Command Log", |
| lines=10, |
| show_copy_button=True |
| ) |
| |
| |
| with gr.Group(): |
| gr.Markdown("**Step 4: Data Analysis (Optional)**") |
| with gr.Row(): |
| analyze_rgb_btn = gr.Button("Analyze RGB Trend", variant="secondary", size="lg") |
| analyze_temp_btn = gr.Button("Analyze Temperature Trend", variant="secondary", size="lg") |
| |
| plot_output = gr.Plot( |
| label="Trend Analysis", |
| show_label=True |
| ) |
| |
| |
| analyze_rgb_btn.click( |
| lambda: plot_data_with_prediction("rgb"), |
| outputs=plot_output |
| ) |
| analyze_temp_btn.click( |
| lambda: plot_data_with_prediction("temperature"), |
| outputs=plot_output |
| ) |
|
|
| |
| ping_init_btn.click(initialize_ping, outputs=ping_status) |
| pong_init_btn.click(initialize_pong, outputs=pong_status) |
| pong_stop_btn.click(stop_mqtt, outputs=pong_status) |
| |
| |
| send_rgb_btn.click( |
| send_rgb, |
| [r, g, b], |
| rgb_status |
| ).then( |
| check_pong_commands, |
| outputs=[pong_commands] |
| ).then( |
| check_ping_responses, |
| outputs=[ping_responses] |
| ) |
|
|
| send_weight_btn.click( |
| send_weight_request, |
| rpm, |
| weight_status |
| ).then( |
| refresh_all, |
| outputs=[pong_commands, device_info] |
| ) |
|
|
| |
| refresh_btn.click( |
| refresh_all, |
| outputs=[pong_commands, device_info] |
| ) |
|
|
| check_ping_btn.click( |
| check_ping_responses, |
| outputs=ping_responses |
| ) |
| check_pong_btn.click(check_pong_commands, outputs=pong_commands) |
| |
| |
| r.change(update_rgb_preview, inputs=[r, g, b], outputs=device_info) |
| g.change(update_rgb_preview, inputs=[r, g, b], outputs=device_info) |
| b.change(update_rgb_preview, inputs=[r, g, b], outputs=device_info) |
|
|
| |
| rpm.change(update_rpm_preview, inputs=rpm, outputs=device_info) |
|
|
| |
| temperature.change(update_temperature_preview, inputs=temperature, outputs=device_info) |
|
|
| |
| send_temp_btn.click( |
| lambda temp: send_command("Temperature Reading", {"temperature": temp}), |
| inputs=[temperature], |
| outputs=temp_status |
| ).then( |
| check_pong_commands, |
| outputs=[pong_commands] |
| ).then( |
| check_ping_responses, |
| outputs=[ping_responses] |
| ) |
|
|
| demo.load(lambda: None) |
|
|
| if __name__ == "__main__": |
| demo.launch() |