# app.py # pip install keplergl pandas numpy geopandas shapely gradio requests openpyxl import os import io import time import json import tempfile import requests import pandas as pd import numpy as np import geopandas as gpd from shapely.geometry import Point import gradio as gr from keplergl import KeplerGl # ---------------------------- # 設定 # ---------------------------- GSI_USER_AGENT = os.environ.get( "GSI_USER_AGENT", "jp-gsi-geocoding-demo (contact: your_email@example.com)" # 連絡先付き推奨 ) GSI_TIMEOUT_SEC = float(os.environ.get("GSI_TIMEOUT_SEC", "10")) # ★ sleep最小(0秒) GEOCODE_DELAY_SEC = float(os.environ.get("GSI_RATE_LIMIT_SEC", "0.0")) GSI_GEOCODE_URL = "https://msearch.gsi.go.jp/address-search/AddressSearch" CACHE_DIR = "data/cache" os.makedirs(CACHE_DIR, exist_ok=True) CACHE_PATH = os.path.join(CACHE_DIR, "geocode_cache.csv") # ---------------------------- # キャッシュ # ---------------------------- def load_cache(): if os.path.exists(CACHE_PATH): try: df = pd.read_csv(CACHE_PATH) if set(["address_input", "lat", "lon", "CF"]).issubset(df.columns): return df except Exception: pass return pd.DataFrame(columns=["address_input", "lat", "lon", "CF"]) def save_cache(df_cache): try: df_cache.to_csv(CACHE_PATH, index=False) except Exception: pass # ---------------------------- # 国土地理院 ジオコーダ # ---------------------------- def make_gsi_session() -> requests.Session: s = requests.Session() s.headers.update({"User-Agent": GSI_USER_AGENT}) return s def gsi_geocode_once(address: str, session: requests.Session) -> tuple[float, float]: """ 国土地理院 住所検索APIを1回呼び出し、(lat, lon) を返す。失敗時は (nan, nan)。 APIは [lon, lat] を返すので順を入れ替える。 """ try: if not address or str(address).strip() == "" or str(address).strip().lower() in ("nan", "none"): return (np.nan, np.nan) resp = session.get(GSI_GEOCODE_URL, params={"q": address}, timeout=GSI_TIMEOUT_SEC) if not resp.ok: return (np.nan, np.nan) data = resp.json() if isinstance(data, list) and len(data) > 0: feat = data[0] coords = (feat.get("geometry") or {}).get("coordinates") or [] if isinstance(coords, (list, tuple)) and len(coords) >= 2: lon, lat = float(coords[0]), float(coords[1]) return (lat, lon) except Exception: pass return (np.nan, np.nan) def geocode_with_cache(addresses, CFs, use_internet=True): cache = load_cache() cache_map = {row["address_input"]: (row["lat"], row["lon"], row["CF"]) for _, row in cache.iterrows()} results = [] session = make_gsi_session() if use_internet else None for a, cf in zip(addresses, CFs): a = "" if (a is None or (isinstance(a, float) and np.isnan(a))) else str(a).strip() cf = "" if (cf is None or (isinstance(cf, float) and np.isnan(cf))) else str(cf) # cache hit if a in cache_map: lat, lon, _cached_cf = cache_map[a] if pd.notna(lat) and pd.notna(lon): results.append({"address_input": a, "CF": cf, "lat": lat, "lon": lon}) continue if not use_internet: results.append({"address_input": a, "CF": cf, "lat": np.nan, "lon": np.nan}) continue lat, lon = gsi_geocode_once(a, session) # ★ 最小スリープ(デフォルト0.0秒) if GEOCODE_DELAY_SEC > 0: time.sleep(GEOCODE_DELAY_SEC) # キャッシュ更新 cache = cache[cache["address_input"] != a] cache = pd.concat( [cache, pd.DataFrame([{"address_input": a, "lat": lat, "lon": lon, "CF": cf}])], ignore_index=True ) save_cache(cache) results.append({"address_input": a, "CF": cf, "lat": lat, "lon": lon}) return pd.DataFrame(results) # ---------------------------- # Kepler.gl HTML 生成(ポイントのみ) # ---------------------------- def make_kepler_html(df_points: pd.DataFrame, height: int = 640) -> str: """ df_points は 'lat','lon','address_input','CF' を含む DataFrame を想定。 ポイントレイヤのみを Kepler.gl で描画し、HTMLを文字列で返す。 """ df_valid = df_points.dropna(subset=["lat", "lon"]).copy() if df_valid.empty: # 空のKeplerでもHTMLは返す m = KeplerGl(height=height) return m._repr_html_() # ほどよい初期中心 center_lat = float(df_valid["lat"].median()) center_lon = float(df_valid["lon"].median()) # Kepler 設定(ポイントレイヤのみ) config = { "version": "v1", "config": { "visState": { "filters": [], "layers": [ { "id": "point_layer", "type": "point", "config": { "dataId": "points", "label": "Points", "color": [18, 147, 154], "columns": {"lat": "lat", "lng": "lon"}, "isVisible": True, "visConfig": { "radius": 10, # 基本半径 "opacity": 0.9, "outline": False } }, "visualChannels": { # CF列が数値ならサイズに反映(なければ自動で固定半径) "sizeField": {"name": "CF", "type": "real"} if pd.to_numeric(df_valid.get("CF", pd.Series([])), errors="coerce").notna().any() else None, "sizeScale": "sqrt", }, } ], "interactionConfig": { "tooltip": { "enabled": True, "fieldsToShow": { "points": [ {"name": "address_input", "format": None}, {"name": "CF", "format": None}, {"name": "lat", "format": None}, {"name": "lon", "format": None} ] }, "compareMode": False, "compareType": "absolute" } }, "layerBlending": "normal" }, "mapState": { "bearing": 0, "pitch": 0, "latitude": center_lat, "longitude": center_lon, "zoom": 6 }, "mapStyle": { "styleType": "light", "topLayerGroups": {}, "visibleLayerGroups": {"label": True, "road": True, "border": False, "building": False, "water": True, "land": True} } } } m = KeplerGl(height=height, config=config) # Kepler は DataFrame の列名で自動解釈(lat/lon) m.add_data(data=df_valid[["lat", "lon", "address_input", "CF"]], name="points") # Gradioへは _repr_html_ をそのまま返すのが簡単 try: return m._repr_html_() except Exception: # 万一ノートブック外で不安定な場合はHTMLファイルを生成して読み戻す with tempfile.NamedTemporaryFile(suffix=".html", delete=False) as f: tmp = f.name m.save_to_html(file_name=tmp, read_only=True) with open(tmp, "r", encoding="utf-8") as fh: html = fh.read() return html # ---------------------------- # 実行パイプライン(ポイントのみ) # ---------------------------- def _parse_indexer(x): try: return int(x) except Exception: return x def run(excel_file, sheet_name, header_row, address_col, power_col, use_inet): # Excel 読み込み if excel_file is None or not hasattr(excel_file, "name"): table_df = pd.DataFrame(columns=["address_input", "CF", "lat", "lon"]) return "", table_df, "Excelファイルを指定してください。" try: df = pd.read_excel(excel_file.name, sheet_name=sheet_name, header=int(header_row)) except Exception as e: empty_df = pd.DataFrame(columns=["address_input", "CF", "lat", "lon"]) return "", empty_df, f"Excel の読み込みに失敗しました: {e}" addr_series = df.iloc[:, address_col] if isinstance(address_col, int) else df[address_col] cf_series = df.iloc[:, power_col] if isinstance(power_col, int) else df[power_col] addresses = addr_series.astype(str).tolist() cfs = cf_series.tolist() geo_df = geocode_with_cache(addresses, cfs, use_internet=bool(use_inet)) table_df = geo_df[["address_input", "CF", "lat", "lon"]].copy() # GeoDataFrame も一応整備(未使用だが将来の拡張用) geometry = [ Point(lon, lat) if (pd.notna(lat) and pd.notna(lon)) else None for lat, lon in zip(geo_df["lat"], geo_df["lon"]) ] gdf_pts = gpd.GeoDataFrame(geo_df, geometry=geometry, crs="EPSG:4326") # Kepler.gl(ポイントのみ) try: html = make_kepler_html(table_df, height=640) except Exception as e: html = f"
Kepler.gl描画に失敗しました: {e}
" # 情報(地物数のみ) info = [] info.append(f"ポイント数(有効座標): {int(gdf_pts.geometry.notnull().sum())} / {len(gdf_pts)}") return html, table_df, "\n".join(info) # ---------------------------- # Gradio UI(ポイントのみ) # ---------------------------- with gr.Blocks(title="Excel住所 → Kepler.gl(ポイントのみ)") as demo: gr.Markdown("## Excelの住所を国土地理院APIでジオコーディング → Kepler.gl に **ポイントのみ** を描画") with gr.Row(): xlsx_in = gr.File(label="Excelファイル(住所付き)", file_count="single", file_types=[".xlsx", ".xls"]) with gr.Row(): sheet = gr.Textbox(label="シート名", value="認定設備") header_row = gr.Number(label="ヘッダー行番号(0始まり)", value=2, precision=0) with gr.Row(): address_col = gr.Textbox(label="住所列(列名 or 0始まり列番号)", value="発電設備の所在地") power_col = gr.Textbox(label="数値列(任意:列名 or 0始まり列番号)", value="発電出力(kW)") with gr.Row(): use_inet = gr.Checkbox(label="国土地理院APIに問い合わせ(オフでキャッシュのみ使用)", value=True) run_btn = gr.Button("描画") out_html = gr.HTML(label="インタラクティブ地図(Kepler.gl:ポイントのみ)") out_table = gr.Dataframe(label="ジオコーディング結果(住所・緯度・経度・CF)", wrap=True) out_info = gr.Textbox(label="メタ情報", lines=2) def _parse(x): try: return int(x) except Exception: return x def app_run(xls, s, h, a, p, inet): return run( xls, s, int(h), _parse(a), _parse(p), inet ) run_btn.click( fn=app_run, inputs=[xlsx_in, sheet, header_row, address_col, power_col, use_inet], outputs=[out_html, out_table, out_info], ) if __name__ == "__main__": demo.launch()