cwadayi commited on
Commit
8041dc5
·
verified ·
1 Parent(s): 27b74a6

Update cwa_service.py

Browse files
Files changed (1) hide show
  1. cwa_service.py +12 -17
cwa_service.py CHANGED
@@ -17,7 +17,6 @@ def _to_float(x):
17
  return float(m.group()) if m else None
18
 
19
  def _parse_cwa_time(s: str) -> tuple[str, str]:
20
- """回傳 (台灣時間, UTC);若字串無時區,預設視為台灣時間。"""
21
  if not s: return ("未知", "未知")
22
  try:
23
  dt = datetime.fromisoformat(s.replace("Z", "+00:00"))
@@ -29,7 +28,6 @@ def _parse_cwa_time(s: str) -> tuple[str, str]:
29
 
30
  # --- 地震預警 (CWA_ALARM_API) ---
31
  def fetch_cwa_alarm_list(limit: int = 5) -> str:
32
- """抓 CWA 地震預警並格式化輸出。"""
33
  try:
34
  r = requests.get(CWA_ALARM_API, timeout=10)
35
  r.raise_for_status()
@@ -56,7 +54,7 @@ def fetch_cwa_alarm_list(limit: int = 5) -> str:
56
  tw_str, utc_str = _parse_cwa_time(it.get("originTime", ""))
57
  lines.append(
58
  f"事件: {it.get('identifier', '—')} | 類型: {it.get('msgType', '—')}#{it.get('msgNo', '—')}\n"
59
- f"震級/深度: M{mag:.1f} / {depth:.0f} km\n"
60
  f"時間: {tw_str}(台灣)\n"
61
  f"預警地區: {it.get('alertAreas') or '—'}"
62
  )
@@ -64,7 +62,6 @@ def fetch_cwa_alarm_list(limit: int = 5) -> str:
64
 
65
  # --- 顯著有感地震 (E-A0015-001) ---
66
  def _parse_significant_earthquakes(obj: dict) -> pd.DataFrame:
67
- """解析 E-A0015-001 的 JSON 資料"""
68
  quakes = obj.get("records", {}).get("Earthquake", [])
69
  rows = []
70
  for q in quakes:
@@ -84,28 +81,22 @@ def _parse_significant_earthquakes(obj: dict) -> pd.DataFrame:
84
  df = pd.DataFrame(rows)
85
  if not df.empty and "Time" in df.columns:
86
  time_series = pd.to_datetime(df["Time"], errors="coerce")
87
- # [修正] 先用 tz_localize('UTC') 賦予 UTC 時區,再用 tz_convert 轉換為台北時區
88
- df["Time"] = time_series.dt.tz_localize("UTC").dt.tz_convert(TAIPEI_TZ)
89
  return df
90
 
91
  def fetch_significant_earthquakes(days: int = 7, limit: int = 5) -> str:
92
- """從 CWA 獲取最新的顯著有感地震"""
93
  if not CWA_API_KEY:
94
  return "❌ 顯著地震查詢失敗:管理者尚未設定 CWA_API_KEY。"
95
 
96
  now = datetime.now(timezone.utc)
97
  time_from = (now - timedelta(days=days)).strftime("%Y-%m-%d")
98
-
99
- params = {
100
- "Authorization": CWA_API_KEY,
101
- "format": "JSON",
102
- "timeFrom": time_from,
103
- }
104
  try:
105
  r = requests.get(CWA_SIGNIFICANT_API, params=params, timeout=15)
106
  r.raise_for_status()
107
  data = r.json()
108
-
109
  df = _parse_significant_earthquakes(data)
110
  if df.empty:
111
  return f"✅ 過去 {days} 天內沒有顯著有感地震報告。"
@@ -114,10 +105,14 @@ def fetch_significant_earthquakes(days: int = 7, limit: int = 5) -> str:
114
 
115
  lines = [f"🚨 CWA 最新顯著有感地震 (近{days}天內):", "-" * 20]
116
  for _, row in df.iterrows():
 
 
 
 
117
  lines.append(
118
- f"時間: {row['Time'].strftime('%Y-%m-%d %H:%M')}\n"
119
- f"地點: {row['Location']}\n"
120
- f"震級: M{row['Magnitude']:.1f} | 深度: {row['Depth']:.0f} km\n"
121
  f"報告: {row['URL'] or '無'}"
122
  )
123
  return "\n\n".join(lines)
 
17
  return float(m.group()) if m else None
18
 
19
  def _parse_cwa_time(s: str) -> tuple[str, str]:
 
20
  if not s: return ("未知", "未知")
21
  try:
22
  dt = datetime.fromisoformat(s.replace("Z", "+00:00"))
 
28
 
29
  # --- 地震預警 (CWA_ALARM_API) ---
30
  def fetch_cwa_alarm_list(limit: int = 5) -> str:
 
31
  try:
32
  r = requests.get(CWA_ALARM_API, timeout=10)
33
  r.raise_for_status()
 
54
  tw_str, utc_str = _parse_cwa_time(it.get("originTime", ""))
55
  lines.append(
56
  f"事件: {it.get('identifier', '—')} | 類型: {it.get('msgType', '—')}#{it.get('msgNo', '—')}\n"
57
+ f"震級/深度: M{mag:.1f if mag is not None else '—'} / {depth:.0f if depth is not None else '—'} km\n"
58
  f"時間: {tw_str}(台灣)\n"
59
  f"預警地區: {it.get('alertAreas') or '—'}"
60
  )
 
62
 
63
  # --- 顯著有感地震 (E-A0015-001) ---
64
  def _parse_significant_earthquakes(obj: dict) -> pd.DataFrame:
 
65
  quakes = obj.get("records", {}).get("Earthquake", [])
66
  rows = []
67
  for q in quakes:
 
81
  df = pd.DataFrame(rows)
82
  if not df.empty and "Time" in df.columns:
83
  time_series = pd.to_datetime(df["Time"], errors="coerce")
84
+ if pd.api.types.is_datetime64_any_dtype(time_series):
85
+ df["Time"] = time_series.dt.tz_localize("UTC").dt.tz_convert(TAIPEI_TZ)
86
  return df
87
 
88
  def fetch_significant_earthquakes(days: int = 7, limit: int = 5) -> str:
 
89
  if not CWA_API_KEY:
90
  return "❌ 顯著地震查詢失敗:管理者尚未設定 CWA_API_KEY。"
91
 
92
  now = datetime.now(timezone.utc)
93
  time_from = (now - timedelta(days=days)).strftime("%Y-%m-%d")
94
+ params = {"Authorization": CWA_API_KEY, "format": "JSON", "timeFrom": time_from}
95
+
 
 
 
 
96
  try:
97
  r = requests.get(CWA_SIGNIFICANT_API, params=params, timeout=15)
98
  r.raise_for_status()
99
  data = r.json()
 
100
  df = _parse_significant_earthquakes(data)
101
  if df.empty:
102
  return f"✅ 過去 {days} 天內沒有顯著有感地震報告。"
 
105
 
106
  lines = [f"🚨 CWA 最新顯著有感地震 (近{days}天內):", "-" * 20]
107
  for _, row in df.iterrows():
108
+ # [修正] 在格式化前,先檢查值是否存在
109
+ mag_str = f"{row['Magnitude']:.1f}" if pd.notna(row['Magnitude']) else "—"
110
+ depth_str = f"{row['Depth']:.0f}" if pd.notna(row['Depth']) else "—"
111
+
112
  lines.append(
113
+ f"時間: {row['Time'].strftime('%Y-%m-%d %H:%M') if pd.notna(row['Time']) else '—'}\n"
114
+ f"地點: {row['Location'] or '—'}\n"
115
+ f"震級: M{mag_str} | 深度: {depth_str} km\n"
116
  f"報告: {row['URL'] or '無'}"
117
  )
118
  return "\n\n".join(lines)