LapStore commited on
Commit
722435e
·
1 Parent(s): 3eeeacd
Files changed (2) hide show
  1. app.py +3 -2
  2. mqtt_manager.py +36 -23
app.py CHANGED
@@ -18,8 +18,9 @@ from contextlib import asynccontextmanager
18
  @asynccontextmanager
19
  async def lifespan(app: FastAPI):
20
  # Starting code
21
- intialize_mqtt_brokers()
22
- start_mqtt_publishers()
 
23
 
24
  yield
25
 
 
18
  @asynccontextmanager
19
  async def lifespan(app: FastAPI):
20
  # Starting code
21
+ #intialize_mqtt_brokers()
22
+ #start_mqtt_publishers()
23
+ start_mqtt()
24
 
25
  yield
26
 
mqtt_manager.py CHANGED
@@ -1,13 +1,12 @@
1
-
2
- import paho.mqtt.client as mqtt
3
- import time
4
  import threading
5
- from dotenv import load_dotenv
6
  import os
7
- import state
8
  import paho.mqtt.client as mqtt
9
- import time
 
 
10
 
 
11
  load_dotenv(dotenv_path="keys.env")
12
 
13
  mqtt_broker = os.getenv("MQTT_BROKER")
@@ -15,46 +14,60 @@ mqtt_port = int(os.getenv("MQTT_PORT"))
15
  MQTT_PARTOPIC_state = os.getenv("MQTT_PARTOPIC_state")
16
  MQTT_SUBTOPIC_reply = os.getenv("MQTT_SUBTOPIC_reply")
17
  MQTT_SUBTOPIC_msg = os.getenv("MQTT_SUBTOPIC_msg")
18
- topic_rep = lambda x: MQTT_PARTOPIC_state+"/"+x +"/"+ MQTT_SUBTOPIC_reply
19
- topic_msg = lambda x:MQTT_PARTOPIC_state+"/"+x +"/"+ MQTT_SUBTOPIC_msg
20
 
 
 
21
 
 
 
 
 
 
 
 
22
 
23
 
24
- client = mqtt.Client()
 
 
 
25
 
26
- def intialize_mqtt_brokers():
27
  client.connect(mqtt_broker, mqtt_port)
28
- for tl_id in state.singlas_state.keys():
29
- client.subscribe(topic_rep(tl_id))
30
 
31
- def start_mqtt_publishers():
32
  for tl_id in state.singlas_state.keys():
33
- threading.Thread(target=mqtt_publisher_loop, args=(tl_id,), daemon=True).start()
34
-
35
-
36
- def get_State(tl_id):
37
- if tl_id in state.singlas_state:
38
- return state.singlas_state[tl_id]
39
- else:
40
- return "FREE"
41
 
 
 
 
42
 
 
43
  def mqtt_publisher_loop(tl_id):
44
- client.loop_start()
 
 
45
 
46
  last_state = None
47
  while True:
48
  current_state = get_State(tl_id)
49
  if current_state != last_state:
50
  topic = topic_msg(tl_id)
51
- client.publish(topic, current_state)
52
  print(f"📡 Published {current_state} to {topic}")
53
  last_state = current_state
54
  time.sleep(1)
55
 
56
 
 
 
 
 
57
 
 
 
 
58
 
59
 
60
  '''
 
 
 
 
1
  import threading
2
+ import time
3
  import os
 
4
  import paho.mqtt.client as mqtt
5
+ from dotenv import load_dotenv
6
+ import state # يحتوي على `singlas_state`
7
+
8
 
9
+ # ------------------ Load .env ------------------ #
10
  load_dotenv(dotenv_path="keys.env")
11
 
12
  mqtt_broker = os.getenv("MQTT_BROKER")
 
14
  MQTT_PARTOPIC_state = os.getenv("MQTT_PARTOPIC_state")
15
  MQTT_SUBTOPIC_reply = os.getenv("MQTT_SUBTOPIC_reply")
16
  MQTT_SUBTOPIC_msg = os.getenv("MQTT_SUBTOPIC_msg")
 
 
17
 
18
+ def topic_msg(tl_id):
19
+ return f"{MQTT_PARTOPIC_state}/{tl_id}/{MQTT_SUBTOPIC_msg}"
20
 
21
+ def topic_rep(tl_id):
22
+ return f"{MQTT_PARTOPIC_state}/{tl_id}/{MQTT_SUBTOPIC_reply}"
23
+
24
+
25
+ # ------------------ Get state ------------------ #
26
+ def get_State(tl_id):
27
+ return state.singlas_state.get(tl_id, "FREE")
28
 
29
 
30
+ # ------------------ Listener ------------------ #
31
+ def mqtt_listener():
32
+ def on_message(client, userdata, message):
33
+ print(f"📥 Reply Received on {message.topic}: {message.payload.decode()}")
34
 
35
+ client = mqtt.Client()
36
  client.connect(mqtt_broker, mqtt_port)
 
 
37
 
38
+ # اشترك في كل الردود
39
  for tl_id in state.singlas_state.keys():
40
+ client.subscribe(topic_rep(tl_id))
 
 
 
 
 
 
 
41
 
42
+ client.on_message = on_message
43
+ client.loop_forever()
44
+
45
 
46
+ # ------------------ Publisher ------------------ #
47
  def mqtt_publisher_loop(tl_id):
48
+ pub_client = mqtt.Client()
49
+ pub_client.connect(mqtt_broker, mqtt_port)
50
+ pub_client.loop_start()
51
 
52
  last_state = None
53
  while True:
54
  current_state = get_State(tl_id)
55
  if current_state != last_state:
56
  topic = topic_msg(tl_id)
57
+ pub_client.publish(topic, current_state)
58
  print(f"📡 Published {current_state} to {topic}")
59
  last_state = current_state
60
  time.sleep(1)
61
 
62
 
63
+ # ------------------ Startup ------------------ #
64
+ def start_mqtt():
65
+ # Start listener once
66
+ threading.Thread(target=mqtt_listener, daemon=True).start()
67
 
68
+ # Start one publisher per tl_id
69
+ for tl_id in state.singlas_state.keys():
70
+ threading.Thread(target=mqtt_publisher_loop, args=(tl_id,), daemon=True).start()
71
 
72
 
73
  '''