import json import time import random import pandas as pd import numpy as np import re #提取 POI 經緯度 extract_poi_coordinates = lambda poi_data: np.array([ [poi["location"]["latitude"], poi["location"]["longitude"]] for poi in poi_data if poi.get("location") and "latitude" in poi["location"] and "longitude" in poi["location"] ]) def filter_pois(poi_with_distances, parsed_targets): """執行一系列過濾操作""" #以目標過濾 filtered_target_pois = filter_by_target(poi_with_distances, parsed_targets) # 以距離 (distance) 過濾 filtered_distance_pois = filter_by_distance(filtered_target_pois, parsed_targets) # 以營業時間 (open) 過濾 filtered_open_pois = filter_by_open(filtered_distance_pois, parsed_targets) # 以評分 (rating) 過濾或排序 filtered_rating_pois = filter_or_sort_by_rating(filtered_open_pois, parsed_targets) # 過濾 POI 列表中的營業時間,僅保留今天的時間段。 top_filtered_pois = filter_today_opening_hours(filtered_rating_pois) return top_filtered_pois #過濾掉位於 user_location 西邊(longitude < user_lon)的 POIs,依據每個POI 的distance 排序後取前3。 def filter_and_get_top_pois(user_location, top_filtered_pois): _, user_lon = user_location # 過濾出在東邊的 POIs (longitude > user_lon) east_pois = [ poi for poi in top_filtered_pois if poi['location']['longitude'] > user_lon ] # 依據 POI 本身的 distance 屬性進行排序 east_pois.sort(key=lambda x: x['distance']) # 取前 3 個 return east_pois[:3] def filter_poi_list(data_list): needed_keys = [ "englishName", "types", "primaryType", "rating", "location", "formattedAddress", "userRatingCount", "editorialSummary" ] filtered = [] for poi in data_list: filtered_dict = {key: poi.get(key) for key in needed_keys} regular_hours = poi.get("regularOpeningHours") if regular_hours and "periods" in regular_hours: filtered_dict["regularOpeningHours"] = { "periods": regular_hours["periods"] } else: # 如果沒有 regularOpeningHours 或沒有 periods,就給個 None 或空字典 filtered_dict["regularOpeningHours"] = None filtered.append(filtered_dict) return filtered def get_coordinates(location_data, video_tps_value): """ 從 DataFrame 中檢索特定 video_tps 的經緯度座標。 """ matched_row = location_data[location_data['video_tps'] == video_tps_value] if matched_row.empty: # 如果沒有匹配的行,返回空列表 return [] # 提取經緯度 latitude = matched_row['latitude'].values[0] longitude = matched_row['longitude'].values[0] return (latitude, longitude) # 計算兩個地點之間的距離(Haversine formula)返回公尺 def calculate_distances(point, points_array): lat1, lon1 = point lat2 = points_array[:, 0] lon2 = points_array[:, 1] # 將經緯度轉換為弧度 lon1, lat1, lon2, lat2 = map(np.radians, [lon1, lat1, lon2, lat2]) # 計算經緯度的差值 dlon = lon2 - lon1 dlat = lat2 - lat1 # Haversine formula 計算大圓距離 a = np.sin(dlat / 2.0) ** 2 + np.cos(lat1) * np.cos(lat2) * np.sin(dlon / 2.0) ** 2 c = 2 * np.arcsin(np.sqrt(a)) r = 6371000 # 地球半徑,單位:公尺 distances_in_meters = c * r # 以公尺計算距離 # 四捨五入到小數點後兩位 return np.round(distances_in_meters, 2) def add_distances_to_poi(poi_data, distances): # 新增 distance 欄位 poi_with_distances = [] for i, poi in enumerate(poi_data): # 複製 POI 資料,新增 distance 欄位 poi_with_distance = poi.copy() poi_with_distance['distance'] = distances[i] poi_with_distances.append(poi_with_distance) return poi_with_distances def parse_ner_targets(parsed_entities): """ 將 parsed_entities 中的 target 拆解為 primarytype 和 displayname, """ # 定義主要類別 primary_categories = {"restaurant", "shopping_mall", "hospital", "parking", "tourist_attraction", "convenience_store", "bank", "car_repair", "electric_vehicle_charging_station", "gas_station", "movie_theater"} parsed_result = { "primarytype": '', "displayname": '', "distance": parsed_entities.get("distance", []), "open": parsed_entities.get("open", []), "rank": parsed_entities.get("rank", []) } targets = parsed_entities.get("target", []) for target in targets: if target in primary_categories: parsed_result["primarytype"] = target else: parsed_result["displayname"] = target return parsed_result def filter_by_target(poi_with_distances, parsed_targets): """ 用 parsed_targets 的內容篩選 poi_with_distances。 1. 如果 parsed_targets['displayname'] 不為空字串 (''), 以正則匹配完整詞,不被其他字母數字相鄰 2. 若 parsed_targets['displayname'] 為空字串 (''), 則以 parsed_targets['primarytype'] 檢查 poi_with_distances['types'] 中是否包含 """ target_displayname = parsed_targets.get("displayname", "").strip() target_type = parsed_targets.get("primarytype", "").strip() if target_displayname: print('target_displayname') pattern = re.compile(rf'\b{re.escape(target_displayname.lower())}\b') filtered_pois = [ poi for poi in poi_with_distances if pattern.search(poi['englishName'].lower()) ] else: # displayname為空,就看types裡面是否含有target_type filtered_pois = [ poi for poi in poi_with_distances if target_type in poi.get("types", []) ] return filtered_pois # 以距離 (distance) 過濾 def filter_by_distance(poi_list, parsed_targets): """ 1) 先按照 distance 升冪(由近到遠)進行排序 2) 若 parsed_targets['distance'] = ['500'],則只保留 distance < 500 的項目 若無 distance 或 distance 為空,則返回原清單(但也還是已經排序好) """ # 先依距離由小到大排序 sorted_by_distance = sorted( poi_list, key=lambda x: x.get("distance", float("inf")) # 如果沒有 distance,當作無限大 ) # 若沒有傳入 distance 閾值,直接回傳「已排序」的結果 if not parsed_targets.get("distance"): return sorted_by_distance # 假設只取第一個 distance 閾值 distance_str = parsed_targets["distance"][0] distance_threshold = float(distance_str) # 再針對距離做篩選 filtered_result = [ poi for poi in sorted_by_distance if poi.get("distance") is not None and poi["distance"] < distance_threshold ] return filtered_result # 以評分 (rating) 過濾或排序 def filter_or_sort_by_rating(poi_list, parsed_targets): """ 若 parsed_targets['rank'] 為小數點字串 (例如 "4.0") => 過濾 rating >= 該值 若 parsed_targets['rank'] 為 "highest" => 依 rating 高到低排序 其他情況 (例如沒有 rank 或 rank 為空) 則不處理,直接返回原清單 """ # 如果沒有 rank 或 rank 為空 list,就直接回傳原清單 if not parsed_targets.get("rank"): return poi_list rank_value = parsed_targets["rank"][0] # 只取第一個 # Case 1:若為 "highest" => 依 rating 高到低排序 if rank_value.lower() == "highest": return sorted(poi_list, key=lambda x: x["rating"] if x["rating"] is not None else 0, reverse=True) # Case 2:否則視為數值字符串 (如 "4.5") rank_float = float(rank_value) # 可直接轉 float,因為不會有其他無法轉換的情況 return [ poi for poi in poi_list if poi.get("rating") and poi["rating"] >= rank_float ] # 以營業時間 (open) 過濾 def filter_by_open(poi_with_distances, parsed_targets): """ 不動原始 poi_with_distances,只回傳一個「新清單」。 在新清單的每個 POI dict 中,新增/更新 key: "isOpenNow" (True/False), 表示「在當下星期 + parsed_targets['open'] 時間下」是否營業。 - 若 parsed_targets['open'] 是空,則直接將 isOpenNow 設為 None (不判斷)。 - 其餘欄位皆照原資料保留。 """ # 如果沒有 open,則不判斷,整份清單都 isOpenNow = None if not parsed_targets.get("open"): new_list = [] for poi in poi_with_distances: poi_copy = poi.copy() # 淺拷貝,避免改到原資料 poi_copy["isOpenNow"] = None # 不判斷,固定 None new_list.append(poi_copy) return new_list # 取得使用者指定的小時,例如 15 / 21 target_hour = float(parsed_targets["open"][0]) # 假設只取第一個 # 系統當下是星期幾 (0=星期一, 6=星期日) current_weekday_py = time.localtime().tm_wday # 轉成 POI 的 day: (0=星期日, 1=星期一, ... 6=星期六) current_weekday_poi = (current_weekday_py + 1) % 7 new_list = [] for poi in poi_with_distances: poi_copy = poi.copy() # 建立副本,不動原資料 # 預設 isOpenNow = False poi_copy["isOpenNow"] = False regular_hours = poi_copy.get("regularOpeningHours") if regular_hours and "periods" in regular_hours: periods = regular_hours["periods"] # 檢查當天 (current_weekday_poi) 時段 for p in periods: if p["open"]["day"] == current_weekday_poi: open_hour = p["open"]["hour"] close_hour = p["close"]["hour"] # 分鐘先忽略,只看整點 if open_hour <= target_hour < close_hour: poi_copy["isOpenNow"] = True break new_list.append(poi_copy) return new_list def filter_today_opening_hours(poi_list): """ 過濾 POI 列表中的營業時間,僅保留今天的時間段。 """ # 取得今天的星期 (0 表示星期一, 6 表示星期日) today = time.localtime().tm_wday # 遍歷 POI 資料 for poi in poi_list: # 如果 regularOpeningHours 為 None,跳過處理 if poi.get('regularOpeningHours') is None or 'periods' not in poi['regularOpeningHours']: continue # 過濾只保留今天的營業時間 poi['regularOpeningHours']['periods'] = [ period for period in poi['regularOpeningHours']['periods'] if period['open']['day'] == today ] return poi_list def filter_pois_by_rating_count(pois, user_rating_threshold=1000): filtered_pois = [ poi for poi in pois if (poi.get('userRatingCount') or 0) >= user_rating_threshold ] return filtered_pois # 帶入pois 過濾poi type def filter_pois_by_type(pois, poi_type): """ Filters a list of POIs based on a specified type. """ return [poi for poi in pois if poi_type in poi.get("types", [])] def format_english_name(english_name): """ 接收 englishname 字串,並根據分隔符號處理字串。 """ # 使用正則表達式拆分字符串,根據 '|', '/', '\' 作為分隔符 parts = re.split(r'[||//\\\]', english_name) # 確保至少有兩個部分可以處理 if len(parts) > 1: main_part = parts[0].strip() # 取第一部分並去除前後空格 first_segment = parts[1].strip() # 取第二部分並去除前後空格 return f"{main_part} ({first_segment})" return english_name # 若無有效分隔符,則返回原始字符串 def truncate_and_fix_brackets(input_str, max_length=70): """ 簡化並修正括號的字串處理 1. 若字串超過指定長度(預設 70 字),則確保不在單字中間截斷,並適當延伸到完整單字結束後。 2. 截斷後,若發現未完整的括號(如 `(` `(` `{` `[`),則自動補齊對應的右括號。 """ # 若字串長度在允許範圍內,直接回傳 if len(input_str) <= max_length: return input_str # **步驟 1:確保不截斷單字** truncated_str = input_str[:max_length] # 初步截斷 remaining_str = input_str[max_length:] # 搜尋下一個可用的空格或標點,避免單字被切開 match = re.search(r'[\s,!)\]\},)]', remaining_str) # 尋找最近的空格或標點 if match: truncated_str += remaining_str[:match.start()] # 延伸至完整的單字結束後 else: truncated_str = input_str # 若沒有適合的分隔符,則不截斷 # **步驟 2:修正未關閉的括號** bracket_pairs = {'(': ')', '(': ')', '[': ']', '{': '}'} open_brackets = [] # 用來追蹤未關閉的括號 for char in truncated_str: if char in bracket_pairs: # 若遇到左括號,記錄對應的右括號 open_brackets.append(bracket_pairs[char]) elif char in bracket_pairs.values(): # 若遇到右括號,確認是否匹配 if open_brackets and open_brackets[-1] == char: open_brackets.pop() # 依照順序補齊缺少的右括號 for missing in reversed(open_brackets): truncated_str += missing return truncated_str def generate_tts_message(top_filtered_pois): """ 生成禮貌 自然語氣的TTS。輸出找到的店名,並隨機選擇一種禮貌模板。 """ num_places = len(top_filtered_pois) if num_places == 0: return "I'm sorry, I couldn't find any matching places nearby. Please try adjusting your preferences." # 五種模板 templates = [ "I found {num_places} places that might interest you nearby: {details} Have a wonderful day ahead!", "Here are {num_places} nearby places you might like: {details} Hope this helps you!", "I've identified {num_places} places nearby: {details} Wishing you a pleasant journey!", "There are {num_places} places nearby that you might enjoy: {details} Have a great time!", "Nearby, I found {num_places} places worth checking out: {details} Enjoy exploring!" ] # 選擇隨機模板 chosen_template = random.choice(templates) # 構建內容,直接列出店名 details = ", ".join( [f"***{format_english_name(truncate_and_fix_brackets(poi.get('englishName', 'Unknown Place')))}***" for poi in top_filtered_pois] ) + "." # 使用模板生成 final_message = chosen_template.format(num_places=num_places, details=details) return final_message def generate_display_message(top_filtered_pois): """ 生成自然語言的顯示消息,包含店名、評分、距離(整數公尺)、與英文簡介。 使用多個模板,隨機選擇一種生成消息。 """ num_places = len(top_filtered_pois) if num_places == 0: return "I'm sorry, I couldn't find any places that match your request in the nearby area. Please try expanding the search radius or adjusting your preferences." # 定義五種模板 templates = [ "I found {num_places} places nearby. \n{details} Enjoy exploring!", "Here are {num_places} nearby options: \n{details} Hope you find this helpful!", "There are {num_places} places worth checking out. \n{details} Have a great time!", "Nearby, I identified {num_places} options you might like. \n{details} Wishing you a pleasant experience!", "I discovered {num_places} places close by. \n{details} Enjoy your visit!" ] # 隨機選擇模板 chosen_template = random.choice(templates) # 構建詳細內容 details = "\n".join([ f"{idx}. {format_english_name(truncate_and_fix_brackets(poi.get('englishName', 'Unknown Place')))}:\n" f" - Rating: {poi.get('rating', 'No rating')}/5\n" f" - Distance: {int(poi.get('distance', 0))} meters\n" f" - Summary: {poi.get('editorialSummary', {}).get('englishSummary', 'No summary available.') if isinstance(poi.get('editorialSummary'), dict) else 'No summary available.'}" for idx, poi in enumerate(top_filtered_pois, start=1) ]) # 使用模板生成最終消息 final_message = chosen_template.format(num_places=num_places, details=details) return final_message