dialogflowAPI / app /services /dialog_service.py
OnlyBiggg
refactor code
df37f6e
# from bisect import bisect_left
# from datetime import datetime, timedelta
# from fastapi import logger
# from app.ner.services.ner import NER
# from common.external.external_api import api
# from core.conf import settings
# class DialogService:
# @staticmethod
# def to_datetime_from_Dialogflow(time: dict):
# date_time = datetime(int(time["year"]), int(time["month"]), int(time["day"]))
# return date_time
# @staticmethod
# def process_dates_to_timestamp(
# from_time: datetime = None, to_time: datetime = None
# ):
# if to_time is None and from_time is not None:
# to_time = from_time.replace(hour=23, minute=59, second=59)
# if from_time is None:
# today = datetime.today().date()
# from_time = datetime.combine(today, datetime.min.time())
# to_time = datetime.combine(today, datetime.max.time()) - timedelta(
# microseconds=1
# )
# return int(from_time.timestamp()) * 1000, int(to_time.timestamp()) * 1000
# def get_param_from_dialogflow(self, body: any):
# session_info = body.get("sessionInfo", {})
# parameters = (
# session_info.get("parameters", {})
# if isinstance(session_info.get("parameters"), dict)
# else {}
# )
# raw_date = parameters.get("date")
# if raw_date is not None:
# raw_date = self.to_datetime_from_Dialogflow(raw_date)
# raw_departure_city = parameters.get("departure_city")
# raw_destination_city = parameters.get("destination_city")
# raw_ticket_number = parameters.get("ticket_number")
# raw_time_of_day = parameters.get("time_of_day")
# return (
# raw_departure_city,
# raw_destination_city,
# raw_ticket_number,
# raw_date,
# raw_time_of_day,
# )
# # Danh sách route_id API v1
# @staticmethod
# async def search_route_ids_one_way_by_metadata(
# origin_code: str = None,
# from_id: int = None,
# orign_ids: int = None,
# dest_code: str = None,
# to_id: str = None,
# dest_ids: str = None,
# ):
# params = {
# k: v
# for k, v in {
# "OriginCode": origin_code,
# "FromId": from_id,
# "OriginIds": orign_ids,
# "DestCode": dest_code,
# "ToId": to_id,
# "DestIds": dest_ids,
# }.items()
# if v is not None
# }
# response = await api.get(f"/metadata/office/routes", params=params)
# route_ids = []
# if isinstance(response, list):
# route_ids = [route.get("routeId", None) for route in response]
# return route_ids
# return []
# # Danh sách route_id v2 khi API route v1 không có dữ liệu
# @staticmethod
# async def search_route_ids_one_way_by_origin_dest_code(origin: str, dest: str):
# try:
# route_ids = []
# response = await api.get(f"/booking/api/v2/routes/from/{origin}/to/{dest}")
# if response.get("Status") == 200:
# if response.get("Data"):
# data = response.get("Data")
# route_ids = [route.get("Id", None) for route in data]
# return route_ids
# except Exception as e:
# print(e)
# raise Exception("Error fetching seats data")
# async def search_all_route_ids(
# self,
# origin_code: str = None,
# from_id: int = None,
# orign_ids: int = None,
# dest_code: str = None,
# to_id: str = None,
# dest_ids: str = None,
# ):
# route_ids = await self.search_route_ids_one_way_by_metadata(
# origin_code, from_id, orign_ids, dest_code, to_id, dest_ids
# )
# if len(route_ids) == 0:
# route_ids = await self.search_route_ids_one_way_by_origin_dest_code(
# origin_code, dest_code
# )
# return route_ids
# # Danh sách ghế
# @staticmethod
# async def seats_trip(
# route_id: int, trip_id: int, departure_date: str, departure_time: str, kind: str
# ):
# try:
# params = {
# k: v
# for k, v in {
# "departureDate": departure_date,
# "departureTime": departure_time,
# "kind": kind,
# }.items()
# if v is not None
# }
# if params is None:
# return []
# response = await api.get(
# api_base=settings.API_BASE_URL_VATO,
# endpoint=f"/seats/{route_id}/{trip_id}",
# params=params,
# )
# if not response.get("data"):
# return []
# seats = response["data"]
# return seats
# except Exception as e:
# print(e)
# raise Exception("Error fetching seats data")
# # Call API để lấy danh sách chuyến đi
# @staticmethod
# async def search_trip(
# from_time: int, to_time: int, route_ids: list[int], ticket_count: int = 1
# ):
# try:
# if len(route_ids) == 0:
# return []
# ###
# payload = {
# k: v
# for k, v in {
# "channel": "web_client",
# "size": 300,
# "only_online_trip": True,
# "from_time": from_time,
# "to_time": to_time,
# "route_ids": route_ids,
# "ticket_count": ticket_count,
# "sort_by": ["price", "departure_time"],
# }.items()
# if v is not None
# }
# data = []
# response = await api.post("/search/trips", payload=payload)
# if response.get("status") == 200:
# if response.get("data"):
# if response["data"].get("total") > 0:
# data = response["data"]["items"]
# return data
# except Exception as e:
# print(e)
# raise Exception("Error fetching trip data")
# # Chuyến đi khớp với thời gian và văn phòng đón trả
# @staticmethod
# def get_trip_by_time_and_office_id(
# trips: list, time: str, origin_id: int, dest_id: int
# ) -> dict:
# if time is None or origin_id is None or dest_id is None:
# return {}
# for trip in trips:
# if (
# trip
# and trip["raw_departure_time"] == time
# and trip["route"]["origin_hub_office_id"] == origin_id
# and trip["route"]["dest_hub_office_id"] == dest_id
# ):
# return trip
# return {}
# # Danh sách chuyến đi có thể chọn được theo văn phòng
# @staticmethod
# def get_all_trip_by_office(
# trips: list, origin_id: int = None, dest_id: int = None
# ) -> list:
# if origin_id is None and dest_id is None:
# return trips
# result = []
# for trip in trips:
# pickup_points = trip.get("pickup_points", [])
# pickup_point_ids = {p["OfficeId"] for p in pickup_points if "OfficeId" in p}
# origin_match = (
# (origin_id is None)
# or (trip["route"]["origin_hub_office_id"] == origin_id)
# or (origin_id in pickup_point_ids)
# )
# dest_match = (
# (dest_id is None)
# or (trip["route"]["dest_hub_office_id"] == dest_id)
# or (dest_id in pickup_point_ids)
# )
# if origin_match and dest_match:
# result.append(trip)
# return result if result else trips
# # Danh sách 4 chuyến đi xung quanh thời gian chỉ định
# @staticmethod
# def get_surrounding_trip(trips: list, time: str, num_trip: int = 4) -> list:
# if not time or not trips or num_trip <= 0:
# return []
# time_trips = [trip["raw_departure_time"] for trip in trips]
# index = bisect_left(time_trips, time)
# half = num_trip // 2
# start = max(0, index - half)
# end = min(len(trips), index + half + (num_trip % 2))
# missing = num_trip - (end - start)
# if missing > 0:
# extra_start = min(missing, start)
# start -= extra_start
# missing -= extra_start
# if missing > 0:
# extra_end = min(missing, len(trips) - end)
# end += extra_end
# return trips[start:end]
# async def is_valid_select_seat(
# self,
# seat: str,
# route_id: int,
# trip_id: int,
# departure_date: str,
# departure_time: str,
# kind: str,
# ):
# if seat is None:
# return False
# seats = await self.seats_trip(
# route_id, trip_id, departure_date, departure_time, kind
# )
# for seat_data in seats:
# if (
# seat_data["chair"].lower() == seat.lower()
# and seat_data["bookStatus"] == 0
# ):
# return True
# return False
# async def search_trip_by_id(
# self,
# trip_id: int,
# from_time: int,
# to_time: int,
# route_ids: list[int],
# ticket_count: int = 1,
# ):
# trip = await self.search_trip(from_time, to_time, route_ids, ticket_count)
# for item in trip:
# if trip_id == item["id"]:
# return item
# return None
# @staticmethod
# def get_trip_by_id(trip_id: int, trips: list):
# for trip in trips:
# if trip and trip["id"] == trip_id:
# return trip
# return {}
# @staticmethod
# async def stops(route_id: int, way_id: int):
# try:
# if route_id is None or way_id is None:
# return []
# params = {
# k: v
# for k, v in {
# "wayId": way_id,
# }.items()
# if v is not None
# }
# response = await api.get(
# api_base=settings.API_BASE_URL_VATO,
# endpoint=f"/stops/{route_id}",
# params=params,
# )
# if not response.get("data"):
# return []
# data = response["data"]
# return data
# except Exception as e:
# print(e)
# raise Exception("Error fetching stops data")
# async def pickup_list(self, route_id: int, way_id: int):
# try:
# data = await self.stops(route_id, way_id)
# if not data:
# return []
# pickup_list = []
# for pickup in data:
# if pickup["type"] == 0 or pickup["type"] == -1:
# pickup_list.append(pickup)
# return pickup_list
# except Exception as e:
# print(e)
# raise Exception("Error fetching pickup list data")
# async def is_valid_pickup(self, pickup: str, route_id: int, way_id: int):
# if pickup is None:
# return False
# pickup_list = await self.pickup_list(route_id, way_id)
# for pickup_data in pickup_list:
# pickup_name: str = pickup_data["name"]
# if pickup_name.lower() == pickup.lower():
# return True
# return False
# async def dropoff_list(self, route_id: int, way_id: int):
# try:
# data = await self.stops(route_id, way_id)
# if not data:
# return []
# dropoff_list = []
# for dropoff in data:
# if dropoff["type"] == 1 or (
# dropoff["type"] == -1 and dropoff["presentBeforeMinutes"] >= 0
# ):
# dropoff_list.append(dropoff)
# return dropoff_list
# except Exception as e:
# print(e)
# raise Exception("Error fetching dropoff list data")
# async def is_valid_dropoff(self, dropoff: str, route_id: int, way_id: int):
# if dropoff is None:
# return False
# dropoff_list = await self.dropoff_list(route_id, way_id)
# for dropoff_data in dropoff_list:
# if dropoff_data["name"] == dropoff:
# return True
# return False
# @staticmethod
# async def search_pickup_points(origin: str = None, dest: str = None) -> dict:
# session_id = str(int(datetime.now().timestamp()))
# params = {
# k: v
# for k, v in {
# "origin": origin,
# "dest": dest,
# "session_id": session_id,
# }.items()
# if v is not None
# }
# response = await api.get("/search/metadata/pickup-points", params=params)
# if response.get("status") == 200:
# data = response.get("data")
# return data
# return {}
# async def find_id_office_by_name_office(self, office_name: str):
# data = await self.search_pickup_points(origin=office_name)
# if data.get("origin"):
# origins = data["origin"]
# for origin in origins:
# if origin.get("group"):
# groups = origin["group"]
# for group in groups:
# if group.get("name"):
# name: str = group["name"]
# if name.lower() == office_name.lower():
# office_id = group["officeId"]
# office_id = (
# office_id[0]
# if isinstance(office_id, tuple)
# else office_id
# )
# return office_id
# return None
# async def find_id_provine_by_name_office(self, office_name: str):
# data = await self.search_pickup_points(origin=office_name)
# if data.get("origin"):
# origins = data["origin"]
# for origin in origins:
# if origin.get("group"):
# groups = origin["group"]
# for group in groups:
# if group.get("name"):
# name: str = group["name"]
# if name.lower() == office_name.lower():
# province_id = group["provinceId"]
# return province_id
# return None
# async def find_all_origin_by_name_office(self, office_name: str):
# data = await self.search_pickup_points(origin=office_name)
# if data.get("origin"):
# origins = data["origin"]
# return origins
# return None
# async def find_id_and_code_provine_by_name_office(self, office_name: str):
# data = await self.search_pickup_points(origin=office_name)
# if data.get("origin"):
# origins = data["origin"]
# for origin in origins:
# if origin.get("group"):
# groups = origin["group"]
# for group in groups:
# if group.get("name"):
# name: str = group["name"]
# if name.lower() == office_name.lower():
# province_id = group["provinceId"]
# province_code = group["provinceCode"]
# province_id = (
# province_id[0]
# if isinstance(province_id, tuple)
# else province_id
# )
# return province_id, province_code
# return None, None
# async def get_origin_city_from_office(self, origin_office: str):
# data = await self.search_pickup_points(origin=origin_office)
# if data.get("origin"):
# origins = data["origin"]
# for origin in origins:
# if origin.get("group"):
# groups = origin["group"]
# for group in groups:
# if group.get("name"):
# name: str = group["name"]
# if name.lower() == origin_office.lower():
# return group["provinceName"]
# return None
# async def get_destination_city_from_office(self, dest_office: str):
# data = await self.search_pickup_points(dest=dest_office)
# if data.get("dest"):
# dests = data["dest"]
# for dest in dests:
# if dest.get("group"):
# groups = dest["group"]
# for group in groups:
# if group.get("name"):
# name: str = group["name"]
# if name.lower() == dest_office.lower():
# return group["provinceName"]
# return None
# async def check_exist_user_info(self, user_id: str = None):
# try:
# # response = await api.get(f'/user/{user_id}')
# # if response.get("status") == 200:
# # return True
# return True
# # return False
# except Exception as e:
# logger.error(f"Error checking user info: {e}")
# return False
# async def get_user_info(self, user_id: str = None):
# try:
# # response = await api.get(f'/user/{user_id}')
# # if response.get("status") == 200:
# # return response.get("data")
# user_info = {
# "user_name": {"name": "Đại", "original": "Đại"},
# "phone_number": "0987654321",
# "email": "vdai234@gmail.com",
# }
# return user_info
# # return None
# except Exception as e:
# logger.error(f"Error fetching user info: {e}")
# return None
# @staticmethod
# async def extract_user_name(text: str, ner: NER):
# if text is None:
# return None
# user_name_pred = await ner.predict(text=text, entity_tag="PERSON")
# if user_name_pred:
# user_name = user_name_pred[0]
# if user_name:
# user_name_resp = {"name": user_name, "original": user_name}
# return user_name_resp
# return None
# dialog_service: DialogService = DialogService()