Spaces:
Sleeping
Sleeping
File size: 12,943 Bytes
a5f11f1 9f2e9b8 a5f11f1 1f9c86b ceee7a4 6001575 a5f11f1 ceee7a4 a5f11f1 ceee7a4 7d61895 1f9c86b ceee7a4 1f9c86b ceee7a4 5db2c30 ceee7a4 7d61895 ceee7a4 5db2c30 a5f11f1 7d61895 a5f11f1 7d61895 1f9c86b 7d61895 1f9c86b 55c0b39 1f9c86b 55c0b39 1f9c86b 55c0b39 1f9c86b 55c0b39 1f9c86b 55c0b39 7d61895 55c0b39 7d61895 55c0b39 7d61895 1f9c86b 7d61895 1f9c86b 55c0b39 7d61895 55c0b39 7d61895 55c0b39 7d61895 55c0b39 1f9c86b 55c0b39 7d61895 55c0b39 7d61895 55c0b39 7d61895 55c0b39 7d61895 55c0b39 7d61895 55c0b39 7d61895 55c0b39 7d61895 55c0b39 7d61895 1f9c86b 7d61895 55c0b39 1f9c86b 55c0b39 1f9c86b 7d61895 55c0b39 7d61895 55c0b39 7d61895 55c0b39 7d61895 55c0b39 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 |
# app.py
import os
import io
import time
import requests
import pandas as pd
import numpy as np
import geopandas as gpd
import matplotlib.pyplot as plt
from shapely.geometry import Point
import folium
import gradio as gr
from PIL import Image
# ----------------------------
# 設定
# ----------------------------
GSI_USER_AGENT = os.environ.get(
"GSI_USER_AGENT",
"jp-gsi-geocoding-demo (contact: your_email@example.com)" # 連絡先付き推奨
)
GSI_TIMEOUT_SEC = float(os.environ.get("GSI_TIMEOUT_SEC", "10"))
GEOCODE_DELAY_SEC = float(os.environ.get("GSI_RATE_LIMIT_SEC", "0.5")) # マナーとして少し待機
GSI_GEOCODE_URL = "https://msearch.gsi.go.jp/address-search/AddressSearch"
CACHE_DIR = "data/cache"
os.makedirs(CACHE_DIR, exist_ok=True)
CACHE_PATH = os.path.join(CACHE_DIR, "geocode_cache.csv")
DEFAULT_ZIP = "data/japan_ver85.zip"
# ----------------------------
# キャッシュ
# ----------------------------
def load_cache():
if os.path.exists(CACHE_PATH):
try:
df = pd.read_csv(CACHE_PATH)
if set(["address_input", "lat", "lon", "CF"]).issubset(df.columns):
return df
except Exception:
pass
return pd.DataFrame(columns=["address_input", "lat", "lon", "CF"])
def save_cache(df_cache):
try:
df_cache.to_csv(CACHE_PATH, index=False)
except Exception:
pass
# ----------------------------
# Shapefile 読み込み
# ----------------------------
def load_gdf_from_zip(zip_path: str) -> gpd.GeoDataFrame:
gdf = gpd.read_file(f"zip://{zip_path}") # , engine="pyogrio"
try:
if gdf.crs:
gdf = gdf.to_crs("EPSG:4326")
except Exception:
pass
return gdf
# ----------------------------
# 国土地理院 ジオコーダ
# ----------------------------
def make_gsi_session() -> requests.Session:
s = requests.Session()
s.headers.update({"User-Agent": GSI_USER_AGENT})
return s
def gsi_geocode_once(address: str, session: requests.Session) -> tuple[float, float]:
"""
国土地理院 住所検索APIを1回呼び出し、(lat, lon) を返す。失敗時は (nan, nan)。
返却座標は [lon, lat] なので順を入れ替えて返す。
"""
try:
# 空やnan文字列はスキップ
if not address or address.strip() == "" or address.strip().lower() in ("nan", "none"):
return (np.nan, np.nan)
resp = session.get(GSI_GEOCODE_URL, params={"q": address}, timeout=GSI_TIMEOUT_SEC)
if not resp.ok:
return (np.nan, np.nan)
data = resp.json()
# 返り値は配列(候補リスト)。最上位候補を採用
if isinstance(data, list) and len(data) > 0:
feat = data[0]
coords = (feat.get("geometry") or {}).get("coordinates") or []
if isinstance(coords, (list, tuple)) and len(coords) >= 2:
lon, lat = coords[0], coords[1]
# 数値化チェック
lat = float(lat)
lon = float(lon)
return (lat, lon)
except Exception:
pass
return (np.nan, np.nan)
def geocode_with_cache(addresses, CFs, use_internet=True):
cache = load_cache()
cache_map = {row["address_input"]: (row["lat"], row["lon"], row["CF"]) for _, row in cache.iterrows()}
results = []
session = make_gsi_session() if use_internet else None
for a, cf in zip(addresses, CFs):
a = "" if (a is None or (isinstance(a, float) and np.isnan(a))) else str(a).strip()
cf = "" if (cf is None or (isinstance(cf, float) and np.isnan(cf))) else str(cf)
# cache hit
if a in cache_map:
lat, lon, _cached_cf = cache_map[a]
if pd.notna(lat) and pd.notna(lon):
results.append({"address_input": a, "CF": cf, "lat": lat, "lon": lon})
continue
if not use_internet:
results.append({"address_input": a, "CF": cf, "lat": np.nan, "lon": np.nan})
continue
lat, lon = gsi_geocode_once(a, session)
# マナーとして小休止
time.sleep(GEOCODE_DELAY_SEC)
# キャッシュ更新
cache = cache[cache["address_input"] != a]
cache = pd.concat(
[cache, pd.DataFrame([{"address_input": a, "lat": lat, "lon": lon, "CF": cf}])],
ignore_index=True
)
save_cache(cache)
results.append({"address_input": a, "CF": cf, "lat": lat, "lon": lon})
return pd.DataFrame(results)
# ----------------------------
# 可視化(matplotlib)
# ----------------------------
def plot_map_png(
gdf_pref: gpd.GeoDataFrame,
gdf_pts: gpd.GeoDataFrame,
line_width: float = 0.6,
marker_size: int = 24,
legend_shrink: float = 0.6,
legend_fontsize: int = 8,
figsize=(7, 7),
) -> Image.Image:
fig, ax = plt.subplots(figsize=figsize)
gdf_pref.boundary.plot(ax=ax, linewidth=line_width, color="black")
gdf_pts_valid = gdf_pts[gdf_pts.geometry.notnull()]
if not gdf_pts_valid.empty:
cf_num = pd.to_numeric(
gdf_pts_valid.get("CF", pd.Series([np.nan]*len(gdf_pts_valid))),
errors="coerce"
)
gdf_pts_valid.assign(CF_num=cf_num).plot(
ax=ax,
column="CF_num",
cmap="OrRd",
markersize=max(2, int(marker_size)),
alpha=0.85,
legend=True,
legend_kwds={"shrink": legend_shrink},
)
try:
for _ax in fig.axes:
if _ax is not ax:
_ax.tick_params(labelsize=legend_fontsize)
except Exception:
pass
ax.set_axis_off()
plt.tight_layout()
buf = io.BytesIO()
fig.savefig(buf, format="png", dpi=200)
plt.close(fig)
buf.seek(0)
return Image.open(buf)
# ----------------------------
# 可視化(folium)
# ----------------------------
def make_folium_html(gdf_pref: gpd.GeoDataFrame, gdf_pts: gpd.GeoDataFrame, marker_size: int = 24):
gdf_pts_valid = gdf_pts[gdf_pts.geometry.notnull()]
if not gdf_pts_valid.empty:
center_lat = gdf_pts_valid.geometry.y.median()
center_lon = gdf_pts_valid.geometry.x.median()
zoom = 6
else:
center_lat, center_lon, zoom = 35.6812, 139.7671, 5
m = folium.Map(location=[center_lat, center_lon], zoom_start=zoom)
try:
folium.GeoJson(gdf_pref.to_json(), name="prefecture").add_to(m)
except Exception:
pass
circle_radius = max(3, int(marker_size // 3))
for _, r in gdf_pts_valid.iterrows():
lat, lon = r.geometry.y, r.geometry.x
popup = f"{r.get('address_input','(no addr)')}<br>CF:{r.get('CF','')}"
folium.CircleMarker(
location=(float(lat), float(lon)),
radius=circle_radius,
fill=True,
fill_opacity=0.9,
popup=popup,
).add_to(m)
return m._repr_html_()
# ----------------------------
# 実行パイプライン
# ----------------------------
def _parse_indexer(x):
try:
return int(x)
except Exception:
return x
def run(zip_file, excel_file, sheet_name, header_row, address_col, power_col,
use_inet, line_width, marker_size, legend_shrink, legend_fontsize):
# Shapefile
if zip_file is not None and hasattr(zip_file, "name") and os.path.exists(zip_file.name):
zip_path = zip_file.name
elif os.path.exists(DEFAULT_ZIP):
zip_path = DEFAULT_ZIP
else:
empty_df = pd.DataFrame(columns=["address_input", "CF", "lat", "lon"])
return None, None, "", empty_df, "Shapefile の ZIP をアップロードするか、data/japan_ver85.zip を配置してください。"
try:
gdf_pref = load_gdf_from_zip(zip_path)
except Exception as e:
empty_df = pd.DataFrame(columns=["address_input", "CF", "lat", "lon"])
return None, None, "", empty_df, f"行政界の読み込みに失敗しました: {e}"
# Excel→ジオコーディング
if excel_file is None or not hasattr(excel_file, "name"):
gdf_pts = gpd.GeoDataFrame(columns=["address_input", "CF", "lat", "lon"], geometry=[], crs="EPSG:4326")
table_df = pd.DataFrame(columns=["address_input", "CF", "lat", "lon"])
else:
try:
df = pd.read_excel(excel_file.name, sheet_name=sheet_name, header=int(header_row))
except Exception as e:
empty_df = pd.DataFrame(columns=["address_input", "CF", "lat", "lon"])
return None, None, "", empty_df, f"Excel の読み込みに失敗しました: {e}"
addr_series = df.iloc[:, address_col] if isinstance(address_col, int) else df[address_col]
cf_series = df.iloc[:, power_col] if isinstance(power_col, int) else df[power_col]
addresses = addr_series.astype(str).tolist()
cfs = cf_series.tolist()
geo_df = geocode_with_cache(addresses, cfs, use_internet=bool(use_inet))
table_df = geo_df[["address_input", "CF", "lat", "lon"]].copy()
geometry = [
Point(lon, lat) if (pd.notna(lat) and pd.notna(lon)) else None
for lat, lon in zip(geo_df["lat"], geo_df["lon"])
]
gdf_pts = gpd.GeoDataFrame(geo_df, geometry=geometry, crs="EPSG:4326")
# 図と地図
try:
img = plot_map_png(
gdf_pref, gdf_pts,
line_width=float(line_width),
marker_size=int(marker_size),
legend_shrink=float(legend_shrink),
legend_fontsize=int(legend_fontsize),
)
except Exception as e:
return None, None, "", table_df, f"静的描画に失敗しました: {e}"
try:
html = make_folium_html(gdf_pref, gdf_pts, marker_size=int(marker_size))
except Exception as e:
html = f"<p>folium描画に失敗しました: {e}</p>"
# 情報
info = []
info.append(f"都道府県レコード数: {len(gdf_pref)}")
if gdf_pref.crs:
info.append(f"PREF CRS: {gdf_pref.crs}")
info.append(f"ポイント数(有効座標): {int(gdf_pts.geometry.notnull().sum())} / {len(gdf_pts)}")
if not gdf_pts.empty and gdf_pts.crs:
info.append(f"PTS CRS: {gdf_pts.crs}")
return img, html, "\n".join(info), table_df, ""
# ----------------------------
# Gradio UI
# ----------------------------
with gr.Blocks(title="Japan Shapefile + Excel Geocoding Plotter (GSI)") as demo:
gr.Markdown("## japan_ver85.shp(ZIP) + Excel住所 → 日本地図にプロット(凡例小・点大の調整可)")
with gr.Row():
zip_in = gr.File(label="Shapefile (ZIP)", file_count="single", file_types=[".zip"])
xlsx_in = gr.File(label="Excelファイル(住所付き)", file_count="single", file_types=[".xlsx", ".xls"])
with gr.Row():
sheet = gr.Textbox(label="シート名", value="認定設備")
header_row = gr.Number(label="ヘッダー行番号(0始まり)", value=2, precision=0)
with gr.Row():
address_col = gr.Textbox(label="住所列(列名 or 0始まり列番号)", value="発電設備の所在地")
power_col = gr.Textbox(label="数値列(任意:列名 or 0始まり列番号)", value="発電出力(kW)")
with gr.Row():
use_inet = gr.Checkbox(label="国土地理院APIに問い合わせ(オフでキャッシュのみ使用)", value=True)
line_width = gr.Slider(0.2, 2.0, value=0.6, step=0.1, label="境界線の太さ")
# 見た目調整スライダ
with gr.Row():
marker_size = gr.Slider(4, 64, value=24, step=2, label="ポイントサイズ(matplotlib / folium)")
legend_shrink = gr.Slider(0.3, 1.0, value=0.6, step=0.05, label="凡例の縮小率(小さいほど小さく)")
legend_fontsize = gr.Slider(6, 16, value=8, step=1, label="凡例の目盛フォントサイズ")
run_btn = gr.Button("描画")
out_img = gr.Image(label="静的地図(matplotlib)", type="pil")
out_html = gr.HTML(label="インタラクティブ地図(folium)")
out_info = gr.Textbox(label="メタ情報", lines=4)
out_table = gr.Dataframe(label="ジオコーディング結果(住所・緯度・経度・CF)", wrap=True)
out_err = gr.Markdown(label="エラー", visible=True)
def _parse(x):
try:
return int(x)
except Exception:
return x
def app_run(zipf, xls, s, h, a, p, inet, lw, ms, lsh, lfs):
return run(
zipf, xls, s, int(h), _parse(a), _parse(p),
inet, lw, ms, lsh, lfs
)
run_btn.click(
fn=app_run,
inputs=[zip_in, xlsx_in, sheet, header_row, address_col, power_col, use_inet, line_width, marker_size, legend_shrink, legend_fontsize],
outputs=[out_img, out_html, out_info, out_table, out_err],
)
if __name__ == "__main__":
demo.launch()
|