LINE-ROBOT / command_handler.py
cwadayi's picture
Update command_handler.py
54641dd verified
raw
history blame
5.07 kB
# command_handler.py
import pandas as pd
from linebot.v3.messaging import TextMessage, ImageMessage
# 匯入服務函式
from cwa_service import fetch_cwa_alarm_list, fetch_significant_earthquakes
from usgs_service import fetch_global_last24h_text, fetch_taiwan_df_this_year
from plotting_service import create_and_save_map
from ai_service import generate_ai_text
from weather_service import fetch_forecast_by_location
from config import CURRENT_YEAR, HF_SPACE_URL
def get_help_message() -> TextMessage:
"""回傳包含所有可用指令的說明訊息。"""
text = (
"📖 指令列表\n\n"
"【地震資訊】\n"
"• /global - 查詢全球近24小時顯著地震\n"
"• /taiwan - 顯示今年台灣顯著地震列表\n"
"• /map - 繪製台灣顯著地震地圖\n"
"• /alert - 獲取 CWA 最新地震預警\n"
"• /significant - CWA 最新顯著有感地震\n\n"
"【AI 與工具】\n"
"• /ai <問題> - 與 AI 助理對話\n"
"• /weather <地點> - 查詢天氣預報\n\n"
"【基本指令】\n"
"• /info - 關於此機器人\n"
"• /help - 顯示此說明"
)
return TextMessage(text=text)
def get_info_message() -> TextMessage:
"""回傳機器人資訊。"""
text = (
"🤖 關於我\n\n"
"我是一個多功能助理機器人,提供地震查詢、天氣資訊與 AI 對話功能。\n\n"
"• 版本: 2.0\n"
"• 資料來源: CWA, USGS, Hugging Face\n"
"• 開發者: dayichen"
)
return TextMessage(text=text)
def get_taiwan_earthquake_list() -> TextMessage:
"""回傳近期的台灣地震文字列表。"""
result = fetch_taiwan_df_this_year()
if isinstance(result, pd.DataFrame):
count = len(result)
lines = [f"🇹🇼 今年 ({CURRENT_YEAR} 年) 台灣區域顯著地震 (M≥5.0),共 {count} 筆:", "-" * 20]
for _, row in result.head(15).iterrows():
t = row["time_utc"].strftime("%Y-%m-%d %H:%M")
lines.append(f"震級: {row['magnitude']:.1f} | 日期時間: {t} (UTC)\n地點: {row['place']}")
if count > 15:
lines.append(f"... (還有 {count - 15} 筆,請用 /map 查看全部)")
reply_text = "\n\n".join(lines)
else:
reply_text = result
return TextMessage(text=reply_text)
def get_taiwan_earthquake_map(base_url: str) -> list:
"""產生並回傳台灣地震地圖的訊息。"""
result = fetch_taiwan_df_this_year()
if isinstance(result, pd.DataFrame):
filename = create_and_save_map(result)
image_url = f"{(HF_SPACE_URL or base_url)}/static/{filename}"
return [
TextMessage(text="🗺️ 已為您繪製今年台灣區域 M≥5.0 地震分佈圖(UTC)。"),
ImageMessage(original_content_url=image_url, preview_image_url=image_url),
]
else:
return [TextMessage(text=result)]
def process_message(user_message_raw: str, request_base_url: str) -> list:
"""處理使用者的文字訊息並回傳一個包含回覆訊息的列表。"""
# [修正] 加上這一行來定義 user_message 變數
user_message = (user_message_raw or "").strip()
# 將舊指令對應到新指令,提供向下相容
if user_message.lower() in ["地震", "quake"]:
user_message = "/global"
elif user_message.lower() in ["台灣地震", "臺灣地震"]:
user_message = "/taiwan"
elif user_message.lower() in ["台灣地震畫圖", "臺灣地震畫圖"]:
user_message = "/map"
elif user_message.lower() == "地震預警":
user_message = "/alert"
if user_message.startswith('/'):
parts = user_message.split(' ', 1)
command = parts[0].lower()
arg = parts[1] if len(parts) > 1 else ""
if command == '/help':
return [get_help_message()]
elif command == '/info':
return [get_info_message()]
elif command == '/global':
return [TextMessage(text=fetch_global_last24h_text())]
elif command == '/taiwan':
return [get_taiwan_earthquake_list()]
elif command == '/map':
return get_taiwan_earthquake_map(request_base_url)
elif command == '/alert':
return [TextMessage(text=fetch_cwa_alarm_list(limit=5))]
elif command == '/significant':
return [TextMessage(text=fetch_significant_earthquakes(limit=5))]
elif command == '/weather':
if not arg:
return [TextMessage(text="請輸入地點,例如:/weather 台北")]
return [TextMessage(text=fetch_forecast_by_location(arg))]
elif command == '/ai':
if not arg:
return [TextMessage(text="請輸入問題,例如:/ai 台灣最高的山是哪座?")]
return [TextMessage(text=generate_ai_text(arg))]
# 若非指令,則預設交給 AI 處理 (作為備援)
return [TextMessage(text=generate_ai_text(user_message))]