cwadayi's picture
Update app.py
e0a2af0 verified
import os
import re
import tempfile
from datetime import datetime, timedelta, timezone
import base64
import requests
import pandas as pd
import gradio as gr
import folium
from folium.plugins import MarkerCluster
from branca.colormap import linear
# ------- 可選依賴偵測(表格美化;沒裝也能跑) -------
try:
import tabulate as _tabulate # noqa: F401
HAS_TABULATE = True
except Exception:
HAS_TABULATE = False
# -----------------------------
# 台北時區 (UTC+8)
# -----------------------------
TAIPEI_TZ = timezone(timedelta(hours=8))
def _fmt(dt: datetime) -> str:
return dt.strftime("%Y-%m-%dT%H:%M:%S")
def set_time_range(hours=None, days=None):
"""依台北時間回傳 (timeFrom, timeTo) ISO 字串"""
now = datetime.now(TAIPEI_TZ)
if hours is not None:
t_from = now - timedelta(hours=hours)
elif days is not None:
t_from = now - timedelta(days=days)
else:
t_from = now - timedelta(days=3)
return _fmt(t_from), _fmt(now)
# -----------------------------
# 呼叫 CWA API
# -----------------------------
API_URL = "https://opendata.cwa.gov.tw/api/v1/rest/datastore/E-A0015-001"
def fetch_reports(time_from, time_to):
api_key = os.getenv("CWA_API_KEY", "").strip()
if not api_key:
raise RuntimeError("請在環境變數設定 CWA_API_KEY")
params = {"Authorization": api_key, "timeFrom": time_from, "timeTo": time_to}
r = requests.get(API_URL, params=params, timeout=30)
r.raise_for_status()
return r.json()
# -----------------------------
# 解析 JSON
# -----------------------------
def _to_float(x):
"""將字串(含單位)抽出第一個數字成 float;失敗回 None。"""
if x is None:
return None
if isinstance(x, (int, float)):
return float(x)
s = str(x).strip()
if s == "":
return None
m = re.search(r"[-+]?\d+(?:\.\d+)?", s)
return float(m.group()) if m else None
def parse_ea0015(obj):
"""
解析 CWA E-A0015-001 -> DataFrame 欄位:
OriginTime, Magnitude, Depth_km, Lat, Lon, Location, ReportURL
"""
records = obj.get("records") or obj.get("Records") or {}
quakes = records.get("earthquake") or records.get("Earthquake") or []
if not isinstance(quakes, list):
quakes = []
rows = []
for q in quakes:
ei = q.get("EarthquakeInfo") or q.get("earthquakeInfo") or {}
epic = ei.get("Epicenter") or ei.get("epicenter") or {}
mago = (
ei.get("Magnitude") or ei.get("magnitude")
or ei.get("EarthquakeMagnitude") or ei.get("earthquakeMagnitude")
or {}
)
origin = (
ei.get("OriginTime") or ei.get("originTime")
or q.get("OriginTime") or q.get("originTime")
)
lat_raw = (
epic.get("EpicenterLat") or epic.get("epicenterLat")
or epic.get("EpicenterLatitude") or epic.get("epicenterLatitude")
or epic.get("Lat") or epic.get("lat")
)
lon_raw = (
epic.get("EpicenterLon") or epic.get("epicenterLon")
or epic.get("EpicenterLongitude") or epic.get("epicenterLongitude")
or epic.get("Lon") or epic.get("lon")
)
depth_raw = (
ei.get("Depth") or ei.get("depth")
or ei.get("FocalDepth") or ei.get("focalDepth")
or ei.get("FocalDepthKm") or ei.get("focalDepthKm")
)
mag_raw = (
mago.get("MagnitudeValue") or mago.get("magnitudeValue")
or mago.get("Value") or mago.get("value")
or mago.get("Magnitude") or mago.get("magnitude")
or ei.get("MagnitudeValue") or ei.get("magnitudeValue")
)
loc = epic.get("Location") or epic.get("location")
url = q.get("Web") or q.get("ReportURL") or q.get("reportURL")
rows.append({
"OriginTime": origin,
"Lat": _to_float(lat_raw),
"Lon": _to_float(lon_raw),
"Depth_km": _to_float(depth_raw),
"Magnitude": _to_float(mag_raw),
"Location": loc,
"ReportURL": url,
})
df = pd.DataFrame(rows)
if not df.empty:
df["OriginTime"] = pd.to_datetime(df["OriginTime"], errors="coerce")
df = df.sort_values("OriginTime", ascending=False, na_position="last").reset_index(drop=True)
return df
# -----------------------------
# 表格輸出
# -----------------------------
def _format_taipei(series):
try:
if series.dt.tz is None:
s = series.dt.tz_localize(TAIPEI_TZ)
else:
s = series.dt.tz_convert(TAIPEI_TZ)
return s.dt.strftime("%Y-%m-%d %H:%M:%S %Z")
except Exception:
return series.astype(str)
def _to_simple_md_table(df: pd.DataFrame) -> str:
cols = list(df.columns)
header = "|" + "|".join(cols) + "|\n"
sep = "|" + "|".join(["---"] * len(cols)) + "|\n"
rows = []
for _, r in df.iterrows():
cells = []
for c in cols:
v = r.get(c, "")
cells.append("" if pd.isna(v) else str(v))
rows.append("|" + "|".join(cells) + "|")
return header + sep + "\n".join(rows)
def df_to_markdown(df, top_n=100):
if df.empty:
return "(查無資料)"
cols = ["OriginTime", "Magnitude", "Depth_km", "Lat", "Lon", "Location", "ReportURL"]
cols = [c for c in cols if c in df.columns]
slim = df[cols].head(top_n).copy()
if "OriginTime" in slim.columns:
slim["OriginTime"] = _format_taipei(slim["OriginTime"])
header = f"共 {len(df)} 筆,顯示前 {min(len(slim), top_n)} 筆\n\n"
if HAS_TABULATE:
table = slim.to_markdown(index=False)
else:
table = _to_simple_md_table(slim.reset_index(drop=True))
return header + table
# -----------------------------
# OSM 地圖(Folium)輸出(以 data URL iframe 嵌入)
# -----------------------------
def map_osm_html(df: pd.DataFrame):
if df.empty:
return "<div style='padding:8px'>(查無資料)</div>"
d = df.dropna(subset=["Lat", "Lon"]).copy()
if d.empty:
return "<div style='padding:8px'>(無經緯度可繪製)</div>"
# 數值化
d["Magnitude"] = pd.to_numeric(d["Magnitude"], errors="coerce").fillna(0).clip(lower=0)
d["Depth_km"] = pd.to_numeric(d["Depth_km"], errors="coerce")
# 地圖中心 / 底圖
center = [d["Lat"].mean(), d["Lon"].mean()]
m = folium.Map(location=center, zoom_start=6, tiles="OpenStreetMap", control_scale=True)
# 顏色條(深度)— 通用 viridis
depth_min, depth_max = float(d["Depth_km"].min()), float(d["Depth_km"].max())
if depth_min == depth_max:
depth_min, depth_max = max(0.0, depth_min - 1), depth_max + 1
cmap = linear.viridis.scale(depth_min, depth_max)
cmap.caption = "Depth (km)"
cmap.add_to(m)
cluster = MarkerCluster().add_to(m)
# 逐筆加入圓標
for _, r in d.iterrows():
lat, lon = float(r["Lat"]), float(r["Lon"])
mag = float(r["Magnitude"]) if pd.notna(r["Magnitude"]) else 0.0
depth = float(r["Depth_km"]) if pd.notna(r["Depth_km"]) else 0.0
size = 4 + 2.5 * max(0.0, mag) # 依規模調整像素半徑
color = cmap(depth)
popup_html = f"""
<b>OriginTime</b>: {r['OriginTime']}<br>
<b>Magnitude</b>: {mag:.1f}<br>
<b>Depth</b>: {depth:.1f} km<br>
<b>Location</b>: {r.get('Location','') or ''}<br>
<a href="{r.get('ReportURL','') or '#'}" target="_blank">CWA 報告</a>
"""
folium.CircleMarker(
location=[lat, lon],
radius=size,
color="#000000",
weight=1,
fill=True,
fill_color=color,
fill_opacity=0.85,
popup=folium.Popup(popup_html, max_width=320),
).add_to(cluster)
# fit bounds
m.fit_bounds([[d["Lat"].min(), d["Lon"].min()], [d["Lat"].max(), d["Lon"].max()]], padding=(20, 20))
# 以 data URL 方式嵌入,避免被 HTML 清洗移除 <script>
html = m.get_root().render()
b64 = base64.b64encode(html.encode("utf-8")).decode("ascii")
return f'<iframe src="data:text/html;base64,{b64}" style="width:100%;height:520px;border:none;"></iframe>'
# -----------------------------
# 主流程
# -----------------------------
def query_and_render(time_from, time_to, sort_order):
try:
raw = fetch_reports(time_from, time_to)
df = parse_ea0015(raw)
if df.empty:
return "(查無資料)", "<div style='padding:8px'>(查無資料)</div>", None
if sort_order == "OriginTime (舊→新)":
df = df.sort_values("OriginTime", ascending=True, na_position="last").reset_index(drop=True)
md = df_to_markdown(df)
map_html = map_osm_html(df)
# 寫檔並回傳「檔案路徑」給 DownloadButton
tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".csv", prefix="CWA_E-A0015-001_")
df.to_csv(tmp.name, index=False, encoding="utf-8-sig")
return md, map_html, tmp.name
except Exception as e:
return f"錯誤:{e}", "<div style='padding:8px'>(無法繪圖)</div>", None
# -----------------------------
# 介面
# -----------------------------
default_from, default_to = set_time_range(days=3)
with gr.Blocks(fill_height=True) as demo:
gr.Markdown("## CWA 顯著有感地震報告 (E-A0015-001)\n預設查詢最近 3 天(台北時間)")
with gr.Column():
time_from = gr.Textbox(label="timeFrom yyyy-MM-ddTHH:mm:ss", value=default_from)
time_to = gr.Textbox(label="timeTo yyyy-MM-ddTHH:mm:ss", value=default_to)
with gr.Row():
btn_12h = gr.Button("最近 12 小時")
btn_24h = gr.Button("最近 24 小時")
btn_3d = gr.Button("最近 3 天")
btn_5d = gr.Button("最近 5 天")
sort_dd = gr.Dropdown(
choices=["OriginTime (新→舊)", "OriginTime (舊→新)"],
value="OriginTime (新→舊)",
label="排序",
)
run_btn = gr.Button("查詢", variant="primary")
table_out = gr.Markdown("(尚未查詢)")
map_out = gr.HTML() # 不帶 sanitize_html 參數(舊版 Gradio 相容)
dl_btn = gr.DownloadButton(label="下載 CSV")
# 快速鍵
btn_12h.click(lambda: set_time_range(hours=12), outputs=[time_from, time_to])
btn_24h.click(lambda: set_time_range(hours=24), outputs=[time_from, time_to])
btn_3d.click(lambda: set_time_range(days=3), outputs=[time_from, time_to])
btn_5d.click(lambda: set_time_range(days=5), outputs=[time_from, time_to])
# 查詢
run_btn.click(
query_and_render,
inputs=[time_from, time_to, sort_dd],
outputs=[table_out, map_out, dl_btn],
)
if __name__ == "__main__":
demo.launch()