LINE-ROBOT / command_handler.py
cwadayi's picture
Update command_handler.py
7363269 verified
raw
history blame
6.43 kB
# command_handler.py (Help message now includes English translations)
import pandas as pd
import re
from linebot.v3.messaging import TextMessage, ImageMessage
# 匯入所有服務函式
from cwa_service import fetch_cwa_alarm_list, fetch_significant_earthquakes, fetch_latest_significant_earthquake
from usgs_service import fetch_global_last24h_text, fetch_taiwan_df_this_year
from ai_service import generate_ai_text
from config import CURRENT_YEAR, MCP_SERVER_URL
def get_help_message() -> TextMessage:
"""
[核心修改]
產生並回傳包含所有指令的說明文字,並附上英文翻譯。
"""
text = (
"📖 指令列表 (Command List)\n"
" (請直接輸入數字 / Please enter a number)\n\n"
"【地震資訊 (Earthquake Info)】\n"
"• 1 - 最新一筆顯著地震 (含圖)\n"
" (Latest Significant Earthquake w/ Map)\n"
"• 2 - 全球近24小時顯著地震 (USGS)\n"
" (Global Quakes in 24h - USGS)\n"
"• 3 - 今年台灣顯著地震列表 (USGS)\n"
" (Taiwan Quakes This Year - USGS)\n"
"• 4 - CWA 地震目錄查詢 (外部連結)\n"
" (CWA Catalog Search - External Link)\n"
"• 5 - CWA 最新地震預警\n"
" (Latest CWA Earthquake Alerts)\n"
"• 6 - CWA 最近7天顯著有感地震\n"
" (Significant Quakes in 7 Days - CWA)\n\n"
"【AI 與工具 (AI & Tools)】\n"
"• 7 <問題> - 與 AI 助理對話\n"
" (Chat with AI Assistant)\n"
" (e.g., 7 2023年臺灣最大地震)\n\n"
"【基本指令 (Basic Commands)】\n"
"• 8 - 關於此機器人 (About this Bot)\n"
"• 9 - 顯示此說明 (Show this Help Message)"
)
return TextMessage(text=text)
def get_info_message() -> TextMessage:
"""產生並回傳關於此機器人的資訊。"""
text = (
"🤖 關於我\n\n"
"我是一個多功能助理機器人,提供地震查詢與 AI 對話功能。\n\n"
"• 版本: 5.3 (Gemini Command-Only Edition)\n"
"• 資料來源: CWA, USGS, Google Gemini\n"
"• 開發者: dayichen"
)
return TextMessage(text=text)
def get_taiwan_earthquake_list() -> TextMessage:
"""從 USGS 取得今年台灣的地震列表並格式化回覆。"""
result = fetch_taiwan_df_this_year()
if isinstance(result, pd.DataFrame):
count = len(result)
lines = [f"🇹🇼 今年 ({CURRENT_YEAR} 年) 台灣區域顯著地震 (M≥5.0),共 {count} 筆:", "-" * 20]
for _, row in result.head(15).iterrows():
t = row["time_utc"].strftime("%Y-%m-%d %H:%M")
lines.append(
f"規模: {row['magnitude']:.1f} | 日期時間: {t} (UTC)\n"
f"地點: {row['place']}\n"
f"報告連結: {row.get('url', '無')}"
)
if count > 15: lines.append(f"... (還有 {count-15} 筆資料)")
reply_text = "\n\n".join(lines)
else: reply_text = result
return TextMessage(text=reply_text)
def get_latest_earthquake_reply() -> list:
"""取得 CWA 最新一筆地震,並組合文字與圖片訊息。"""
try:
latest_eq = fetch_latest_significant_earthquake()
if not latest_eq: return [TextMessage(text="✅ 近期無顯著有感地震報告。")]
mag_str = f"{latest_eq['Magnitude']:.1f}" if latest_eq.get('Magnitude') is not None else "—"
depth_str = f"{latest_eq['Depth']:.0f}" if latest_eq.get('Depth') is not None else "—"
text_message_content = (
f"🚨 CWA 最新顯著有感地震\n"
f"----------------------------------\n"
f"時間: {latest_eq.get('TimeStr', '—')}\n"
f"地點: {latest_eq.get('Location', '—')}\n"
f"規模: M{mag_str} | 深度: {depth_str} km\n"
f"報告: {latest_eq.get('URL', '無')}"
)
reply_messages = [TextMessage(text=text_message_content)]
if latest_eq.get("ImageURL"):
image_url = latest_eq["ImageURL"]
reply_messages.append(ImageMessage(original_content_url=image_url, preview_image_url=image_url))
return reply_messages
except Exception as e: return [TextMessage(text=f"❌ 查詢最新地震失敗:{e}")]
def preprocess_ai_prompt(prompt: str) -> str:
"""預處理使用者發給 AI 的問題。"""
year_match = re.search(r"(\d{4})年", prompt)
if year_match and "最大地震" in prompt:
year = year_match.group(1)
new_prompt = f"請幫我呼叫地震搜尋工具,找出{year}年1月1日至{year}年12月31日之間,規模最大的地震。"
print(f"--- AI Prompt Rewritten ---\nOriginal: {prompt}\nNew: {new_prompt}\n---------------------------")
return new_prompt
return prompt
def process_message(user_message_raw: str, request_base_url: str) -> list:
"""處理使用者輸入的主函式。"""
user_message = (user_message_raw or "").strip()
if not user_message:
return [] # 如果是空訊息,不回應
parts = user_message.split(' ', 1)
command_key = parts[0]
arg = parts[1].strip() if len(parts) > 1 else ""
# 處理固定的數字指令
if command_key == '1': return get_latest_earthquake_reply()
if command_key == '2': return [TextMessage(text=fetch_global_last24h_text())]
if command_key == '3': return [get_taiwan_earthquake_list()]
if command_key == '4': return [TextMessage(text=f"🗺️ 外部地震查詢服務\n\n請點擊以下連結:\n{MCP_SERVER_URL}")]
if command_key == '5': return [TextMessage(text=fetch_cwa_alarm_list(limit=5))]
if command_key == '6': return [TextMessage(text=fetch_significant_earthquakes(limit=5))]
if command_key == '8': return [get_info_message()]
if command_key == '9': return [get_help_message()]
# 只有當指令是 '7' 時,才啟用 AI
if command_key == '7':
if not arg:
return [TextMessage(text="請輸入問題,例如:7 台灣最高的山是哪座?")]
# 將問題進行預處理,然後交給 AI
processed_prompt = preprocess_ai_prompt(arg)
return [TextMessage(text=generate_ai_text(processed_prompt))]
# 對於所有其他無法識別的訊息 (一般對話),回傳說明指令
return [get_help_message()]