dialogflowAPI / app /client /futabus_client.py
OnlyBiggg
refactor code
df37f6e
import logging
from app.dto.trip import TripDTO
from common.external.external_api import api
from datetime import datetime
from core.conf import settings
logger = logging.getLogger(__name__)
class FutabusClient:
@staticmethod
async def get_pickup_points(
origin: str | None = None, dest: str | None = None
) -> dict:
"""
Gọi API để lấy metadata điểm đón/trả.
Args:
origin (str): tên văn phòng đi
dest (str): tên văn phòng đến
Returns:
dict: dữ liệu metadata chứa các điểm đón/trả
"""
params = {
"origin": origin,
"dest": dest,
"session_id": str(int(datetime.now().timestamp())),
}
try:
response = await api.get(
"/search/metadata/pickup-points",
params={k: v for k, v in params.items() if v is not None},
)
if response.get("status") == 200:
logger.debug(f"Fetched pickup points for: origin={origin}, dest={dest}")
return response.get("data", {})
logger.warning(
f"Unexpected response status: {response.get('status')} - origin={origin}, dest={dest}"
)
return {}
except Exception as e:
logger.exception(
f"[FutabusClient] Error fetching pickup points for origin={origin}, dest={dest}: {e}"
)
return {}
@staticmethod
async def get_route_ids_by_metadata(
origin_code: str | None = None,
from_id: int | None = None,
origin_ids: int | list[int] | None = None,
dest_code: str | None = None,
to_id: int | None = None,
dest_ids: int | list[int] | None = None,
) -> list[int]:
"""
Gọi API `/metadata/office/routes` để lấy routeId theo ID/code văn phòng hoặc tỉnh.
"""
params = {
k: v
for k, v in {
"OriginCode": origin_code,
"FromId": from_id,
"OriginIds": origin_ids,
"DestCode": dest_code,
"ToId": to_id,
"DestIds": dest_ids,
}.items()
if v is not None
}
try:
response = await api.get("/metadata/office/routes", params=params)
if isinstance(response, list):
return [item.get("routeId") for item in response if "routeId" in item]
except Exception as e:
logger.exception("Error fetching route ids by metadata")
return []
@staticmethod
async def get_route_ids_by_code(origin_code: str, dest_code: str) -> list:
"""
Gọi API `/booking/api/v2/routes/from/{origin}/to/{dest}` để lấy danh sách route khi metadata không có.
"""
try:
endpoint = f"/booking/api/v2/routes/from/{origin_code}/to/{dest_code}"
response = await api.get(endpoint)
if response.get("Status") == 200 and response.get("Data"):
data = response.get("Data", [])
route_ids = [item.get("Id", None) for item in data]
return route_ids
return []
except Exception as e:
logger.exception("Error fetching route ids by province codes")
return []
@staticmethod
async def search_trips(
from_time: int, to_time: int, route_ids: list, ticket_count: int = 1
) -> list[dict[str, any]]:
"""
Gọi external API để tìm chuyến xe theo danh sách route_id và thời gian.
Args:
from_time (int): Mốc thời gian bắt đầu (timestamp in ms)
to_time (int): Mốc thời gian kết thúc (timestamp in ms)
route_ids (list[int]): Danh sách route ID để lọc chuyến
ticket_count (int): Số lượng vé cần đặt (default = 1)
Returns:
list[dict[str, any]]: Danh sách chuyến xe phù hợp
"""
if not route_ids:
return []
payload = {
"channel": "web_client",
"size": 300,
"only_online_trip": True,
"from_time": from_time,
"to_time": to_time,
"route_ids": route_ids,
"ticket_count": int(ticket_count),
"sort_by": ["price", "departure_time"],
}
logger.info(f"Searching trips with payload: {payload}")
try:
response = await api.post("/search/trips", payload=payload)
if response.get("status") == 200:
data = response.get("data", {})
if data.get("total", 0) > 0:
return data.get("items", [])
logger.warning(
f"Unexpected response status: {response.get('status')} - No trips found"
)
return []
except Exception as e:
logger.exception("Failed to fetch trips from external API")
return []
@staticmethod
async def get_seat_trip(trip: TripDTO) -> list[dict]:
"""
Gọi API để lấy thông tin ghế của chuyến xe.
Args:
trip (TripDTO): Thông tin chuyến xe cần lấy ghế.
Returns:
list[dict]: Danh sách ghế của chuyến xe.
"""
params = {
k: v
for k, v in {
"departureDate": trip.raw_departure_date,
"departureTime": trip.raw_departure_time,
"kind": trip.seat_type_name,
}.items()
if v is not None
}
logger.info(f"Parameters for fetching seats: {params}")
logger.debug(f"Parameters for fetching seats: {params}")
try:
rs = await api.get(
api_base=settings.API_BASE_URL_VATO,
# api_base="https://api-busline.vato.vn/api/buslines/futa/booking",
endpoint=f"/seats/{trip.route_id}/{trip.id}",
params=params,
)
return rs.get("data", [])
except Exception as e:
logger.exception(f"Error fetching seats for trip {trip.id}: {e}")
return []
@staticmethod
async def get_stop_by_route_and_way(route_id: int, way_id: int) -> list:
"""
Lấy danh sách các điểm dừng của chuyến xe theo route_id và way_id.
Args:
route_id (int): ID của tuyến xe.
way_id (int): ID của lộ trình.
Returns:
list[dict]: Danh sách các điểm dừng.
"""
try:
params = {
k: v
for k, v in {
"wayId": way_id,
}.items()
if v is not None
}
rs = await api.get(
api_base=settings.API_BASE_URL_VATO,
endpoint=f"/stops/{route_id}",
params=params,
)
return rs.get("data", [])
except Exception as e:
logger.exception(
f"Error fetching stops for route {route_id}, way {way_id}: {e}"
)
return []