Spaces:
Sleeping
Sleeping
| # command_handler.py | |
| import pandas as pd | |
| from linebot.v3.messaging import TextMessage, ImageMessage | |
| # 匯入服務函式 | |
| from cwa_service import fetch_cwa_alarm_list, fetch_significant_earthquakes, fetch_latest_significant_earthquake # [新增] | |
| from usgs_service import fetch_global_last24h_text, fetch_taiwan_df_this_year | |
| from plotting_service import create_and_save_map | |
| from ai_service import generate_ai_text | |
| from config import CURRENT_YEAR, HF_SPACE_URL | |
| def get_help_message() -> TextMessage: | |
| """回傳包含所有可用指令的說明訊息。""" | |
| text = ( | |
| "📖 指令列表\n\n" | |
| "【地震資訊】\n" | |
| "• /latest - 查詢最新一筆顯著地震(含圖)\n" # [新增] | |
| "• /global - 查詢全球近24小時顯著地震\n" | |
| "• /taiwan - 顯示今年台灣顯著地震列表\n" | |
| "• /map - 取得台灣顯著地震地圖連結\n" | |
| "• /alert - 獲取 CWA 最新地震預警\n" | |
| "• /significant - CWA 最新顯著有感地震\n\n" | |
| "【AI 與工具】\n" | |
| "• /ai <問題> - 與 AI 助理對話\n\n" | |
| "【基本指令】\n" | |
| "• /info - 關於此機器人\n" | |
| "• /help - 顯示此說明" | |
| ) | |
| return TextMessage(text=text) | |
| # ... (get_info_message, get_taiwan_earthquake_list, get_taiwan_earthquake_map 函式不變) ... | |
| # --- [新功能] 處理 /latest 指令的回覆 --- | |
| def get_latest_earthquake_reply() -> list: | |
| """獲取最新地震資訊並組合成 LINE 訊息""" | |
| try: | |
| latest_eq = fetch_latest_significant_earthquake() | |
| if not latest_eq: | |
| return [TextMessage(text="✅ 近期無顯著有感地震報告。")] | |
| # 組合文字訊息 | |
| mag_str = f"{latest_eq['Magnitude']:.1f}" if latest_eq.get('Magnitude') is not None else "—" | |
| depth_str = f"{latest_eq['Depth']:.0f}" if latest_eq.get('Depth') is not None else "—" | |
| text_message_content = ( | |
| f"🚨 CWA 最新顯著有感地震\n" | |
| f"----------------------------------\n" | |
| f"時間: {latest_eq.get('TimeStr', '—')}\n" | |
| f"地點: {latest_eq.get('Location', '—')}\n" | |
| f"規模: M{mag_str} | 深度: {depth_str} km\n" | |
| f"報告: {latest_eq.get('URL', '無')}" | |
| ) | |
| reply_messages = [TextMessage(text=text_message_content)] | |
| # 如果有圖片網址,則新增圖片訊息 | |
| if latest_eq.get("ImageURL"): | |
| image_url = latest_eq["ImageURL"] | |
| reply_messages.append( | |
| ImageMessage(original_content_url=image_url, preview_image_url=image_url) | |
| ) | |
| return reply_messages | |
| except Exception as e: | |
| return [TextMessage(text=f"❌ 查詢最新地震失敗:{e}")] | |
| def process_message(user_message_raw: str, request_base_url: str) -> list: | |
| """處理使用者的文字訊息並回傳一個包含回覆訊息的列表。""" | |
| user_message = (user_message_raw or "").strip() | |
| # ... (舊指令相容性轉換不變) ... | |
| if user_message.startswith('/'): | |
| parts = user_message.split(' ', 1) | |
| command = parts[0].lower() | |
| arg = parts[1] if len(parts) > 1 else "" | |
| if command == '/help': | |
| return [get_help_message()] | |
| elif command == '/info': | |
| return [get_info_message()] | |
| elif command == '/latest': # [新增] | |
| return get_latest_earthquake_reply() | |
| elif command == '/global': | |
| return [TextMessage(text=fetch_global_last24h_text())] | |
| elif command == '/taiwan': | |
| return [get_taiwan_earthquake_list()] | |
| elif command == '/map': | |
| return get_taiwan_earthquake_map(request_base_url) | |
| elif command == '/alert': | |
| return [TextMessage(text=fetch_cwa_alarm_list(limit=5))] | |
| elif command == '/significant': | |
| return [TextMessage(text=fetch_significant_earthquakes(limit=5))] | |
| elif command == '/ai': | |
| if not arg: | |
| return [TextMessage(text="請輸入問題,例如:/ai 台灣最高的山是哪座?")] | |
| return [TextMessage(text=generate_ai_text(arg))] | |
| # 若非指令,則預設交給 AI 處理 (作為備援) | |
| return [TextMessage(text=generate_ai_text(user_message))] | |