# command_handler.py import pandas as pd from linebot.v3.messaging import TextMessage, ImageMessage # 匯入服務函式 from cwa_service import fetch_cwa_alarm_list from usgs_service import fetch_global_last24h_text, fetch_taiwan_df_this_year from plotting_service import create_and_save_map from ai_service import generate_ai_text from config import CURRENT_YEAR, HF_SPACE_URL def get_help_message() -> TextMessage: """回傳包含所有可用指令的說明訊息。""" text = ( "📖 指令\n\n" "• /help\n" "• 地震 / quake(全球近24小時,含日期時間)\n" "• 臺灣地震 / 台灣地震(今年台灣區域清單)\n" "• 臺灣地震畫圖 / 台灣地震畫圖(今年台灣區域分佈圖)\n" "• 地震預警(CWA 最新 5 筆)\n" "• ai 你的問題(AI 對話)\n" "• 你好" ) return TextMessage(text=text) def get_taiwan_earthquake_list() -> TextMessage: """回傳近期的台灣地震文字列表。""" result = fetch_taiwan_df_this_year() if isinstance(result, pd.DataFrame): count = len(result) lines = [f"🇹🇼 今年 ({CURRENT_YEAR} 年) 台灣區域顯著地震 (M≥5.0),共 {count} 筆:", "-" * 20] for _, row in result.head(15).iterrows(): t = row["time_utc"].strftime("%Y-%m-%d %H:%M") lines.append(f"震級: {row['magnitude']:.1f} | 日期時間: {t} (UTC)\n地點: {row['place']}") if count > 15: lines.append(f"... (還有 {count - 15} 筆,可用「臺灣地震畫圖」查看全部)") reply_text = "\n\n".join(lines) else: reply_text = result return TextMessage(text=reply_text) def get_taiwan_earthquake_map(base_url: str) -> list: """產生並回傳台灣地震地圖的訊息。""" result = fetch_taiwan_df_this_year() if isinstance(result, pd.DataFrame): filename = create_and_save_map(result) # 如果 HF_SPACE_URL 存在就使用它,否則使用請求的 base_url image_url = f"{(HF_SPACE_URL or base_url)}/static/{filename}" return [ TextMessage(text="🗺️ 已為您繪製今年台灣區域 M≥5.0 地震分佈圖(UTC)。"), ImageMessage(original_content_url=image_url, preview_image_url=image_url), ] else: return [TextMessage(text=result)] def process_message(user_message_raw: str, request_base_url: str) -> list: """處理使用者的文字訊息並回傳一個包含回覆訊息的列表。""" user_message = (user_message_raw or "").strip().lower() if "地震預警" in user_message: reply_text = fetch_cwa_alarm_list(limit=5) return [TextMessage(text=reply_text)] if "臺灣地震畫圖" in user_message or "台灣地震畫圖" in user_message: return get_taiwan_earthquake_map(request_base_url) if "臺灣地震" in user_message or "台灣地震" in user_message: return [get_taiwan_earthquake_list()] if user_message == "/help": return [get_help_message()] if "地震" in user_message or "quake" in user_message: reply_text = fetch_global_last24h_text() return [TextMessage(text=reply_text)] if user_message.startswith("ai ") or user_message.startswith("ai:") or user_message.startswith("ai:"): prompt = user_message_raw[2:].lstrip(" ::").strip() or "請簡要介紹你的功能。" ai_text = generate_ai_text(prompt) return [TextMessage(text=ai_text)] if "你好" in user_message or "hi" in user_message: return [TextMessage(text="👋 你好!輸入 /help 查看指令。")] # 對於所有其他訊息,使用 AI 作為備援回覆 fallback_text = generate_ai_text(user_message_raw) return [TextMessage(text=fallback_text)]