Spaces:
Sleeping
Sleeping
Update cwa_service.py
Browse files- cwa_service.py +12 -9
cwa_service.py
CHANGED
|
@@ -28,7 +28,7 @@ def _parse_cwa_time(s: str) -> tuple[str, str]:
|
|
| 28 |
|
| 29 |
# --- 地震預警 (CWA_ALARM_API) ---
|
| 30 |
def fetch_cwa_alarm_list(limit: int = 5) -> str:
|
| 31 |
-
|
| 32 |
try:
|
| 33 |
r = requests.get(CWA_ALARM_API, timeout=10)
|
| 34 |
r.raise_for_status()
|
|
@@ -60,7 +60,7 @@ def fetch_cwa_alarm_list(limit: int = 5) -> str:
|
|
| 60 |
depth_str = f"{depth:.0f}" if depth is not None else "—"
|
| 61 |
lines.append(
|
| 62 |
f"事件: {identifier} | 類型: {msg_type}#{msg_no}\n"
|
| 63 |
-
f"
|
| 64 |
f"時間: {tw_str}(台灣)\n"
|
| 65 |
f"預警地區: {areas}"
|
| 66 |
)
|
|
@@ -68,16 +68,17 @@ def fetch_cwa_alarm_list(limit: int = 5) -> str:
|
|
| 68 |
|
| 69 |
# --- 顯著有感地震 (E-A0015-001) ---
|
| 70 |
def _parse_significant_earthquakes(obj: dict) -> pd.DataFrame:
|
| 71 |
-
"""[修改]
|
| 72 |
-
|
|
|
|
| 73 |
rows = []
|
| 74 |
for q in quakes:
|
| 75 |
-
#
|
| 76 |
ei = q.get("EarthquakeInfo") or q.get("earthquakeInfo") or {}
|
| 77 |
epic = ei.get("Epicenter") or ei.get("epicenter") or {}
|
| 78 |
-
mag_info = ei.get("Magnitude") or ei.get("magnitude") or {}
|
| 79 |
-
|
| 80 |
-
#
|
| 81 |
depth_raw = ei.get("FocalDepth") or ei.get("depth") or ei.get("Depth")
|
| 82 |
mag_raw = mag_info.get("MagnitudeValue") or mag_info.get("magnitudeValue") or mag_info.get("Value") or mag_info.get("value")
|
| 83 |
|
|
@@ -95,6 +96,7 @@ def _parse_significant_earthquakes(obj: dict) -> pd.DataFrame:
|
|
| 95 |
if not df.empty and "Time" in df.columns:
|
| 96 |
time_series = pd.to_datetime(df["Time"], errors="coerce")
|
| 97 |
if pd.api.types.is_datetime64_any_dtype(time_series):
|
|
|
|
| 98 |
df["Time"] = time_series.dt.tz_localize("UTC").dt.tz_convert(TAIPEI_TZ)
|
| 99 |
return df
|
| 100 |
|
|
@@ -118,9 +120,10 @@ def fetch_significant_earthquakes(days: int = 7, limit: int = 5) -> str:
|
|
| 118 |
lines.append(
|
| 119 |
f"時間: {row['Time'].strftime('%Y-%m-%d %H:%M') if pd.notna(row['Time']) else '—'}\n"
|
| 120 |
f"地點: {row['Location'] or '—'}\n"
|
| 121 |
-
f"規模: M{mag_str} | 深度: {depth_str} km\n"
|
| 122 |
f"報告: {row['URL'] or '無'}"
|
| 123 |
)
|
| 124 |
return "\n\n".join(lines)
|
| 125 |
except Exception as e:
|
| 126 |
return f"❌ 顯著地震查詢失敗:{e}"
|
|
|
|
|
|
| 28 |
|
| 29 |
# --- 地震預警 (CWA_ALARM_API) ---
|
| 30 |
def fetch_cwa_alarm_list(limit: int = 5) -> str:
|
| 31 |
+
"""抓 CWA 地震預警並格式化輸出。"""
|
| 32 |
try:
|
| 33 |
r = requests.get(CWA_ALARM_API, timeout=10)
|
| 34 |
r.raise_for_status()
|
|
|
|
| 60 |
depth_str = f"{depth:.0f}" if depth is not None else "—"
|
| 61 |
lines.append(
|
| 62 |
f"事件: {identifier} | 類型: {msg_type}#{msg_no}\n"
|
| 63 |
+
f"規模/深度: M{mag_str} / {depth_str} km\n"
|
| 64 |
f"時間: {tw_str}(台灣)\n"
|
| 65 |
f"預警地區: {areas}"
|
| 66 |
)
|
|
|
|
| 68 |
|
| 69 |
# --- 顯著有感地震 (E-A0015-001) ---
|
| 70 |
def _parse_significant_earthquakes(obj: dict) -> pd.DataFrame:
|
| 71 |
+
"""[修改] 採用參考程式碼中最穩健的解析邏輯,應對 CWA 不穩定的資料格式"""
|
| 72 |
+
records = obj.get("records") or obj.get("Records") or {}
|
| 73 |
+
quakes = records.get("earthquake") or records.get("Earthquake") or []
|
| 74 |
rows = []
|
| 75 |
for q in quakes:
|
| 76 |
+
# 兼容各種大小寫和可能的父節點名稱
|
| 77 |
ei = q.get("EarthquakeInfo") or q.get("earthquakeInfo") or {}
|
| 78 |
epic = ei.get("Epicenter") or ei.get("epicenter") or {}
|
| 79 |
+
mag_info = ei.get("Magnitude") or ei.get("magnitude") or ei.get("EarthquakeMagnitude") or {}
|
| 80 |
+
|
| 81 |
+
# 嘗試用所有已知的欄位名稱去取得資料
|
| 82 |
depth_raw = ei.get("FocalDepth") or ei.get("depth") or ei.get("Depth")
|
| 83 |
mag_raw = mag_info.get("MagnitudeValue") or mag_info.get("magnitudeValue") or mag_info.get("Value") or mag_info.get("value")
|
| 84 |
|
|
|
|
| 96 |
if not df.empty and "Time" in df.columns:
|
| 97 |
time_series = pd.to_datetime(df["Time"], errors="coerce")
|
| 98 |
if pd.api.types.is_datetime64_any_dtype(time_series):
|
| 99 |
+
# 假設 API 回傳的時間是 UTC 標準時間
|
| 100 |
df["Time"] = time_series.dt.tz_localize("UTC").dt.tz_convert(TAIPEI_TZ)
|
| 101 |
return df
|
| 102 |
|
|
|
|
| 120 |
lines.append(
|
| 121 |
f"時間: {row['Time'].strftime('%Y-%m-%d %H:%M') if pd.notna(row['Time']) else '—'}\n"
|
| 122 |
f"地點: {row['Location'] or '—'}\n"
|
| 123 |
+
f"規模: M{mag_str} | 深度: {depth_str} km\n"
|
| 124 |
f"報告: {row['URL'] or '無'}"
|
| 125 |
)
|
| 126 |
return "\n\n".join(lines)
|
| 127 |
except Exception as e:
|
| 128 |
return f"❌ 顯著地震查詢失敗:{e}"
|
| 129 |
+
|