LINE-ROBOT / command_handler.py
cwadayi's picture
Update command_handler.py
118b8c9 verified
raw
history blame
4.31 kB
# command_handler.py
import pandas as pd
from linebot.v3.messaging import TextMessage, ImageMessage
# 匯入服務函式
from cwa_service import fetch_cwa_alarm_list, fetch_significant_earthquakes, fetch_latest_significant_earthquake # [新增]
from usgs_service import fetch_global_last24h_text, fetch_taiwan_df_this_year
from plotting_service import create_and_save_map
from ai_service import generate_ai_text
from config import CURRENT_YEAR, HF_SPACE_URL
def get_help_message() -> TextMessage:
"""回傳包含所有可用指令的說明訊息。"""
text = (
"📖 指令列表\n\n"
"【地震資訊】\n"
"• /latest - 查詢最新一筆顯著地震(含圖)\n" # [新增]
"• /global - 查詢全球近24小時顯著地震\n"
"• /taiwan - 顯示今年台灣顯著地震列表\n"
"• /map - 取得台灣顯著地震地圖連結\n"
"• /alert - 獲取 CWA 最新地震預警\n"
"• /significant - CWA 最新顯著有感地震\n\n"
"【AI 與工具】\n"
"• /ai <問題> - 與 AI 助理對話\n\n"
"【基本指令】\n"
"• /info - 關於此機器人\n"
"• /help - 顯示此說明"
)
return TextMessage(text=text)
# ... (get_info_message, get_taiwan_earthquake_list, get_taiwan_earthquake_map 函式不變) ...
# --- [新功能] 處理 /latest 指令的回覆 ---
def get_latest_earthquake_reply() -> list:
"""獲取最新地震資訊並組合成 LINE 訊息"""
try:
latest_eq = fetch_latest_significant_earthquake()
if not latest_eq:
return [TextMessage(text="✅ 近期無顯著有感地震報告。")]
# 組合文字訊息
mag_str = f"{latest_eq['Magnitude']:.1f}" if latest_eq.get('Magnitude') is not None else "—"
depth_str = f"{latest_eq['Depth']:.0f}" if latest_eq.get('Depth') is not None else "—"
text_message_content = (
f"🚨 CWA 最新顯著有感地震\n"
f"----------------------------------\n"
f"時間: {latest_eq.get('TimeStr', '—')}\n"
f"地點: {latest_eq.get('Location', '—')}\n"
f"規模: M{mag_str} | 深度: {depth_str} km\n"
f"報告: {latest_eq.get('URL', '無')}"
)
reply_messages = [TextMessage(text=text_message_content)]
# 如果有圖片網址,則新增圖片訊息
if latest_eq.get("ImageURL"):
image_url = latest_eq["ImageURL"]
reply_messages.append(
ImageMessage(original_content_url=image_url, preview_image_url=image_url)
)
return reply_messages
except Exception as e:
return [TextMessage(text=f"❌ 查詢最新地震失敗:{e}")]
def process_message(user_message_raw: str, request_base_url: str) -> list:
"""處理使用者的文字訊息並回傳一個包含回覆訊息的列表。"""
user_message = (user_message_raw or "").strip()
# ... (舊指令相容性轉換不變) ...
if user_message.startswith('/'):
parts = user_message.split(' ', 1)
command = parts[0].lower()
arg = parts[1] if len(parts) > 1 else ""
if command == '/help':
return [get_help_message()]
elif command == '/info':
return [get_info_message()]
elif command == '/latest': # [新增]
return get_latest_earthquake_reply()
elif command == '/global':
return [TextMessage(text=fetch_global_last24h_text())]
elif command == '/taiwan':
return [get_taiwan_earthquake_list()]
elif command == '/map':
return get_taiwan_earthquake_map(request_base_url)
elif command == '/alert':
return [TextMessage(text=fetch_cwa_alarm_list(limit=5))]
elif command == '/significant':
return [TextMessage(text=fetch_significant_earthquakes(limit=5))]
elif command == '/ai':
if not arg:
return [TextMessage(text="請輸入問題,例如:/ai 台灣最高的山是哪座?")]
return [TextMessage(text=generate_ai_text(arg))]
# 若非指令,則預設交給 AI 處理 (作為備援)
return [TextMessage(text=generate_ai_text(user_message))]