Spaces:
Sleeping
Sleeping
File size: 6,431 Bytes
7363269 071beff b0ea870 ce425c0 22f516b 89c2715 ce425c0 22f516b ce425c0 7363269 ce425c0 7363269 0acfd1d 7363269 0acfd1d 7363269 0acfd1d 7363269 5da9c98 7363269 ce425c0 2da8e5c ce425c0 7363269 22f516b ce425c0 2da8e5c ce425c0 b0ea870 ce425c0 b0ea870 ce425c0 2da8e5c 139ddee ce425c0 b0ea870 ce425c0 b0ea870 ce425c0 b0ea870 ce425c0 b0ea870 7363269 b0ea870 7363269 0acfd1d 5da9c98 ce425c0 7363269 5da9c98 7363269 b0ea870 7363269 b0ea870 7363269 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 |
# command_handler.py (Help message now includes English translations)
import pandas as pd
import re
from linebot.v3.messaging import TextMessage, ImageMessage
# 匯入所有服務函式
from cwa_service import fetch_cwa_alarm_list, fetch_significant_earthquakes, fetch_latest_significant_earthquake
from usgs_service import fetch_global_last24h_text, fetch_taiwan_df_this_year
from ai_service import generate_ai_text
from config import CURRENT_YEAR, MCP_SERVER_URL
def get_help_message() -> TextMessage:
"""
[核心修改]
產生並回傳包含所有指令的說明文字,並附上英文翻譯。
"""
text = (
"📖 指令列表 (Command List)\n"
" (請直接輸入數字 / Please enter a number)\n\n"
"【地震資訊 (Earthquake Info)】\n"
"• 1 - 最新一筆顯著地震 (含圖)\n"
" (Latest Significant Earthquake w/ Map)\n"
"• 2 - 全球近24小時顯著地震 (USGS)\n"
" (Global Quakes in 24h - USGS)\n"
"• 3 - 今年台灣顯著地震列表 (USGS)\n"
" (Taiwan Quakes This Year - USGS)\n"
"• 4 - CWA 地震目錄查詢 (外部連結)\n"
" (CWA Catalog Search - External Link)\n"
"• 5 - CWA 最新地震預警\n"
" (Latest CWA Earthquake Alerts)\n"
"• 6 - CWA 最近7天顯著有感地震\n"
" (Significant Quakes in 7 Days - CWA)\n\n"
"【AI 與工具 (AI & Tools)】\n"
"• 7 <問題> - 與 AI 助理對話\n"
" (Chat with AI Assistant)\n"
" (e.g., 7 2023年臺灣最大地震)\n\n"
"【基本指令 (Basic Commands)】\n"
"• 8 - 關於此機器人 (About this Bot)\n"
"• 9 - 顯示此說明 (Show this Help Message)"
)
return TextMessage(text=text)
def get_info_message() -> TextMessage:
"""產生並回傳關於此機器人的資訊。"""
text = (
"🤖 關於我\n\n"
"我是一個多功能助理機器人,提供地震查詢與 AI 對話功能。\n\n"
"• 版本: 5.3 (Gemini Command-Only Edition)\n"
"• 資料來源: CWA, USGS, Google Gemini\n"
"• 開發者: dayichen"
)
return TextMessage(text=text)
def get_taiwan_earthquake_list() -> TextMessage:
"""從 USGS 取得今年台灣的地震列表並格式化回覆。"""
result = fetch_taiwan_df_this_year()
if isinstance(result, pd.DataFrame):
count = len(result)
lines = [f"🇹🇼 今年 ({CURRENT_YEAR} 年) 台灣區域顯著地震 (M≥5.0),共 {count} 筆:", "-" * 20]
for _, row in result.head(15).iterrows():
t = row["time_utc"].strftime("%Y-%m-%d %H:%M")
lines.append(
f"規模: {row['magnitude']:.1f} | 日期時間: {t} (UTC)\n"
f"地點: {row['place']}\n"
f"報告連結: {row.get('url', '無')}"
)
if count > 15: lines.append(f"... (還有 {count-15} 筆資料)")
reply_text = "\n\n".join(lines)
else: reply_text = result
return TextMessage(text=reply_text)
def get_latest_earthquake_reply() -> list:
"""取得 CWA 最新一筆地震,並組合文字與圖片訊息。"""
try:
latest_eq = fetch_latest_significant_earthquake()
if not latest_eq: return [TextMessage(text="✅ 近期無顯著有感地震報告。")]
mag_str = f"{latest_eq['Magnitude']:.1f}" if latest_eq.get('Magnitude') is not None else "—"
depth_str = f"{latest_eq['Depth']:.0f}" if latest_eq.get('Depth') is not None else "—"
text_message_content = (
f"🚨 CWA 最新顯著有感地震\n"
f"----------------------------------\n"
f"時間: {latest_eq.get('TimeStr', '—')}\n"
f"地點: {latest_eq.get('Location', '—')}\n"
f"規模: M{mag_str} | 深度: {depth_str} km\n"
f"報告: {latest_eq.get('URL', '無')}"
)
reply_messages = [TextMessage(text=text_message_content)]
if latest_eq.get("ImageURL"):
image_url = latest_eq["ImageURL"]
reply_messages.append(ImageMessage(original_content_url=image_url, preview_image_url=image_url))
return reply_messages
except Exception as e: return [TextMessage(text=f"❌ 查詢最新地震失敗:{e}")]
def preprocess_ai_prompt(prompt: str) -> str:
"""預處理使用者發給 AI 的問題。"""
year_match = re.search(r"(\d{4})年", prompt)
if year_match and "最大地震" in prompt:
year = year_match.group(1)
new_prompt = f"請幫我呼叫地震搜尋工具,找出{year}年1月1日至{year}年12月31日之間,規模最大的地震。"
print(f"--- AI Prompt Rewritten ---\nOriginal: {prompt}\nNew: {new_prompt}\n---------------------------")
return new_prompt
return prompt
def process_message(user_message_raw: str, request_base_url: str) -> list:
"""處理使用者輸入的主函式。"""
user_message = (user_message_raw or "").strip()
if not user_message:
return [] # 如果是空訊息,不回應
parts = user_message.split(' ', 1)
command_key = parts[0]
arg = parts[1].strip() if len(parts) > 1 else ""
# 處理固定的數字指令
if command_key == '1': return get_latest_earthquake_reply()
if command_key == '2': return [TextMessage(text=fetch_global_last24h_text())]
if command_key == '3': return [get_taiwan_earthquake_list()]
if command_key == '4': return [TextMessage(text=f"🗺️ 外部地震查詢服務\n\n請點擊以下連結:\n{MCP_SERVER_URL}")]
if command_key == '5': return [TextMessage(text=fetch_cwa_alarm_list(limit=5))]
if command_key == '6': return [TextMessage(text=fetch_significant_earthquakes(limit=5))]
if command_key == '8': return [get_info_message()]
if command_key == '9': return [get_help_message()]
# 只有當指令是 '7' 時,才啟用 AI
if command_key == '7':
if not arg:
return [TextMessage(text="請輸入問題,例如:7 台灣最高的山是哪座?")]
# 將問題進行預處理,然後交給 AI
processed_prompt = preprocess_ai_prompt(arg)
return [TextMessage(text=generate_ai_text(processed_prompt))]
# 對於所有其他無法識別的訊息 (一般對話),回傳說明指令
return [get_help_message()]
|