Spaces:
Sleeping
Sleeping
| import requests | |
| from database_manager import * | |
| from dotenv import load_dotenv | |
| import os | |
| import state | |
| import asyncio | |
| import threading | |
| load_dotenv(dotenv_path="keys.env") | |
| overpass_url = os.getenv("URL_Traffic") | |
| import time | |
| def get_traffic_in_container(coordinates): | |
| (min_lat, max_lat, min_lon, max_lon) = get_rectangle_container(coordinates) | |
| query = f""" | |
| [out:json]; | |
| node["highway"="traffic_signals"]({min_lat},{min_lon},{max_lat},{max_lon}); | |
| out body; | |
| """ | |
| response = requests.get(overpass_url, params={'data': query}) | |
| data = response.json() | |
| return [(el['lat'], el['lon']) for el in data['elements']] | |
| def get_rectangle_container(coordinates): | |
| lats = [i for i, _ in coordinates] # lats as it from mobile app take lats first | |
| lons = [i for _, i in coordinates] | |
| min_lat, max_lat = min(lats), max(lats) | |
| min_lon, max_lon = min(lons), max(lons) | |
| return min_lat, max_lat, min_lon, max_lon | |
| async def wait_check_conection(tl_id,check_time): | |
| start = time.time() | |
| while time.time() - start < check_time: | |
| if state.singlas_state[tl_id] != state.status_chk: | |
| break | |
| await asyncio.sleep(0.2) | |
| async def open_signal(tl_id,state_,duration,delay): | |
| if (state.singlas_state[tl_id] != state.status_free): #exist old request | |
| request_type = "Accident to stop road " if (state.singlas_state[tl_id] == state.status_acc) else "Exist Emergency request" | |
| return "Request Refused , Exist old request : \n"+ request_type | |
| check_time = 4 | |
| #await asyncio.sleep(delay) | |
| state.singlas_state[tl_id] = state.status_chk | |
| print(state.request[tl_id]) | |
| await wait_check_conection(tl_id,check_time) | |
| print(state.request[tl_id]) | |
| if (state.request[tl_id]['accepted']== True): | |
| threading.Thread(target=handle_request_in_thread, args=(tl_id, state_, duration,delay)).start() | |
| return "Request Accepted" | |
| else: | |
| state.delete_last_request(tl_id) | |
| return "Request Refused , No Internet Connection or Very High Queue Length" | |
| def handle_request_in_thread(tl_id, state_, duration,delay): | |
| time.sleep(delay) | |
| state.set_request(tl_id, state_, duration) | |
| print("✅ Request Set:", state.request) | |
| time.sleep(duration) | |
| state.delete_last_request(tl_id) | |
| print("🧹 Request Deleted:", state.request) | |
| print("✅ Request Accepted") | |
| def check_signals(coords): | |
| connection = get_db() | |
| found_signals = [] | |
| try: | |
| with connection.cursor() as cursor: | |
| for lat, lon in coords: | |
| cursor.execute(""" | |
| SELECT tl_id_sumo FROM traffic_signals | |
| WHERE ABS(lat - %s) < 0.0001 AND ABS(lon - %s) < 0.0001 | |
| """, (lon,lat)) | |
| result = cursor.fetchone() | |
| if result: | |
| found_signals.append(result['tl_id_sumo']) | |
| finally: | |
| connection.close() | |
| return found_signals | |