Spaces:
Paused
Paused
File size: 6,148 Bytes
98b6d67 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 | # app/services/brigade_simulator.py - YANGI FAYL YARATING
"""
Brigade Simulator - Brigadalarning harakatini simulyatsiya qilish
Real-time koordinatalarni yangilash (A nuqtadan B nuqtaga)
"""
import random
import math
import asyncio
import logging
from typing import Dict, Tuple, Optional
from datetime import datetime
logger = logging.getLogger(__name__)
# Toshkent chegaralari
TASHKENT_BOUNDS = {
"lat_min": 41.20,
"lat_max": 41.35,
"lon_min": 69.10,
"lon_max": 69.35
}
def haversine_distance(lat1: float, lon1: float, lat2: float, lon2: float) -> float:
"""
Ikki nuqta orasidagi masofa (km)
Returns:
float: Masofa kilometrlarda
"""
R = 6371 # Yer radiusi (km)
lat1_rad = math.radians(lat1)
lat2_rad = math.radians(lat2)
delta_lat = math.radians(lat2 - lat1)
delta_lon = math.radians(lon2 - lon1)
a = math.sin(delta_lat / 2) ** 2 + math.cos(lat1_rad) * math.cos(lat2_rad) * math.sin(delta_lon / 2) ** 2
c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
return R * c
def calculate_new_position(
current_lat: float,
current_lon: float,
target_lat: float,
target_lon: float,
speed_kmh: float = 60,
time_step_seconds: int = 3
) -> Tuple[float, float]:
"""
Yangi pozitsiyani hisoblash (A dan B ga)
Args:
current_lat: Joriy lat
current_lon: Joriy lon
target_lat: Maqsad lat
target_lon: Maqsad lon
speed_kmh: Tezlik (km/soat)
time_step_seconds: Vaqt qadami (sekund)
Returns:
Tuple[float, float]: Yangi (lat, lon)
"""
# Maqsadga yetgan bo'lsa
distance = haversine_distance(current_lat, current_lon, target_lat, target_lon)
if distance < 0.05: # 50 metr
return target_lat, target_lon
# Harakat masofasi (km)
distance_step = (speed_kmh / 3600) * time_step_seconds
# Agar bir qadamda yetib borsa
if distance_step >= distance:
return target_lat, target_lon
# Yo'nalish (bearing)
fraction = distance_step / distance
new_lat = current_lat + (target_lat - current_lat) * fraction
new_lon = current_lon + (target_lon - current_lon) * fraction
return new_lat, new_lon
def generate_random_target() -> Tuple[float, float]:
"""
Tasodifiy maqsad nuqta (Toshkent ichida)
Returns:
Tuple[float, float]: (lat, lon)
"""
lat = random.uniform(TASHKENT_BOUNDS["lat_min"], TASHKENT_BOUNDS["lat_max"])
lon = random.uniform(TASHKENT_BOUNDS["lon_min"], TASHKENT_BOUNDS["lon_max"])
return lat, lon
class BrigadeSimulator:
"""Brigade simulyatori - koordinatalarni yangilash"""
def __init__(self, db):
self.db = db
self.is_running = False
self.update_interval = 3 # 3 sekund
async def start(self):
"""Simulyatorni ishga tushirish"""
self.is_running = True
logger.info("π Brigade simulator ishga tushdi")
while self.is_running:
try:
await self.update_all_brigades()
await asyncio.sleep(self.update_interval)
except Exception as e:
logger.error(f"β Brigade simulator xatolik: {e}")
await asyncio.sleep(5)
def stop(self):
"""Simulyatorni to'xtatish"""
self.is_running = False
logger.info("π Brigade simulator to'xtatildi")
async def update_all_brigades(self):
"""Barcha brigadalarni yangilash"""
from app.services.brigade_matcher import load_brigades
brigades = load_brigades()
for brigade in brigades:
if brigade.get("current_status") == "busy":
self.update_brigade_position(brigade)
def update_brigade_position(self, brigade: Dict):
"""Bitta brigadaning pozitsiyasini yangilash"""
brigade_id = brigade.get("brigade_id")
# Joriy va maqsad koordinatalar
current_lat = brigade.get("current_lat", brigade.get("base_lat"))
current_lon = brigade.get("current_lon", brigade.get("base_lon"))
target_lat = brigade.get("target_lat")
target_lon = brigade.get("target_lon")
# Agar maqsad yo'q bo'lsa, yangi maqsad yaratish
if not target_lat or not target_lon:
target_lat, target_lon = generate_random_target()
self.db.update_brigade(brigade_id, {
"target_lat": target_lat,
"target_lon": target_lon
})
# Yangi pozitsiya hisoblash
speed = brigade.get("speed_kmh", 60)
new_lat, new_lon = calculate_new_position(
current_lat, current_lon,
target_lat, target_lon,
speed, self.update_interval
)
# Maqsadga yetdimi?
distance = haversine_distance(new_lat, new_lon, target_lat, target_lon)
if distance < 0.05: # 50 metr - yetdi
# Yangi maqsad yaratish
new_target_lat, new_target_lon = generate_random_target()
self.db.update_brigade(brigade_id, {
"current_lat": new_lat,
"current_lon": new_lon,
"target_lat": new_target_lat,
"target_lon": new_target_lon
})
logger.info(f"π {brigade.get('name')} maqsadga yetdi, yangi yo'nalish")
else:
# Oddiy yangilash
self.db.update_brigade(brigade_id, {
"current_lat": new_lat,
"current_lon": new_lon
})
# Global instance
simulator_instance: Optional[BrigadeSimulator] = None
def get_simulator(db):
"""Simulator instanceni olish"""
global simulator_instance
if simulator_instance is None:
simulator_instance = BrigadeSimulator(db)
return simulator_instance |