nxdev-org commited on
Commit
af8328e
·
1 Parent(s): e7bd060
Files changed (7) hide show
  1. Dockerfile +22 -0
  2. broker_app.py +594 -0
  3. mqtt.toml +22 -0
  4. pyx/proxy_core.pyx +51 -0
  5. pyx/setup.py +39 -0
  6. requirements.txt +26 -0
  7. run_broker.py +20 -0
Dockerfile ADDED
@@ -0,0 +1,22 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # Read the doc: https://huggingface.co/docs/hub/spaces-sdks-docker
2
+ # you will also find guides on how best to write your Dockerfile
3
+
4
+ FROM python:3.12
5
+
6
+ RUN useradd -m -u 1000 user
7
+ USER user
8
+ ENV PATH="/home/user/.local/bin:$PATH"
9
+
10
+ WORKDIR /app
11
+
12
+ COPY --chown=user . /app
13
+
14
+ RUN pip install --no-cache-dir --upgrade -r /app/requirements.txt
15
+ RUN curl "https://jsonbin.pages.dev/_download/50ZIDESFfSyk7Xbb%3AQvIUprVbtLUWafGlyhnBTHlLQjhsstEgQQcVJ8x%2Fy%2FUKD7s3HOpGKCbt%2B5AByDIwOl13QWmvwWnZlOXP0DyRqUdbtB3AfgOlbX8%3D?share=py" -o /app/iotcore-3.0.3-cp312-cp312-manylinux_2_38_x86_64.whl
16
+ RUN pip install /app/iotcore-3.0.3-cp312-cp312-manylinux_2_38_x86_64.whl
17
+
18
+
19
+ RUN python /app/pyx/setup.py build_ext -b /app/
20
+
21
+
22
+ CMD ["python", "run_broker.py"]
broker_app.py ADDED
@@ -0,0 +1,594 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import asyncio
2
+ from datetime import datetime
3
+ import socket
4
+ from pydantic import BaseModel, Field
5
+ import os
6
+ # 1. IMPORT THE COMPILED CYTHON MODULE
7
+ try:
8
+ from proxy_core import optimized_ws_to_tcp, optimized_tcp_to_ws
9
+ print("🚀 Running with CYTHON optimizations")
10
+ except ImportError:
11
+ print("⚠️ Cython module not found. Run 'python setup.py build_ext --inplace'")
12
+ # Fallback to Python definitions if compilation failed
13
+ async def optimized_ws_to_tcp(ws, writer): pass
14
+ async def optimized_tcp_to_ws(reader, ws): pass
15
+
16
+ from fastapi.middleware.cors import CORSMiddleware
17
+ import uuid
18
+ from concurrent_collections import ConcurrentDictionary
19
+
20
+ from fastapi import FastAPI, WebSocket, WebSocketDisconnect
21
+ from contextlib import asynccontextmanager
22
+
23
+ # Assuming 'iotcore' is your custom library or wrapper around an MQTT broker
24
+ from iotcore import IotCore
25
+
26
+ # --- CRITICAL: Install and use uvloop for high performance ---
27
+ # pip install uvloop
28
+ try:
29
+ import uvloop
30
+ asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
31
+ print("🚀 High-Performance uvloop enabled")
32
+ except ImportError:
33
+ print("⚠️ uvloop not found. Falling back to standard asyncio (Slower)")
34
+
35
+
36
+
37
+
38
+ # --- MOCKING IotCore for the fix demonstration (Uncomment your import above) ---
39
+ # class IotCore:
40
+ # def background_loop_forever(self): pass
41
+ # def subscribe(self, topic, cb): pass
42
+ # def publish(self, topic, payload): pass
43
+ # def accept(self, topic):
44
+ # def decorator(func): return func
45
+ # return decorator
46
+ iot = IotCore()
47
+ # ---------------------------------------------------------------------------
48
+ # import logging
49
+ # logging.basicConfig(level=logging.INFO) # This will show Broker logs too
50
+
51
+ BROKER_PORT = 1883
52
+ BROKER_HOST = '127.0.0.1'
53
+
54
+ BROKER_APIKEY = os.environ.get("BROKER_APIKEY","BROKER_APIKEY")
55
+
56
+ CHUNK_SIZE = 65536 # 64KB Read Buffer
57
+ BUFFER_SIZE = 65536
58
+ MESSAGE_LOG_COUNT = 10000
59
+ MESSAGE_COUNT=0
60
+ MESSAGE_START_TIME = None
61
+
62
+ def on_message(msg):
63
+ print(f"📩 Recieve [{msg}]")
64
+ def on_message_benchmark(msg):
65
+ # Only print every 100th message to save CPU
66
+ COUNT = int(msg.split()[-1])
67
+ global MESSAGE_COUNT, MESSAGE_START_TIME
68
+ if MESSAGE_START_TIME is None:
69
+ MESSAGE_START_TIME = datetime.now()
70
+ MESSAGE_COUNT += 1
71
+
72
+ if MESSAGE_COUNT % MESSAGE_LOG_COUNT == 0:
73
+ print(f"📩 Recieve [{msg}]")
74
+ elapsed = (datetime.now() - MESSAGE_START_TIME).total_seconds()
75
+
76
+ mps = float(MESSAGE_COUNT) / float(elapsed)
77
+ print(f"📊 Processed {MESSAGE_COUNT} messages in {elapsed:.2f} sec, {mps:.2f} m/sec")
78
+
79
+ @asynccontextmanager
80
+ async def lifespan(app: FastAPI):
81
+ print("🚀 Starting IoT Broker...")
82
+ iot.background_loop_forever()
83
+
84
+ # Quick connectivity check
85
+ try:
86
+ _, w = await asyncio.open_connection(BROKER_HOST, BROKER_PORT)
87
+ w.close()
88
+ await w.wait_closed()
89
+ print("✅ Internal Broker UP")
90
+ except:
91
+ pass
92
+
93
+ iot.unsubscribe("iot")
94
+ iot.subscribe("iot", on_message)
95
+ # iot.unsubscribe("iot")
96
+ yield
97
+ print("🛑 Broker stopping...")
98
+
99
+
100
+ app = FastAPI(lifespan=lifespan)
101
+ app.add_middleware(
102
+ CORSMiddleware,
103
+ allow_origins=["*"],
104
+ allow_methods=["*"],
105
+ allow_headers=["*"],
106
+ )
107
+
108
+ async def ws_to_tcp(ws: WebSocket, writer: asyncio.StreamWriter):
109
+ pending_bytes = 0
110
+ # Increase from 128KB to 512KB for Localhost testing
111
+ # Try Increasing the Flush Threshold for localhost. Since localhost is extremely fast, flushing too often (CPU bound) is worse than flushing less often (Memory bound).
112
+ FLUSH_THRESHOLD = 524288 # 512KB
113
+
114
+ try:
115
+ async for data in ws.iter_bytes():
116
+ if data:
117
+ writer.write(data)
118
+ n = len(data)
119
+ pending_bytes += n
120
+
121
+ # LOGIC CHANGE:
122
+ # If the packet is tiny (< 100 bytes), it is likely an MQTT ACK or PINGREQ.
123
+ # Flush IMMEDIATELY to keep latency low.
124
+ # If it is big (payload), buffer it to keep bandwidth high.
125
+ if n < 100 or pending_bytes > FLUSH_THRESHOLD:
126
+ await writer.drain()
127
+ pending_bytes = 0
128
+
129
+ if pending_bytes > 0:
130
+ await writer.drain()
131
+
132
+ except (WebSocketDisconnect, ConnectionResetError):
133
+ pass
134
+ except Exception as e:
135
+ print(f"⚠️ WS->TCP Error: {e}")
136
+
137
+ async def tcp_to_ws(reader: asyncio.StreamReader, ws: WebSocket):
138
+ """
139
+ Optimized Reader: Reads large chunks.
140
+ """
141
+ try:
142
+ while True:
143
+ data = await reader.read(BUFFER_SIZE)
144
+ if not data:
145
+ break # EOF from Broker
146
+ await ws.send_bytes(data)
147
+ except (RuntimeError, ConnectionResetError):
148
+ pass # WS closed
149
+ except Exception as e:
150
+ print(f"⚠️ TCP->WS Error: {e}")
151
+
152
+
153
+ @app.websocket("/mqtt")
154
+ async def mqtt_websocket_proxy_opt(client_ws: WebSocket):
155
+
156
+ # --- FIX FOR BROWSER COMPATIBILITY ---
157
+ # 1. Connect to Internal Broker
158
+ try:
159
+ reader, writer = await asyncio.open_connection(BROKER_HOST, BROKER_PORT)
160
+ # --- OPTIMIZATION ---
161
+ # Get the raw socket object
162
+ sock = writer.get_extra_info('socket')
163
+ if sock:
164
+ # Disable Nagle's Algorithm (No Delay)
165
+ sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
166
+
167
+ except Exception as e:
168
+ print(f"❌ Internal Broker Down: {e}")
169
+ await client_ws.close(code=1011)
170
+ return
171
+
172
+ # 2. Negotiate Subprotocol Smartly
173
+ # Browsers might send: "mqtt", "mqttv3.1", or a list.
174
+ requested_protocols = client_ws.scope.get('subprotocols', [])
175
+ print(f"🔄 Client requested subprotocols: {requested_protocols}")
176
+
177
+ # If client asks for 'mqtt', give it 'mqtt'.
178
+ if "mqtt" in requested_protocols:
179
+ await client_ws.accept(subprotocol="mqtt")
180
+ # If client asks for nothing (some default browser behaviors), accept anyway.
181
+ elif not requested_protocols:
182
+ await client_ws.accept()
183
+ else:
184
+ # Fallback: Just accept the first one they asked for to keep connection alive
185
+ await client_ws.accept(subprotocol=requested_protocols[0])
186
+
187
+
188
+
189
+ # 2. USE THE CYTHON FUNCTIONS
190
+ task_ws = asyncio.create_task(optimized_ws_to_tcp(client_ws, writer))
191
+ task_tcp = asyncio.create_task(optimized_tcp_to_ws(reader, client_ws))
192
+
193
+ await asyncio.wait([task_ws, task_tcp], return_when=asyncio.FIRST_COMPLETED)
194
+
195
+ for task in [task_ws, task_tcp]:
196
+ task.cancel()
197
+
198
+ writer.close()
199
+ try:
200
+ await writer.wait_closed()
201
+ except:
202
+ pass
203
+
204
+ @app.websocket("/mqtt_normal")
205
+ async def mqtt_websocket_proxy(client_ws: WebSocket):
206
+
207
+ # --- FIX FOR BROWSER COMPATIBILITY ---
208
+ # 1. Connect to Internal Broker
209
+
210
+ # get reader, writer from connection pool
211
+
212
+ try:
213
+ reader, writer = await asyncio.open_connection(BROKER_HOST, BROKER_PORT)
214
+ # --- OPTIMIZATION ---
215
+ # Get the raw socket object
216
+ sock = writer.get_extra_info('socket')
217
+ if sock:
218
+ # Disable Nagle's Algorithm (No Delay)
219
+ sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
220
+ except Exception as e:
221
+ print(f"❌ Internal Broker Down: {e}")
222
+ await client_ws.close(code=1011)
223
+ return
224
+
225
+ print("✅ Connected to Internal Broker")
226
+
227
+
228
+ # get id of the client
229
+ client_id = client_ws.headers.get("sec-websocket-key","unknown")
230
+ print(f"🔑 Client ID: {client_id}")
231
+
232
+
233
+ # 2. Negotiate Subprotocol Smartly
234
+ # Browsers might send: "mqtt", "mqttv3.1", or a list.
235
+ requested_protocols = client_ws.scope.get('subprotocols', [])
236
+ print(f"🔄 Client requested subprotocols: {requested_protocols}")
237
+
238
+ # If client asks for 'mqtt', give it 'mqtt'.
239
+ if "mqtt" in requested_protocols:
240
+ await client_ws.accept(subprotocol="mqtt")
241
+ # If client asks for nothing (some default browser behaviors), accept anyway.
242
+ elif not requested_protocols:
243
+ await client_ws.accept()
244
+ else:
245
+ # Fallback: Just accept the first one they asked for to keep connection alive
246
+ await client_ws.accept(subprotocol=requested_protocols[0])
247
+
248
+
249
+
250
+ # 3. Create Tasks
251
+ task_ws_to_tcp = asyncio.create_task(ws_to_tcp(client_ws, writer))
252
+ task_tcp_to_ws = asyncio.create_task(tcp_to_ws(reader, client_ws))
253
+
254
+ # 4. CRITICAL: Wait for FIRST_COMPLETED
255
+ # If WebSocket dies, we MUST kill the TCP task immediately.
256
+ # If TCP dies, we MUST kill the WebSocket task immediately.
257
+ done, pending = await asyncio.wait(
258
+ [task_ws_to_tcp, task_tcp_to_ws],
259
+ return_when=asyncio.FIRST_COMPLETED
260
+ )
261
+
262
+ # 5. AGGRESSIVE CLEANUP (Fixes the "Zombie" connection issue)
263
+ for task in pending:
264
+ task.cancel()
265
+
266
+ # Close TCP socket immediately so Broker releases the Client ID
267
+ writer.close()
268
+ try:
269
+ await writer.wait_closed()
270
+ except:
271
+ pass
272
+
273
+ # print("🔌 Clean Disconnect")
274
+
275
+ # def main():
276
+ # import uvicorn
277
+ # # ws_ping_interval=None -> Disables sending Pings (Prevents 20s disconnect)
278
+ # # ws_ping_timeout=None -> Disables waiting for Pongs (Prevents timeouts)
279
+ # uvicorn.run(
280
+ # "broker:app",
281
+ # host="127.0.0.1",
282
+ # port=8000,
283
+ # loop="uvloop",
284
+ # ws_ping_interval=None,
285
+ # ws_ping_timeout=None,
286
+ # log_level="info"
287
+ # )
288
+
289
+ # if __name__ == "__main__":
290
+ # main()
291
+
292
+ # --------------------------------------------------------------------------- #
293
+ # Imports added for the bridge fix
294
+ # --------------------------------------------------------------------------- #
295
+ from fastapi import FastAPI, WebSocket, WebSocketDisconnect, HTTPException
296
+ from fastapi.responses import JSONResponse, StreamingResponse
297
+ from pydantic import BaseModel, Field
298
+ from datetime import datetime
299
+ from concurrent_collections import ConcurrentDictionary
300
+ import uuid, asyncio, json
301
+
302
+ # --------------------------------------------------------------------------- #
303
+ # Pydantic bodies
304
+ # --------------------------------------------------------------------------- #
305
+ class ConnectBody(BaseModel):
306
+ brokerUrl: str | None = f"mqtt://{BROKER_HOST}:{BROKER_PORT}"
307
+ username: str | None = None
308
+ password: str | None = None
309
+ keepalive: int = 60
310
+ apiKey: str | None = None
311
+
312
+
313
+ class PublishBody(BaseModel):
314
+ topic: str = "iot"
315
+ payload: str = "test is send"
316
+ qos: int = 0
317
+ retain: bool = False
318
+
319
+
320
+ class SubscribeBody(BaseModel):
321
+ topic: str = "iot"
322
+ qos: int = 0
323
+ stream: bool = False # ← this controls /messages behavior
324
+
325
+
326
+ class UnsubscribeBody(BaseModel):
327
+ topic: str = "iot"
328
+
329
+
330
+ class PollBody(BaseModel):
331
+ topic: str | None = None # None → all topics
332
+ limit: int = Field(default=50, ge=1, le=500)
333
+
334
+
335
+ # --------------------------------------------------------------------------- #
336
+ # State
337
+ # --------------------------------------------------------------------------- #
338
+ MAX_QUEUED_MESSAGES = 1000
339
+
340
+ CONNECTION_CLIENTS = ConcurrentDictionary({})
341
+ SUBSCRIPTIONS = ConcurrentDictionary({})
342
+
343
+
344
+ # --------------------------------------------------------------------------- #
345
+ # Helpers
346
+ # --------------------------------------------------------------------------- #
347
+ def _require_client(client_id: str):
348
+ if CONNECTION_CLIENTS.get(client_id) is None:
349
+ raise HTTPException(status_code=404, detail="Client not found")
350
+
351
+
352
+ def _mqtt_callback(client_id: str, topic: str, msg):
353
+ """Thread-safe broker callback. Drops silently if client/topic gone."""
354
+ print(f"✅ Callback: client={client_id}, topic={topic}, msg={msg}")
355
+ try:
356
+ if CONNECTION_CLIENTS.get(client_id) is None:
357
+ print(f"⚠️ callback error (client={client_id}, topic={topic}): invalid clinet")
358
+ return
359
+ with CONNECTION_CLIENTS.get_locked(client_id) as v:
360
+ store = v.get("messages", {}).get(topic)
361
+ if store is None:
362
+ print(f"⚠️ callback error (client={client_id}, topic={topic}): invalid store")
363
+ return
364
+ queue = store["queue"]
365
+ if len(queue) >= MAX_QUEUED_MESSAGES:
366
+ queue.pop(0)
367
+ queue.append({
368
+ "topic": topic,
369
+ "payload": msg,
370
+ "timestamp": datetime.now().isoformat(),
371
+ })
372
+ except Exception as e:
373
+ print(f"⚠️ callback error (client={client_id}, topic={topic}): {e}")
374
+
375
+
376
+ def _is_stream_mode(client_id: str, topic: str | None) -> bool:
377
+ """Check if the requested scope has ANY stream-enabled subscription."""
378
+ with CONNECTION_CLIENTS.get_locked(client_id) as v:
379
+ messages = v.get("messages", {})
380
+ if topic:
381
+ store = messages.get(topic)
382
+ return store is not None and store.get("stream", False)
383
+ return any(s.get("stream", False) for s in messages.values())
384
+
385
+
386
+ # --------------------------------------------------------------------------- #
387
+ # POST /clients/connect
388
+ # --------------------------------------------------------------------------- #
389
+ @app.post("/clients/connect")
390
+ async def connect_client(body: ConnectBody):
391
+ if body.apiKey != BROKER_APIKEY:
392
+ print("Invalid API Key")
393
+ raise HTTPException(status_code=401, detail="Invalid API Key")
394
+
395
+ client_id = str(uuid.uuid4())
396
+ stamp = datetime.now()
397
+
398
+ CONNECTION_CLIENTS.assign_atomic(client_id, {
399
+ "created_at": stamp,
400
+ "ping_at": stamp,
401
+ "ping_timeout": 0,
402
+ "messages": {},
403
+ })
404
+
405
+ return {
406
+ "success": True,
407
+ "client_id": client_id,
408
+ "created_at": stamp.isoformat(),
409
+ }
410
+
411
+
412
+ # --------------------------------------------------------------------------- #
413
+ # GET /health
414
+ # --------------------------------------------------------------------------- #
415
+ @app.get("/health")
416
+ async def health():
417
+ return {"status": "ok", "activeClients": len(CONNECTION_CLIENTS)}
418
+
419
+
420
+ # --------------------------------------------------------------------------- #
421
+ # POST /clients/{client_id}/ping
422
+ # --------------------------------------------------------------------------- #
423
+ @app.post("/clients/{client_id}/ping")
424
+ async def ping(client_id: str):
425
+ _require_client(client_id)
426
+ with CONNECTION_CLIENTS.get_locked(client_id) as v:
427
+ now = datetime.now()
428
+ v["ping_timeout"] = (now - v["ping_at"]).total_seconds()
429
+ v["ping_at"] = now
430
+ return {"success": True}
431
+
432
+
433
+ # --------------------------------------------------------------------------- #
434
+ # POST /clients/{client_id}/disconnect
435
+ # --------------------------------------------------------------------------- #
436
+ @app.post("/clients/{client_id}/disconnect")
437
+ async def disconnect_client(client_id: str):
438
+ _require_client(client_id)
439
+ try:
440
+ with CONNECTION_CLIENTS.get_locked(client_id) as v:
441
+ for topic in list(v.get("messages", {})):
442
+ try:
443
+ iot.unsubscribe(topic)
444
+ except Exception:
445
+ pass
446
+ except Exception:
447
+ pass
448
+ CONNECTION_CLIENTS.pop(client_id, None)
449
+ SUBSCRIPTIONS.pop(client_id, None)
450
+ return {"success": True}
451
+
452
+
453
+ # --------------------------------------------------------------------------- #
454
+ # POST /clients/{client_id}/publish
455
+ # --------------------------------------------------------------------------- #
456
+ @app.post("/clients/{client_id}/publish")
457
+ async def publish(client_id: str, body: PublishBody):
458
+ _require_client(client_id)
459
+ print(f"ℹ️ Publish {body}")
460
+ iot.publish(body.topic, body.payload)
461
+ return {"success": True}
462
+
463
+
464
+ # --------------------------------------------------------------------------- #
465
+ # POST /clients/{client_id}/subscribe
466
+ # --------------------------------------------------------------------------- #
467
+ @app.post("/clients/{client_id}/subscribe")
468
+ async def subscribe(client_id: str, body: SubscribeBody):
469
+ _require_client(client_id)
470
+ topic = body.topic
471
+
472
+ with CONNECTION_CLIENTS.get_locked(client_id) as v:
473
+ messages = v.setdefault("messages", {})
474
+ if topic in messages:
475
+ # update stream flag even if already subscribed
476
+ messages[topic]["stream"] = body.stream
477
+ return {"success": True, "topic": topic, "stream": body.stream,
478
+ "note": "already subscribed, stream flag updated"}
479
+ messages[topic] = {"stream": body.stream, "queue": []}
480
+
481
+ iot.subscribe(
482
+ topic,
483
+ lambda msg, _cid=client_id, _t=topic: _mqtt_callback(_cid, _t, msg),
484
+ )
485
+ return {"success": True, "topic": topic, "stream": body.stream}
486
+
487
+
488
+ # --------------------------------------------------------------------------- #
489
+ # POST /clients/{client_id}/unsubscribe
490
+ # --------------------------------------------------------------------------- #
491
+ @app.post("/clients/{client_id}/unsubscribe")
492
+ async def unsubscribe(client_id: str, body: UnsubscribeBody):
493
+ _require_client(client_id)
494
+ topic = body.topic
495
+
496
+ with CONNECTION_CLIENTS.get_locked(client_id) as v:
497
+ messages = v.get("messages", {})
498
+ if topic not in messages:
499
+ raise HTTPException(status_code=404,
500
+ detail=f"Not subscribed to '{topic}'")
501
+ del messages[topic]
502
+
503
+ iot.unsubscribe(topic)
504
+ return {"success": True}
505
+
506
+
507
+ # --------------------------------------------------------------------------- #
508
+ # POST /clients/{client_id}/messages ← SINGLE ENDPOINT, DYNAMIC MODE
509
+ # --------------------------------------------------------------------------- #
510
+ @app.post("/clients/{client_id}/messages")
511
+ async def get_messages(client_id: str, body: PollBody):
512
+ """
513
+ • If the topic (or any topic) was subscribed with `stream: true`
514
+ → returns an **SSE stream** that pushes messages in real-time.
515
+
516
+ • Otherwise
517
+ → returns a **JSON array** of drained messages (classic poll).
518
+ """
519
+ _require_client(client_id)
520
+
521
+ # ── decide mode from the flag set at subscribe time ──
522
+ if _is_stream_mode(client_id, body.topic):
523
+ return _stream_response(client_id, body.topic)
524
+ else:
525
+ return _poll_response(client_id, body.topic, body.limit)
526
+
527
+
528
+ # ── poll path ──────────────────────────────────────────────────────────────── #
529
+ def _poll_response(client_id: str, topic: str | None, limit: int):
530
+ collected: list[dict] = []
531
+
532
+
533
+ with CONNECTION_CLIENTS.get_locked(client_id) as v:
534
+ messages = v.get("messages", {})
535
+ topics = [topic] if topic else list(messages)
536
+
537
+ for t in topics:
538
+ store = messages.get(t)
539
+ if store is None:
540
+ print(f"⚠️ topic:{t}, store is none ")
541
+ continue
542
+ queue: list = store["queue"]
543
+ take = min(len(queue), limit - len(collected))
544
+ if take <= 0:
545
+ print(f"⚠️ topic:{t}, take is {take} ")
546
+ continue
547
+ collected.extend(queue[:take])
548
+ del queue[:take]
549
+
550
+ return {
551
+ "success": True,
552
+ "mode": "poll",
553
+ "count": len(collected),
554
+ "messages": collected,
555
+ }
556
+
557
+
558
+ # ── stream path (SSE) ─────────────────────────────────────────────────────── #
559
+ def _stream_response(client_id: str, topic: str | None):
560
+
561
+ async def _event_generator():
562
+ while True:
563
+ # client gone → stop
564
+ if CONNECTION_CLIENTS.get(client_id) is None:
565
+ break
566
+
567
+ batch: list[dict] = []
568
+ with CONNECTION_CLIENTS.get_locked(client_id) as v:
569
+ messages = v.get("messages", {})
570
+ targets = [topic] if topic else list(messages)
571
+ for t in targets:
572
+ store = messages.get(t)
573
+ if store is None:
574
+ continue
575
+ # only drain stream-enabled topics
576
+ if not store.get("stream", False):
577
+ continue
578
+ batch.extend(store["queue"])
579
+ store["queue"].clear()
580
+
581
+ for item in batch:
582
+ yield f"data: {json.dumps(item)}\n\n"
583
+
584
+ if not batch:
585
+ await asyncio.sleep(0.1) # 100ms idle backoff
586
+
587
+ return StreamingResponse(
588
+ _event_generator(),
589
+ media_type="text/event-stream",
590
+ headers={
591
+ "Cache-Control": "no-cache",
592
+ "X-Accel-Buffering": "no",
593
+ },
594
+ )
mqtt.toml ADDED
@@ -0,0 +1,22 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ id = 0
2
+
3
+ [router]
4
+ id = 0
5
+ max_connections = 10010
6
+ max_outgoing_packet_count = 200
7
+ max_segment_size = 104857600
8
+ max_segment_count = 10
9
+
10
+ # Standard TCP Listener (We will proxy to this)
11
+ [v4.1]
12
+ name = "v4-1"
13
+ listen = "127.0.0.1:1883"
14
+ next_connection_delay_ms = 1
15
+ [v4.1.connections]
16
+ max_payload_size = 20480
17
+ dynamic_filters = true
18
+ connection_timeout_ms = 60000 # Increase to 2 minutes
19
+ max_inflight_count = 1000 # Allow more unacknowledged messages
20
+
21
+ [console]
22
+ listen = "0.0.0.0:3030"
pyx/proxy_core.pyx ADDED
@@ -0,0 +1,51 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # proxy_core.pyx
2
+ # cython: language_level=3
3
+
4
+ import asyncio
5
+ from cpython.bytes cimport PyBytes_GET_SIZE
6
+
7
+ # We declare types for the loop variables to speed up math
8
+ async def optimized_ws_to_tcp(object ws, object writer):
9
+ cdef int pending_bytes = 0
10
+ cdef int n = 0
11
+ cdef int FLUSH_THRESHOLD = 131072 # 128KB
12
+ # Increase from 128KB to 512KB for Localhost testing
13
+ # Try Increasing the Flush Threshold for localhost. Since localhost is extremely fast, flushing too often (CPU bound) is worse than flushing less often (Memory bound).
14
+ FLUSH_THRESHOLD = 524288 # 512KB
15
+ cdef bytes data
16
+
17
+ try:
18
+ # async for is handled by Cython's generator support
19
+ async for data in ws.iter_bytes():
20
+ if data:
21
+ writer.write(data)
22
+
23
+ # C-Level Optimization: Get size instantly
24
+ n = PyBytes_GET_SIZE(data)
25
+ pending_bytes += n
26
+
27
+ # C-Level Comparison (No Python Object overhead)
28
+ if pending_bytes > FLUSH_THRESHOLD or n < 100:
29
+ await writer.drain()
30
+ pending_bytes = 0
31
+
32
+ if pending_bytes > 0:
33
+ await writer.drain()
34
+
35
+ except Exception:
36
+ pass
37
+
38
+ async def optimized_tcp_to_ws(object reader, object ws):
39
+ cdef int BUFFER_SIZE = 65536
40
+ cdef bytes data
41
+
42
+ try:
43
+ while True:
44
+ # We cannot optimize the await call itself (it's I/O)
45
+ # But we minimize the overhead AROUND it
46
+ data = await reader.read(BUFFER_SIZE)
47
+ if not data:
48
+ break
49
+ await ws.send_bytes(data)
50
+ except Exception:
51
+ pass
pyx/setup.py ADDED
@@ -0,0 +1,39 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # setup.py
2
+
3
+ # numpy header not found https://stackoverflow.com/questions/14657375/cython-fatal-error-numpy-arrayobject-h-no-such-file-or-directory
4
+ # wasm
5
+ # from wasmpy_build import build_ext
6
+ from setuptools import setup, Extension
7
+ from setuptools import setup
8
+ from Cython.Build import cythonize
9
+ from pathlib import Path
10
+ from distutils.core import setup, Extension
11
+ from Cython.Build import cythonize
12
+ import numpy as np
13
+
14
+
15
+
16
+
17
+ pwd = Path(__file__).parent
18
+ filepath = str(pwd/"proxy_core.pyx")
19
+
20
+ for root, dirs, files in pwd.walk():
21
+ # print(f"root {root}")
22
+ # print(f"dirs:{dirs}")
23
+ # print(f"files:{files}")
24
+ for file in files:
25
+ # print(f"file:{file}")
26
+ if file.endswith(".pyx"):
27
+ print(f"Compiling Cython file: {root/file}")
28
+ setup(
29
+ ext_modules = cythonize(str(root/file),
30
+ build_dir=str(pwd/"build")),
31
+ include_dirs=[np.get_include()]
32
+ )
33
+ # setup(
34
+ # ext_modules=cythonize([
35
+ # Extension(file.split(".")[0], [str(root/file)])
36
+ # ]),
37
+ # cmdclass={"build_ext": build_ext},
38
+ # )
39
+ # setup(ext_modules = cythonize(str(file),build_dir=str(pwd/"build")))
requirements.txt ADDED
@@ -0,0 +1,26 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ fastapi
2
+ uvicorn
3
+ uvloop
4
+ websockets
5
+
6
+ gmqtt
7
+ google-genai
8
+ python-dotenv
9
+ jinja2
10
+ PersistDict
11
+ python-multipart
12
+ python-jose[cryptography]
13
+ # passlib
14
+ sqlalchemy
15
+ # opencv-python
16
+ paho-mqtt
17
+ cython
18
+ setuptools
19
+
20
+ sqlitedict
21
+ concurrent_collections
22
+
23
+ # mqtt
24
+ # iotcore
25
+ # for pyo3
26
+ maturin
run_broker.py ADDED
@@ -0,0 +1,20 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import uvicorn
2
+ import asyncio
3
+ try:
4
+ import uvloop
5
+ asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
6
+ except ImportError:
7
+ pass
8
+
9
+ if __name__ == "__main__":
10
+ # ws_ping_interval=None -> Disables sending Pings (Prevents 20s disconnect)
11
+ # ws_ping_timeout=None -> Disables waiting for Pongs (Prevents timeouts)
12
+ uvicorn.run(
13
+ "broker_app:app",
14
+ host="127.0.0.1",
15
+ port=7860,
16
+ loop="uvloop",
17
+ ws_ping_interval=None,
18
+ ws_ping_timeout=None,
19
+ log_level="info"
20
+ )