Spaces:
Sleeping
Sleeping
metadata
title: High-Performance MQTT HTTP Bridge
emoji: π
colorFrom: indigo
colorTo: blue
sdk: docker
pinned: false
license: mit
short_description: A high-concurrency, low-latency MQTT-HTTP bridge
High-Performance MQTT HTTP Bridge
A robust, high-speed conduit between industrial MQTT and the modern Web. Achieves ~70,000+ msg/s MQTT throughput with Cython optimizations and supports HTTP, SSE, WebSocket, and NDJSON streaming.
π Performance
Localhost (Verified 2026-03-26)
| Protocol | Send Throughput | Recv Throughput | Latency |
|---|---|---|---|
| MQTT (TCP) | ~70,000 msg/s | ~800 msg/s | ~1.5ms |
| MQTT (WebSocket) | ~65,000 msg/s | ~800 msg/s | ~1.7ms |
| HTTP (SSE/NDJSON) | ~10,000 msg/s | ~5,000 msg/s | ~8ms |
| WebSocket API | ~15,000 msg/s | ~20-60 msg/s | ~8ms |
Cloudflare/HuggingFace Space (Remote)
| Environment | Latency | Throughput |
|---|---|---|
| Cloudflare HTTP | ~43ms | ~1,000 msg/s |
| HuggingFace Space | ~100ms | ~500 msg/s |
Note: MQTT requires direct TCP access (port 1883) - not available over Cloudflare tunnels. Use WebSocket/MQTT or HTTP API instead.
π― Features
- MQTT Broker - Built-in high-performance MQTT broker (iotcore/Rust)
- HTTP REST API - Full CRUD operations for clients, topics, and messages
- Server-Sent Events (SSE) - Real-time message streaming with automatic batching
- NDJSON Streaming - Newline-delimited JSON for lower overhead than SSE
- WebSocket - Bidirectional communication for both MQTT proxy and HTTP API clients
- Fire-and-Forget Publish - Fastest publish mode via
/publish/fire - GZip Compression - Automatic compression for responses >256 bytes
- Cython Optimizations - Hot paths use orjson and compiled extensions
π¦ Quick Start
Install Dependencies
pip install -r requirements.txt
Build Cython Extensions (Optional but Recommended)
python pyx/setup.py build_ext --inplace
Run Broker
python run_broker.py
Run Benchmarks
# Python benchmark
python benchmark.py --test all
# Node.js benchmark (requires Node.js)
node benchmark.js --test all
π API Reference
Connect Client
curl -X POST http://localhost:7860/clients/connect \
-H "Content-Type: application/json" \
-d '{"apiKey": "BROKER_APIKEY", "clientId": "my-client"}'
Subscribe to Topic
curl -X POST http://localhost:7860/clients/{client_id}/subscribe \
-H "Content-Type: application/json" \
-d '{"topic": "arena-ai/{session_id}/response", "qos": 0}'
Publish Message
curl -X POST http://localhost:7860/clients/{client_id}/publish \
-H "Content-Type: application/json" \
-d '{"topic": "arena-ai/{session_id}/request", "payload": "Hello", "qos": 0}'
Receive Messages (SSE)
curl -X POST http://localhost:7860/clients/{client_id}/messages \
-H "Accept: text/event-stream"
Receive Messages (NDJSON)
curl -X POST http://localhost:7860/clients/{client_id}/messages/stream \
-H "Accept: application/x-ndjson"
WebSocket (MQTT Proxy)
const mqtt = require('mqtt');
const client = mqtt.connect('ws://localhost:7860/ws', {
clientId: 'my-client',
protocolVersion: 4,
path: '/ws'
});
client.on('connect', () => {
client.subscribe('topic/#');
});
client.on('message', (topic, msg) => {
console.log(msg.toString());
});
WebSocket (HTTP API Client)
const WebSocket = require('ws');
const ws = new WebSocket('ws://localhost:7860/ws/{client_id}');
ws.onopen = () => console.log('Connected');
ws.onmessage = (event) => {
const messages = JSON.parse(event.data);
console.log(messages);
};
π Architecture
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β Clients β
β (Browser, IoT Devices, External Services) β
βββββββββββββββββββββββ¬ββββββββββββββββββββββββββββββββββββββ
β
ββββββββββββββΌβββββββββββββ
β β β
βΌ βΌ βΌ
βββββββββββ βββββββββββ βββββββββββ
β HTTP β β MQTT β βWebSocketβ
β API β β (TCP) β β Proxy β
ββββββ¬βββββ ββββββ¬βββββ ββββββ¬βββββ
β β β
ββββββββββββββΌβββββββββββββ
β
ββββββββββββββΌβββββββββββββ
β broker_app.py β
β (FastAPI + uvloop) β
β β
β βββββββββββββββββββ β
β β TopicMultiplexerβ β
β β (Message Fan-out)β β
β ββββββββββ¬βββββββββ β
β β β
βββββββββββββΌββββββββββββββ
β
βββββββββββββΌββββββββββββ
β iotcore β
β (Rust MQTT Broker) β
βββββββββββββββββββββββββ
βοΈ Environment Variables
| Variable | Default | Description |
|---|---|---|
BROKER_HOST |
127.0.0.1 | MQTT broker host |
BROKER_PORT |
1883 | MQTT broker port |
BROKER_APIKEY |
BROKER_APIKEY | API authentication |
MAX_QUEUED |
1000 | Max messages per topic buffer |
MSG_TTL |
300 | Message TTL in seconds |
STALE_SECONDS |
300 | Client stale timeout |
BROKER_LOG_LEVEL |
WARNING | Logging level |
π Cloudflare/HuggingFace Deployment
Performance Tips
- Use NDJSON Stream (
POST /clients/{id}/messages/stream) - lower overhead than SSE - Enable GZip - auto-enabled for responses >256 bytes
- Use batch publish -
POST /clients/{id}/publish/batchfor bulk sends - Use fire-and-forget -
POST /clients/{id}/publish/firefor fastest publish - Use WebSocket MQTT - Direct MQTT over WebSocket bypasses HTTP overhead
Known Limitations
- MQTT requires direct TCP access (port 1883) - not available over Cloudflare
- Use WebSocket/MQTT or HTTP API over cloudflared tunnel
- Cloudflare QUIC adds ~40ms latency
π Project Structure
mqtt-broker/
βββ broker_app.py # Main MQTT broker/HTTP bridge
βββ api_server.py # OpenAI-compatible MQTT proxy
βββ benchmark.py # Python performance test suite
βββ benchmark.js # Node.js performance test suite
βββ run_broker.py # Broker startup script
βββ pyx/ # Cython extension modules
βββ requirements.txt # Python dependencies
βββ package.json # Node.js dependencies
π License
MIT