ggg2 / poi_processing.py
jva96160's picture
Upload poi_processing.py
be824ef verified
import json
import time
import random
import pandas as pd
import numpy as np
import re
#提取 POI 經緯度
extract_poi_coordinates = lambda poi_data: np.array([
[poi["location"]["latitude"], poi["location"]["longitude"]]
for poi in poi_data if poi.get("location") and "latitude" in poi["location"] and "longitude" in poi["location"]
])
def filter_pois(poi_with_distances, parsed_targets):
"""執行一系列過濾操作"""
#以目標過濾
filtered_target_pois = filter_by_target(poi_with_distances, parsed_targets)
# 以距離 (distance) 過濾
filtered_distance_pois = filter_by_distance(filtered_target_pois, parsed_targets)
# 以營業時間 (open) 過濾
filtered_open_pois = filter_by_open(filtered_distance_pois, parsed_targets)
# 以評分 (rating) 過濾或排序
filtered_rating_pois = filter_or_sort_by_rating(filtered_open_pois, parsed_targets)
# 過濾 POI 列表中的營業時間,僅保留今天的時間段。
top_filtered_pois = filter_today_opening_hours(filtered_rating_pois)
return top_filtered_pois
#過濾掉位於 user_location 西邊(longitude < user_lon)的 POIs,依據每個POI 的distance 排序後取前3。
def filter_and_get_top_pois(user_location, top_filtered_pois):
_, user_lon = user_location
# 過濾出在東邊的 POIs (longitude > user_lon)
east_pois = [
poi for poi in top_filtered_pois
if poi['location']['longitude'] > user_lon
]
# 依據 POI 本身的 distance 屬性進行排序
east_pois.sort(key=lambda x: x['distance'])
# 取前 3 個
return east_pois[:3]
def filter_poi_list(data_list):
needed_keys = [
"englishName",
"types",
"primaryType",
"rating",
"location",
"formattedAddress",
"userRatingCount",
"editorialSummary"
]
filtered = []
for poi in data_list:
filtered_dict = {key: poi.get(key) for key in needed_keys}
regular_hours = poi.get("regularOpeningHours")
if regular_hours and "periods" in regular_hours:
filtered_dict["regularOpeningHours"] = {
"periods": regular_hours["periods"]
}
else:
# 如果沒有 regularOpeningHours 或沒有 periods,就給個 None 或空字典
filtered_dict["regularOpeningHours"] = None
filtered.append(filtered_dict)
return filtered
def get_coordinates(location_data, video_tps_value):
"""
從 DataFrame 中檢索特定 video_tps 的經緯度座標。
"""
matched_row = location_data[location_data['video_tps'] == video_tps_value]
if matched_row.empty:
# 如果沒有匹配的行,返回空列表
return []
# 提取經緯度
latitude = matched_row['latitude'].values[0]
longitude = matched_row['longitude'].values[0]
return (latitude, longitude)
# 計算兩個地點之間的距離(Haversine formula)返回公尺
def calculate_distances(point, points_array):
lat1, lon1 = point
lat2 = points_array[:, 0]
lon2 = points_array[:, 1]
# 將經緯度轉換為弧度
lon1, lat1, lon2, lat2 = map(np.radians, [lon1, lat1, lon2, lat2])
# 計算經緯度的差值
dlon = lon2 - lon1
dlat = lat2 - lat1
# Haversine formula 計算大圓距離
a = np.sin(dlat / 2.0) ** 2 + np.cos(lat1) * np.cos(lat2) * np.sin(dlon / 2.0) ** 2
c = 2 * np.arcsin(np.sqrt(a))
r = 6371000 # 地球半徑,單位:公尺
distances_in_meters = c * r # 以公尺計算距離
# 四捨五入到小數點後兩位
return np.round(distances_in_meters, 2)
def add_distances_to_poi(poi_data, distances):
# 新增 distance 欄位
poi_with_distances = []
for i, poi in enumerate(poi_data):
# 複製 POI 資料,新增 distance 欄位
poi_with_distance = poi.copy()
poi_with_distance['distance'] = distances[i]
poi_with_distances.append(poi_with_distance)
return poi_with_distances
def parse_ner_targets(parsed_entities):
"""
將 parsed_entities 中的 target 拆解為 primarytype 和 displayname,
"""
# 定義主要類別
primary_categories = {"restaurant", "shopping_mall", "hospital", "parking", "tourist_attraction",
"convenience_store", "bank", "car_repair", "electric_vehicle_charging_station",
"gas_station", "movie_theater"}
parsed_result = {
"primarytype": '',
"displayname": '',
"distance": parsed_entities.get("distance", []),
"open": parsed_entities.get("open", []),
"rank": parsed_entities.get("rank", [])
}
targets = parsed_entities.get("target", [])
for target in targets:
if target in primary_categories:
parsed_result["primarytype"] = target
else:
parsed_result["displayname"] = target
return parsed_result
def filter_by_target(poi_with_distances, parsed_targets):
"""
用 parsed_targets 的內容篩選 poi_with_distances。
1. 如果 parsed_targets['displayname'] 不為空字串 (''),
以正則匹配完整詞,不被其他字母數字相鄰
2. 若 parsed_targets['displayname'] 為空字串 (''),
則以 parsed_targets['primarytype'] 檢查 poi_with_distances['types'] 中是否包含
"""
target_displayname = parsed_targets.get("displayname", "").strip()
target_type = parsed_targets.get("primarytype", "").strip()
if target_displayname:
print('target_displayname')
pattern = re.compile(rf'\b{re.escape(target_displayname.lower())}\b')
filtered_pois = [
poi for poi in poi_with_distances
if pattern.search(poi['englishName'].lower())
]
else:
# displayname為空,就看types裡面是否含有target_type
filtered_pois = [
poi for poi in poi_with_distances
if target_type in poi.get("types", [])
]
return filtered_pois
# 以距離 (distance) 過濾
def filter_by_distance(poi_list, parsed_targets):
"""
1) 先按照 distance 升冪(由近到遠)進行排序
2) 若 parsed_targets['distance'] = ['500'],則只保留 distance < 500 的項目
若無 distance 或 distance 為空,則返回原清單(但也還是已經排序好)
"""
# 先依距離由小到大排序
sorted_by_distance = sorted(
poi_list,
key=lambda x: x.get("distance", float("inf")) # 如果沒有 distance,當作無限大
)
# 若沒有傳入 distance 閾值,直接回傳「已排序」的結果
if not parsed_targets.get("distance"):
return sorted_by_distance
# 假設只取第一個 distance 閾值
distance_str = parsed_targets["distance"][0]
distance_threshold = float(distance_str)
# 再針對距離做篩選
filtered_result = [
poi for poi in sorted_by_distance
if poi.get("distance") is not None and poi["distance"] < distance_threshold
]
return filtered_result
# 以評分 (rating) 過濾或排序
def filter_or_sort_by_rating(poi_list, parsed_targets):
"""
若 parsed_targets['rank'] 為小數點字串 (例如 "4.0") => 過濾 rating >= 該值
若 parsed_targets['rank'] 為 "highest" => 依 rating 高到低排序
其他情況 (例如沒有 rank 或 rank 為空) 則不處理,直接返回原清單
"""
# 如果沒有 rank 或 rank 為空 list,就直接回傳原清單
if not parsed_targets.get("rank"):
return poi_list
rank_value = parsed_targets["rank"][0] # 只取第一個
# Case 1:若為 "highest" => 依 rating 高到低排序
if rank_value.lower() == "highest":
return sorted(poi_list, key=lambda x: x["rating"] if x["rating"] is not None else 0, reverse=True)
# Case 2:否則視為數值字符串 (如 "4.5")
rank_float = float(rank_value) # 可直接轉 float,因為不會有其他無法轉換的情況
return [
poi for poi in poi_list
if poi.get("rating") and poi["rating"] >= rank_float
]
# 以營業時間 (open) 過濾
def filter_by_open(poi_with_distances, parsed_targets):
"""
不動原始 poi_with_distances,只回傳一個「新清單」。
在新清單的每個 POI dict 中,新增/更新 key: "isOpenNow" (True/False),
表示「在當下星期 + parsed_targets['open'] 時間下」是否營業。
- 若 parsed_targets['open'] 是空,則直接將 isOpenNow 設為 None (不判斷)。
- 其餘欄位皆照原資料保留。
"""
# 如果沒有 open,則不判斷,整份清單都 isOpenNow = None
if not parsed_targets.get("open"):
new_list = []
for poi in poi_with_distances:
poi_copy = poi.copy() # 淺拷貝,避免改到原資料
poi_copy["isOpenNow"] = None # 不判斷,固定 None
new_list.append(poi_copy)
return new_list
# 取得使用者指定的小時,例如 15 / 21
target_hour = float(parsed_targets["open"][0]) # 假設只取第一個
# 系統當下是星期幾 (0=星期一, 6=星期日)
current_weekday_py = time.localtime().tm_wday
# 轉成 POI 的 day: (0=星期日, 1=星期一, ... 6=星期六)
current_weekday_poi = (current_weekday_py + 1) % 7
new_list = []
for poi in poi_with_distances:
poi_copy = poi.copy() # 建立副本,不動原資料
# 預設 isOpenNow = False
poi_copy["isOpenNow"] = False
regular_hours = poi_copy.get("regularOpeningHours")
if regular_hours and "periods" in regular_hours:
periods = regular_hours["periods"]
# 檢查當天 (current_weekday_poi) 時段
for p in periods:
if p["open"]["day"] == current_weekday_poi:
open_hour = p["open"]["hour"]
close_hour = p["close"]["hour"]
# 分鐘先忽略,只看整點
if open_hour <= target_hour < close_hour:
poi_copy["isOpenNow"] = True
break
new_list.append(poi_copy)
return new_list
def filter_today_opening_hours(poi_list):
"""
過濾 POI 列表中的營業時間,僅保留今天的時間段。
"""
# 取得今天的星期 (0 表示星期一, 6 表示星期日)
today = time.localtime().tm_wday
# 遍歷 POI 資料
for poi in poi_list:
# 如果 regularOpeningHours 為 None,跳過處理
if poi.get('regularOpeningHours') is None or 'periods' not in poi['regularOpeningHours']:
continue
# 過濾只保留今天的營業時間
poi['regularOpeningHours']['periods'] = [
period for period in poi['regularOpeningHours']['periods']
if period['open']['day'] == today
]
return poi_list
def filter_pois_by_rating_count(pois, user_rating_threshold=1000):
filtered_pois = [
poi for poi in pois
if (poi.get('userRatingCount') or 0) >= user_rating_threshold
]
return filtered_pois
# 帶入pois 過濾poi type
def filter_pois_by_type(pois, poi_type):
"""
Filters a list of POIs based on a specified type.
"""
return [poi for poi in pois if poi_type in poi.get("types", [])]
def format_english_name(english_name):
"""
接收 englishname 字串,並根據分隔符號處理字串。
"""
# 使用正則表達式拆分字符串,根據 '|', '/', '\' 作為分隔符
parts = re.split(r'[||//\\\]', english_name)
# 確保至少有兩個部分可以處理
if len(parts) > 1:
main_part = parts[0].strip() # 取第一部分並去除前後空格
first_segment = parts[1].strip() # 取第二部分並去除前後空格
return f"{main_part} ({first_segment})"
return english_name # 若無有效分隔符,則返回原始字符串
def truncate_and_fix_brackets(input_str, max_length=70):
"""
簡化並修正括號的字串處理
1. 若字串超過指定長度(預設 70 字),則確保不在單字中間截斷,並適當延伸到完整單字結束後。
2. 截斷後,若發現未完整的括號(如 `(` `(` `{` `[`),則自動補齊對應的右括號。
"""
# 若字串長度在允許範圍內,直接回傳
if len(input_str) <= max_length:
return input_str
# **步驟 1:確保不截斷單字**
truncated_str = input_str[:max_length] # 初步截斷
remaining_str = input_str[max_length:]
# 搜尋下一個可用的空格或標點,避免單字被切開
match = re.search(r'[\s,!)\]\},)]', remaining_str) # 尋找最近的空格或標點
if match:
truncated_str += remaining_str[:match.start()] # 延伸至完整的單字結束後
else:
truncated_str = input_str # 若沒有適合的分隔符,則不截斷
# **步驟 2:修正未關閉的括號**
bracket_pairs = {'(': ')', '(': ')', '[': ']', '{': '}'}
open_brackets = [] # 用來追蹤未關閉的括號
for char in truncated_str:
if char in bracket_pairs: # 若遇到左括號,記錄對應的右括號
open_brackets.append(bracket_pairs[char])
elif char in bracket_pairs.values(): # 若遇到右括號,確認是否匹配
if open_brackets and open_brackets[-1] == char:
open_brackets.pop()
# 依照順序補齊缺少的右括號
for missing in reversed(open_brackets):
truncated_str += missing
return truncated_str
def generate_tts_message(top_filtered_pois):
"""
生成禮貌 自然語氣的TTS。輸出找到的店名,並隨機選擇一種禮貌模板。
"""
num_places = len(top_filtered_pois)
if num_places == 0:
return "I'm sorry, I couldn't find any matching places nearby. Please try adjusting your preferences."
# 五種模板
templates = [
"I found {num_places} places that might interest you nearby: {details} Have a wonderful day ahead!",
"Here are {num_places} nearby places you might like: {details} Hope this helps you!",
"I've identified {num_places} places nearby: {details} Wishing you a pleasant journey!",
"There are {num_places} places nearby that you might enjoy: {details} Have a great time!",
"Nearby, I found {num_places} places worth checking out: {details} Enjoy exploring!"
]
# 選擇隨機模板
chosen_template = random.choice(templates)
# 構建內容,直接列出店名
details = ", ".join(
[f"***{format_english_name(truncate_and_fix_brackets(poi.get('englishName', 'Unknown Place')))}***" for poi in top_filtered_pois]
) + "."
# 使用模板生成
final_message = chosen_template.format(num_places=num_places, details=details)
return final_message
def generate_display_message(top_filtered_pois):
"""
生成自然語言的顯示消息,包含店名、評分、距離(整數公尺)、與英文簡介。
使用多個模板,隨機選擇一種生成消息。
"""
num_places = len(top_filtered_pois)
if num_places == 0:
return "I'm sorry, I couldn't find any places that match your request in the nearby area. Please try expanding the search radius or adjusting your preferences."
# 定義五種模板
templates = [
"I found {num_places} places nearby. \n{details} Enjoy exploring!",
"Here are {num_places} nearby options: \n{details} Hope you find this helpful!",
"There are {num_places} places worth checking out. \n{details} Have a great time!",
"Nearby, I identified {num_places} options you might like. \n{details} Wishing you a pleasant experience!",
"I discovered {num_places} places close by. \n{details} Enjoy your visit!"
]
# 隨機選擇模板
chosen_template = random.choice(templates)
# 構建詳細內容
details = "\n".join([
f"{idx}. {format_english_name(truncate_and_fix_brackets(poi.get('englishName', 'Unknown Place')))}:\n"
f" - Rating: {poi.get('rating', 'No rating')}/5\n"
f" - Distance: {int(poi.get('distance', 0))} meters\n"
f" - Summary: {poi.get('editorialSummary', {}).get('englishSummary', 'No summary available.') if isinstance(poi.get('editorialSummary'), dict) else 'No summary available.'}"
for idx, poi in enumerate(top_filtered_pois, start=1)
])
# 使用模板生成最終消息
final_message = chosen_template.format(num_places=num_places, details=details)
return final_message