Spaces:
Sleeping
Sleeping
| # app.py (Folium + 無料タイル / File配信 / 発電設備区分で色分け / CF表示名を「発電出力(kW)」に統一) | |
| # pip install folium gradio pandas numpy requests openpyxl | |
| import os | |
| import re | |
| import time | |
| import tempfile | |
| import requests | |
| import pandas as pd | |
| import numpy as np | |
| import gradio as gr | |
| # ---------------------------- | |
| # 設定 | |
| # ---------------------------- | |
| GSI_USER_AGENT = os.environ.get( | |
| "GSI_USER_AGENT", | |
| "jp-gsi-geocoding-demo (contact: your_email@example.com)" # 連絡先付き推奨 | |
| ) | |
| GSI_TIMEOUT_SEC = float(os.environ.get("GSI_TIMEOUT_SEC", "10")) | |
| GEOCODE_DELAY_SEC = float(os.environ.get("GSI_RATE_LIMIT_SEC", "0.0")) | |
| GSI_GEOCODE_URL = "https://msearch.gsi.go.jp/address-search/AddressSearch" | |
| CACHE_DIR = "data/cache" | |
| os.makedirs(CACHE_DIR, exist_ok=True) | |
| CACHE_PATH = os.path.join(CACHE_DIR, "geocode_cache.csv") | |
| # ---------------------------- | |
| # キャッシュ | |
| # ---------------------------- | |
| def load_cache(): | |
| if os.path.exists(CACHE_PATH): | |
| try: | |
| df = pd.read_csv(CACHE_PATH) | |
| need = {"address_input", "lat", "lon", "CF"} | |
| if need.issubset(df.columns): | |
| df["CF"] = pd.to_numeric(df["CF"], errors="coerce") | |
| df["lat"] = pd.to_numeric(df["lat"], errors="coerce") | |
| df["lon"] = pd.to_numeric(df["lon"], errors="coerce") | |
| return df | |
| except Exception: | |
| pass | |
| return pd.DataFrame(columns=["address_input", "lat", "lon", "CF"]) | |
| def save_cache(df_cache): | |
| try: | |
| df_cache.to_csv(CACHE_PATH, index=False) | |
| except Exception: | |
| pass | |
| # ---------------------------- | |
| # 国土地理院 ジオコーダ | |
| # ---------------------------- | |
| def make_gsi_session() -> requests.Session: | |
| s = requests.Session() | |
| s.headers.update({"User-Agent": GSI_USER_AGENT}) | |
| return s | |
| def gsi_geocode_once(address: str, session: requests.Session) -> tuple[float, float]: | |
| """ | |
| 国土地理院 住所検索APIを1回呼び出し、(lat, lon) を返す(失敗時は (nan, nan))。 | |
| APIは [lon, lat] を返すため、順を入れ替える。 | |
| """ | |
| try: | |
| if not address or str(address).strip() == "" or str(address).strip().lower() in ("nan", "none"): | |
| return (np.nan, np.nan) | |
| resp = session.get(GSI_GEOCODE_URL, params={"q": address}, timeout=GSI_TIMEOUT_SEC) | |
| if not resp.ok: | |
| return (np.nan, np.nan) | |
| data = resp.json() | |
| if isinstance(data, list) and len(data) > 0: | |
| feat = data[0] | |
| coords = (feat.get("geometry") or {}).get("coordinates") or [] | |
| if isinstance(coords, (list, tuple)) and len(coords) >= 2: | |
| lon, lat = float(coords[0]), float(coords[1]) | |
| return (lat, lon) | |
| except Exception: | |
| pass | |
| return (np.nan, np.nan) | |
| def geocode_with_cache(addresses, CFs, use_internet=True): | |
| cache = load_cache() | |
| cache_map = {row["address_input"]: (row["lat"], row["lon"], row["CF"]) for _, row in cache.iterrows()} | |
| results = [] | |
| session = make_gsi_session() if use_internet else None | |
| for a, cf in zip(addresses, CFs): | |
| a = "" if (a is None or (isinstance(a, float) and np.isnan(a))) else str(a).strip() | |
| cf_num = pd.to_numeric(cf, errors="coerce") | |
| # cache hit | |
| if a in cache_map: | |
| lat, lon, _cached_cf = cache_map[a] | |
| if pd.notna(lat) and pd.notna(lon): | |
| results.append({"address_input": a, "CF": cf_num, "lat": float(lat), "lon": float(lon)}) | |
| continue | |
| if not use_internet: | |
| results.append({"address_input": a, "CF": cf_num, "lat": np.nan, "lon": np.nan}) | |
| continue | |
| lat, lon = gsi_geocode_once(a, session) | |
| if GEOCODE_DELAY_SEC > 0: | |
| time.sleep(GEOCODE_DELAY_SEC) | |
| # キャッシュ更新 | |
| cache = cache[cache["address_input"] != a] | |
| cache = pd.concat( | |
| [cache, pd.DataFrame([{"address_input": a, "lat": lat, "lon": lon, "CF": cf_num}])], | |
| ignore_index=True | |
| ) | |
| save_cache(cache) | |
| results.append({"address_input": a, "CF": cf_num, "lat": lat, "lon": lon}) | |
| df = pd.DataFrame(results) | |
| df["lat"] = pd.to_numeric(df["lat"], errors="coerce") | |
| df["lon"] = pd.to_numeric(df["lon"], errors="coerce") | |
| df["CF"] = pd.to_numeric(df["CF"], errors="coerce") | |
| return df | |
| # ---------------------------- | |
| # Folium 地図生成(無料タイル + 区分色分け) | |
| # ---------------------------- | |
| import folium | |
| from branca.element import Element | |
| TILE_CATALOG = { | |
| "GSI 標準地図": "https://cyberjapandata.gsi.go.jp/xyz/std/{z}/{x}/{y}.png", | |
| "GSI 淡色地図": "https://cyberjapandata.gsi.go.jp/xyz/pale/{z}/{x}/{y}.png", | |
| "GSI 写真(シームレス)": "https://cyberjapandata.gsi.go.jp/xyz/seamlessphoto/{z}/{x}/{y}.jpg", | |
| "OpenStreetMap": "https://tile.openstreetmap.org/{z}/{x}/{y}.png", | |
| } | |
| # 固定パレット(Tableau 10 + α) | |
| _BASE_PALETTE = [ | |
| "#1f77b4","#ff7f0e","#2ca02c","#d62728","#9467bd", | |
| "#8c564b","#e377c2","#7f7f7f","#bcbd22","#17becf", | |
| "#393b79","#637939","#8c6d31","#843c39","#7b4173", | |
| ] | |
| def _build_category_palette(labels: pd.Series) -> dict: | |
| labels = pd.Series(labels).astype(str).fillna("その他/不明").replace({"nan":"その他/不明","None":"その他/不明"}) | |
| uniq = list(pd.unique(labels)) | |
| mapping = {} | |
| for i, lab in enumerate(uniq): | |
| if i < len(_BASE_PALETTE): | |
| mapping[lab] = _BASE_PALETTE[i] | |
| else: | |
| h = (i * 37) % 360 | |
| mapping[lab] = f"hsl({h},70%,45%)" | |
| return mapping | |
| def _build_legend_html(title: str, mapping: dict) -> str: | |
| items = "".join( | |
| f"<div style='display:flex;align-items:center;margin-bottom:4px;'>" | |
| f"<span style='display:inline-block;width:12px;height:12px;background:{col};" | |
| f"border:1px solid #555;margin-right:6px;'></span>{lab}</div>" | |
| for lab, col in mapping.items() | |
| ) | |
| return ( | |
| "<div style='position:fixed;bottom:20px;left:20px;z-index:9999;" | |
| "background:#fff;padding:10px 12px;border:1px solid #ccc;border-radius:6px;" | |
| "box-shadow:0 2px 8px rgba(0,0,0,0.15);font-size:12px;'>" | |
| f"<div style='font-weight:600;margin-bottom:6px;'>凡例:{title}</div>" | |
| f"{items}</div>" | |
| ) | |
| def _build_folium_map_html(df_points: pd.DataFrame, base_name: str, cat_col: str) -> str: | |
| df_valid = df_points.dropna(subset=["lat", "lon"]).copy() | |
| # 地図中心 | |
| if df_valid.empty: | |
| center_lat, center_lon, zoom = 35.0, 135.0, 4 | |
| else: | |
| center_lat = float(df_valid["lat"].median()) | |
| center_lon = float(df_valid["lon"].median()) | |
| zoom = 6 | |
| # ベースマップ | |
| m = folium.Map(location=[center_lat, center_lon], zoom_start=zoom, control_scale=True, tiles=None) | |
| for name, url in TILE_CATALOG.items(): | |
| folium.TileLayer(tiles=url, name=name, attr=f"© {name}", overlay=False, control=True, max_zoom=20).add_to(m) | |
| # カテゴリ列 | |
| if cat_col and (cat_col in df_points.columns): | |
| cats = df_points[cat_col].astype("string").fillna("その他/不明") | |
| else: | |
| cat_col = "(区分なし)" | |
| cats = pd.Series(["その他/不明"] * len(df_points)) | |
| palette = _build_category_palette(cats) | |
| # マーカーのサイズ(内部的には CF を使用) | |
| if "CF" in df_valid.columns and df_valid["CF"].notna().any(): | |
| cf = df_valid["CF"].clip(lower=0) | |
| cf_norm = (cf - cf.min()) / (cf.max() - cf.min() + 1e-9) | |
| sizes = (cf_norm * 12 + 3).fillna(6).tolist() | |
| else: | |
| sizes = [6] * len(df_valid) | |
| # マーカー描画(色 = 区分) | |
| for (idx, row), r in zip(df_valid.iterrows(), sizes): | |
| lat, lon = float(row["lat"]), float(row["lon"]) | |
| addr = str(row.get("address_input", "")) | |
| cfv = row.get("CF", np.nan) | |
| catv = str(df_points.loc[idx, cat_col]) if cat_col in df_points.columns else "その他/不明" | |
| col = palette.get(catv, "#666666") | |
| popup_html = ( | |
| f"<b>住所:</b> {addr}<br>" | |
| f"<b>発電出力(kW):</b> {'' if pd.isna(cfv) else cfv}<br>" | |
| f"<b>{cat_col}:</b> {catv}" | |
| ) | |
| folium.CircleMarker( | |
| location=(lat, lon), | |
| radius=float(r), | |
| weight=1, | |
| color=col, | |
| fill=True, | |
| fill_opacity=0.85, | |
| fill_color=col, | |
| popup=folium.Popup(popup_html, max_width=320), | |
| ).add_to(m) | |
| # 凡例 | |
| legend_html = _build_legend_html(cat_col, palette) | |
| m.get_root().html.add_child(Element(legend_html)) | |
| folium.LayerControl(position="topright").add_to(m) | |
| return m.get_root().render() | |
| def _rewrite_leaflet_cdn(html_text: str, host: str) -> str: | |
| """ | |
| Folium が出力する Leaflet の CDN(通常 jsDelivr)を必要に応じて置換。 | |
| SRI不整合を避けるため integrity/crossorigin を除去。 | |
| """ | |
| html_text = re.sub(r'\sintegrity="[^"]+"', "", html_text) | |
| html_text = re.sub(r'\scrossorigin="[^"]+"', "", html_text) | |
| if host == "jsdelivr": | |
| return html_text | |
| elif host == "cdnjs": | |
| html_text = html_text.replace( | |
| "https://cdn.jsdelivr.net/npm/leaflet@", "https://cdnjs.cloudflare.com/ajax/libs/leaflet/" | |
| ) | |
| html_text = html_text.replace("/dist/leaflet.css", "/leaflet.css") | |
| html_text = html_text.replace("/dist/leaflet.js", "/leaflet.js") | |
| return html_text | |
| elif host == "unpkg": | |
| html_text = html_text.replace( | |
| "https://cdn.jsdelivr.net/npm/", "https://unpkg.com/" | |
| ) | |
| return html_text | |
| else: | |
| return html_text | |
| def _save_map_html_file(html_text: str) -> str: | |
| """地図HTMLを実ファイルに保存(Gradio Fileに渡すパスを返す)""" | |
| fd, path = tempfile.mkstemp(suffix=".html") | |
| os.close(fd) | |
| with open(path, "w", encoding="utf-8") as f: | |
| f.write(html_text) | |
| return path | |
| # ---------------------------- | |
| # 実行パイプライン | |
| # ---------------------------- | |
| def _parse_indexer(x): | |
| try: | |
| return int(x) | |
| except Exception: | |
| return x | |
| def run(excel_file, sheet_name, header_row, address_col, power_col, category_col, use_inet, base_name, leaflet_cdn): | |
| # Excel 読み込み | |
| if excel_file is None or not hasattr(excel_file, "name"): | |
| table_df = pd.DataFrame(columns=["address_input", "発電出力(kW)", "lat", "lon", category_col or "発電設備区分"]) | |
| return ("Excelファイルを指定してください。", table_df, "", None) | |
| try: | |
| df = pd.read_excel(excel_file.name, sheet_name=sheet_name, header=int(header_row)) | |
| except Exception as e: | |
| empty_df = pd.DataFrame(columns=["address_input", "発電出力(kW)", "lat", "lon", category_col or "発電設備区分"]) | |
| return (f"Excel の読み込みに失敗しました: {e}", empty_df, "", None) | |
| # 列参照(番号/名前の両対応) | |
| addr_series = df.iloc[:, address_col] if isinstance(address_col, int) else df[address_col] | |
| cf_series = df.iloc[:, power_col] if isinstance(power_col, int) else df[power_col] | |
| # 区分列は任意 | |
| if category_col: | |
| try: | |
| cat_series = df.iloc[:, category_col] if isinstance(category_col, int) else df[category_col] | |
| except Exception: | |
| cat_series = pd.Series([np.nan] * len(df)) | |
| category_col = "発電設備区分" | |
| else: | |
| cat_series = pd.Series([np.nan] * len(df)) | |
| category_col = "発電設備区分" | |
| addresses = addr_series.astype(str).tolist() | |
| cfs = cf_series.tolist() | |
| # ジオコーディング(内部列名は CF のまま) | |
| geo_df = geocode_with_cache(addresses, cfs, use_internet=bool(use_inet)) | |
| # 表示用に列名を日本語化(地図生成には内部の CF を使用するので別DataFrame) | |
| table_df = geo_df.copy() | |
| table_df[category_col] = cat_series.values | |
| display_df = table_df.rename(columns={"CF": "発電出力(kW)"}) | |
| # 地図HTML生成 → CDN書換 → 実ファイル保存 → File出力 | |
| try: | |
| html_text = _build_folium_map_html(table_df, base_name=base_name, cat_col=category_col) # ← CF列は内部名のまま使用 | |
| html_text = _rewrite_leaflet_cdn(html_text, host=leaflet_cdn) | |
| map_file_path = _save_map_html_file(html_text) | |
| msg = ( | |
| "✅ 地図HTMLを生成しました。下の **地図HTMLファイル** をクリックして新規タブで開いてください。\n" | |
| "色=「発電設備区分」、サイズ=「発電出力(kW)」です。" | |
| ) | |
| info = f"ポイント数(有効座標): {int(display_df[['lat','lon']].dropna().shape[0])} / {len(display_df)}" | |
| # 表示テーブルの列順 | |
| disp_cols = ["address_input", "発電出力(kW)", "lat", "lon", category_col] | |
| return (msg, display_df[disp_cols], info, map_file_path) | |
| except Exception as e: | |
| disp_cols = ["address_input", "発電出力(kW)", "lat", "lon", category_col] | |
| return (f"地図描画に失敗しました: {e}", display_df[disp_cols], "", None) | |
| # ---------------------------- | |
| # Gradio UI | |
| # ---------------------------- | |
| with gr.Blocks(title="Excel住所 → Folium(無料タイル・File配信・区分色分け)") as demo: | |
| gr.Markdown( | |
| "## Excelの住所を国土地理院APIでジオコーディング → Folium(Leaflet)で地図表示(無料タイル・Mapbox不要)\n" | |
| "- 地図は **実ファイル(.html)** として配信します(CSPが厳しい環境でもOK)。\n" | |
| "- **色=発電設備区分、サイズ=発電出力(kW)**。タイル=地理院/OSM、CDNは必要に応じて切替できます。" | |
| ) | |
| with gr.Row(): | |
| xlsx_in = gr.File(label="Excelファイル(住所付き)", file_count="single", file_types=[".xlsx", ".xls"]) | |
| with gr.Row(): | |
| sheet = gr.Textbox(label="シート名", value="認定設備") | |
| header_row = gr.Number(label="ヘッダー行番号(0始まり)", value=2, precision=0) | |
| with gr.Row(): | |
| address_col = gr.Textbox(label="住所列(列名 or 0始まり列番号)", value="発電設備の所在地") | |
| power_col = gr.Textbox(label="発電出力(kW)の列(列名 or 0始まり列番号)", value="発電出力(kW)") | |
| category_col = gr.Textbox(label="区分列(色分け:列名 or 0始まり列番号)", value="発電設備区分") | |
| with gr.Row(): | |
| use_inet = gr.Checkbox(label="国土地理院APIに問い合わせ(オフでキャッシュのみ使用)", value=True) | |
| base_name = gr.Dropdown(choices=list(TILE_CATALOG.keys()), value="GSI 標準地図", label="ベースマップ") | |
| leaflet_cdn = gr.Dropdown( | |
| choices=["jsdelivr", "cdnjs", "unpkg"], value="jsdelivr", | |
| label="Leaflet CDN(遮断時に切替)" | |
| ) | |
| run_btn = gr.Button("描画") | |
| out_html = gr.HTML(label="案内メッセージ") | |
| out_table = gr.Dataframe(label="ジオコーディング結果(住所・発電出力(kW)・緯度・経度・区分)", wrap=True) | |
| out_info = gr.Textbox(label="メタ情報", lines=2) | |
| out_file = gr.File(label="地図HTMLファイル(クリックで開く/ダウンロード)") | |
| def _parse(x): | |
| try: | |
| return int(x) | |
| except Exception: | |
| return x | |
| def app_run(xls, s, h, a, p, c, inet, base, cdn): | |
| return run( | |
| xls, s, int(h), _parse(a), _parse(p), _parse(c), inet, base, cdn | |
| ) | |
| run_btn.click( | |
| fn=app_run, | |
| inputs=[xlsx_in, sheet, header_row, address_col, power_col, category_col, use_inet, base_name, leaflet_cdn], | |
| outputs=[out_html, out_table, out_info, out_file], | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch() | |