LINE-ROBOT / cwa_service.py
cwadayi's picture
Update cwa_service.py
fdfb166 verified
raw
history blame
9.31 kB
# cwa_service.py
# -*- coding: utf-8 -*-
from __future__ import annotations
import requests
import re
import pandas as pd
from datetime import datetime, timedelta, timezone
from config import CWA_API_KEY, CWA_ALARM_API, CWA_SIGNIFICANT_API, CWA_LOCAL_EQ_API
TAIPEI_TZ = timezone(timedelta(hours=8))
# --- Helper Functions ---
def _to_float(x):
if x is None: return None
s = str(x).strip()
m = re.search(r"[-+]?\d+(?:\.\d+)?", s)
return float(m.group()) if m else None
def _parse_cwa_time(s: str) -> tuple[str, str]:
if not s: return ("未知", "未知")
try:
dt = datetime.fromisoformat(s.replace("Z", "+00:00"))
tw = dt.astimezone(TAIPEI_TZ).strftime("%Y-%m-%d %H:%M")
utc = dt.astimezone(timezone.utc).strftime("%Y-%m-%d %H:%M")
return (tw, utc)
except Exception:
return (s, "未知")
def _normalize_cwa_area_name(area_name: str) -> str:
"""自動校正縣市名稱,以符合 CWA API 查詢需求"""
area_name = area_name.replace("台", "臺")
if area_name.endswith("市") or area_name.endswith("縣"):
return area_name
major_cities = "臺北,新北,基隆,桃園,新竹,臺中,嘉義,臺南,高雄".split(',')
if any(city in area_name for city in major_cities):
return f"{area_name}市"
else:
return f"{area_name}縣"
# --- 地震預警 (CWA_ALARM_API) ---
def fetch_cwa_alarm_list(limit: int = 5) -> str:
"""抓 CWA 地震預警並格式化輸出。"""
try:
r = requests.get(CWA_ALARM_API, timeout=10)
r.raise_for_status()
payload = r.json()
except Exception as e:
return f"❌ 地震預警查詢失敗:{e}"
items = payload.get("data", [])
if not items:
return "✅ 目前沒有地震預警。"
def _key(it):
try:
return datetime.fromisoformat(it.get("originTime", "").replace("Z", "+00:00"))
except:
return datetime.min.replace(tzinfo=timezone.utc)
items = sorted(items, key=_key, reverse=True)
lines = ["🚨 地震預警(最新):", "-" * 20]
for it in items[:limit]:
mag = _to_float(it.get("magnitudeValue"))
depth = _to_float(it.get("depth"))
tw_str, _ = _parse_cwa_time(it.get("originTime", ""))
identifier = str(it.get('identifier', '—')).replace('{', '{{').replace('}', '}}')
msg_type = str(it.get('msgType', '—')).replace('{', '{{').replace('}', '}}')
msg_no = str(it.get('msgNo', '—')).replace('{', '{{').replace('}', '}}')
areas = str(it.get('alertAreas') or '—').replace('{', '{{').replace('}', '}}')
mag_str = f"{mag:.1f}" if mag is not None else "—"
depth_str = f"{depth:.0f}" if depth is not None else "—"
lines.append(
f"事件: {identifier} | 類型: {msg_type}#{msg_no}\n"
f"規模/深度: M{mag_str} / {depth_str} km\n"
f"時間: {tw_str}(台灣)\n"
f"預警地區: {areas}"
)
return "\n\n".join(lines).strip()
# --- 顯著有感地震 (E-A0015-001) ---
def _parse_significant_earthquakes(obj: dict) -> pd.DataFrame:
"""最穩健的解析邏輯,供 /significant 和 /latest 共用"""
records = obj.get("records") or obj.get("Records") or {}
quakes = records.get("earthquake") or records.get("Earthquake") or []
rows = []
for q in quakes:
ei = q.get("EarthquakeInfo") or q.get("earthquakeInfo") or {}
epic = ei.get("Epicenter") or ei.get("epicenter") or {}
mag_info = ei.get("Magnitude") or ei.get("magnitude") or ei.get("EarthquakeMagnitude") or {}
depth_raw = ei.get("FocalDepth") or ei.get("depth") or ei.get("Depth")
mag_raw = mag_info.get("MagnitudeValue") or mag_info.get("magnitudeValue") or mag_info.get("Value") or mag_info.get("value")
rows.append({
"ID": q.get("EarthquakeNo"), "Time": ei.get("OriginTime"),
"Lat": _to_float(epic.get("EpicenterLatitude") or epic.get("epicenterLatitude")),
"Lon": _to_float(epic.get("EpicenterLongitude") or epic.get("epicenterLongitude")),
"Depth": _to_float(depth_raw), "Magnitude": _to_float(mag_raw),
"Location": epic.get("Location") or epic.get("location"),
"URL": q.get("Web") or q.get("ReportURL"),
})
df = pd.DataFrame(rows)
if not df.empty and "Time" in df.columns:
time_series = pd.to_datetime(df["Time"], errors="coerce")
if pd.api.types.is_datetime64_any_dtype(time_series):
df["Time"] = time_series.dt.tz_localize("UTC").dt.tz_convert(TAIPEI_TZ)
return df
def fetch_significant_earthquakes(days: int = 7, limit: int = 5) -> str:
if not CWA_API_KEY: return "❌ 顯著地震查詢失敗:管理者尚未設定 CWA_API_KEY。"
now = datetime.now(timezone.utc)
time_from = (now - timedelta(days=days)).strftime("%Y-%m-%d")
params = {"Authorization": CWA_API_KEY, "format": "JSON", "timeFrom": time_from}
try:
r = requests.get(CWA_SIGNIFICANT_API, params=params, timeout=15)
r.raise_for_status()
data = r.json()
df = _parse_significant_earthquakes(data)
if df.empty: return f"✅ 過去 {days} 天內沒有顯著有感地震報告。"
df = df.sort_values(by="Time", ascending=False).head(limit)
lines = [f"🚨 CWA 最新顯著有感地震 (近{days}天內):", "-" * 20]
for _, row in df.iterrows():
mag_str = f"{row['Magnitude']:.1f}" if pd.notna(row['Magnitude']) else "—"
depth_str = f"{row['Depth']:.0f}" if pd.notna(row['Depth']) else "—"
lines.append(
f"時間: {row['Time'].strftime('%Y-%m-%d %H:%M') if pd.notna(row['Time']) else '—'}\n"
f"地點: {row['Location'] or '—'}\n"
f"規模: M{mag_str} | 深度: {depth_str} km\n"
f"報告: {row['URL'] or '無'}"
)
return "\n\n".join(lines)
except Exception as e:
return f"❌ 顯著地震查詢失敗:{e}"
# --- 最新一筆顯著地震 ---
def fetch_latest_significant_earthquake() -> dict | None:
"""從 CWA 獲取最新一筆顯著地震,並重用現有的解析邏輯"""
if not CWA_API_KEY:
raise ValueError("錯誤:尚未設定 CWA_API_KEY Secret。")
now = datetime.now(timezone.utc)
time_from = (now - timedelta(days=2)).strftime("%Y-%m-%dT%H:%M:%S")
params = {"Authorization": CWA_API_KEY, "format": "JSON", "limit": 1}
r = requests.get(CWA_SIGNIFICANT_API, params=params, timeout=15)
r.raise_for_status()
data = r.json()
df = _parse_significant_earthquakes(data)
if df.empty:
return None
latest_eq_data = df.sort_values(by="Time", ascending=False).iloc[0].to_dict()
quakes = data.get("records", {}).get("Earthquake", [])
if quakes:
latest_eq_data["ImageURL"] = quakes[0].get("ReportImageURI")
if pd.notna(latest_eq_data.get("Time")):
latest_eq_data["TimeStr"] = latest_eq_data["Time"].strftime('%Y-%m-%d %H:%M')
return latest_eq_data
# --- 小區域有感地震 (E-A0016-001) ---
def fetch_local_earthquakes(area_name: str = "", limit: int = 5) -> str:
"""從 CWA 獲取小區域有感地震報告。如果 area_name 為空,則查詢全台灣。"""
if not CWA_API_KEY: return "❌ 查詢失敗:管理者尚未設定 CWA_API_KEY。"
params = {"Authorization": CWA_API_KEY, "format": "JSON", "limit": limit}
title = "🇹🇼 台灣近期小區域有感地震:"
if area_name:
normalized_area = _normalize_cwa_area_name(area_name)
params["AreaName"] = normalized_area
title = f"🚨 「{normalized_area}」近期小區域有感地震:"
try:
r = requests.get(CWA_LOCAL_EQ_API, params=params, timeout=15)
r.raise_for_status()
data = r.json()
earthquakes = data.get("records", {}).get("Earthquake", [])
if not earthquakes:
msg = f"✅ 在「{area_name}」" if area_name else "✅ 台灣"
return f"{msg}近期沒有小區域有感地震報告。"
lines = [title, "-" * 20]
for eq in earthquakes:
info = eq.get("earthquakeInfo", {})
epi = info.get("epicenter", {})
mag = info.get("magnitude", {})
depth = info.get("depth", {})
intensity_areas = eq.get("intensity", {}).get("shakingArea", [])
area_strs = [f"{area.get('areaDesc')} {area.get('areaIntensity')}級" for area in intensity_areas if area.get("areaIntensity") and float(area.get("areaIntensity", 0)) > 0]
intensity_str = "、".join(area_strs) if area_strs else "無具體震度回報"
lines.append(
f"報告: {eq.get('reportContent', '—')}\n"
f"時間: {info.get('originTime', '—')}\n"
f"地點: {epi.get('location', '—')}\n"
f"規模: M{mag.get('magnitudeValue', '—')} | 深度: {depth.get('value', '—')} km\n"
f"主要影響區域: {intensity_str}"
)
return "\n\n".join(lines)
except Exception as e:
return f"❌ 小區域地震查詢失敗:{e}"