Spaces:
Sleeping
Sleeping
File size: 7,277 Bytes
df37f6e |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 |
import logging
from app.dto.trip import TripDTO
from common.external.external_api import api
from datetime import datetime
from core.conf import settings
logger = logging.getLogger(__name__)
class FutabusClient:
@staticmethod
async def get_pickup_points(
origin: str | None = None, dest: str | None = None
) -> dict:
"""
Gọi API để lấy metadata điểm đón/trả.
Args:
origin (str): tên văn phòng đi
dest (str): tên văn phòng đến
Returns:
dict: dữ liệu metadata chứa các điểm đón/trả
"""
params = {
"origin": origin,
"dest": dest,
"session_id": str(int(datetime.now().timestamp())),
}
try:
response = await api.get(
"/search/metadata/pickup-points",
params={k: v for k, v in params.items() if v is not None},
)
if response.get("status") == 200:
logger.debug(f"Fetched pickup points for: origin={origin}, dest={dest}")
return response.get("data", {})
logger.warning(
f"Unexpected response status: {response.get('status')} - origin={origin}, dest={dest}"
)
return {}
except Exception as e:
logger.exception(
f"[FutabusClient] Error fetching pickup points for origin={origin}, dest={dest}: {e}"
)
return {}
@staticmethod
async def get_route_ids_by_metadata(
origin_code: str | None = None,
from_id: int | None = None,
origin_ids: int | list[int] | None = None,
dest_code: str | None = None,
to_id: int | None = None,
dest_ids: int | list[int] | None = None,
) -> list[int]:
"""
Gọi API `/metadata/office/routes` để lấy routeId theo ID/code văn phòng hoặc tỉnh.
"""
params = {
k: v
for k, v in {
"OriginCode": origin_code,
"FromId": from_id,
"OriginIds": origin_ids,
"DestCode": dest_code,
"ToId": to_id,
"DestIds": dest_ids,
}.items()
if v is not None
}
try:
response = await api.get("/metadata/office/routes", params=params)
if isinstance(response, list):
return [item.get("routeId") for item in response if "routeId" in item]
except Exception as e:
logger.exception("Error fetching route ids by metadata")
return []
@staticmethod
async def get_route_ids_by_code(origin_code: str, dest_code: str) -> list:
"""
Gọi API `/booking/api/v2/routes/from/{origin}/to/{dest}` để lấy danh sách route khi metadata không có.
"""
try:
endpoint = f"/booking/api/v2/routes/from/{origin_code}/to/{dest_code}"
response = await api.get(endpoint)
if response.get("Status") == 200 and response.get("Data"):
data = response.get("Data", [])
route_ids = [item.get("Id", None) for item in data]
return route_ids
return []
except Exception as e:
logger.exception("Error fetching route ids by province codes")
return []
@staticmethod
async def search_trips(
from_time: int, to_time: int, route_ids: list, ticket_count: int = 1
) -> list[dict[str, any]]:
"""
Gọi external API để tìm chuyến xe theo danh sách route_id và thời gian.
Args:
from_time (int): Mốc thời gian bắt đầu (timestamp in ms)
to_time (int): Mốc thời gian kết thúc (timestamp in ms)
route_ids (list[int]): Danh sách route ID để lọc chuyến
ticket_count (int): Số lượng vé cần đặt (default = 1)
Returns:
list[dict[str, any]]: Danh sách chuyến xe phù hợp
"""
if not route_ids:
return []
payload = {
"channel": "web_client",
"size": 300,
"only_online_trip": True,
"from_time": from_time,
"to_time": to_time,
"route_ids": route_ids,
"ticket_count": int(ticket_count),
"sort_by": ["price", "departure_time"],
}
logger.info(f"Searching trips with payload: {payload}")
try:
response = await api.post("/search/trips", payload=payload)
if response.get("status") == 200:
data = response.get("data", {})
if data.get("total", 0) > 0:
return data.get("items", [])
logger.warning(
f"Unexpected response status: {response.get('status')} - No trips found"
)
return []
except Exception as e:
logger.exception("Failed to fetch trips from external API")
return []
@staticmethod
async def get_seat_trip(trip: TripDTO) -> list[dict]:
"""
Gọi API để lấy thông tin ghế của chuyến xe.
Args:
trip (TripDTO): Thông tin chuyến xe cần lấy ghế.
Returns:
list[dict]: Danh sách ghế của chuyến xe.
"""
params = {
k: v
for k, v in {
"departureDate": trip.raw_departure_date,
"departureTime": trip.raw_departure_time,
"kind": trip.seat_type_name,
}.items()
if v is not None
}
logger.info(f"Parameters for fetching seats: {params}")
logger.debug(f"Parameters for fetching seats: {params}")
try:
rs = await api.get(
api_base=settings.API_BASE_URL_VATO,
# api_base="https://api-busline.vato.vn/api/buslines/futa/booking",
endpoint=f"/seats/{trip.route_id}/{trip.id}",
params=params,
)
return rs.get("data", [])
except Exception as e:
logger.exception(f"Error fetching seats for trip {trip.id}: {e}")
return []
@staticmethod
async def get_stop_by_route_and_way(route_id: int, way_id: int) -> list:
"""
Lấy danh sách các điểm dừng của chuyến xe theo route_id và way_id.
Args:
route_id (int): ID của tuyến xe.
way_id (int): ID của lộ trình.
Returns:
list[dict]: Danh sách các điểm dừng.
"""
try:
params = {
k: v
for k, v in {
"wayId": way_id,
}.items()
if v is not None
}
rs = await api.get(
api_base=settings.API_BASE_URL_VATO,
endpoint=f"/stops/{route_id}",
params=params,
)
return rs.get("data", [])
except Exception as e:
logger.exception(
f"Error fetching stops for route {route_id}, way {way_id}: {e}"
)
return []
|