cwadayi commited on
Commit
9824e85
·
verified ·
1 Parent(s): 647c085

fetch_cwa_alarm_list.py

Browse files
Files changed (1) hide show
  1. fetch_cwa_alarm_list.py +0 -104
fetch_cwa_alarm_list.py DELETED
@@ -1,104 +0,0 @@
1
- # -*- coding: utf-8 -*-
2
- from __future__ import annotations
3
- import requests
4
- from datetime import datetime, timedelta, timezone
5
-
6
- CWA_ALARM_API = "https://app-2.cwa.gov.tw/api/v1/earthquake/alarm/list"
7
-
8
- def _parse_cwa_time(s: str) -> tuple[str, str]:
9
- """回傳 (台灣時間, UTC);若字串無時區,預設視為台灣時間。"""
10
- if not s:
11
- return ("未知", "未知")
12
- try:
13
- if "T" in s or s.endswith("Z") or "+" in s:
14
- dt = datetime.fromisoformat(s.replace("Z", "+00:00"))
15
- else:
16
- dt = datetime.strptime(s, "%Y-%m-%d %H:%M:%S")
17
- dt = dt.replace(tzinfo=timezone(timedelta(hours=8)))
18
- tw = dt.astimezone(timezone(timedelta(hours=8))).strftime("%Y-%m-%d %H:%M")
19
- utc = dt.astimezone(timezone.utc).strftime("%Y-%m-%d %H:%M")
20
- return (tw, utc)
21
- except Exception:
22
- return (s, "未知")
23
-
24
- def fetch_cwa_alarm_list(limit: int = 5) -> str:
25
- """抓 CWA 地震預警並格式化輸出。"""
26
- try:
27
- r = requests.get(CWA_ALARM_API, timeout=10)
28
- r.raise_for_status()
29
- payload = r.json()
30
- except Exception as e:
31
- return f"❌ 地震預警查詢失敗:{e}"
32
-
33
- items = None
34
- if isinstance(payload, dict):
35
- items = payload.get("data") or payload.get("records") or payload.get("list") or payload.get("items")
36
- if items is None and isinstance(payload, list):
37
- items = payload
38
- if not items:
39
- return "✅ 目前沒有地震預警。"
40
-
41
- # 由新到舊
42
- def _key(it):
43
- s = it.get("originTime") or ""
44
- try:
45
- if "T" in s or s.endswith("Z") or "+" in s:
46
- dt = datetime.fromisoformat(s.replace("Z", "+00:00"))
47
- else:
48
- dt = datetime.strptime(s, "%Y-%m-%d %H:%M:%S").replace(tzinfo=timezone(timedelta(hours=8)))
49
- return dt.astimezone(timezone.utc)
50
- except Exception:
51
- return datetime.min.replace(tzinfo=timezone.utc)
52
-
53
- try:
54
- items = sorted(items, key=_key, reverse=True)
55
- except Exception:
56
- pass
57
-
58
- def _num(x):
59
- xs = str(x)
60
- ok = xs.replace(".", "", 1).replace("-", "", 1).isdigit()
61
- return float(xs) if ok else None
62
-
63
- lines = ["🚨 地震預警(最新):", "-" * 20]
64
- for idx, it in enumerate(items[:limit]):
65
- identifier = it.get("identifier") or it.get("eventId") or it.get("id") or "—"
66
- status = it.get("status") or "—"
67
- msg_type = it.get("msgType") or "—"
68
- msg_no = it.get("msgNo") or it.get("msgSeq") or "—"
69
-
70
- mag = _num(it.get("magnitudeValue") or it.get("magnitude") or it.get("ml") or it.get("mw"))
71
- mag_str = f"{mag:.1f}" if mag is not None else "—"
72
-
73
- depth = _num(it.get("depth"))
74
- depth_str = f"{depth:.0f}" if depth is not None else "—"
75
-
76
- lat = _num(it.get("epicenterLat") or it.get("latitude") or it.get("lat"))
77
- lon = _num(it.get("epicenterLon") or it.get("longitude") or it.get("lon"))
78
- lat_str = f"{lat:.2f}" if lat is not None else "—"
79
- lon_str = f"{lon:.2f}" if lon is not None else "—"
80
-
81
- origin = it.get("originTime") or ""
82
- tw_str, utc_str = _parse_cwa_time(origin)
83
-
84
- areas = it.get("locationDesc") or it.get("areas") or it.get("alertAreas")
85
- if isinstance(areas, list):
86
- areas_txt = "、".join(str(a) for a in areas if a)
87
- elif isinstance(areas, str):
88
- areas_txt = areas
89
- else:
90
- areas_txt = "—"
91
-
92
- lines.append(
93
- f"事件: {identifier} | 狀態: {status} | 類型: {msg_type}#{msg_no}\n"
94
- f"震級/深度: M{mag_str} / {depth_str} km\n"
95
- f"震中: lat {lat_str}, lon {lon_str}\n"
96
- f"時間: {tw_str}(台灣) / {utc_str}(UTC)\n"
97
- f"預警地區: {areas_txt}"
98
- )
99
- lines.append("")
100
-
101
- if len(items) > limit:
102
- lines.append(f"... 另有 {len(items) - limit} 筆。")
103
-
104
- return "\n".join(lines).strip()