cwadayi commited on
Commit
9377e8f
·
verified ·
1 Parent(s): 66f1d75

Update cwa_service.py

Browse files
Files changed (1) hide show
  1. cwa_service.py +2 -65
cwa_service.py CHANGED
@@ -1,11 +1,11 @@
1
- # cwa_service.py
2
  # -*- coding: utf-8 -*-
3
  from __future__ import annotations
4
  import requests
5
  import re
6
  import pandas as pd
7
  from datetime import datetime, timedelta, timezone
8
- from config import CWA_API_KEY, CWA_ALARM_API, CWA_SIGNIFICANT_API, CWA_LOCAL_EQ_API
9
 
10
  TAIPEI_TZ = timezone(timedelta(hours=8))
11
 
@@ -26,18 +26,6 @@ def _parse_cwa_time(s: str) -> tuple[str, str]:
26
  except Exception:
27
  return (s, "未知")
28
 
29
- def _normalize_cwa_area_name(area_name: str) -> str:
30
- """自動校正縣市名稱,以符合 CWA API 查詢需求"""
31
- area_name = area_name.replace("台", "臺")
32
- if area_name.endswith("市") or area_name.endswith("縣"):
33
- return area_name
34
-
35
- major_cities = "臺北,新北,基隆,桃園,新竹,臺中,嘉義,臺南,高雄".split(',')
36
- if any(city in area_name for city in major_cities):
37
- return f"{area_name}市"
38
- else:
39
- return f"{area_name}縣"
40
-
41
  # --- 地震預警 (CWA_ALARM_API) ---
42
  def fetch_cwa_alarm_list(limit: int = 5) -> str:
43
  """抓 CWA 地震預警並格式化輸出。"""
@@ -80,7 +68,6 @@ def fetch_cwa_alarm_list(limit: int = 5) -> str:
80
 
81
  # --- 顯著有感地震 (E-A0015-001) ---
82
  def _parse_significant_earthquakes(obj: dict) -> pd.DataFrame:
83
- """最穩健的解析邏輯,供 /significant 和 /latest 共用"""
84
  records = obj.get("records") or obj.get("Records") or {}
85
  quakes = records.get("earthquake") or records.get("Earthquake") or []
86
  rows = []
@@ -88,10 +75,8 @@ def _parse_significant_earthquakes(obj: dict) -> pd.DataFrame:
88
  ei = q.get("EarthquakeInfo") or q.get("earthquakeInfo") or {}
89
  epic = ei.get("Epicenter") or ei.get("epicenter") or {}
90
  mag_info = ei.get("Magnitude") or ei.get("magnitude") or ei.get("EarthquakeMagnitude") or {}
91
-
92
  depth_raw = ei.get("FocalDepth") or ei.get("depth") or ei.get("Depth")
93
  mag_raw = mag_info.get("MagnitudeValue") or mag_info.get("magnitudeValue") or mag_info.get("Value") or mag_info.get("value")
94
-
95
  rows.append({
96
  "ID": q.get("EarthquakeNo"), "Time": ei.get("OriginTime"),
97
  "Lat": _to_float(epic.get("EpicenterLatitude") or epic.get("epicenterLatitude")),
@@ -119,7 +104,6 @@ def fetch_significant_earthquakes(days: int = 7, limit: int = 5) -> str:
119
  df = _parse_significant_earthquakes(data)
120
  if df.empty: return f"✅ 過去 {days} 天內沒有顯著有感地震報告。"
121
  df = df.sort_values(by="Time", ascending=False).head(limit)
122
-
123
  lines = [f"🚨 CWA 最新顯著有感地震 (近{days}天內):", "-" * 20]
124
  for _, row in df.iterrows():
125
  mag_str = f"{row['Magnitude']:.1f}" if pd.notna(row['Magnitude']) else "—"
@@ -147,7 +131,6 @@ def fetch_latest_significant_earthquake() -> dict | None:
147
  r = requests.get(CWA_SIGNIFICANT_API, params=params, timeout=15)
148
  r.raise_for_status()
149
  data = r.json()
150
-
151
  df = _parse_significant_earthquakes(data)
152
  if df.empty:
153
  return None
@@ -162,49 +145,3 @@ def fetch_latest_significant_earthquake() -> dict | None:
162
  latest_eq_data["TimeStr"] = latest_eq_data["Time"].strftime('%Y-%m-%d %H:%M')
163
 
164
  return latest_eq_data
165
-
166
- # --- 小區域有感地震 (E-A0016-001) ---
167
- def fetch_local_earthquakes(area_name: str = "", limit: int = 5) -> str:
168
- """從 CWA 獲取小區域有感地震報告。如果 area_name 為空,則查詢全台灣。"""
169
- if not CWA_API_KEY: return "❌ 查詢失敗:管理者尚未設定 CWA_API_KEY。"
170
-
171
- params = {"Authorization": CWA_API_KEY, "format": "JSON", "limit": limit}
172
- title = "🇹🇼 台灣近期小區域有感地震:"
173
-
174
- if area_name:
175
- normalized_area = _normalize_cwa_area_name(area_name)
176
- params["AreaName"] = normalized_area
177
- title = f"🚨 「{normalized_area}」近期小區域有感地震:"
178
-
179
- try:
180
- r = requests.get(CWA_LOCAL_EQ_API, params=params, timeout=15)
181
- r.raise_for_status()
182
- data = r.json()
183
-
184
- earthquakes = data.get("records", {}).get("Earthquake", [])
185
- if not earthquakes:
186
- msg = f"✅ 在「{area_name}」" if area_name else "✅ 台灣"
187
- return f"{msg}近期沒有小區域有感地震報告。"
188
-
189
- lines = [title, "-" * 20]
190
- for eq in earthquakes:
191
- info = eq.get("earthquakeInfo", {})
192
- epi = info.get("epicenter", {})
193
- mag = info.get("magnitude", {})
194
- depth = info.get("depth", {})
195
-
196
- intensity_areas = eq.get("intensity", {}).get("shakingArea", [])
197
- area_strs = [f"{area.get('areaDesc')} {area.get('areaIntensity')}級" for area in intensity_areas if area.get("areaIntensity") and float(area.get("areaIntensity", 0)) > 0]
198
- intensity_str = "、".join(area_strs) if area_strs else "無具體震度回報"
199
-
200
- lines.append(
201
- f"報告: {eq.get('reportContent', '—')}\n"
202
- f"時間: {info.get('originTime', '—')}\n"
203
- f"地點: {epi.get('location', '—')}\n"
204
- f"規模: M{mag.get('magnitudeValue', '—')} | 深度: {depth.get('value', '—')} km\n"
205
- f"主要影響區域: {intensity_str}"
206
- )
207
- return "\n\n".join(lines)
208
-
209
- except Exception as e:
210
- return f"❌ 小區域地震查詢失敗:{e}"
 
1
+ # cwa_service.py
2
  # -*- coding: utf-8 -*-
3
  from __future__ import annotations
4
  import requests
5
  import re
6
  import pandas as pd
7
  from datetime import datetime, timedelta, timezone
8
+ from config import CWA_API_KEY, CWA_ALARM_API, CWA_SIGNIFICANT_API
9
 
10
  TAIPEI_TZ = timezone(timedelta(hours=8))
11
 
 
26
  except Exception:
27
  return (s, "未知")
28
 
 
 
 
 
 
 
 
 
 
 
 
 
29
  # --- 地震預警 (CWA_ALARM_API) ---
30
  def fetch_cwa_alarm_list(limit: int = 5) -> str:
31
  """抓 CWA 地震預警並格式化輸出。"""
 
68
 
69
  # --- 顯著有感地震 (E-A0015-001) ---
70
  def _parse_significant_earthquakes(obj: dict) -> pd.DataFrame:
 
71
  records = obj.get("records") or obj.get("Records") or {}
72
  quakes = records.get("earthquake") or records.get("Earthquake") or []
73
  rows = []
 
75
  ei = q.get("EarthquakeInfo") or q.get("earthquakeInfo") or {}
76
  epic = ei.get("Epicenter") or ei.get("epicenter") or {}
77
  mag_info = ei.get("Magnitude") or ei.get("magnitude") or ei.get("EarthquakeMagnitude") or {}
 
78
  depth_raw = ei.get("FocalDepth") or ei.get("depth") or ei.get("Depth")
79
  mag_raw = mag_info.get("MagnitudeValue") or mag_info.get("magnitudeValue") or mag_info.get("Value") or mag_info.get("value")
 
80
  rows.append({
81
  "ID": q.get("EarthquakeNo"), "Time": ei.get("OriginTime"),
82
  "Lat": _to_float(epic.get("EpicenterLatitude") or epic.get("epicenterLatitude")),
 
104
  df = _parse_significant_earthquakes(data)
105
  if df.empty: return f"✅ 過去 {days} 天內沒有顯著有感地震報告。"
106
  df = df.sort_values(by="Time", ascending=False).head(limit)
 
107
  lines = [f"🚨 CWA 最新顯著有感地震 (近{days}天內):", "-" * 20]
108
  for _, row in df.iterrows():
109
  mag_str = f"{row['Magnitude']:.1f}" if pd.notna(row['Magnitude']) else "—"
 
131
  r = requests.get(CWA_SIGNIFICANT_API, params=params, timeout=15)
132
  r.raise_for_status()
133
  data = r.json()
 
134
  df = _parse_significant_earthquakes(data)
135
  if df.empty:
136
  return None
 
145
  latest_eq_data["TimeStr"] = latest_eq_data["Time"].strftime('%Y-%m-%d %H:%M')
146
 
147
  return latest_eq_data