Spaces:
Sleeping
Sleeping
| # app.py | |
| # pip install keplergl pandas numpy geopandas shapely gradio requests openpyxl | |
| import os | |
| import io | |
| import time | |
| import json | |
| import tempfile | |
| import requests | |
| import pandas as pd | |
| import numpy as np | |
| import geopandas as gpd | |
| from shapely.geometry import Point | |
| import gradio as gr | |
| from keplergl import KeplerGl | |
| # ---------------------------- | |
| # 設定 | |
| # ---------------------------- | |
| GSI_USER_AGENT = os.environ.get( | |
| "GSI_USER_AGENT", | |
| "jp-gsi-geocoding-demo (contact: your_email@example.com)" # 連絡先付き推奨 | |
| ) | |
| GSI_TIMEOUT_SEC = float(os.environ.get("GSI_TIMEOUT_SEC", "10")) | |
| # ★ sleep最小(0秒) | |
| GEOCODE_DELAY_SEC = float(os.environ.get("GSI_RATE_LIMIT_SEC", "0.0")) | |
| GSI_GEOCODE_URL = "https://msearch.gsi.go.jp/address-search/AddressSearch" | |
| CACHE_DIR = "data/cache" | |
| os.makedirs(CACHE_DIR, exist_ok=True) | |
| CACHE_PATH = os.path.join(CACHE_DIR, "geocode_cache.csv") | |
| # ---------------------------- | |
| # キャッシュ | |
| # ---------------------------- | |
| def load_cache(): | |
| if os.path.exists(CACHE_PATH): | |
| try: | |
| df = pd.read_csv(CACHE_PATH) | |
| if set(["address_input", "lat", "lon", "CF"]).issubset(df.columns): | |
| return df | |
| except Exception: | |
| pass | |
| return pd.DataFrame(columns=["address_input", "lat", "lon", "CF"]) | |
| def save_cache(df_cache): | |
| try: | |
| df_cache.to_csv(CACHE_PATH, index=False) | |
| except Exception: | |
| pass | |
| # ---------------------------- | |
| # 国土地理院 ジオコーダ | |
| # ---------------------------- | |
| def make_gsi_session() -> requests.Session: | |
| s = requests.Session() | |
| s.headers.update({"User-Agent": GSI_USER_AGENT}) | |
| return s | |
| def gsi_geocode_once(address: str, session: requests.Session) -> tuple[float, float]: | |
| """ | |
| 国土地理院 住所検索APIを1回呼び出し、(lat, lon) を返す。失敗時は (nan, nan)。 | |
| APIは [lon, lat] を返すので順を入れ替える。 | |
| """ | |
| try: | |
| if not address or str(address).strip() == "" or str(address).strip().lower() in ("nan", "none"): | |
| return (np.nan, np.nan) | |
| resp = session.get(GSI_GEOCODE_URL, params={"q": address}, timeout=GSI_TIMEOUT_SEC) | |
| if not resp.ok: | |
| return (np.nan, np.nan) | |
| data = resp.json() | |
| if isinstance(data, list) and len(data) > 0: | |
| feat = data[0] | |
| coords = (feat.get("geometry") or {}).get("coordinates") or [] | |
| if isinstance(coords, (list, tuple)) and len(coords) >= 2: | |
| lon, lat = float(coords[0]), float(coords[1]) | |
| return (lat, lon) | |
| except Exception: | |
| pass | |
| return (np.nan, np.nan) | |
| def geocode_with_cache(addresses, CFs, use_internet=True): | |
| cache = load_cache() | |
| cache_map = {row["address_input"]: (row["lat"], row["lon"], row["CF"]) for _, row in cache.iterrows()} | |
| results = [] | |
| session = make_gsi_session() if use_internet else None | |
| for a, cf in zip(addresses, CFs): | |
| a = "" if (a is None or (isinstance(a, float) and np.isnan(a))) else str(a).strip() | |
| cf = "" if (cf is None or (isinstance(cf, float) and np.isnan(cf))) else str(cf) | |
| # cache hit | |
| if a in cache_map: | |
| lat, lon, _cached_cf = cache_map[a] | |
| if pd.notna(lat) and pd.notna(lon): | |
| results.append({"address_input": a, "CF": cf, "lat": lat, "lon": lon}) | |
| continue | |
| if not use_internet: | |
| results.append({"address_input": a, "CF": cf, "lat": np.nan, "lon": np.nan}) | |
| continue | |
| lat, lon = gsi_geocode_once(a, session) | |
| # ★ 最小スリープ(デフォルト0.0秒) | |
| if GEOCODE_DELAY_SEC > 0: | |
| time.sleep(GEOCODE_DELAY_SEC) | |
| # キャッシュ更新 | |
| cache = cache[cache["address_input"] != a] | |
| cache = pd.concat( | |
| [cache, pd.DataFrame([{"address_input": a, "lat": lat, "lon": lon, "CF": cf}])], | |
| ignore_index=True | |
| ) | |
| save_cache(cache) | |
| results.append({"address_input": a, "CF": cf, "lat": lat, "lon": lon}) | |
| return pd.DataFrame(results) | |
| # ---------------------------- | |
| # Kepler.gl HTML 生成(ポイントのみ) | |
| # ---------------------------- | |
| def make_kepler_html(df_points: pd.DataFrame, height: int = 640) -> str: | |
| """ | |
| df_points は 'lat','lon','address_input','CF' を含む DataFrame を想定。 | |
| ポイントレイヤのみを Kepler.gl で描画し、HTMLを文字列で返す。 | |
| """ | |
| df_valid = df_points.dropna(subset=["lat", "lon"]).copy() | |
| if df_valid.empty: | |
| # 空のKeplerでもHTMLは返す | |
| m = KeplerGl(height=height) | |
| return m._repr_html_() | |
| # ほどよい初期中心 | |
| center_lat = float(df_valid["lat"].median()) | |
| center_lon = float(df_valid["lon"].median()) | |
| # Kepler 設定(ポイントレイヤのみ) | |
| config = { | |
| "version": "v1", | |
| "config": { | |
| "visState": { | |
| "filters": [], | |
| "layers": [ | |
| { | |
| "id": "point_layer", | |
| "type": "point", | |
| "config": { | |
| "dataId": "points", | |
| "label": "Points", | |
| "color": [18, 147, 154], | |
| "columns": {"lat": "lat", "lng": "lon"}, | |
| "isVisible": True, | |
| "visConfig": { | |
| "radius": 10, # 基本半径 | |
| "opacity": 0.9, | |
| "outline": False | |
| } | |
| }, | |
| "visualChannels": { | |
| # CF列が数値ならサイズに反映(なければ自動で固定半径) | |
| "sizeField": {"name": "CF", "type": "real"} if pd.to_numeric(df_valid.get("CF", pd.Series([])), errors="coerce").notna().any() else None, | |
| "sizeScale": "sqrt", | |
| }, | |
| } | |
| ], | |
| "interactionConfig": { | |
| "tooltip": { | |
| "enabled": True, | |
| "fieldsToShow": { | |
| "points": [ {"name": "address_input", "format": None}, | |
| {"name": "CF", "format": None}, | |
| {"name": "lat", "format": None}, | |
| {"name": "lon", "format": None} ] | |
| }, | |
| "compareMode": False, | |
| "compareType": "absolute" | |
| } | |
| }, | |
| "layerBlending": "normal" | |
| }, | |
| "mapState": { | |
| "bearing": 0, | |
| "pitch": 0, | |
| "latitude": center_lat, | |
| "longitude": center_lon, | |
| "zoom": 6 | |
| }, | |
| "mapStyle": { | |
| "styleType": "light", | |
| "topLayerGroups": {}, | |
| "visibleLayerGroups": {"label": True, "road": True, "border": False, "building": False, "water": True, "land": True} | |
| } | |
| } | |
| } | |
| m = KeplerGl(height=height, config=config) | |
| # Kepler は DataFrame の列名で自動解釈(lat/lon) | |
| m.add_data(data=df_valid[["lat", "lon", "address_input", "CF"]], name="points") | |
| # Gradioへは _repr_html_ をそのまま返すのが簡単 | |
| try: | |
| return m._repr_html_() | |
| except Exception: | |
| # 万一ノートブック外で不安定な場合はHTMLファイルを生成して読み戻す | |
| with tempfile.NamedTemporaryFile(suffix=".html", delete=False) as f: | |
| tmp = f.name | |
| m.save_to_html(file_name=tmp, read_only=True) | |
| with open(tmp, "r", encoding="utf-8") as fh: | |
| html = fh.read() | |
| return html | |
| # ---------------------------- | |
| # 実行パイプライン(ポイントのみ) | |
| # ---------------------------- | |
| def _parse_indexer(x): | |
| try: | |
| return int(x) | |
| except Exception: | |
| return x | |
| def run(excel_file, sheet_name, header_row, address_col, power_col, use_inet): | |
| # Excel 読み込み | |
| if excel_file is None or not hasattr(excel_file, "name"): | |
| table_df = pd.DataFrame(columns=["address_input", "CF", "lat", "lon"]) | |
| return "", table_df, "Excelファイルを指定してください。" | |
| try: | |
| df = pd.read_excel(excel_file.name, sheet_name=sheet_name, header=int(header_row)) | |
| except Exception as e: | |
| empty_df = pd.DataFrame(columns=["address_input", "CF", "lat", "lon"]) | |
| return "", empty_df, f"Excel の読み込みに失敗しました: {e}" | |
| addr_series = df.iloc[:, address_col] if isinstance(address_col, int) else df[address_col] | |
| cf_series = df.iloc[:, power_col] if isinstance(power_col, int) else df[power_col] | |
| addresses = addr_series.astype(str).tolist() | |
| cfs = cf_series.tolist() | |
| geo_df = geocode_with_cache(addresses, cfs, use_internet=bool(use_inet)) | |
| table_df = geo_df[["address_input", "CF", "lat", "lon"]].copy() | |
| # GeoDataFrame も一応整備(未使用だが将来の拡張用) | |
| geometry = [ | |
| Point(lon, lat) if (pd.notna(lat) and pd.notna(lon)) else None | |
| for lat, lon in zip(geo_df["lat"], geo_df["lon"]) | |
| ] | |
| gdf_pts = gpd.GeoDataFrame(geo_df, geometry=geometry, crs="EPSG:4326") | |
| # Kepler.gl(ポイントのみ) | |
| try: | |
| html = make_kepler_html(table_df, height=640) | |
| except Exception as e: | |
| html = f"<p>Kepler.gl描画に失敗しました: {e}</p>" | |
| # 情報(地物数のみ) | |
| info = [] | |
| info.append(f"ポイント数(有効座標): {int(gdf_pts.geometry.notnull().sum())} / {len(gdf_pts)}") | |
| return html, table_df, "\n".join(info) | |
| # ---------------------------- | |
| # Gradio UI(ポイントのみ) | |
| # ---------------------------- | |
| with gr.Blocks(title="Excel住所 → Kepler.gl(ポイントのみ)") as demo: | |
| gr.Markdown("## Excelの住所を国土地理院APIでジオコーディング → Kepler.gl に **ポイントのみ** を描画") | |
| with gr.Row(): | |
| xlsx_in = gr.File(label="Excelファイル(住所付き)", file_count="single", file_types=[".xlsx", ".xls"]) | |
| with gr.Row(): | |
| sheet = gr.Textbox(label="シート名", value="認定設備") | |
| header_row = gr.Number(label="ヘッダー行番号(0始まり)", value=2, precision=0) | |
| with gr.Row(): | |
| address_col = gr.Textbox(label="住所列(列名 or 0始まり列番号)", value="発電設備の所在地") | |
| power_col = gr.Textbox(label="数値列(任意:列名 or 0始まり列番号)", value="発電出力(kW)") | |
| with gr.Row(): | |
| use_inet = gr.Checkbox(label="国土地理院APIに問い合わせ(オフでキャッシュのみ使用)", value=True) | |
| run_btn = gr.Button("描画") | |
| out_html = gr.HTML(label="インタラクティブ地図(Kepler.gl:ポイントのみ)") | |
| out_table = gr.Dataframe(label="ジオコーディング結果(住所・緯度・経度・CF)", wrap=True) | |
| out_info = gr.Textbox(label="メタ情報", lines=2) | |
| def _parse(x): | |
| try: | |
| return int(x) | |
| except Exception: | |
| return x | |
| def app_run(xls, s, h, a, p, inet): | |
| return run( | |
| xls, s, int(h), _parse(a), _parse(p), inet | |
| ) | |
| run_btn.click( | |
| fn=app_run, | |
| inputs=[xlsx_in, sheet, header_row, address_col, power_col, use_inet], | |
| outputs=[out_html, out_table, out_info], | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch() | |