Spaces:
Sleeping
Sleeping
| import os | |
| import re | |
| import tempfile | |
| from datetime import datetime, timedelta, timezone | |
| import base64 | |
| import requests | |
| import pandas as pd | |
| import gradio as gr | |
| import folium | |
| from folium.plugins import MarkerCluster | |
| from branca.colormap import linear | |
| # ------- 可選依賴偵測(表格美化;沒裝也能跑) ------- | |
| try: | |
| import tabulate as _tabulate # noqa: F401 | |
| HAS_TABULATE = True | |
| except Exception: | |
| HAS_TABULATE = False | |
| # ----------------------------- | |
| # 台北時區 (UTC+8) | |
| # ----------------------------- | |
| TAIPEI_TZ = timezone(timedelta(hours=8)) | |
| def _fmt(dt: datetime) -> str: | |
| return dt.strftime("%Y-%m-%dT%H:%M:%S") | |
| def set_time_range(hours=None, days=None): | |
| """依台北時間回傳 (timeFrom, timeTo) ISO 字串""" | |
| now = datetime.now(TAIPEI_TZ) | |
| if hours is not None: | |
| t_from = now - timedelta(hours=hours) | |
| elif days is not None: | |
| t_from = now - timedelta(days=days) | |
| else: | |
| t_from = now - timedelta(days=3) | |
| return _fmt(t_from), _fmt(now) | |
| # ----------------------------- | |
| # 呼叫 CWA API | |
| # ----------------------------- | |
| API_URL = "https://opendata.cwa.gov.tw/api/v1/rest/datastore/E-A0015-001" | |
| def fetch_reports(time_from, time_to): | |
| api_key = os.getenv("CWA_API_KEY", "").strip() | |
| if not api_key: | |
| raise RuntimeError("請在環境變數設定 CWA_API_KEY") | |
| params = {"Authorization": api_key, "timeFrom": time_from, "timeTo": time_to} | |
| r = requests.get(API_URL, params=params, timeout=30) | |
| r.raise_for_status() | |
| return r.json() | |
| # ----------------------------- | |
| # 解析 JSON | |
| # ----------------------------- | |
| def _to_float(x): | |
| """將字串(含單位)抽出第一個數字成 float;失敗回 None。""" | |
| if x is None: | |
| return None | |
| if isinstance(x, (int, float)): | |
| return float(x) | |
| s = str(x).strip() | |
| if s == "": | |
| return None | |
| m = re.search(r"[-+]?\d+(?:\.\d+)?", s) | |
| return float(m.group()) if m else None | |
| def parse_ea0015(obj): | |
| """ | |
| 解析 CWA E-A0015-001 -> DataFrame 欄位: | |
| OriginTime, Magnitude, Depth_km, Lat, Lon, Location, ReportURL | |
| """ | |
| records = obj.get("records") or obj.get("Records") or {} | |
| quakes = records.get("earthquake") or records.get("Earthquake") or [] | |
| if not isinstance(quakes, list): | |
| quakes = [] | |
| rows = [] | |
| for q in quakes: | |
| ei = q.get("EarthquakeInfo") or q.get("earthquakeInfo") or {} | |
| epic = ei.get("Epicenter") or ei.get("epicenter") or {} | |
| mago = ( | |
| ei.get("Magnitude") or ei.get("magnitude") | |
| or ei.get("EarthquakeMagnitude") or ei.get("earthquakeMagnitude") | |
| or {} | |
| ) | |
| origin = ( | |
| ei.get("OriginTime") or ei.get("originTime") | |
| or q.get("OriginTime") or q.get("originTime") | |
| ) | |
| lat_raw = ( | |
| epic.get("EpicenterLat") or epic.get("epicenterLat") | |
| or epic.get("EpicenterLatitude") or epic.get("epicenterLatitude") | |
| or epic.get("Lat") or epic.get("lat") | |
| ) | |
| lon_raw = ( | |
| epic.get("EpicenterLon") or epic.get("epicenterLon") | |
| or epic.get("EpicenterLongitude") or epic.get("epicenterLongitude") | |
| or epic.get("Lon") or epic.get("lon") | |
| ) | |
| depth_raw = ( | |
| ei.get("Depth") or ei.get("depth") | |
| or ei.get("FocalDepth") or ei.get("focalDepth") | |
| or ei.get("FocalDepthKm") or ei.get("focalDepthKm") | |
| ) | |
| mag_raw = ( | |
| mago.get("MagnitudeValue") or mago.get("magnitudeValue") | |
| or mago.get("Value") or mago.get("value") | |
| or mago.get("Magnitude") or mago.get("magnitude") | |
| or ei.get("MagnitudeValue") or ei.get("magnitudeValue") | |
| ) | |
| loc = epic.get("Location") or epic.get("location") | |
| url = q.get("Web") or q.get("ReportURL") or q.get("reportURL") | |
| rows.append({ | |
| "OriginTime": origin, | |
| "Lat": _to_float(lat_raw), | |
| "Lon": _to_float(lon_raw), | |
| "Depth_km": _to_float(depth_raw), | |
| "Magnitude": _to_float(mag_raw), | |
| "Location": loc, | |
| "ReportURL": url, | |
| }) | |
| df = pd.DataFrame(rows) | |
| if not df.empty: | |
| df["OriginTime"] = pd.to_datetime(df["OriginTime"], errors="coerce") | |
| df = df.sort_values("OriginTime", ascending=False, na_position="last").reset_index(drop=True) | |
| return df | |
| # ----------------------------- | |
| # 表格輸出 | |
| # ----------------------------- | |
| def _format_taipei(series): | |
| try: | |
| if series.dt.tz is None: | |
| s = series.dt.tz_localize(TAIPEI_TZ) | |
| else: | |
| s = series.dt.tz_convert(TAIPEI_TZ) | |
| return s.dt.strftime("%Y-%m-%d %H:%M:%S %Z") | |
| except Exception: | |
| return series.astype(str) | |
| def _to_simple_md_table(df: pd.DataFrame) -> str: | |
| cols = list(df.columns) | |
| header = "|" + "|".join(cols) + "|\n" | |
| sep = "|" + "|".join(["---"] * len(cols)) + "|\n" | |
| rows = [] | |
| for _, r in df.iterrows(): | |
| cells = [] | |
| for c in cols: | |
| v = r.get(c, "") | |
| cells.append("" if pd.isna(v) else str(v)) | |
| rows.append("|" + "|".join(cells) + "|") | |
| return header + sep + "\n".join(rows) | |
| def df_to_markdown(df, top_n=100): | |
| if df.empty: | |
| return "(查無資料)" | |
| cols = ["OriginTime", "Magnitude", "Depth_km", "Lat", "Lon", "Location", "ReportURL"] | |
| cols = [c for c in cols if c in df.columns] | |
| slim = df[cols].head(top_n).copy() | |
| if "OriginTime" in slim.columns: | |
| slim["OriginTime"] = _format_taipei(slim["OriginTime"]) | |
| header = f"共 {len(df)} 筆,顯示前 {min(len(slim), top_n)} 筆\n\n" | |
| if HAS_TABULATE: | |
| table = slim.to_markdown(index=False) | |
| else: | |
| table = _to_simple_md_table(slim.reset_index(drop=True)) | |
| return header + table | |
| # ----------------------------- | |
| # OSM 地圖(Folium)輸出(以 data URL iframe 嵌入) | |
| # ----------------------------- | |
| def map_osm_html(df: pd.DataFrame): | |
| if df.empty: | |
| return "<div style='padding:8px'>(查無資料)</div>" | |
| d = df.dropna(subset=["Lat", "Lon"]).copy() | |
| if d.empty: | |
| return "<div style='padding:8px'>(無經緯度可繪製)</div>" | |
| # 數值化 | |
| d["Magnitude"] = pd.to_numeric(d["Magnitude"], errors="coerce").fillna(0).clip(lower=0) | |
| d["Depth_km"] = pd.to_numeric(d["Depth_km"], errors="coerce") | |
| # 地圖中心 / 底圖 | |
| center = [d["Lat"].mean(), d["Lon"].mean()] | |
| m = folium.Map(location=center, zoom_start=6, tiles="OpenStreetMap", control_scale=True) | |
| # 顏色條(深度)— 通用 viridis | |
| depth_min, depth_max = float(d["Depth_km"].min()), float(d["Depth_km"].max()) | |
| if depth_min == depth_max: | |
| depth_min, depth_max = max(0.0, depth_min - 1), depth_max + 1 | |
| cmap = linear.viridis.scale(depth_min, depth_max) | |
| cmap.caption = "Depth (km)" | |
| cmap.add_to(m) | |
| cluster = MarkerCluster().add_to(m) | |
| # 逐筆加入圓標 | |
| for _, r in d.iterrows(): | |
| lat, lon = float(r["Lat"]), float(r["Lon"]) | |
| mag = float(r["Magnitude"]) if pd.notna(r["Magnitude"]) else 0.0 | |
| depth = float(r["Depth_km"]) if pd.notna(r["Depth_km"]) else 0.0 | |
| size = 4 + 2.5 * max(0.0, mag) # 依規模調整像素半徑 | |
| color = cmap(depth) | |
| popup_html = f""" | |
| <b>OriginTime</b>: {r['OriginTime']}<br> | |
| <b>Magnitude</b>: {mag:.1f}<br> | |
| <b>Depth</b>: {depth:.1f} km<br> | |
| <b>Location</b>: {r.get('Location','') or ''}<br> | |
| <a href="{r.get('ReportURL','') or '#'}" target="_blank">CWA 報告</a> | |
| """ | |
| folium.CircleMarker( | |
| location=[lat, lon], | |
| radius=size, | |
| color="#000000", | |
| weight=1, | |
| fill=True, | |
| fill_color=color, | |
| fill_opacity=0.85, | |
| popup=folium.Popup(popup_html, max_width=320), | |
| ).add_to(cluster) | |
| # fit bounds | |
| m.fit_bounds([[d["Lat"].min(), d["Lon"].min()], [d["Lat"].max(), d["Lon"].max()]], padding=(20, 20)) | |
| # 以 data URL 方式嵌入,避免被 HTML 清洗移除 <script> | |
| html = m.get_root().render() | |
| b64 = base64.b64encode(html.encode("utf-8")).decode("ascii") | |
| return f'<iframe src="data:text/html;base64,{b64}" style="width:100%;height:520px;border:none;"></iframe>' | |
| # ----------------------------- | |
| # 主流程 | |
| # ----------------------------- | |
| def query_and_render(time_from, time_to, sort_order): | |
| try: | |
| raw = fetch_reports(time_from, time_to) | |
| df = parse_ea0015(raw) | |
| if df.empty: | |
| return "(查無資料)", "<div style='padding:8px'>(查無資料)</div>", None | |
| if sort_order == "OriginTime (舊→新)": | |
| df = df.sort_values("OriginTime", ascending=True, na_position="last").reset_index(drop=True) | |
| md = df_to_markdown(df) | |
| map_html = map_osm_html(df) | |
| # 寫檔並回傳「檔案路徑」給 DownloadButton | |
| tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".csv", prefix="CWA_E-A0015-001_") | |
| df.to_csv(tmp.name, index=False, encoding="utf-8-sig") | |
| return md, map_html, tmp.name | |
| except Exception as e: | |
| return f"錯誤:{e}", "<div style='padding:8px'>(無法繪圖)</div>", None | |
| # ----------------------------- | |
| # 介面 | |
| # ----------------------------- | |
| default_from, default_to = set_time_range(days=3) | |
| with gr.Blocks(fill_height=True) as demo: | |
| gr.Markdown("## CWA 顯著有感地震報告 (E-A0015-001)\n預設查詢最近 3 天(台北時間)") | |
| with gr.Column(): | |
| time_from = gr.Textbox(label="timeFrom yyyy-MM-ddTHH:mm:ss", value=default_from) | |
| time_to = gr.Textbox(label="timeTo yyyy-MM-ddTHH:mm:ss", value=default_to) | |
| with gr.Row(): | |
| btn_12h = gr.Button("最近 12 小時") | |
| btn_24h = gr.Button("最近 24 小時") | |
| btn_3d = gr.Button("最近 3 天") | |
| btn_5d = gr.Button("最近 5 天") | |
| sort_dd = gr.Dropdown( | |
| choices=["OriginTime (新→舊)", "OriginTime (舊→新)"], | |
| value="OriginTime (新→舊)", | |
| label="排序", | |
| ) | |
| run_btn = gr.Button("查詢", variant="primary") | |
| table_out = gr.Markdown("(尚未查詢)") | |
| map_out = gr.HTML() # 不帶 sanitize_html 參數(舊版 Gradio 相容) | |
| dl_btn = gr.DownloadButton(label="下載 CSV") | |
| # 快速鍵 | |
| btn_12h.click(lambda: set_time_range(hours=12), outputs=[time_from, time_to]) | |
| btn_24h.click(lambda: set_time_range(hours=24), outputs=[time_from, time_to]) | |
| btn_3d.click(lambda: set_time_range(days=3), outputs=[time_from, time_to]) | |
| btn_5d.click(lambda: set_time_range(days=5), outputs=[time_from, time_to]) | |
| # 查詢 | |
| run_btn.click( | |
| query_and_render, | |
| inputs=[time_from, time_to, sort_dd], | |
| outputs=[table_out, map_out, dl_btn], | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch() |