# cwa_service.py # -*- coding: utf-8 -*- from __future__ import annotations import requests import re import pandas as pd from datetime import datetime, timedelta, timezone from config import CWA_API_KEY, CWA_ALARM_API, CWA_SIGNIFICANT_API, CWA_LOCAL_EQ_API TAIPEI_TZ = timezone(timedelta(hours=8)) # --- Helper Functions --- def _to_float(x): if x is None: return None s = str(x).strip() m = re.search(r"[-+]?\d+(?:\.\d+)?", s) return float(m.group()) if m else None def _parse_cwa_time(s: str) -> tuple[str, str]: if not s: return ("未知", "未知") try: dt = datetime.fromisoformat(s.replace("Z", "+00:00")) tw = dt.astimezone(TAIPEI_TZ).strftime("%Y-%m-%d %H:%M") utc = dt.astimezone(timezone.utc).strftime("%Y-%m-%d %H:%M") return (tw, utc) except Exception: return (s, "未知") def _normalize_cwa_area_name(area_name: str) -> str: """自動校正縣市名稱,以符合 CWA API 查詢需求""" area_name = area_name.replace("台", "臺") if area_name.endswith("市") or area_name.endswith("縣"): return area_name major_cities = "臺北,新北,基隆,桃園,新竹,臺中,嘉義,臺南,高雄".split(',') if any(city in area_name for city in major_cities): return f"{area_name}市" else: return f"{area_name}縣" # --- 地震預警 (CWA_ALARM_API) --- def fetch_cwa_alarm_list(limit: int = 5) -> str: """抓 CWA 地震預警並格式化輸出。""" try: r = requests.get(CWA_ALARM_API, timeout=10) r.raise_for_status() payload = r.json() except Exception as e: return f"❌ 地震預警查詢失敗:{e}" items = payload.get("data", []) if not items: return "✅ 目前沒有地震預警。" def _key(it): try: return datetime.fromisoformat(it.get("originTime", "").replace("Z", "+00:00")) except: return datetime.min.replace(tzinfo=timezone.utc) items = sorted(items, key=_key, reverse=True) lines = ["🚨 地震預警(最新):", "-" * 20] for it in items[:limit]: mag = _to_float(it.get("magnitudeValue")) depth = _to_float(it.get("depth")) tw_str, _ = _parse_cwa_time(it.get("originTime", "")) identifier = str(it.get('identifier', '—')).replace('{', '{{').replace('}', '}}') msg_type = str(it.get('msgType', '—')).replace('{', '{{').replace('}', '}}') msg_no = str(it.get('msgNo', '—')).replace('{', '{{').replace('}', '}}') areas = str(it.get('alertAreas') or '—').replace('{', '{{').replace('}', '}}') mag_str = f"{mag:.1f}" if mag is not None else "—" depth_str = f"{depth:.0f}" if depth is not None else "—" lines.append( f"事件: {identifier} | 類型: {msg_type}#{msg_no}\n" f"規模/深度: M{mag_str} / {depth_str} km\n" f"時間: {tw_str}(台灣)\n" f"預警地區: {areas}" ) return "\n\n".join(lines).strip() # --- 顯著有感地震 (E-A0015-001) --- def _parse_significant_earthquakes(obj: dict) -> pd.DataFrame: """最穩健的解析邏輯,供 /significant 和 /latest 共用""" records = obj.get("records") or obj.get("Records") or {} quakes = records.get("earthquake") or records.get("Earthquake") or [] rows = [] for q in quakes: ei = q.get("EarthquakeInfo") or q.get("earthquakeInfo") or {} epic = ei.get("Epicenter") or ei.get("epicenter") or {} mag_info = ei.get("Magnitude") or ei.get("magnitude") or ei.get("EarthquakeMagnitude") or {} depth_raw = ei.get("FocalDepth") or ei.get("depth") or ei.get("Depth") mag_raw = mag_info.get("MagnitudeValue") or mag_info.get("magnitudeValue") or mag_info.get("Value") or mag_info.get("value") rows.append({ "ID": q.get("EarthquakeNo"), "Time": ei.get("OriginTime"), "Lat": _to_float(epic.get("EpicenterLatitude") or epic.get("epicenterLatitude")), "Lon": _to_float(epic.get("EpicenterLongitude") or epic.get("epicenterLongitude")), "Depth": _to_float(depth_raw), "Magnitude": _to_float(mag_raw), "Location": epic.get("Location") or epic.get("location"), "URL": q.get("Web") or q.get("ReportURL"), }) df = pd.DataFrame(rows) if not df.empty and "Time" in df.columns: time_series = pd.to_datetime(df["Time"], errors="coerce") if pd.api.types.is_datetime64_any_dtype(time_series): df["Time"] = time_series.dt.tz_localize("UTC").dt.tz_convert(TAIPEI_TZ) return df def fetch_significant_earthquakes(days: int = 7, limit: int = 5) -> str: if not CWA_API_KEY: return "❌ 顯著地震查詢失敗:管理者尚未設定 CWA_API_KEY。" now = datetime.now(timezone.utc) time_from = (now - timedelta(days=days)).strftime("%Y-%m-%d") params = {"Authorization": CWA_API_KEY, "format": "JSON", "timeFrom": time_from} try: r = requests.get(CWA_SIGNIFICANT_API, params=params, timeout=15) r.raise_for_status() data = r.json() df = _parse_significant_earthquakes(data) if df.empty: return f"✅ 過去 {days} 天內沒有顯著有感地震報告。" df = df.sort_values(by="Time", ascending=False).head(limit) lines = [f"🚨 CWA 最新顯著有感地震 (近{days}天內):", "-" * 20] for _, row in df.iterrows(): mag_str = f"{row['Magnitude']:.1f}" if pd.notna(row['Magnitude']) else "—" depth_str = f"{row['Depth']:.0f}" if pd.notna(row['Depth']) else "—" lines.append( f"時間: {row['Time'].strftime('%Y-%m-%d %H:%M') if pd.notna(row['Time']) else '—'}\n" f"地點: {row['Location'] or '—'}\n" f"規模: M{mag_str} | 深度: {depth_str} km\n" f"報告: {row['URL'] or '無'}" ) return "\n\n".join(lines) except Exception as e: return f"❌ 顯著地震查詢失敗:{e}" # --- 最新一筆顯著地震 --- def fetch_latest_significant_earthquake() -> dict | None: """從 CWA 獲取最新一筆顯著地震,並重用現有的解析邏輯""" if not CWA_API_KEY: raise ValueError("錯誤:尚未設定 CWA_API_KEY Secret。") now = datetime.now(timezone.utc) time_from = (now - timedelta(days=2)).strftime("%Y-%m-%dT%H:%M:%S") params = {"Authorization": CWA_API_KEY, "format": "JSON", "limit": 1} r = requests.get(CWA_SIGNIFICANT_API, params=params, timeout=15) r.raise_for_status() data = r.json() df = _parse_significant_earthquakes(data) if df.empty: return None latest_eq_data = df.sort_values(by="Time", ascending=False).iloc[0].to_dict() quakes = data.get("records", {}).get("Earthquake", []) if quakes: latest_eq_data["ImageURL"] = quakes[0].get("ReportImageURI") if pd.notna(latest_eq_data.get("Time")): latest_eq_data["TimeStr"] = latest_eq_data["Time"].strftime('%Y-%m-%d %H:%M') return latest_eq_data # --- 小區域有感地震 (E-A0016-001) --- def fetch_local_earthquakes(area_name: str = "", limit: int = 5) -> str: """從 CWA 獲取小區域有感地震報告。如果 area_name 為空,則查詢全台灣。""" if not CWA_API_KEY: return "❌ 查詢失敗:管理者尚未設定 CWA_API_KEY。" params = {"Authorization": CWA_API_KEY, "format": "JSON", "limit": limit} title = "🇹🇼 台灣近期小區域有感地震:" if area_name: normalized_area = _normalize_cwa_area_name(area_name) params["AreaName"] = normalized_area title = f"🚨 「{normalized_area}」近期小區域有感地震:" try: r = requests.get(CWA_LOCAL_EQ_API, params=params, timeout=15) r.raise_for_status() data = r.json() earthquakes = data.get("records", {}).get("Earthquake", []) if not earthquakes: msg = f"✅ 在「{area_name}」" if area_name else "✅ 台灣" return f"{msg}近期沒有小區域有感地震報告。" lines = [title, "-" * 20] for eq in earthquakes: info = eq.get("earthquakeInfo", {}) epi = info.get("epicenter", {}) mag = info.get("magnitude", {}) depth = info.get("depth", {}) intensity_areas = eq.get("intensity", {}).get("shakingArea", []) area_strs = [f"{area.get('areaDesc')} {area.get('areaIntensity')}級" for area in intensity_areas if area.get("areaIntensity") and float(area.get("areaIntensity", 0)) > 0] intensity_str = "、".join(area_strs) if area_strs else "無具體震度回報" lines.append( f"報告: {eq.get('reportContent', '—')}\n" f"時間: {info.get('originTime', '—')}\n" f"地點: {epi.get('location', '—')}\n" f"規模: M{mag.get('magnitudeValue', '—')} | 深度: {depth.get('value', '—')} km\n" f"主要影響區域: {intensity_str}" ) return "\n\n".join(lines) except Exception as e: return f"❌ 小區域地震查詢失敗:{e}"