# from bisect import bisect_left # from datetime import datetime, timedelta # from fastapi import logger # from app.ner.services.ner import NER # from common.external.external_api import api # from core.conf import settings # class DialogService: # @staticmethod # def to_datetime_from_Dialogflow(time: dict): # date_time = datetime(int(time["year"]), int(time["month"]), int(time["day"])) # return date_time # @staticmethod # def process_dates_to_timestamp( # from_time: datetime = None, to_time: datetime = None # ): # if to_time is None and from_time is not None: # to_time = from_time.replace(hour=23, minute=59, second=59) # if from_time is None: # today = datetime.today().date() # from_time = datetime.combine(today, datetime.min.time()) # to_time = datetime.combine(today, datetime.max.time()) - timedelta( # microseconds=1 # ) # return int(from_time.timestamp()) * 1000, int(to_time.timestamp()) * 1000 # def get_param_from_dialogflow(self, body: any): # session_info = body.get("sessionInfo", {}) # parameters = ( # session_info.get("parameters", {}) # if isinstance(session_info.get("parameters"), dict) # else {} # ) # raw_date = parameters.get("date") # if raw_date is not None: # raw_date = self.to_datetime_from_Dialogflow(raw_date) # raw_departure_city = parameters.get("departure_city") # raw_destination_city = parameters.get("destination_city") # raw_ticket_number = parameters.get("ticket_number") # raw_time_of_day = parameters.get("time_of_day") # return ( # raw_departure_city, # raw_destination_city, # raw_ticket_number, # raw_date, # raw_time_of_day, # ) # # Danh sách route_id API v1 # @staticmethod # async def search_route_ids_one_way_by_metadata( # origin_code: str = None, # from_id: int = None, # orign_ids: int = None, # dest_code: str = None, # to_id: str = None, # dest_ids: str = None, # ): # params = { # k: v # for k, v in { # "OriginCode": origin_code, # "FromId": from_id, # "OriginIds": orign_ids, # "DestCode": dest_code, # "ToId": to_id, # "DestIds": dest_ids, # }.items() # if v is not None # } # response = await api.get(f"/metadata/office/routes", params=params) # route_ids = [] # if isinstance(response, list): # route_ids = [route.get("routeId", None) for route in response] # return route_ids # return [] # # Danh sách route_id v2 khi API route v1 không có dữ liệu # @staticmethod # async def search_route_ids_one_way_by_origin_dest_code(origin: str, dest: str): # try: # route_ids = [] # response = await api.get(f"/booking/api/v2/routes/from/{origin}/to/{dest}") # if response.get("Status") == 200: # if response.get("Data"): # data = response.get("Data") # route_ids = [route.get("Id", None) for route in data] # return route_ids # except Exception as e: # print(e) # raise Exception("Error fetching seats data") # async def search_all_route_ids( # self, # origin_code: str = None, # from_id: int = None, # orign_ids: int = None, # dest_code: str = None, # to_id: str = None, # dest_ids: str = None, # ): # route_ids = await self.search_route_ids_one_way_by_metadata( # origin_code, from_id, orign_ids, dest_code, to_id, dest_ids # ) # if len(route_ids) == 0: # route_ids = await self.search_route_ids_one_way_by_origin_dest_code( # origin_code, dest_code # ) # return route_ids # # Danh sách ghế # @staticmethod # async def seats_trip( # route_id: int, trip_id: int, departure_date: str, departure_time: str, kind: str # ): # try: # params = { # k: v # for k, v in { # "departureDate": departure_date, # "departureTime": departure_time, # "kind": kind, # }.items() # if v is not None # } # if params is None: # return [] # response = await api.get( # api_base=settings.API_BASE_URL_VATO, # endpoint=f"/seats/{route_id}/{trip_id}", # params=params, # ) # if not response.get("data"): # return [] # seats = response["data"] # return seats # except Exception as e: # print(e) # raise Exception("Error fetching seats data") # # Call API để lấy danh sách chuyến đi # @staticmethod # async def search_trip( # from_time: int, to_time: int, route_ids: list[int], ticket_count: int = 1 # ): # try: # if len(route_ids) == 0: # return [] # ### # payload = { # k: v # for k, v in { # "channel": "web_client", # "size": 300, # "only_online_trip": True, # "from_time": from_time, # "to_time": to_time, # "route_ids": route_ids, # "ticket_count": ticket_count, # "sort_by": ["price", "departure_time"], # }.items() # if v is not None # } # data = [] # response = await api.post("/search/trips", payload=payload) # if response.get("status") == 200: # if response.get("data"): # if response["data"].get("total") > 0: # data = response["data"]["items"] # return data # except Exception as e: # print(e) # raise Exception("Error fetching trip data") # # Chuyến đi khớp với thời gian và văn phòng đón trả # @staticmethod # def get_trip_by_time_and_office_id( # trips: list, time: str, origin_id: int, dest_id: int # ) -> dict: # if time is None or origin_id is None or dest_id is None: # return {} # for trip in trips: # if ( # trip # and trip["raw_departure_time"] == time # and trip["route"]["origin_hub_office_id"] == origin_id # and trip["route"]["dest_hub_office_id"] == dest_id # ): # return trip # return {} # # Danh sách chuyến đi có thể chọn được theo văn phòng # @staticmethod # def get_all_trip_by_office( # trips: list, origin_id: int = None, dest_id: int = None # ) -> list: # if origin_id is None and dest_id is None: # return trips # result = [] # for trip in trips: # pickup_points = trip.get("pickup_points", []) # pickup_point_ids = {p["OfficeId"] for p in pickup_points if "OfficeId" in p} # origin_match = ( # (origin_id is None) # or (trip["route"]["origin_hub_office_id"] == origin_id) # or (origin_id in pickup_point_ids) # ) # dest_match = ( # (dest_id is None) # or (trip["route"]["dest_hub_office_id"] == dest_id) # or (dest_id in pickup_point_ids) # ) # if origin_match and dest_match: # result.append(trip) # return result if result else trips # # Danh sách 4 chuyến đi xung quanh thời gian chỉ định # @staticmethod # def get_surrounding_trip(trips: list, time: str, num_trip: int = 4) -> list: # if not time or not trips or num_trip <= 0: # return [] # time_trips = [trip["raw_departure_time"] for trip in trips] # index = bisect_left(time_trips, time) # half = num_trip // 2 # start = max(0, index - half) # end = min(len(trips), index + half + (num_trip % 2)) # missing = num_trip - (end - start) # if missing > 0: # extra_start = min(missing, start) # start -= extra_start # missing -= extra_start # if missing > 0: # extra_end = min(missing, len(trips) - end) # end += extra_end # return trips[start:end] # async def is_valid_select_seat( # self, # seat: str, # route_id: int, # trip_id: int, # departure_date: str, # departure_time: str, # kind: str, # ): # if seat is None: # return False # seats = await self.seats_trip( # route_id, trip_id, departure_date, departure_time, kind # ) # for seat_data in seats: # if ( # seat_data["chair"].lower() == seat.lower() # and seat_data["bookStatus"] == 0 # ): # return True # return False # async def search_trip_by_id( # self, # trip_id: int, # from_time: int, # to_time: int, # route_ids: list[int], # ticket_count: int = 1, # ): # trip = await self.search_trip(from_time, to_time, route_ids, ticket_count) # for item in trip: # if trip_id == item["id"]: # return item # return None # @staticmethod # def get_trip_by_id(trip_id: int, trips: list): # for trip in trips: # if trip and trip["id"] == trip_id: # return trip # return {} # @staticmethod # async def stops(route_id: int, way_id: int): # try: # if route_id is None or way_id is None: # return [] # params = { # k: v # for k, v in { # "wayId": way_id, # }.items() # if v is not None # } # response = await api.get( # api_base=settings.API_BASE_URL_VATO, # endpoint=f"/stops/{route_id}", # params=params, # ) # if not response.get("data"): # return [] # data = response["data"] # return data # except Exception as e: # print(e) # raise Exception("Error fetching stops data") # async def pickup_list(self, route_id: int, way_id: int): # try: # data = await self.stops(route_id, way_id) # if not data: # return [] # pickup_list = [] # for pickup in data: # if pickup["type"] == 0 or pickup["type"] == -1: # pickup_list.append(pickup) # return pickup_list # except Exception as e: # print(e) # raise Exception("Error fetching pickup list data") # async def is_valid_pickup(self, pickup: str, route_id: int, way_id: int): # if pickup is None: # return False # pickup_list = await self.pickup_list(route_id, way_id) # for pickup_data in pickup_list: # pickup_name: str = pickup_data["name"] # if pickup_name.lower() == pickup.lower(): # return True # return False # async def dropoff_list(self, route_id: int, way_id: int): # try: # data = await self.stops(route_id, way_id) # if not data: # return [] # dropoff_list = [] # for dropoff in data: # if dropoff["type"] == 1 or ( # dropoff["type"] == -1 and dropoff["presentBeforeMinutes"] >= 0 # ): # dropoff_list.append(dropoff) # return dropoff_list # except Exception as e: # print(e) # raise Exception("Error fetching dropoff list data") # async def is_valid_dropoff(self, dropoff: str, route_id: int, way_id: int): # if dropoff is None: # return False # dropoff_list = await self.dropoff_list(route_id, way_id) # for dropoff_data in dropoff_list: # if dropoff_data["name"] == dropoff: # return True # return False # @staticmethod # async def search_pickup_points(origin: str = None, dest: str = None) -> dict: # session_id = str(int(datetime.now().timestamp())) # params = { # k: v # for k, v in { # "origin": origin, # "dest": dest, # "session_id": session_id, # }.items() # if v is not None # } # response = await api.get("/search/metadata/pickup-points", params=params) # if response.get("status") == 200: # data = response.get("data") # return data # return {} # async def find_id_office_by_name_office(self, office_name: str): # data = await self.search_pickup_points(origin=office_name) # if data.get("origin"): # origins = data["origin"] # for origin in origins: # if origin.get("group"): # groups = origin["group"] # for group in groups: # if group.get("name"): # name: str = group["name"] # if name.lower() == office_name.lower(): # office_id = group["officeId"] # office_id = ( # office_id[0] # if isinstance(office_id, tuple) # else office_id # ) # return office_id # return None # async def find_id_provine_by_name_office(self, office_name: str): # data = await self.search_pickup_points(origin=office_name) # if data.get("origin"): # origins = data["origin"] # for origin in origins: # if origin.get("group"): # groups = origin["group"] # for group in groups: # if group.get("name"): # name: str = group["name"] # if name.lower() == office_name.lower(): # province_id = group["provinceId"] # return province_id # return None # async def find_all_origin_by_name_office(self, office_name: str): # data = await self.search_pickup_points(origin=office_name) # if data.get("origin"): # origins = data["origin"] # return origins # return None # async def find_id_and_code_provine_by_name_office(self, office_name: str): # data = await self.search_pickup_points(origin=office_name) # if data.get("origin"): # origins = data["origin"] # for origin in origins: # if origin.get("group"): # groups = origin["group"] # for group in groups: # if group.get("name"): # name: str = group["name"] # if name.lower() == office_name.lower(): # province_id = group["provinceId"] # province_code = group["provinceCode"] # province_id = ( # province_id[0] # if isinstance(province_id, tuple) # else province_id # ) # return province_id, province_code # return None, None # async def get_origin_city_from_office(self, origin_office: str): # data = await self.search_pickup_points(origin=origin_office) # if data.get("origin"): # origins = data["origin"] # for origin in origins: # if origin.get("group"): # groups = origin["group"] # for group in groups: # if group.get("name"): # name: str = group["name"] # if name.lower() == origin_office.lower(): # return group["provinceName"] # return None # async def get_destination_city_from_office(self, dest_office: str): # data = await self.search_pickup_points(dest=dest_office) # if data.get("dest"): # dests = data["dest"] # for dest in dests: # if dest.get("group"): # groups = dest["group"] # for group in groups: # if group.get("name"): # name: str = group["name"] # if name.lower() == dest_office.lower(): # return group["provinceName"] # return None # async def check_exist_user_info(self, user_id: str = None): # try: # # response = await api.get(f'/user/{user_id}') # # if response.get("status") == 200: # # return True # return True # # return False # except Exception as e: # logger.error(f"Error checking user info: {e}") # return False # async def get_user_info(self, user_id: str = None): # try: # # response = await api.get(f'/user/{user_id}') # # if response.get("status") == 200: # # return response.get("data") # user_info = { # "user_name": {"name": "Đại", "original": "Đại"}, # "phone_number": "0987654321", # "email": "vdai234@gmail.com", # } # return user_info # # return None # except Exception as e: # logger.error(f"Error fetching user info: {e}") # return None # @staticmethod # async def extract_user_name(text: str, ner: NER): # if text is None: # return None # user_name_pred = await ner.predict(text=text, entity_tag="PERSON") # if user_name_pred: # user_name = user_name_pred[0] # if user_name: # user_name_resp = {"name": user_name, "original": user_name} # return user_name_resp # return None # dialog_service: DialogService = DialogService()