Spaces:
Sleeping
Sleeping
| # app.py | |
| import os | |
| import io | |
| import time | |
| import requests | |
| import pandas as pd | |
| import numpy as np | |
| import geopandas as gpd | |
| import matplotlib.pyplot as plt | |
| from shapely.geometry import Point | |
| import folium | |
| import gradio as gr | |
| from PIL import Image | |
| # ---------------------------- | |
| # 設定 | |
| # ---------------------------- | |
| GSI_USER_AGENT = os.environ.get( | |
| "GSI_USER_AGENT", | |
| "jp-gsi-geocoding-demo (contact: your_email@example.com)" # 連絡先付き推奨 | |
| ) | |
| GSI_TIMEOUT_SEC = float(os.environ.get("GSI_TIMEOUT_SEC", "10")) | |
| GEOCODE_DELAY_SEC = float(os.environ.get("GSI_RATE_LIMIT_SEC", "0.5")) # マナーとして少し待機 | |
| GSI_GEOCODE_URL = "https://msearch.gsi.go.jp/address-search/AddressSearch" | |
| CACHE_DIR = "data/cache" | |
| os.makedirs(CACHE_DIR, exist_ok=True) | |
| CACHE_PATH = os.path.join(CACHE_DIR, "geocode_cache.csv") | |
| DEFAULT_ZIP = "data/japan_ver85.zip" | |
| # ---------------------------- | |
| # キャッシュ | |
| # ---------------------------- | |
| def load_cache(): | |
| if os.path.exists(CACHE_PATH): | |
| try: | |
| df = pd.read_csv(CACHE_PATH) | |
| if set(["address_input", "lat", "lon", "CF"]).issubset(df.columns): | |
| return df | |
| except Exception: | |
| pass | |
| return pd.DataFrame(columns=["address_input", "lat", "lon", "CF"]) | |
| def save_cache(df_cache): | |
| try: | |
| df_cache.to_csv(CACHE_PATH, index=False) | |
| except Exception: | |
| pass | |
| # ---------------------------- | |
| # Shapefile 読み込み | |
| # ---------------------------- | |
| def load_gdf_from_zip(zip_path: str) -> gpd.GeoDataFrame: | |
| gdf = gpd.read_file(f"zip://{zip_path}") # , engine="pyogrio" | |
| try: | |
| if gdf.crs: | |
| gdf = gdf.to_crs("EPSG:4326") | |
| except Exception: | |
| pass | |
| return gdf | |
| # ---------------------------- | |
| # 国土地理院 ジオコーダ | |
| # ---------------------------- | |
| def make_gsi_session() -> requests.Session: | |
| s = requests.Session() | |
| s.headers.update({"User-Agent": GSI_USER_AGENT}) | |
| return s | |
| def gsi_geocode_once(address: str, session: requests.Session) -> tuple[float, float]: | |
| """ | |
| 国土地理院 住所検索APIを1回呼び出し、(lat, lon) を返す。失敗時は (nan, nan)。 | |
| 返却座標は [lon, lat] なので順を入れ替えて返す。 | |
| """ | |
| try: | |
| # 空やnan文字列はスキップ | |
| if not address or address.strip() == "" or address.strip().lower() in ("nan", "none"): | |
| return (np.nan, np.nan) | |
| resp = session.get(GSI_GEOCODE_URL, params={"q": address}, timeout=GSI_TIMEOUT_SEC) | |
| if not resp.ok: | |
| return (np.nan, np.nan) | |
| data = resp.json() | |
| # 返り値は配列(候補リスト)。最上位候補を採用 | |
| if isinstance(data, list) and len(data) > 0: | |
| feat = data[0] | |
| coords = (feat.get("geometry") or {}).get("coordinates") or [] | |
| if isinstance(coords, (list, tuple)) and len(coords) >= 2: | |
| lon, lat = coords[0], coords[1] | |
| # 数値化チェック | |
| lat = float(lat) | |
| lon = float(lon) | |
| return (lat, lon) | |
| except Exception: | |
| pass | |
| return (np.nan, np.nan) | |
| def geocode_with_cache(addresses, CFs, use_internet=True): | |
| cache = load_cache() | |
| cache_map = {row["address_input"]: (row["lat"], row["lon"], row["CF"]) for _, row in cache.iterrows()} | |
| results = [] | |
| session = make_gsi_session() if use_internet else None | |
| for a, cf in zip(addresses, CFs): | |
| a = "" if (a is None or (isinstance(a, float) and np.isnan(a))) else str(a).strip() | |
| cf = "" if (cf is None or (isinstance(cf, float) and np.isnan(cf))) else str(cf) | |
| # cache hit | |
| if a in cache_map: | |
| lat, lon, _cached_cf = cache_map[a] | |
| if pd.notna(lat) and pd.notna(lon): | |
| results.append({"address_input": a, "CF": cf, "lat": lat, "lon": lon}) | |
| continue | |
| if not use_internet: | |
| results.append({"address_input": a, "CF": cf, "lat": np.nan, "lon": np.nan}) | |
| continue | |
| lat, lon = gsi_geocode_once(a, session) | |
| # マナーとして小休止 | |
| time.sleep(GEOCODE_DELAY_SEC) | |
| # キャッシュ更新 | |
| cache = cache[cache["address_input"] != a] | |
| cache = pd.concat( | |
| [cache, pd.DataFrame([{"address_input": a, "lat": lat, "lon": lon, "CF": cf}])], | |
| ignore_index=True | |
| ) | |
| save_cache(cache) | |
| results.append({"address_input": a, "CF": cf, "lat": lat, "lon": lon}) | |
| return pd.DataFrame(results) | |
| # ---------------------------- | |
| # 可視化(matplotlib) | |
| # ---------------------------- | |
| def plot_map_png( | |
| gdf_pref: gpd.GeoDataFrame, | |
| gdf_pts: gpd.GeoDataFrame, | |
| line_width: float = 0.6, | |
| marker_size: int = 24, | |
| legend_shrink: float = 0.6, | |
| legend_fontsize: int = 8, | |
| figsize=(7, 7), | |
| ) -> Image.Image: | |
| fig, ax = plt.subplots(figsize=figsize) | |
| gdf_pref.boundary.plot(ax=ax, linewidth=line_width, color="black") | |
| gdf_pts_valid = gdf_pts[gdf_pts.geometry.notnull()] | |
| if not gdf_pts_valid.empty: | |
| cf_num = pd.to_numeric( | |
| gdf_pts_valid.get("CF", pd.Series([np.nan]*len(gdf_pts_valid))), | |
| errors="coerce" | |
| ) | |
| gdf_pts_valid.assign(CF_num=cf_num).plot( | |
| ax=ax, | |
| column="CF_num", | |
| cmap="OrRd", | |
| markersize=max(2, int(marker_size)), | |
| alpha=0.85, | |
| legend=True, | |
| legend_kwds={"shrink": legend_shrink}, | |
| ) | |
| try: | |
| for _ax in fig.axes: | |
| if _ax is not ax: | |
| _ax.tick_params(labelsize=legend_fontsize) | |
| except Exception: | |
| pass | |
| ax.set_axis_off() | |
| plt.tight_layout() | |
| buf = io.BytesIO() | |
| fig.savefig(buf, format="png", dpi=200) | |
| plt.close(fig) | |
| buf.seek(0) | |
| return Image.open(buf) | |
| # ---------------------------- | |
| # 可視化(folium) | |
| # ---------------------------- | |
| def make_folium_html(gdf_pref: gpd.GeoDataFrame, gdf_pts: gpd.GeoDataFrame, marker_size: int = 24): | |
| gdf_pts_valid = gdf_pts[gdf_pts.geometry.notnull()] | |
| if not gdf_pts_valid.empty: | |
| center_lat = gdf_pts_valid.geometry.y.median() | |
| center_lon = gdf_pts_valid.geometry.x.median() | |
| zoom = 6 | |
| else: | |
| center_lat, center_lon, zoom = 35.6812, 139.7671, 5 | |
| m = folium.Map(location=[center_lat, center_lon], zoom_start=zoom) | |
| try: | |
| folium.GeoJson(gdf_pref.to_json(), name="prefecture").add_to(m) | |
| except Exception: | |
| pass | |
| circle_radius = max(3, int(marker_size // 3)) | |
| for _, r in gdf_pts_valid.iterrows(): | |
| lat, lon = r.geometry.y, r.geometry.x | |
| popup = f"{r.get('address_input','(no addr)')}<br>CF:{r.get('CF','')}" | |
| folium.CircleMarker( | |
| location=(float(lat), float(lon)), | |
| radius=circle_radius, | |
| fill=True, | |
| fill_opacity=0.9, | |
| popup=popup, | |
| ).add_to(m) | |
| return m._repr_html_() | |
| # ---------------------------- | |
| # 実行パイプライン | |
| # ---------------------------- | |
| def _parse_indexer(x): | |
| try: | |
| return int(x) | |
| except Exception: | |
| return x | |
| def run(zip_file, excel_file, sheet_name, header_row, address_col, power_col, | |
| use_inet, line_width, marker_size, legend_shrink, legend_fontsize): | |
| # Shapefile | |
| if zip_file is not None and hasattr(zip_file, "name") and os.path.exists(zip_file.name): | |
| zip_path = zip_file.name | |
| elif os.path.exists(DEFAULT_ZIP): | |
| zip_path = DEFAULT_ZIP | |
| else: | |
| empty_df = pd.DataFrame(columns=["address_input", "CF", "lat", "lon"]) | |
| return None, None, "", empty_df, "Shapefile の ZIP をアップロードするか、data/japan_ver85.zip を配置してください。" | |
| try: | |
| gdf_pref = load_gdf_from_zip(zip_path) | |
| except Exception as e: | |
| empty_df = pd.DataFrame(columns=["address_input", "CF", "lat", "lon"]) | |
| return None, None, "", empty_df, f"行政界の読み込みに失敗しました: {e}" | |
| # Excel→ジオコーディング | |
| if excel_file is None or not hasattr(excel_file, "name"): | |
| gdf_pts = gpd.GeoDataFrame(columns=["address_input", "CF", "lat", "lon"], geometry=[], crs="EPSG:4326") | |
| table_df = pd.DataFrame(columns=["address_input", "CF", "lat", "lon"]) | |
| else: | |
| try: | |
| df = pd.read_excel(excel_file.name, sheet_name=sheet_name, header=int(header_row)) | |
| except Exception as e: | |
| empty_df = pd.DataFrame(columns=["address_input", "CF", "lat", "lon"]) | |
| return None, None, "", empty_df, f"Excel の読み込みに失敗しました: {e}" | |
| addr_series = df.iloc[:, address_col] if isinstance(address_col, int) else df[address_col] | |
| cf_series = df.iloc[:, power_col] if isinstance(power_col, int) else df[power_col] | |
| addresses = addr_series.astype(str).tolist() | |
| cfs = cf_series.tolist() | |
| geo_df = geocode_with_cache(addresses, cfs, use_internet=bool(use_inet)) | |
| table_df = geo_df[["address_input", "CF", "lat", "lon"]].copy() | |
| geometry = [ | |
| Point(lon, lat) if (pd.notna(lat) and pd.notna(lon)) else None | |
| for lat, lon in zip(geo_df["lat"], geo_df["lon"]) | |
| ] | |
| gdf_pts = gpd.GeoDataFrame(geo_df, geometry=geometry, crs="EPSG:4326") | |
| # 図と地図 | |
| try: | |
| img = plot_map_png( | |
| gdf_pref, gdf_pts, | |
| line_width=float(line_width), | |
| marker_size=int(marker_size), | |
| legend_shrink=float(legend_shrink), | |
| legend_fontsize=int(legend_fontsize), | |
| ) | |
| except Exception as e: | |
| return None, None, "", table_df, f"静的描画に失敗しました: {e}" | |
| try: | |
| html = make_folium_html(gdf_pref, gdf_pts, marker_size=int(marker_size)) | |
| except Exception as e: | |
| html = f"<p>folium描画に失敗しました: {e}</p>" | |
| # 情報 | |
| info = [] | |
| info.append(f"都道府県レコード数: {len(gdf_pref)}") | |
| if gdf_pref.crs: | |
| info.append(f"PREF CRS: {gdf_pref.crs}") | |
| info.append(f"ポイント数(有効座標): {int(gdf_pts.geometry.notnull().sum())} / {len(gdf_pts)}") | |
| if not gdf_pts.empty and gdf_pts.crs: | |
| info.append(f"PTS CRS: {gdf_pts.crs}") | |
| return img, html, "\n".join(info), table_df, "" | |
| # ---------------------------- | |
| # Gradio UI | |
| # ---------------------------- | |
| with gr.Blocks(title="Japan Shapefile + Excel Geocoding Plotter (GSI)") as demo: | |
| gr.Markdown("## japan_ver85.shp(ZIP) + Excel住所 → 日本地図にプロット(凡例小・点大の調整可)") | |
| with gr.Row(): | |
| zip_in = gr.File(label="Shapefile (ZIP)", file_count="single", file_types=[".zip"]) | |
| xlsx_in = gr.File(label="Excelファイル(住所付き)", file_count="single", file_types=[".xlsx", ".xls"]) | |
| with gr.Row(): | |
| sheet = gr.Textbox(label="シート名", value="認定設備") | |
| header_row = gr.Number(label="ヘッダー行番号(0始まり)", value=2, precision=0) | |
| with gr.Row(): | |
| address_col = gr.Textbox(label="住所列(列名 or 0始まり列番号)", value="発電設備の所在地") | |
| power_col = gr.Textbox(label="数値列(任意:列名 or 0始まり列番号)", value="発電出力(kW)") | |
| with gr.Row(): | |
| use_inet = gr.Checkbox(label="国土地理院APIに問い合わせ(オフでキャッシュのみ使用)", value=True) | |
| line_width = gr.Slider(0.2, 2.0, value=0.6, step=0.1, label="境界線の太さ") | |
| # 見た目調整スライダ | |
| with gr.Row(): | |
| marker_size = gr.Slider(4, 64, value=24, step=2, label="ポイントサイズ(matplotlib / folium)") | |
| legend_shrink = gr.Slider(0.3, 1.0, value=0.6, step=0.05, label="凡例の縮小率(小さいほど小さく)") | |
| legend_fontsize = gr.Slider(6, 16, value=8, step=1, label="凡例の目盛フォントサイズ") | |
| run_btn = gr.Button("描画") | |
| out_img = gr.Image(label="静的地図(matplotlib)", type="pil") | |
| out_html = gr.HTML(label="インタラクティブ地図(folium)") | |
| out_info = gr.Textbox(label="メタ情報", lines=4) | |
| out_table = gr.Dataframe(label="ジオコーディング結果(住所・緯度・経度・CF)", wrap=True) | |
| out_err = gr.Markdown(label="エラー", visible=True) | |
| def _parse(x): | |
| try: | |
| return int(x) | |
| except Exception: | |
| return x | |
| def app_run(zipf, xls, s, h, a, p, inet, lw, ms, lsh, lfs): | |
| return run( | |
| zipf, xls, s, int(h), _parse(a), _parse(p), | |
| inet, lw, ms, lsh, lfs | |
| ) | |
| run_btn.click( | |
| fn=app_run, | |
| inputs=[zip_in, xlsx_in, sheet, header_row, address_col, power_col, use_inet, line_width, marker_size, legend_shrink, legend_fontsize], | |
| outputs=[out_img, out_html, out_info, out_table, out_err], | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch() | |