Spaces:
Sleeping
Sleeping
Update fetch_cwa_alarm_list.py
Browse files- fetch_cwa_alarm_list.py +9 -23
fetch_cwa_alarm_list.py
CHANGED
|
@@ -6,12 +6,7 @@ from datetime import datetime, timedelta, timezone
|
|
| 6 |
CWA_ALARM_API = "https://app-2.cwa.gov.tw/api/v1/earthquake/alarm/list"
|
| 7 |
|
| 8 |
def _parse_cwa_time(s: str) -> tuple[str, str]:
|
| 9 |
-
"""
|
| 10 |
-
解析 CWA 的 originTime。
|
| 11 |
-
- 若帶有時區或 'Z',尊重其時區;輸出台灣時間與 UTC。
|
| 12 |
-
- 若無時區(常見 'YYYY-MM-DD HH:MM:SS'),視為台灣時間 (UTC+8)。
|
| 13 |
-
回傳: (tw_str, utc_str) -> "YYYY-MM-DD HH:MM"
|
| 14 |
-
"""
|
| 15 |
if not s:
|
| 16 |
return ("未知", "未知")
|
| 17 |
try:
|
|
@@ -27,10 +22,7 @@ def _parse_cwa_time(s: str) -> tuple[str, str]:
|
|
| 27 |
return (s, "未知")
|
| 28 |
|
| 29 |
def fetch_cwa_alarm_list(limit: int = 5) -> str:
|
| 30 |
-
"""
|
| 31 |
-
取得中央氣象署「地震預警」清單並格式化為文字(抓常見欄位;最多 limit 筆)。
|
| 32 |
-
顯示:事件ID、狀態/訊息型別#序號、震級、深度、震中座標、台灣時間與UTC、預警地區。
|
| 33 |
-
"""
|
| 34 |
try:
|
| 35 |
r = requests.get(CWA_ALARM_API, timeout=10)
|
| 36 |
r.raise_for_status()
|
|
@@ -38,7 +30,6 @@ def fetch_cwa_alarm_list(limit: int = 5) -> str:
|
|
| 38 |
except Exception as e:
|
| 39 |
return f"❌ 地震預警查詢失敗:{e}"
|
| 40 |
|
| 41 |
-
# 取出清單
|
| 42 |
items = None
|
| 43 |
if isinstance(payload, dict):
|
| 44 |
items = payload.get("data") or payload.get("records") or payload.get("list") or payload.get("items")
|
|
@@ -47,7 +38,7 @@ def fetch_cwa_alarm_list(limit: int = 5) -> str:
|
|
| 47 |
if not items:
|
| 48 |
return "✅ 目前沒有地震預警。"
|
| 49 |
|
| 50 |
-
#
|
| 51 |
def _key(it):
|
| 52 |
s = it.get("originTime") or ""
|
| 53 |
try:
|
|
@@ -64,22 +55,18 @@ def fetch_cwa_alarm_list(limit: int = 5) -> str:
|
|
| 64 |
except Exception:
|
| 65 |
pass
|
| 66 |
|
| 67 |
-
|
| 68 |
-
|
| 69 |
-
|
| 70 |
-
if
|
| 71 |
-
break
|
| 72 |
|
|
|
|
|
|
|
| 73 |
identifier = it.get("identifier") or it.get("eventId") or it.get("id") or "—"
|
| 74 |
status = it.get("status") or "—"
|
| 75 |
msg_type = it.get("msgType") or "—"
|
| 76 |
msg_no = it.get("msgNo") or it.get("msgSeq") or "—"
|
| 77 |
|
| 78 |
-
def _num(x):
|
| 79 |
-
xs = str(x)
|
| 80 |
-
ok = xs.replace(".", "", 1).replace("-", "", 1).isdigit()
|
| 81 |
-
return float(xs) if ok else None
|
| 82 |
-
|
| 83 |
mag = _num(it.get("magnitudeValue") or it.get("magnitude") or it.get("ml") or it.get("mw"))
|
| 84 |
mag_str = f"{mag:.1f}" if mag is not None else "—"
|
| 85 |
|
|
@@ -110,7 +97,6 @@ def fetch_cwa_alarm_list(limit: int = 5) -> str:
|
|
| 110 |
f"預警地區: {areas_txt}"
|
| 111 |
)
|
| 112 |
lines.append("")
|
| 113 |
-
count += 1
|
| 114 |
|
| 115 |
if len(items) > limit:
|
| 116 |
lines.append(f"... 另有 {len(items) - limit} 筆。")
|
|
|
|
| 6 |
CWA_ALARM_API = "https://app-2.cwa.gov.tw/api/v1/earthquake/alarm/list"
|
| 7 |
|
| 8 |
def _parse_cwa_time(s: str) -> tuple[str, str]:
|
| 9 |
+
"""回傳 (台灣時間, UTC);若字串無時區,預設視為台灣時間。"""
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 10 |
if not s:
|
| 11 |
return ("未知", "未知")
|
| 12 |
try:
|
|
|
|
| 22 |
return (s, "未知")
|
| 23 |
|
| 24 |
def fetch_cwa_alarm_list(limit: int = 5) -> str:
|
| 25 |
+
"""抓 CWA 地震預警並格式化輸出。"""
|
|
|
|
|
|
|
|
|
|
| 26 |
try:
|
| 27 |
r = requests.get(CWA_ALARM_API, timeout=10)
|
| 28 |
r.raise_for_status()
|
|
|
|
| 30 |
except Exception as e:
|
| 31 |
return f"❌ 地震預警查詢失敗:{e}"
|
| 32 |
|
|
|
|
| 33 |
items = None
|
| 34 |
if isinstance(payload, dict):
|
| 35 |
items = payload.get("data") or payload.get("records") or payload.get("list") or payload.get("items")
|
|
|
|
| 38 |
if not items:
|
| 39 |
return "✅ 目前沒有地震預警。"
|
| 40 |
|
| 41 |
+
# 由新到舊
|
| 42 |
def _key(it):
|
| 43 |
s = it.get("originTime") or ""
|
| 44 |
try:
|
|
|
|
| 55 |
except Exception:
|
| 56 |
pass
|
| 57 |
|
| 58 |
+
def _num(x):
|
| 59 |
+
xs = str(x)
|
| 60 |
+
ok = xs.replace(".", "", 1).replace("-", "", 1).isdigit()
|
| 61 |
+
return float(xs) if ok else None
|
|
|
|
| 62 |
|
| 63 |
+
lines = ["🚨 地震預警(最新):", "-" * 20]
|
| 64 |
+
for idx, it in enumerate(items[:limit]):
|
| 65 |
identifier = it.get("identifier") or it.get("eventId") or it.get("id") or "—"
|
| 66 |
status = it.get("status") or "—"
|
| 67 |
msg_type = it.get("msgType") or "—"
|
| 68 |
msg_no = it.get("msgNo") or it.get("msgSeq") or "—"
|
| 69 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 70 |
mag = _num(it.get("magnitudeValue") or it.get("magnitude") or it.get("ml") or it.get("mw"))
|
| 71 |
mag_str = f"{mag:.1f}" if mag is not None else "—"
|
| 72 |
|
|
|
|
| 97 |
f"預警地區: {areas_txt}"
|
| 98 |
)
|
| 99 |
lines.append("")
|
|
|
|
| 100 |
|
| 101 |
if len(items) > limit:
|
| 102 |
lines.append(f"... 另有 {len(items) - limit} 筆。")
|