Spaces:
Sleeping
Sleeping
File size: 6,996 Bytes
5c41367 071beff b0ea870 f7db653 ce425c0 22f516b 89c2715 ce425c0 22f516b 22c6d2b ce425c0 f7db653 ce425c0 7363269 0acfd1d 7363269 0acfd1d 7363269 0acfd1d 7363269 5da9c98 7363269 6ddbfb0 7363269 bc5d366 22c6d2b bc5d366 ce425c0 2da8e5c ce425c0 5c41367 22f516b ce425c0 f7db653 b0ea870 ce425c0 b0ea870 ce425c0 139ddee ce425c0 b0ea870 ce425c0 f7db653 ce425c0 b0ea870 ce425c0 b0ea870 ce425c0 b0ea870 f7db653 b0ea870 5c41367 f7db653 5c41367 f7db653 5c41367 b0ea870 f7db653 0acfd1d 5da9c98 ce425c0 5da9c98 22c6d2b bc5d366 5da9c98 b0ea870 f7db653 7363269 b0ea870 5c41367 bc5d366 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 |
# command_handler.py (Silent on non-command messages)
import pandas as pd
import re
from datetime import datetime, timedelta
from linebot.v3.messaging import TextMessage, ImageMessage
# 匯入所有服務函式
from cwa_service import fetch_cwa_alarm_list, fetch_significant_earthquakes, fetch_latest_significant_earthquake
from usgs_service import fetch_global_last24h_text, fetch_taiwan_df_this_year
from ai_service import generate_ai_text
from config import CURRENT_YEAR, MCP_SERVER_URL
from pws_service import fetch_cwa_pws_earthquake_info
from news_service import fetch_today_news # 匯入新函式
def get_help_message() -> TextMessage:
"""產生並回傳包含所有指令的說明文字,並附上英文翻譯。"""
text = (
"📖 指令列表 (Command List)\n"
" (請直接輸入數字 / Please enter a number)\n\n"
"【地震資訊 (Earthquake Info)】\n"
"• 1 - 最新一筆顯著地震 (含圖)\n"
" (Latest Significant Earthquake w/ Map)\n"
"• 2 - 全球近24小時顯著地震 (USGS)\n"
" (Global Quakes in 24h - USGS)\n"
"• 3 - 今年台灣顯著地震列表 (USGS)\n"
" (Taiwan Quakes This Year - USGS)\n"
"• 4 - CWA 地震目錄查詢 (外部連結)\n"
" (CWA Catalog Search - External Link)\n"
"• 5 - CWA 最新地震預警\n"
" (Latest CWA Earthquake Alerts)\n"
"• 6 - CWA 最近7天顯著有感地震\n"
" (Significant Quakes in 7 Days - CWA)\n\n"
"【AI 與工具 (AI & Tools)】\n"
"• 7 <問題> - 與 AI 助理對話\n"
" (Chat with AI Assistant)\n"
" (e.g., 7 去年最大的地震)\n\n"
"【基本指令 (Basic Commands)】\n"
"• 8 - 關於此機器人 (About this Bot)\n"
"• 9 - 顯示此說明 (Show this Help Message)\n\n"
"【其他服務 (Other Services)】\n"
"• 10 - 查詢今日新聞\n"
" (Today's News)\n"
"• 11 - 查詢最近地震PWS訊息\n"
" (Recent PWS Earthquake Info)"
)
return TextMessage(text=text)
def get_info_message() -> TextMessage:
"""產生並回傳關於此機器人的資訊。"""
text = (
"🤖 關於我\n\n"
"我是一個多功能助理機器人,提供地震查詢與 AI 對話功能。\n\n"
"• 版本: 5.5 (Silent Mode Edition)\n"
"• 資料來源: CWA, USGS, Google Gemini\n"
"• 開發者: dayichen"
)
return TextMessage(text=text)
def get_taiwan_earthquake_list() -> TextMessage:
result = fetch_taiwan_df_this_year()
if isinstance(result, pd.DataFrame):
count = len(result)
lines = [f"🇹🇼 今年 ({CURRENT_YEAR} 年) 台灣區域顯著地震 (M≥5.0),共 {count} 筆:", "-" * 20]
for _, row in result.head(15).iterrows():
t = row["time_utc"].strftime("%Y-%m-%d %H:%M")
lines.append(f"規模: {row['magnitude']:.1f} | 日期時間: {t} (UTC)\n地點: {row['place']}\n報告連結: {row.get('url', '無')}")
if count > 15: lines.append(f"... (還有 {count-15} 筆資料)")
reply_text = "\n\n".join(lines)
else: reply_text = result
return TextMessage(text=reply_text)
def get_latest_earthquake_reply() -> list:
try:
latest_eq = fetch_latest_significant_earthquake()
if not latest_eq: return [TextMessage(text="✅ 近期無顯著有感地震報告。")]
mag_str = f"{latest_eq['Magnitude']:.1f}" if latest_eq.get('Magnitude') is not None else "—"
depth_str = f"{latest_eq['Depth']:.0f}" if latest_eq.get('Depth') is not None else "—"
text_message_content = (f"🚨 CWA 最新顯著有感地震\n----------------------------------\n時間: {latest_eq.get('TimeStr', '—')}\n地點: {latest_eq.get('Location', '—')}\n規模: M{mag_str} | 深度: {depth_str} km\n報告: {latest_eq.get('URL', '無')}")
reply_messages = [TextMessage(text=text_message_content)]
if latest_eq.get("ImageURL"):
image_url = latest_eq["ImageURL"]
reply_messages.append(ImageMessage(original_content_url=image_url, preview_image_url=image_url))
return reply_messages
except Exception as e: return [TextMessage(text=f"❌ 查詢最新地震失敗:{e}")]
def preprocess_ai_prompt(prompt: str) -> str:
today = datetime.now()
year_match = re.search(r"(\d{4})年", prompt)
time_keywords = { "昨天": (today - timedelta(days=1)).strftime("%Y-%m-%d"), "今天": today.strftime("%Y-%m-%d"), "今年": str(today.year), "去年": str(today.year - 1), }
if year_match: time_keywords[year_match.group(0)] = year_match.group(1)
if "地震" in prompt:
for keyword, date_val in time_keywords.items():
if keyword in prompt:
if len(date_val) == 10: start_date = end_date = date_val
else: start_date, end_date = f"{date_val}-01-01", f"{date_val}-12-31"
if "最大" in prompt: new_prompt = f"請幫我呼叫地震搜尋工具,找出從 {start_date} 到 {end_date} 之間,規模最大的地震是哪一個。"
else: new_prompt = f"請幫我呼叫地震搜尋工具,查詢從 {start_date} 到 {end_date} 之間發生的所有地震。"
print(f"--- AI Prompt Rewritten ---\nOriginal: {prompt}\nNew: {new_prompt}\n---------------------------")
return new_prompt
return prompt
def process_message(user_message_raw: str, request_base_url: str) -> list:
user_message = (user_message_raw or "").strip()
if not user_message: return []
parts = user_message.split(' ', 1)
command_key = parts[0]
arg = parts[1].strip() if len(parts) > 1 else ""
if command_key == '1': return get_latest_earthquake_reply()
if command_key == '2': return [TextMessage(text=fetch_global_last24h_text())]
if command_key == '3': return [get_taiwan_earthquake_list()]
if command_key == '4': return [TextMessage(text=f"🗺️ 外部地震查詢服務\n\n請點擊以下連結:\n{MCP_SERVER_URL}")]
if command_key == '5': return [TextMessage(text=fetch_cwa_alarm_list(limit=5))]
if command_key == '6': return [TextMessage(text=fetch_significant_earthquakes(limit=5))]
if command_key == '8': return [get_info_message()]
if command_key == '9': return [get_help_message()]
if command_key == '10': return [TextMessage(text=fetch_today_news())]
if command_key == '11': return [TextMessage(text=fetch_cwa_pws_earthquake_info())]
if command_key == '7':
if not arg: return [TextMessage(text="請輸入問題,例如:7 台灣最高的山是哪座?")]
processed_prompt = preprocess_ai_prompt(arg)
return [TextMessage(text=generate_ai_text(processed_prompt))]
# [核心修改]
# 對於所有其他無法識別的訊息 (一般對話),回傳一個空列表,代表「不回應」。
return [] |