import os import re import tempfile from datetime import datetime, timedelta, timezone import base64 import requests import pandas as pd import gradio as gr import folium from folium.plugins import MarkerCluster from branca.colormap import linear # ------- 可選依賴偵測(表格美化;沒裝也能跑) ------- try: import tabulate as _tabulate # noqa: F401 HAS_TABULATE = True except Exception: HAS_TABULATE = False # ----------------------------- # 台北時區 (UTC+8) # ----------------------------- TAIPEI_TZ = timezone(timedelta(hours=8)) def _fmt(dt: datetime) -> str: return dt.strftime("%Y-%m-%dT%H:%M:%S") def set_time_range(hours=None, days=None): """依台北時間回傳 (timeFrom, timeTo) ISO 字串""" now = datetime.now(TAIPEI_TZ) if hours is not None: t_from = now - timedelta(hours=hours) elif days is not None: t_from = now - timedelta(days=days) else: t_from = now - timedelta(days=3) return _fmt(t_from), _fmt(now) # ----------------------------- # 呼叫 CWA API # ----------------------------- API_URL = "https://opendata.cwa.gov.tw/api/v1/rest/datastore/E-A0015-001" def fetch_reports(time_from, time_to): api_key = os.getenv("CWA_API_KEY", "").strip() if not api_key: raise RuntimeError("請在環境變數設定 CWA_API_KEY") params = {"Authorization": api_key, "timeFrom": time_from, "timeTo": time_to} r = requests.get(API_URL, params=params, timeout=30) r.raise_for_status() return r.json() # ----------------------------- # 解析 JSON # ----------------------------- def _to_float(x): """將字串(含單位)抽出第一個數字成 float;失敗回 None。""" if x is None: return None if isinstance(x, (int, float)): return float(x) s = str(x).strip() if s == "": return None m = re.search(r"[-+]?\d+(?:\.\d+)?", s) return float(m.group()) if m else None def parse_ea0015(obj): """ 解析 CWA E-A0015-001 -> DataFrame 欄位: OriginTime, Magnitude, Depth_km, Lat, Lon, Location, ReportURL """ records = obj.get("records") or obj.get("Records") or {} quakes = records.get("earthquake") or records.get("Earthquake") or [] if not isinstance(quakes, list): quakes = [] rows = [] for q in quakes: ei = q.get("EarthquakeInfo") or q.get("earthquakeInfo") or {} epic = ei.get("Epicenter") or ei.get("epicenter") or {} mago = ( ei.get("Magnitude") or ei.get("magnitude") or ei.get("EarthquakeMagnitude") or ei.get("earthquakeMagnitude") or {} ) origin = ( ei.get("OriginTime") or ei.get("originTime") or q.get("OriginTime") or q.get("originTime") ) lat_raw = ( epic.get("EpicenterLat") or epic.get("epicenterLat") or epic.get("EpicenterLatitude") or epic.get("epicenterLatitude") or epic.get("Lat") or epic.get("lat") ) lon_raw = ( epic.get("EpicenterLon") or epic.get("epicenterLon") or epic.get("EpicenterLongitude") or epic.get("epicenterLongitude") or epic.get("Lon") or epic.get("lon") ) depth_raw = ( ei.get("Depth") or ei.get("depth") or ei.get("FocalDepth") or ei.get("focalDepth") or ei.get("FocalDepthKm") or ei.get("focalDepthKm") ) mag_raw = ( mago.get("MagnitudeValue") or mago.get("magnitudeValue") or mago.get("Value") or mago.get("value") or mago.get("Magnitude") or mago.get("magnitude") or ei.get("MagnitudeValue") or ei.get("magnitudeValue") ) loc = epic.get("Location") or epic.get("location") url = q.get("Web") or q.get("ReportURL") or q.get("reportURL") rows.append({ "OriginTime": origin, "Lat": _to_float(lat_raw), "Lon": _to_float(lon_raw), "Depth_km": _to_float(depth_raw), "Magnitude": _to_float(mag_raw), "Location": loc, "ReportURL": url, }) df = pd.DataFrame(rows) if not df.empty: df["OriginTime"] = pd.to_datetime(df["OriginTime"], errors="coerce") df = df.sort_values("OriginTime", ascending=False, na_position="last").reset_index(drop=True) return df # ----------------------------- # 表格輸出 # ----------------------------- def _format_taipei(series): try: if series.dt.tz is None: s = series.dt.tz_localize(TAIPEI_TZ) else: s = series.dt.tz_convert(TAIPEI_TZ) return s.dt.strftime("%Y-%m-%d %H:%M:%S %Z") except Exception: return series.astype(str) def _to_simple_md_table(df: pd.DataFrame) -> str: cols = list(df.columns) header = "|" + "|".join(cols) + "|\n" sep = "|" + "|".join(["---"] * len(cols)) + "|\n" rows = [] for _, r in df.iterrows(): cells = [] for c in cols: v = r.get(c, "") cells.append("" if pd.isna(v) else str(v)) rows.append("|" + "|".join(cells) + "|") return header + sep + "\n".join(rows) def df_to_markdown(df, top_n=100): if df.empty: return "(查無資料)" cols = ["OriginTime", "Magnitude", "Depth_km", "Lat", "Lon", "Location", "ReportURL"] cols = [c for c in cols if c in df.columns] slim = df[cols].head(top_n).copy() if "OriginTime" in slim.columns: slim["OriginTime"] = _format_taipei(slim["OriginTime"]) header = f"共 {len(df)} 筆,顯示前 {min(len(slim), top_n)} 筆\n\n" if HAS_TABULATE: table = slim.to_markdown(index=False) else: table = _to_simple_md_table(slim.reset_index(drop=True)) return header + table # ----------------------------- # OSM 地圖(Folium)輸出(以 data URL iframe 嵌入) # ----------------------------- def map_osm_html(df: pd.DataFrame): if df.empty: return "