cwadayi commited on
Commit
97ce441
·
verified ·
1 Parent(s): a289bfe

Update command_handler.py

Browse files
Files changed (1) hide show
  1. command_handler.py +88 -37
command_handler.py CHANGED
@@ -3,7 +3,7 @@ import pandas as pd
3
  from linebot.v3.messaging import TextMessage, ImageMessage
4
 
5
  # 匯入服務函式
6
- from cwa_service import fetch_cwa_alarm_list, fetch_significant_earthquakes, fetch_latest_significant_earthquake # [新增]
7
  from usgs_service import fetch_global_last24h_text, fetch_taiwan_df_this_year
8
  from plotting_service import create_and_save_map
9
  from ai_service import generate_ai_text
@@ -14,7 +14,7 @@ def get_help_message() -> TextMessage:
14
  text = (
15
  "📖 指令列表\n\n"
16
  "【地震資訊】\n"
17
- "• /latest - 查詢最新一筆顯著地震(含圖)\n" # [新增]
18
  "• /global - 查詢全球近24小時顯著地震\n"
19
  "• /taiwan - 顯示今年台灣顯著地震列表\n"
20
  "• /map - 取得台灣顯著地震地圖連結\n"
@@ -28,48 +28,66 @@ def get_help_message() -> TextMessage:
28
  )
29
  return TextMessage(text=text)
30
 
31
- # ... (get_info_message, get_taiwan_earthquake_list, get_taiwan_earthquake_map 函式不變) ...
32
-
33
- # --- [新功能] 處理 /latest 指令的回覆 ---
34
- def get_latest_earthquake_reply() -> list:
35
- """獲取最新地震資訊並組合成 LINE 訊息"""
36
- try:
37
- latest_eq = fetch_latest_significant_earthquake()
38
- if not latest_eq:
39
- return [TextMessage(text="✅ 近期無顯著有感地震報告。")]
40
-
41
- # 組合文字訊息
42
- mag_str = f"{latest_eq['Magnitude']:.1f}" if latest_eq.get('Magnitude') is not None else "—"
43
- depth_str = f"{latest_eq['Depth']:.0f}" if latest_eq.get('Depth') is not None else "—"
44
-
45
- text_message_content = (
46
- f"🚨 CWA 最新顯著有感地震\n"
47
- f"----------------------------------\n"
48
- f"時間: {latest_eq.get('TimeStr', '—')}\n"
49
- f"地點: {latest_eq.get('Location', '—')}\n"
50
- f"規模: M{mag_str} | 深度: {depth_str} km\n"
51
- f"報告: {latest_eq.get('URL', '無')}"
52
- )
53
- reply_messages = [TextMessage(text=text_message_content)]
54
 
55
- # 如果有圖片網址,則新增圖片訊息
56
- if latest_eq.get("ImageURL"):
57
- image_url = latest_eq["ImageURL"]
58
- reply_messages.append(
59
- ImageMessage(original_content_url=image_url, preview_image_url=image_url)
 
 
 
 
 
 
 
 
60
  )
61
-
62
- return reply_messages
63
-
64
- except Exception as e:
65
- return [TextMessage(text=f"❌ 查詢最新地震失敗:{e}")]
 
66
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
67
 
68
  def process_message(user_message_raw: str, request_base_url: str) -> list:
69
  """處理使用者的文字訊息並回傳一個包含回覆訊息的列表。"""
70
  user_message = (user_message_raw or "").strip()
71
 
72
- # ... (舊指令相容性轉換不變) ...
 
 
 
 
 
 
 
 
73
 
74
  if user_message.startswith('/'):
75
  parts = user_message.split(' ', 1)
@@ -80,7 +98,7 @@ def process_message(user_message_raw: str, request_base_url: str) -> list:
80
  return [get_help_message()]
81
  elif command == '/info':
82
  return [get_info_message()]
83
- elif command == '/latest': # [新增]
84
  return get_latest_earthquake_reply()
85
  elif command == '/global':
86
  return [TextMessage(text=fetch_global_last24h_text())]
@@ -100,3 +118,36 @@ def process_message(user_message_raw: str, request_base_url: str) -> list:
100
  # 若非指令,則預設交給 AI 處理 (作為備援)
101
  return [TextMessage(text=generate_ai_text(user_message))]
102
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3
  from linebot.v3.messaging import TextMessage, ImageMessage
4
 
5
  # 匯入服務函式
6
+ from cwa_service import fetch_cwa_alarm_list, fetch_significant_earthquakes
7
  from usgs_service import fetch_global_last24h_text, fetch_taiwan_df_this_year
8
  from plotting_service import create_and_save_map
9
  from ai_service import generate_ai_text
 
14
  text = (
15
  "📖 指令列表\n\n"
16
  "【地震資訊】\n"
17
+ "• /latest - 查詢最新一筆顯著地震(含圖)\n"
18
  "• /global - 查詢全球近24小時顯著地震\n"
19
  "• /taiwan - 顯示今年台灣顯著地震列表\n"
20
  "• /map - 取得台灣顯著地震地圖連結\n"
 
28
  )
29
  return TextMessage(text=text)
30
 
31
+ def get_info_message() -> TextMessage:
32
+ """回傳機器人資訊。"""
33
+ text = (
34
+ "🤖 關於我\n\n"
35
+ "我是一個多功能助理機器人,提供地震查詢與 AI 對話功能。\n\n"
36
+ "• 版本: 2.5\n"
37
+ "• 資料來源: CWA, USGS, Hugging Face\n"
38
+ "• 開發者: dayichen"
39
+ )
40
+ return TextMessage(text=text)
 
 
 
 
 
 
 
 
 
 
 
 
 
41
 
42
+ def get_taiwan_earthquake_list() -> TextMessage:
43
+ """回傳近期的台灣地震文字列表。"""
44
+ result = fetch_taiwan_df_this_year()
45
+ if isinstance(result, pd.DataFrame):
46
+ count = len(result)
47
+ lines = [f"🇹🇼 今年 ({CURRENT_YEAR} 年) 台灣區域顯著地震 (M≥5.0),共 {count} 筆:", "-" * 20]
48
+ for _, row in result.head(15).iterrows():
49
+ t = row["time_utc"].strftime("%Y-%m-%d %H:%M")
50
+ lines.append(
51
+ # [修改] 將 "震級" 改為 "規模"
52
+ f"規模: {row['magnitude']:.1f} | 日期時間: {t} (UTC)\n"
53
+ f"地點: {row['place']}\n"
54
+ f"報告連結: {row.get('url', '無')}"
55
  )
56
+ if count > 15:
57
+ lines.append(f"... (還有 {count - 15} 筆,請用 /map 查看全部)")
58
+ reply_text = "\n\n".join(lines)
59
+ else:
60
+ reply_text = result
61
+ return TextMessage(text=reply_text)
62
 
63
+ def get_taiwan_earthquake_map(base_url: str) -> list:
64
+ """產生地震地圖,並回傳包含連結的文字訊息。"""
65
+ result = fetch_taiwan_df_this_year()
66
+ if isinstance(result, pd.DataFrame):
67
+ filename = create_and_save_map(result)
68
+ image_url = f"{(HF_SPACE_URL or base_url)}/static/{filename}"
69
+ reply_text = (
70
+ "🗺️ 已為您繪製地震分佈圖。\n\n"
71
+ "請點擊以下連結查看圖片:\n"
72
+ f"{image_url}"
73
+ )
74
+ return [TextMessage(text=reply_text)]
75
+ else:
76
+ return [TextMessage(text=result)]
77
 
78
  def process_message(user_message_raw: str, request_base_url: str) -> list:
79
  """處理使用者的文字訊息並回傳一個包含回覆訊息的列表。"""
80
  user_message = (user_message_raw or "").strip()
81
 
82
+ # 將舊指令對應到新指令,提供向下相容
83
+ if user_message.lower() in ["地震", "quake"]:
84
+ user_message = "/global"
85
+ elif user_message.lower() in ["台灣地震", "臺灣地震"]:
86
+ user_message = "/taiwan"
87
+ elif user_message.lower() in ["台灣地震畫圖", "臺灣地震畫圖"]:
88
+ user_message = "/map"
89
+ elif user_message.lower() == "地震預警":
90
+ user_message = "/alert"
91
 
92
  if user_message.startswith('/'):
93
  parts = user_message.split(' ', 1)
 
98
  return [get_help_message()]
99
  elif command == '/info':
100
  return [get_info_message()]
101
+ elif command == '/latest':
102
  return get_latest_earthquake_reply()
103
  elif command == '/global':
104
  return [TextMessage(text=fetch_global_last24h_text())]
 
118
  # 若非指令,則預設交給 AI 處理 (作為備援)
119
  return [TextMessage(text=generate_ai_text(user_message))]
120
 
121
+ # --- [新功能] 處理 /latest 指令的回覆 ---
122
+ def get_latest_earthquake_reply() -> list:
123
+ """獲取最新地震資訊並組合成 LINE 訊息"""
124
+ try:
125
+ latest_eq = fetch_latest_significant_earthquake()
126
+ if not latest_eq:
127
+ return [TextMessage(text="✅ 近期無顯著有感地震報告。")]
128
+
129
+ # 組合文字訊息
130
+ mag_str = f"{latest_eq['Magnitude']:.1f}" if latest_eq.get('Magnitude') is not None else "—"
131
+ depth_str = f"{latest_eq['Depth']:.0f}" if latest_eq.get('Depth') is not None else "—"
132
+
133
+ text_message_content = (
134
+ f"🚨 CWA 最新顯著有感地震\n"
135
+ f"----------------------------------\n"
136
+ f"時間: {latest_eq.get('TimeStr', '—')}\n"
137
+ f"地點: {latest_eq.get('Location', '—')}\n"
138
+ f"規模: M{mag_str} | 深度: {depth_str} km\n"
139
+ f"報告: {latest_eq.get('URL', '無')}"
140
+ )
141
+ reply_messages = [TextMessage(text=text_message_content)]
142
+
143
+ # 如果有圖片網址,則新增圖片訊息
144
+ if latest_eq.get("ImageURL"):
145
+ image_url = latest_eq["ImageURL"]
146
+ reply_messages.append(
147
+ ImageMessage(original_content_url=image_url, preview_image_url=image_url)
148
+ )
149
+
150
+ return reply_messages
151
+
152
+ except Exception as e:
153
+ return [TextMessage(text=f"❌ 查詢最新地震失敗:{e}")]