cwadayi commited on
Commit
f08e090
·
verified ·
1 Parent(s): 4fe0fbd

Update cwa_service.py

Browse files
Files changed (1) hide show
  1. cwa_service.py +17 -16
cwa_service.py CHANGED
@@ -28,8 +28,7 @@ def _parse_cwa_time(s: str) -> tuple[str, str]:
28
 
29
  # --- 地震預警 (CWA_ALARM_API) ---
30
  def fetch_cwa_alarm_list(limit: int = 5) -> str:
31
- """抓 CWA 地震預警並格式化輸出。"""
32
- # ... (此函式內容不變) ...
33
  try:
34
  r = requests.get(CWA_ALARM_API, timeout=10)
35
  r.raise_for_status()
@@ -69,35 +68,37 @@ def fetch_cwa_alarm_list(limit: int = 5) -> str:
69
 
70
  # --- 顯著有感地震 (E-A0015-001) ---
71
  def _parse_significant_earthquakes(obj: dict) -> pd.DataFrame:
72
- """[修改] 採用更穩健的方式解析 E-A0015-001 的 JSON 資料"""
73
  quakes = obj.get("records", {}).get("Earthquake", [])
74
  rows = []
75
  for q in quakes:
76
- ei = q.get("EarthquakeInfo", {})
77
- epic = ei.get("Epicenter", {})
78
- mag_info = ei.get("Magnitude", {})
 
79
 
80
- # 嘗試用多種可能的欄位名稱去取得資料
81
- depth_raw = ei.get("FocalDepth") or ei.get("Depth")
82
- mag_raw = mag_info.get("MagnitudeValue") or mag_info.get("value") or mag_info.get("Magnitude")
83
 
84
  rows.append({
85
- "ID": q.get("EarthquakeNo"), "Time": ei.get("OriginTime"),
86
- "Lat": _to_float(epic.get("EpicenterLatitude")), "Lon": _to_float(epic.get("EpicenterLongitude")),
 
 
87
  "Depth": _to_float(depth_raw),
88
  "Magnitude": _to_float(mag_raw),
89
- "Location": epic.get("Location"), "URL": q.get("Web"),
 
90
  })
91
  df = pd.DataFrame(rows)
92
  if not df.empty and "Time" in df.columns:
93
  time_series = pd.to_datetime(df["Time"], errors="coerce")
94
  if pd.api.types.is_datetime64_any_dtype(time_series):
95
- # 假設 API 回傳的時間是 UTC 標準時間
96
  df["Time"] = time_series.dt.tz_localize("UTC").dt.tz_convert(TAIPEI_TZ)
97
  return df
98
 
99
  def fetch_significant_earthquakes(days: int = 7, limit: int = 5) -> str:
100
- # ... (此函式上半部分不變) ...
101
  if not CWA_API_KEY: return "❌ 顯著地震查詢失敗:管理者尚未設定 CWA_API_KEY。"
102
  now = datetime.now(timezone.utc)
103
  time_from = (now - timedelta(days=days)).strftime("%Y-%m-%d")
@@ -109,6 +110,7 @@ def fetch_significant_earthquakes(days: int = 7, limit: int = 5) -> str:
109
  df = _parse_significant_earthquakes(data)
110
  if df.empty: return f"✅ 過去 {days} 天內沒有顯著有感地震報告。"
111
  df = df.sort_values(by="Time", ascending=False).head(limit)
 
112
  lines = [f"🚨 CWA 最新顯著有感地震 (近{days}天內):", "-" * 20]
113
  for _, row in df.iterrows():
114
  mag_str = f"{row['Magnitude']:.1f}" if pd.notna(row['Magnitude']) else "—"
@@ -116,10 +118,9 @@ def fetch_significant_earthquakes(days: int = 7, limit: int = 5) -> str:
116
  lines.append(
117
  f"時間: {row['Time'].strftime('%Y-%m-%d %H:%M') if pd.notna(row['Time']) else '—'}\n"
118
  f"地點: {row['Location'] or '—'}\n"
119
- f"震級: M{mag_str} | 深度: {depth_str} km\n"
120
  f"報告: {row['URL'] or '無'}"
121
  )
122
  return "\n\n".join(lines)
123
  except Exception as e:
124
  return f"❌ 顯著地震查詢失敗:{e}"
125
-
 
28
 
29
  # --- 地震預警 (CWA_ALARM_API) ---
30
  def fetch_cwa_alarm_list(limit: int = 5) -> str:
31
+ # ... (此函式內容不變,保持原樣) ...
 
32
  try:
33
  r = requests.get(CWA_ALARM_API, timeout=10)
34
  r.raise_for_status()
 
68
 
69
  # --- 顯著有感地震 (E-A0015-001) ---
70
  def _parse_significant_earthquakes(obj: dict) -> pd.DataFrame:
71
+ """[修改] 採用最穩健的方式解析 E-A0015-001 的 JSON,應對格式不一致問題"""
72
  quakes = obj.get("records", {}).get("Earthquake", [])
73
  rows = []
74
  for q in quakes:
75
+ # 兼容大小寫和不同寫法
76
+ ei = q.get("EarthquakeInfo") or q.get("earthquakeInfo") or {}
77
+ epic = ei.get("Epicenter") or ei.get("epicenter") or {}
78
+ mag_info = ei.get("Magnitude") or ei.get("magnitude") or {}
79
 
80
+ # 嘗試用所有可能的欄位名稱去取得資料
81
+ depth_raw = ei.get("FocalDepth") or ei.get("depth") or ei.get("Depth")
82
+ mag_raw = mag_info.get("MagnitudeValue") or mag_info.get("magnitudeValue") or mag_info.get("Value") or mag_info.get("value")
83
 
84
  rows.append({
85
+ "ID": q.get("EarthquakeNo"),
86
+ "Time": ei.get("OriginTime"),
87
+ "Lat": _to_float(epic.get("EpicenterLatitude") or epic.get("epicenterLatitude")),
88
+ "Lon": _to_float(epic.get("EpicenterLongitude") or epic.get("epicenterLongitude")),
89
  "Depth": _to_float(depth_raw),
90
  "Magnitude": _to_float(mag_raw),
91
+ "Location": epic.get("Location") or epic.get("location"),
92
+ "URL": q.get("Web") or q.get("ReportURL"),
93
  })
94
  df = pd.DataFrame(rows)
95
  if not df.empty and "Time" in df.columns:
96
  time_series = pd.to_datetime(df["Time"], errors="coerce")
97
  if pd.api.types.is_datetime64_any_dtype(time_series):
 
98
  df["Time"] = time_series.dt.tz_localize("UTC").dt.tz_convert(TAIPEI_TZ)
99
  return df
100
 
101
  def fetch_significant_earthquakes(days: int = 7, limit: int = 5) -> str:
 
102
  if not CWA_API_KEY: return "❌ 顯著地震查詢失敗:管理者尚未設定 CWA_API_KEY。"
103
  now = datetime.now(timezone.utc)
104
  time_from = (now - timedelta(days=days)).strftime("%Y-%m-%d")
 
110
  df = _parse_significant_earthquakes(data)
111
  if df.empty: return f"✅ 過去 {days} 天內沒有顯著有感地震報告。"
112
  df = df.sort_values(by="Time", ascending=False).head(limit)
113
+
114
  lines = [f"🚨 CWA 最新顯著有感地震 (近{days}天內):", "-" * 20]
115
  for _, row in df.iterrows():
116
  mag_str = f"{row['Magnitude']:.1f}" if pd.notna(row['Magnitude']) else "—"
 
118
  lines.append(
119
  f"時間: {row['Time'].strftime('%Y-%m-%d %H:%M') if pd.notna(row['Time']) else '—'}\n"
120
  f"地點: {row['Location'] or '—'}\n"
121
+ f"規模: M{mag_str} | 深度: {depth_str} km\n" # [修改] 將 "震級" 改為 "規模"
122
  f"報告: {row['URL'] or '無'}"
123
  )
124
  return "\n\n".join(lines)
125
  except Exception as e:
126
  return f"❌ 顯著地震查詢失敗:{e}"