cwadayi commited on
Commit
9fb8db5
·
verified ·
1 Parent(s): 54641dd

Update cwa_service.py

Browse files
Files changed (1) hide show
  1. cwa_service.py +47 -7
cwa_service.py CHANGED
@@ -9,21 +9,62 @@ from config import CWA_API_KEY, CWA_ALARM_API, CWA_SIGNIFICANT_API
9
 
10
  TAIPEI_TZ = timezone(timedelta(hours=8))
11
 
12
- # --- Helper Function from Gradio Script ---
13
  def _to_float(x):
14
  if x is None: return None
15
  s = str(x).strip()
16
  m = re.search(r"[-+]?\d+(?:\.\d+)?", s)
17
  return float(m.group()) if m else None
18
 
19
- # --- 地震預警 (舊功能) ---
 
 
 
 
 
 
 
 
 
 
 
20
  def fetch_cwa_alarm_list(limit: int = 5) -> str:
21
- # ... (此函式內容不變) ...
22
- return "\n".join(lines).strip()
 
 
 
 
 
23
 
24
- # --- [新功能] 顯著有感地震 (E-A0015-001) ---
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
25
  def _parse_significant_earthquakes(obj: dict) -> pd.DataFrame:
26
- """移植自 Gradio 程式碼,用於解析 E-A0015-001 的 JSON 資料"""
27
  quakes = obj.get("records", {}).get("Earthquake", [])
28
  rows = []
29
  for q in quakes:
@@ -81,4 +122,3 @@ def fetch_significant_earthquakes(days: int = 7, limit: int = 5) -> str:
81
 
82
  except Exception as e:
83
  return f"❌ 顯著地震查詢失敗:{e}"
84
-
 
9
 
10
  TAIPEI_TZ = timezone(timedelta(hours=8))
11
 
12
+ # --- Helper Functions ---
13
  def _to_float(x):
14
  if x is None: return None
15
  s = str(x).strip()
16
  m = re.search(r"[-+]?\d+(?:\.\d+)?", s)
17
  return float(m.group()) if m else None
18
 
19
+ def _parse_cwa_time(s: str) -> tuple[str, str]:
20
+ """回傳 (台灣時間, UTC);若字串無時區,預設視為台灣時間。"""
21
+ if not s: return ("未知", "未知")
22
+ try:
23
+ dt = datetime.fromisoformat(s.replace("Z", "+00:00"))
24
+ tw = dt.astimezone(TAIPEI_TZ).strftime("%Y-%m-%d %H:%M")
25
+ utc = dt.astimezone(timezone.utc).strftime("%Y-%m-%d %H:%M")
26
+ return (tw, utc)
27
+ except Exception:
28
+ return (s, "未知")
29
+
30
+ # --- 地震預警 (CWA_ALARM_API) ---
31
  def fetch_cwa_alarm_list(limit: int = 5) -> str:
32
+ """抓 CWA 地震預警並格式化輸出。"""
33
+ try:
34
+ r = requests.get(CWA_ALARM_API, timeout=10)
35
+ r.raise_for_status()
36
+ payload = r.json()
37
+ except Exception as e:
38
+ return f"❌ 地震預警查詢失敗:{e}"
39
 
40
+ items = payload.get("data", [])
41
+ if not items:
42
+ return "✅ 目前沒有地震預警。"
43
+
44
+ def _key(it):
45
+ try:
46
+ return datetime.fromisoformat(it.get("originTime", "").replace("Z", "+00:00"))
47
+ except:
48
+ return datetime.min.replace(tzinfo=timezone.utc)
49
+
50
+ items = sorted(items, key=_key, reverse=True)
51
+
52
+ lines = ["🚨 地震預警(最新):", "-" * 20]
53
+ for it in items[:limit]:
54
+ mag = _to_float(it.get("magnitudeValue"))
55
+ depth = _to_float(it.get("depth"))
56
+ tw_str, utc_str = _parse_cwa_time(it.get("originTime", ""))
57
+ lines.append(
58
+ f"事件: {it.get('identifier', '—')} | 類型: {it.get('msgType', '—')}#{it.get('msgNo', '—')}\n"
59
+ f"震級/深度: M{mag:.1f} / {depth:.0f} km\n"
60
+ f"時間: {tw_str}(台灣)\n"
61
+ f"預警地區: {it.get('alertAreas') or '—'}"
62
+ )
63
+ return "\n\n".join(lines).strip()
64
+
65
+ # --- 顯著有感地震 (E-A0015-001) ---
66
  def _parse_significant_earthquakes(obj: dict) -> pd.DataFrame:
67
+ """解析 E-A0015-001 的 JSON 資料"""
68
  quakes = obj.get("records", {}).get("Earthquake", [])
69
  rows = []
70
  for q in quakes:
 
122
 
123
  except Exception as e:
124
  return f"❌ 顯著地震查詢失敗:{e}"