Spaces:
Sleeping
Sleeping
File size: 4,305 Bytes
071beff 240ea75 118b8c9 071beff 417ae3a 118b8c9 417ae3a 5a97175 417ae3a 118b8c9 417ae3a 8ac5ed8 417ae3a 071beff 118b8c9 240ea75 118b8c9 240ea75 118b8c9 240ea75 118b8c9 071beff 417ae3a 118b8c9 417ae3a 240ea75 118b8c9 240ea75 417ae3a 071beff 118b8c9 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 |
# command_handler.py
import pandas as pd
from linebot.v3.messaging import TextMessage, ImageMessage
# 匯入服務函式
from cwa_service import fetch_cwa_alarm_list, fetch_significant_earthquakes, fetch_latest_significant_earthquake # [新增]
from usgs_service import fetch_global_last24h_text, fetch_taiwan_df_this_year
from plotting_service import create_and_save_map
from ai_service import generate_ai_text
from config import CURRENT_YEAR, HF_SPACE_URL
def get_help_message() -> TextMessage:
"""回傳包含所有可用指令的說明訊息。"""
text = (
"📖 指令列表\n\n"
"【地震資訊】\n"
"• /latest - 查詢最新一筆顯著地震(含圖)\n" # [新增]
"• /global - 查詢全球近24小時顯著地震\n"
"• /taiwan - 顯示今年台灣顯著地震列表\n"
"• /map - 取得台灣顯著地震地圖連結\n"
"• /alert - 獲取 CWA 最新地震預警\n"
"• /significant - CWA 最新顯著有感地震\n\n"
"【AI 與工具】\n"
"• /ai <問題> - 與 AI 助理對話\n\n"
"【基本指令】\n"
"• /info - 關於此機器人\n"
"• /help - 顯示此說明"
)
return TextMessage(text=text)
# ... (get_info_message, get_taiwan_earthquake_list, get_taiwan_earthquake_map 函式不變) ...
# --- [新功能] 處理 /latest 指令的回覆 ---
def get_latest_earthquake_reply() -> list:
"""獲取最新地震資訊並組合成 LINE 訊息"""
try:
latest_eq = fetch_latest_significant_earthquake()
if not latest_eq:
return [TextMessage(text="✅ 近期無顯著有感地震報告。")]
# 組合文字訊息
mag_str = f"{latest_eq['Magnitude']:.1f}" if latest_eq.get('Magnitude') is not None else "—"
depth_str = f"{latest_eq['Depth']:.0f}" if latest_eq.get('Depth') is not None else "—"
text_message_content = (
f"🚨 CWA 最新顯著有感地震\n"
f"----------------------------------\n"
f"時間: {latest_eq.get('TimeStr', '—')}\n"
f"地點: {latest_eq.get('Location', '—')}\n"
f"規模: M{mag_str} | 深度: {depth_str} km\n"
f"報告: {latest_eq.get('URL', '無')}"
)
reply_messages = [TextMessage(text=text_message_content)]
# 如果有圖片網址,則新增圖片訊息
if latest_eq.get("ImageURL"):
image_url = latest_eq["ImageURL"]
reply_messages.append(
ImageMessage(original_content_url=image_url, preview_image_url=image_url)
)
return reply_messages
except Exception as e:
return [TextMessage(text=f"❌ 查詢最新地震失敗:{e}")]
def process_message(user_message_raw: str, request_base_url: str) -> list:
"""處理使用者的文字訊息並回傳一個包含回覆訊息的列表。"""
user_message = (user_message_raw or "").strip()
# ... (舊指令相容性轉換不變) ...
if user_message.startswith('/'):
parts = user_message.split(' ', 1)
command = parts[0].lower()
arg = parts[1] if len(parts) > 1 else ""
if command == '/help':
return [get_help_message()]
elif command == '/info':
return [get_info_message()]
elif command == '/latest': # [新增]
return get_latest_earthquake_reply()
elif command == '/global':
return [TextMessage(text=fetch_global_last24h_text())]
elif command == '/taiwan':
return [get_taiwan_earthquake_list()]
elif command == '/map':
return get_taiwan_earthquake_map(request_base_url)
elif command == '/alert':
return [TextMessage(text=fetch_cwa_alarm_list(limit=5))]
elif command == '/significant':
return [TextMessage(text=fetch_significant_earthquakes(limit=5))]
elif command == '/ai':
if not arg:
return [TextMessage(text="請輸入問題,例如:/ai 台灣最高的山是哪座?")]
return [TextMessage(text=generate_ai_text(arg))]
# 若非指令,則預設交給 AI 處理 (作為備援)
return [TextMessage(text=generate_ai_text(user_message))]
|