Spaces:
Sleeping
Sleeping
| import threading | |
| import time | |
| import os | |
| import paho.mqtt.client as mqtt | |
| from dotenv import load_dotenv | |
| import state # ูุญุชูู ุนูู `singlas_state` | |
| # ------------------ Load .env ------------------ # | |
| load_dotenv(dotenv_path="keys.env") | |
| mqtt_broker = os.getenv("MQTT_BROKER") | |
| mqtt_port = int(os.getenv("MQTT_PORT")) | |
| MQTT_PARTOPIC_state = os.getenv("MQTT_PARTOPIC_state") | |
| MQTT_SUBTOPIC_reply = os.getenv("MQTT_SUBTOPIC_reply") | |
| MQTT_SUBTOPIC_msg = os.getenv("MQTT_SUBTOPIC_msg") | |
| def topic_msg(tl_id): | |
| return f"{MQTT_PARTOPIC_state}/{tl_id}/{MQTT_SUBTOPIC_msg}" | |
| def topic_rep(tl_id): | |
| return f"{MQTT_PARTOPIC_state}/{tl_id}/{MQTT_SUBTOPIC_reply}" | |
| # ------------------ Get state ------------------ # | |
| def get_State(tl_id): | |
| return state.singlas_state.get(tl_id, "FREE") | |
| def checked_ql(ql): | |
| max_queue_length = 16 # 15 # this is the maximum queue length for the road | |
| return int(ql)<max_queue_length # for now, always return True, can be improved later to check if the queue length is valid or not | |
| # ------------------ Listener ------------------ # | |
| def mqtt_listener(): | |
| def on_message(client, userdata, message): | |
| msg = message.payload.decode() | |
| topic = message.topic | |
| tl_id = topic.split('/')[1] | |
| print(f"{state.request}-----{tl_id}") | |
| print(f"๐ฅ Reply Received on {topic}: {msg}") | |
| if (msg == 'AVBL'): | |
| state.request[tl_id]['accepted'] = True | |
| state.singlas_state[tl_id] = state.request[tl_id]['State'] | |
| next_request = 'DONE QL?' # ask for queue length if to open road in emergency | |
| client.publish(topic_msg(tl_id), next_request, qos=1) | |
| if (msg.startswith("QL")): | |
| ql = msg.split()[1] | |
| if checked_ql(ql): | |
| client.publish(topic_msg(tl_id), state.singlas_state[tl_id] +" "+str(state.request[tl_id]['Duration']), qos=1) | |
| #if(state.request[tl_id]['State']==state.status_emr): | |
| # client.publish(topic_msg(tl_id), state.status_emr +" "+str(state.request[tl_id]['Duration']), qos=1) | |
| client = mqtt.Client() | |
| client.connect(mqtt_broker, mqtt_port) | |
| # ุงุดุชุฑู ูู ูู ุงูุฑุฏูุฏ | |
| for tl_id in state.singlas_state.keys(): | |
| client.subscribe(topic_rep(tl_id)) | |
| print(f"โ Subscribed to {topic_rep(tl_id)}") | |
| client.on_message = on_message | |
| client.loop_forever() | |
| # ------------------ Publisher ------------------ # | |
| def mqtt_publisher_loop(tl_id): | |
| pub_client = mqtt.Client() | |
| pub_client.connect(mqtt_broker, mqtt_port) | |
| pub_client.loop_start() | |
| last_state = None | |
| while True: | |
| current_state = state.singlas_state[tl_id] | |
| if current_state != last_state: | |
| print(f"๐ก state {current_state} on {topic_msg(tl_id)} ,state") | |
| #if current_state != state.status_free: | |
| topic = topic_msg(tl_id) | |
| pub_client.publish(topic, current_state,qos=1) | |
| last_state = current_state | |
| time.sleep(.01) | |
| # ------------------ Startup ------------------ # | |
| def start_mqtt(): | |
| # Start listener once | |
| threading.Thread(target=mqtt_listener, daemon=True).start() | |
| # Start one publisher per tl_id | |
| for tl_id in state.singlas_state.keys(): | |
| threading.Thread(target=mqtt_publisher_loop, args=(tl_id,), daemon=True).start() | |
| ''' | |
| from fastapi import WebSocket, WebSocketDisconnect | |
| import asyncio | |
| @app.websocket("/ws/state") | |
| async def websocket_get_state(websocket: WebSocket): | |
| await websocket.accept() | |
| try: | |
| tl_id = await websocket.receive_text() | |
| print(f"โ Client subscribed to {tl_id}") | |
| last_state = None | |
| while True: | |
| current_state = get_State(tl_id) | |
| if current_state != last_state: | |
| await websocket.send_text(current_state) | |
| last_state = current_state | |
| await asyncio.sleep(1) # ูู ุซุงููุฉ ูุดูู ุงูุชุบููุฑ | |
| except WebSocketDisconnect: | |
| print("โ Client disconnected") | |
| @app.get("/test-db") | |
| def test_db(): | |
| try: | |
| connection = get_db() | |
| with connection.cursor() as cursor: | |
| cursor.execute("SELECT 1") | |
| result = cursor.fetchone() | |
| connection.close() | |
| return {"status": "connected", "result": result} | |
| except Exception as e: | |
| return {"status": "error", "detail": str(e)} | |
| ''' | |