mqtt-broker / README.md
nxdev-org's picture
fix README.md for hf
9644462
metadata
title: High-Performance MQTT HTTP Bridge
emoji: πŸš€
colorFrom: indigo
colorTo: blue
sdk: docker
pinned: false
license: mit
short_description: A high-concurrency, low-latency MQTT-HTTP bridge

High-Performance MQTT HTTP Bridge

A robust, high-speed conduit between industrial MQTT and the modern Web. Achieves ~70,000+ msg/s MQTT throughput with Cython optimizations and supports HTTP, SSE, WebSocket, and NDJSON streaming.

πŸš€ Performance

Localhost (Verified 2026-03-26)

Protocol Send Throughput Recv Throughput Latency
MQTT (TCP) ~70,000 msg/s ~800 msg/s ~1.5ms
MQTT (WebSocket) ~65,000 msg/s ~800 msg/s ~1.7ms
HTTP (SSE/NDJSON) ~10,000 msg/s ~5,000 msg/s ~8ms
WebSocket API ~15,000 msg/s ~20-60 msg/s ~8ms

Cloudflare/HuggingFace Space (Remote)

Environment Latency Throughput
Cloudflare HTTP ~43ms ~1,000 msg/s
HuggingFace Space ~100ms ~500 msg/s

Note: MQTT requires direct TCP access (port 1883) - not available over Cloudflare tunnels. Use WebSocket/MQTT or HTTP API instead.

🎯 Features

  • MQTT Broker - Built-in high-performance MQTT broker (iotcore/Rust)
  • HTTP REST API - Full CRUD operations for clients, topics, and messages
  • Server-Sent Events (SSE) - Real-time message streaming with automatic batching
  • NDJSON Streaming - Newline-delimited JSON for lower overhead than SSE
  • WebSocket - Bidirectional communication for both MQTT proxy and HTTP API clients
  • Fire-and-Forget Publish - Fastest publish mode via /publish/fire
  • GZip Compression - Automatic compression for responses >256 bytes
  • Cython Optimizations - Hot paths use orjson and compiled extensions

πŸ“¦ Quick Start

Install Dependencies

pip install -r requirements.txt

Build Cython Extensions (Optional but Recommended)

python pyx/setup.py build_ext --inplace

Run Broker

python run_broker.py

Run Benchmarks

# Python benchmark
python benchmark.py --test all

# Node.js benchmark (requires Node.js)
node benchmark.js --test all

πŸ”Œ API Reference

Connect Client

curl -X POST http://localhost:7860/clients/connect \
  -H "Content-Type: application/json" \
  -d '{"apiKey": "BROKER_APIKEY", "clientId": "my-client"}'

Subscribe to Topic

curl -X POST http://localhost:7860/clients/{client_id}/subscribe \
  -H "Content-Type: application/json" \
  -d '{"topic": "arena-ai/{session_id}/response", "qos": 0}'

Publish Message

curl -X POST http://localhost:7860/clients/{client_id}/publish \
  -H "Content-Type: application/json" \
  -d '{"topic": "arena-ai/{session_id}/request", "payload": "Hello", "qos": 0}'

Receive Messages (SSE)

curl -X POST http://localhost:7860/clients/{client_id}/messages \
  -H "Accept: text/event-stream"

Receive Messages (NDJSON)

curl -X POST http://localhost:7860/clients/{client_id}/messages/stream \
  -H "Accept: application/x-ndjson"

WebSocket (MQTT Proxy)

const mqtt = require('mqtt');
const client = mqtt.connect('ws://localhost:7860/ws', {
  clientId: 'my-client',
  protocolVersion: 4,
  path: '/ws'
});
client.on('connect', () => {
  client.subscribe('topic/#');
});
client.on('message', (topic, msg) => {
  console.log(msg.toString());
});

WebSocket (HTTP API Client)

const WebSocket = require('ws');
const ws = new WebSocket('ws://localhost:7860/ws/{client_id}');
ws.onopen = () => console.log('Connected');
ws.onmessage = (event) => {
  const messages = JSON.parse(event.data);
  console.log(messages);
};

πŸ— Architecture

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚                      Clients                                 β”‚
β”‚   (Browser, IoT Devices, External Services)                 β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                      β”‚
         β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
         β”‚            β”‚            β”‚
         β–Ό            β–Ό            β–Ό
    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”
    β”‚   HTTP  β”‚  β”‚  MQTT   β”‚  β”‚WebSocketβ”‚
    β”‚   API   β”‚  β”‚  (TCP)  β”‚  β”‚  Proxy  β”‚
    β””β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”˜
         β”‚            β”‚            β”‚
         β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                      β”‚
         β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β–Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
         β”‚     broker_app.py        β”‚
         β”‚   (FastAPI + uvloop)    β”‚
         β”‚                         β”‚
         β”‚  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”    β”‚
         β”‚  β”‚ TopicMultiplexerβ”‚    β”‚
         β”‚  β”‚ (Message Fan-out)β”‚    β”‚
         β”‚  β””β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”˜    β”‚
         β”‚           β”‚             β”‚
         β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                     β”‚
         β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β–Όβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
         β”‚      iotcore          β”‚
         β”‚   (Rust MQTT Broker)   β”‚
         β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

βš™οΈ Environment Variables

Variable Default Description
BROKER_HOST 127.0.0.1 MQTT broker host
BROKER_PORT 1883 MQTT broker port
BROKER_APIKEY BROKER_APIKEY API authentication
MAX_QUEUED 1000 Max messages per topic buffer
MSG_TTL 300 Message TTL in seconds
STALE_SECONDS 300 Client stale timeout
BROKER_LOG_LEVEL WARNING Logging level

🌐 Cloudflare/HuggingFace Deployment

Performance Tips

  1. Use NDJSON Stream (POST /clients/{id}/messages/stream) - lower overhead than SSE
  2. Enable GZip - auto-enabled for responses >256 bytes
  3. Use batch publish - POST /clients/{id}/publish/batch for bulk sends
  4. Use fire-and-forget - POST /clients/{id}/publish/fire for fastest publish
  5. Use WebSocket MQTT - Direct MQTT over WebSocket bypasses HTTP overhead

Known Limitations

  • MQTT requires direct TCP access (port 1883) - not available over Cloudflare
  • Use WebSocket/MQTT or HTTP API over cloudflared tunnel
  • Cloudflare QUIC adds ~40ms latency

πŸ“ Project Structure

mqtt-broker/
β”œβ”€β”€ broker_app.py       # Main MQTT broker/HTTP bridge
β”œβ”€β”€ api_server.py       # OpenAI-compatible MQTT proxy
β”œβ”€β”€ benchmark.py        # Python performance test suite
β”œβ”€β”€ benchmark.js        # Node.js performance test suite
β”œβ”€β”€ run_broker.py      # Broker startup script
β”œβ”€β”€ pyx/               # Cython extension modules
β”œβ”€β”€ requirements.txt   # Python dependencies
└── package.json      # Node.js dependencies

πŸ“œ License

MIT