# app.py import os import io import time import requests import pandas as pd import numpy as np import geopandas as gpd import matplotlib.pyplot as plt from shapely.geometry import Point import folium import gradio as gr from PIL import Image # ---------------------------- # 設定 # ---------------------------- GSI_USER_AGENT = os.environ.get( "GSI_USER_AGENT", "jp-gsi-geocoding-demo (contact: your_email@example.com)" # 連絡先付き推奨 ) GSI_TIMEOUT_SEC = float(os.environ.get("GSI_TIMEOUT_SEC", "10")) GEOCODE_DELAY_SEC = float(os.environ.get("GSI_RATE_LIMIT_SEC", "0.5")) # マナーとして少し待機 GSI_GEOCODE_URL = "https://msearch.gsi.go.jp/address-search/AddressSearch" CACHE_DIR = "data/cache" os.makedirs(CACHE_DIR, exist_ok=True) CACHE_PATH = os.path.join(CACHE_DIR, "geocode_cache.csv") DEFAULT_ZIP = "data/japan_ver85.zip" # ---------------------------- # キャッシュ # ---------------------------- def load_cache(): if os.path.exists(CACHE_PATH): try: df = pd.read_csv(CACHE_PATH) if set(["address_input", "lat", "lon", "CF"]).issubset(df.columns): return df except Exception: pass return pd.DataFrame(columns=["address_input", "lat", "lon", "CF"]) def save_cache(df_cache): try: df_cache.to_csv(CACHE_PATH, index=False) except Exception: pass # ---------------------------- # Shapefile 読み込み # ---------------------------- def load_gdf_from_zip(zip_path: str) -> gpd.GeoDataFrame: gdf = gpd.read_file(f"zip://{zip_path}") # , engine="pyogrio" try: if gdf.crs: gdf = gdf.to_crs("EPSG:4326") except Exception: pass return gdf # ---------------------------- # 国土地理院 ジオコーダ # ---------------------------- def make_gsi_session() -> requests.Session: s = requests.Session() s.headers.update({"User-Agent": GSI_USER_AGENT}) return s def gsi_geocode_once(address: str, session: requests.Session) -> tuple[float, float]: """ 国土地理院 住所検索APIを1回呼び出し、(lat, lon) を返す。失敗時は (nan, nan)。 返却座標は [lon, lat] なので順を入れ替えて返す。 """ try: # 空やnan文字列はスキップ if not address or address.strip() == "" or address.strip().lower() in ("nan", "none"): return (np.nan, np.nan) resp = session.get(GSI_GEOCODE_URL, params={"q": address}, timeout=GSI_TIMEOUT_SEC) if not resp.ok: return (np.nan, np.nan) data = resp.json() # 返り値は配列(候補リスト)。最上位候補を採用 if isinstance(data, list) and len(data) > 0: feat = data[0] coords = (feat.get("geometry") or {}).get("coordinates") or [] if isinstance(coords, (list, tuple)) and len(coords) >= 2: lon, lat = coords[0], coords[1] # 数値化チェック lat = float(lat) lon = float(lon) return (lat, lon) except Exception: pass return (np.nan, np.nan) def geocode_with_cache(addresses, CFs, use_internet=True): cache = load_cache() cache_map = {row["address_input"]: (row["lat"], row["lon"], row["CF"]) for _, row in cache.iterrows()} results = [] session = make_gsi_session() if use_internet else None for a, cf in zip(addresses, CFs): a = "" if (a is None or (isinstance(a, float) and np.isnan(a))) else str(a).strip() cf = "" if (cf is None or (isinstance(cf, float) and np.isnan(cf))) else str(cf) # cache hit if a in cache_map: lat, lon, _cached_cf = cache_map[a] if pd.notna(lat) and pd.notna(lon): results.append({"address_input": a, "CF": cf, "lat": lat, "lon": lon}) continue if not use_internet: results.append({"address_input": a, "CF": cf, "lat": np.nan, "lon": np.nan}) continue lat, lon = gsi_geocode_once(a, session) # マナーとして小休止 time.sleep(GEOCODE_DELAY_SEC) # キャッシュ更新 cache = cache[cache["address_input"] != a] cache = pd.concat( [cache, pd.DataFrame([{"address_input": a, "lat": lat, "lon": lon, "CF": cf}])], ignore_index=True ) save_cache(cache) results.append({"address_input": a, "CF": cf, "lat": lat, "lon": lon}) return pd.DataFrame(results) # ---------------------------- # 可視化(matplotlib) # ---------------------------- def plot_map_png( gdf_pref: gpd.GeoDataFrame, gdf_pts: gpd.GeoDataFrame, line_width: float = 0.6, marker_size: int = 24, legend_shrink: float = 0.6, legend_fontsize: int = 8, figsize=(7, 7), ) -> Image.Image: fig, ax = plt.subplots(figsize=figsize) gdf_pref.boundary.plot(ax=ax, linewidth=line_width, color="black") gdf_pts_valid = gdf_pts[gdf_pts.geometry.notnull()] if not gdf_pts_valid.empty: cf_num = pd.to_numeric( gdf_pts_valid.get("CF", pd.Series([np.nan]*len(gdf_pts_valid))), errors="coerce" ) gdf_pts_valid.assign(CF_num=cf_num).plot( ax=ax, column="CF_num", cmap="OrRd", markersize=max(2, int(marker_size)), alpha=0.85, legend=True, legend_kwds={"shrink": legend_shrink}, ) try: for _ax in fig.axes: if _ax is not ax: _ax.tick_params(labelsize=legend_fontsize) except Exception: pass ax.set_axis_off() plt.tight_layout() buf = io.BytesIO() fig.savefig(buf, format="png", dpi=200) plt.close(fig) buf.seek(0) return Image.open(buf) # ---------------------------- # 可視化(folium) # ---------------------------- def make_folium_html(gdf_pref: gpd.GeoDataFrame, gdf_pts: gpd.GeoDataFrame, marker_size: int = 24): gdf_pts_valid = gdf_pts[gdf_pts.geometry.notnull()] if not gdf_pts_valid.empty: center_lat = gdf_pts_valid.geometry.y.median() center_lon = gdf_pts_valid.geometry.x.median() zoom = 6 else: center_lat, center_lon, zoom = 35.6812, 139.7671, 5 m = folium.Map(location=[center_lat, center_lon], zoom_start=zoom) try: folium.GeoJson(gdf_pref.to_json(), name="prefecture").add_to(m) except Exception: pass circle_radius = max(3, int(marker_size // 3)) for _, r in gdf_pts_valid.iterrows(): lat, lon = r.geometry.y, r.geometry.x popup = f"{r.get('address_input','(no addr)')}
CF:{r.get('CF','')}" folium.CircleMarker( location=(float(lat), float(lon)), radius=circle_radius, fill=True, fill_opacity=0.9, popup=popup, ).add_to(m) return m._repr_html_() # ---------------------------- # 実行パイプライン # ---------------------------- def _parse_indexer(x): try: return int(x) except Exception: return x def run(zip_file, excel_file, sheet_name, header_row, address_col, power_col, use_inet, line_width, marker_size, legend_shrink, legend_fontsize): # Shapefile if zip_file is not None and hasattr(zip_file, "name") and os.path.exists(zip_file.name): zip_path = zip_file.name elif os.path.exists(DEFAULT_ZIP): zip_path = DEFAULT_ZIP else: empty_df = pd.DataFrame(columns=["address_input", "CF", "lat", "lon"]) return None, None, "", empty_df, "Shapefile の ZIP をアップロードするか、data/japan_ver85.zip を配置してください。" try: gdf_pref = load_gdf_from_zip(zip_path) except Exception as e: empty_df = pd.DataFrame(columns=["address_input", "CF", "lat", "lon"]) return None, None, "", empty_df, f"行政界の読み込みに失敗しました: {e}" # Excel→ジオコーディング if excel_file is None or not hasattr(excel_file, "name"): gdf_pts = gpd.GeoDataFrame(columns=["address_input", "CF", "lat", "lon"], geometry=[], crs="EPSG:4326") table_df = pd.DataFrame(columns=["address_input", "CF", "lat", "lon"]) else: try: df = pd.read_excel(excel_file.name, sheet_name=sheet_name, header=int(header_row)) except Exception as e: empty_df = pd.DataFrame(columns=["address_input", "CF", "lat", "lon"]) return None, None, "", empty_df, f"Excel の読み込みに失敗しました: {e}" addr_series = df.iloc[:, address_col] if isinstance(address_col, int) else df[address_col] cf_series = df.iloc[:, power_col] if isinstance(power_col, int) else df[power_col] addresses = addr_series.astype(str).tolist() cfs = cf_series.tolist() geo_df = geocode_with_cache(addresses, cfs, use_internet=bool(use_inet)) table_df = geo_df[["address_input", "CF", "lat", "lon"]].copy() geometry = [ Point(lon, lat) if (pd.notna(lat) and pd.notna(lon)) else None for lat, lon in zip(geo_df["lat"], geo_df["lon"]) ] gdf_pts = gpd.GeoDataFrame(geo_df, geometry=geometry, crs="EPSG:4326") # 図と地図 try: img = plot_map_png( gdf_pref, gdf_pts, line_width=float(line_width), marker_size=int(marker_size), legend_shrink=float(legend_shrink), legend_fontsize=int(legend_fontsize), ) except Exception as e: return None, None, "", table_df, f"静的描画に失敗しました: {e}" try: html = make_folium_html(gdf_pref, gdf_pts, marker_size=int(marker_size)) except Exception as e: html = f"

folium描画に失敗しました: {e}

" # 情報 info = [] info.append(f"都道府県レコード数: {len(gdf_pref)}") if gdf_pref.crs: info.append(f"PREF CRS: {gdf_pref.crs}") info.append(f"ポイント数(有効座標): {int(gdf_pts.geometry.notnull().sum())} / {len(gdf_pts)}") if not gdf_pts.empty and gdf_pts.crs: info.append(f"PTS CRS: {gdf_pts.crs}") return img, html, "\n".join(info), table_df, "" # ---------------------------- # Gradio UI # ---------------------------- with gr.Blocks(title="Japan Shapefile + Excel Geocoding Plotter (GSI)") as demo: gr.Markdown("## japan_ver85.shp(ZIP) + Excel住所 → 日本地図にプロット(凡例小・点大の調整可)") with gr.Row(): zip_in = gr.File(label="Shapefile (ZIP)", file_count="single", file_types=[".zip"]) xlsx_in = gr.File(label="Excelファイル(住所付き)", file_count="single", file_types=[".xlsx", ".xls"]) with gr.Row(): sheet = gr.Textbox(label="シート名", value="認定設備") header_row = gr.Number(label="ヘッダー行番号(0始まり)", value=2, precision=0) with gr.Row(): address_col = gr.Textbox(label="住所列(列名 or 0始まり列番号)", value="発電設備の所在地") power_col = gr.Textbox(label="数値列(任意:列名 or 0始まり列番号)", value="発電出力(kW)") with gr.Row(): use_inet = gr.Checkbox(label="国土地理院APIに問い合わせ(オフでキャッシュのみ使用)", value=True) line_width = gr.Slider(0.2, 2.0, value=0.6, step=0.1, label="境界線の太さ") # 見た目調整スライダ with gr.Row(): marker_size = gr.Slider(4, 64, value=24, step=2, label="ポイントサイズ(matplotlib / folium)") legend_shrink = gr.Slider(0.3, 1.0, value=0.6, step=0.05, label="凡例の縮小率(小さいほど小さく)") legend_fontsize = gr.Slider(6, 16, value=8, step=1, label="凡例の目盛フォントサイズ") run_btn = gr.Button("描画") out_img = gr.Image(label="静的地図(matplotlib)", type="pil") out_html = gr.HTML(label="インタラクティブ地図(folium)") out_info = gr.Textbox(label="メタ情報", lines=4) out_table = gr.Dataframe(label="ジオコーディング結果(住所・緯度・経度・CF)", wrap=True) out_err = gr.Markdown(label="エラー", visible=True) def _parse(x): try: return int(x) except Exception: return x def app_run(zipf, xls, s, h, a, p, inet, lw, ms, lsh, lfs): return run( zipf, xls, s, int(h), _parse(a), _parse(p), inet, lw, ms, lsh, lfs ) run_btn.click( fn=app_run, inputs=[zip_in, xlsx_in, sheet, header_row, address_col, power_col, use_inet, line_width, marker_size, legend_shrink, legend_fontsize], outputs=[out_img, out_html, out_info, out_table, out_err], ) if __name__ == "__main__": demo.launch()