test / app.py
hiroki0008's picture
Update app.py
0b5b913 verified
# app.py (Folium + 無料タイル / File配信 / 発電設備区分で色分け / CF表示名を「発電出力(kW)」に統一)
# pip install folium gradio pandas numpy requests openpyxl
import os
import re
import time
import tempfile
import requests
import pandas as pd
import numpy as np
import gradio as gr
# ----------------------------
# 設定
# ----------------------------
GSI_USER_AGENT = os.environ.get(
"GSI_USER_AGENT",
"jp-gsi-geocoding-demo (contact: your_email@example.com)" # 連絡先付き推奨
)
GSI_TIMEOUT_SEC = float(os.environ.get("GSI_TIMEOUT_SEC", "10"))
GEOCODE_DELAY_SEC = float(os.environ.get("GSI_RATE_LIMIT_SEC", "0.0"))
GSI_GEOCODE_URL = "https://msearch.gsi.go.jp/address-search/AddressSearch"
CACHE_DIR = "data/cache"
os.makedirs(CACHE_DIR, exist_ok=True)
CACHE_PATH = os.path.join(CACHE_DIR, "geocode_cache.csv")
# ----------------------------
# キャッシュ
# ----------------------------
def load_cache():
if os.path.exists(CACHE_PATH):
try:
df = pd.read_csv(CACHE_PATH)
need = {"address_input", "lat", "lon", "CF"}
if need.issubset(df.columns):
df["CF"] = pd.to_numeric(df["CF"], errors="coerce")
df["lat"] = pd.to_numeric(df["lat"], errors="coerce")
df["lon"] = pd.to_numeric(df["lon"], errors="coerce")
return df
except Exception:
pass
return pd.DataFrame(columns=["address_input", "lat", "lon", "CF"])
def save_cache(df_cache):
try:
df_cache.to_csv(CACHE_PATH, index=False)
except Exception:
pass
# ----------------------------
# 国土地理院 ジオコーダ
# ----------------------------
def make_gsi_session() -> requests.Session:
s = requests.Session()
s.headers.update({"User-Agent": GSI_USER_AGENT})
return s
def gsi_geocode_once(address: str, session: requests.Session) -> tuple[float, float]:
"""
国土地理院 住所検索APIを1回呼び出し、(lat, lon) を返す(失敗時は (nan, nan))。
APIは [lon, lat] を返すため、順を入れ替える。
"""
try:
if not address or str(address).strip() == "" or str(address).strip().lower() in ("nan", "none"):
return (np.nan, np.nan)
resp = session.get(GSI_GEOCODE_URL, params={"q": address}, timeout=GSI_TIMEOUT_SEC)
if not resp.ok:
return (np.nan, np.nan)
data = resp.json()
if isinstance(data, list) and len(data) > 0:
feat = data[0]
coords = (feat.get("geometry") or {}).get("coordinates") or []
if isinstance(coords, (list, tuple)) and len(coords) >= 2:
lon, lat = float(coords[0]), float(coords[1])
return (lat, lon)
except Exception:
pass
return (np.nan, np.nan)
def geocode_with_cache(addresses, CFs, use_internet=True):
cache = load_cache()
cache_map = {row["address_input"]: (row["lat"], row["lon"], row["CF"]) for _, row in cache.iterrows()}
results = []
session = make_gsi_session() if use_internet else None
for a, cf in zip(addresses, CFs):
a = "" if (a is None or (isinstance(a, float) and np.isnan(a))) else str(a).strip()
cf_num = pd.to_numeric(cf, errors="coerce")
# cache hit
if a in cache_map:
lat, lon, _cached_cf = cache_map[a]
if pd.notna(lat) and pd.notna(lon):
results.append({"address_input": a, "CF": cf_num, "lat": float(lat), "lon": float(lon)})
continue
if not use_internet:
results.append({"address_input": a, "CF": cf_num, "lat": np.nan, "lon": np.nan})
continue
lat, lon = gsi_geocode_once(a, session)
if GEOCODE_DELAY_SEC > 0:
time.sleep(GEOCODE_DELAY_SEC)
# キャッシュ更新
cache = cache[cache["address_input"] != a]
cache = pd.concat(
[cache, pd.DataFrame([{"address_input": a, "lat": lat, "lon": lon, "CF": cf_num}])],
ignore_index=True
)
save_cache(cache)
results.append({"address_input": a, "CF": cf_num, "lat": lat, "lon": lon})
df = pd.DataFrame(results)
df["lat"] = pd.to_numeric(df["lat"], errors="coerce")
df["lon"] = pd.to_numeric(df["lon"], errors="coerce")
df["CF"] = pd.to_numeric(df["CF"], errors="coerce")
return df
# ----------------------------
# Folium 地図生成(無料タイル + 区分色分け)
# ----------------------------
import folium
from branca.element import Element
TILE_CATALOG = {
"GSI 標準地図": "https://cyberjapandata.gsi.go.jp/xyz/std/{z}/{x}/{y}.png",
"GSI 淡色地図": "https://cyberjapandata.gsi.go.jp/xyz/pale/{z}/{x}/{y}.png",
"GSI 写真(シームレス)": "https://cyberjapandata.gsi.go.jp/xyz/seamlessphoto/{z}/{x}/{y}.jpg",
"OpenStreetMap": "https://tile.openstreetmap.org/{z}/{x}/{y}.png",
}
# 固定パレット(Tableau 10 + α)
_BASE_PALETTE = [
"#1f77b4","#ff7f0e","#2ca02c","#d62728","#9467bd",
"#8c564b","#e377c2","#7f7f7f","#bcbd22","#17becf",
"#393b79","#637939","#8c6d31","#843c39","#7b4173",
]
def _build_category_palette(labels: pd.Series) -> dict:
labels = pd.Series(labels).astype(str).fillna("その他/不明").replace({"nan":"その他/不明","None":"その他/不明"})
uniq = list(pd.unique(labels))
mapping = {}
for i, lab in enumerate(uniq):
if i < len(_BASE_PALETTE):
mapping[lab] = _BASE_PALETTE[i]
else:
h = (i * 37) % 360
mapping[lab] = f"hsl({h},70%,45%)"
return mapping
def _build_legend_html(title: str, mapping: dict) -> str:
items = "".join(
f"<div style='display:flex;align-items:center;margin-bottom:4px;'>"
f"<span style='display:inline-block;width:12px;height:12px;background:{col};"
f"border:1px solid #555;margin-right:6px;'></span>{lab}</div>"
for lab, col in mapping.items()
)
return (
"<div style='position:fixed;bottom:20px;left:20px;z-index:9999;"
"background:#fff;padding:10px 12px;border:1px solid #ccc;border-radius:6px;"
"box-shadow:0 2px 8px rgba(0,0,0,0.15);font-size:12px;'>"
f"<div style='font-weight:600;margin-bottom:6px;'>凡例:{title}</div>"
f"{items}</div>"
)
def _build_folium_map_html(df_points: pd.DataFrame, base_name: str, cat_col: str) -> str:
df_valid = df_points.dropna(subset=["lat", "lon"]).copy()
# 地図中心
if df_valid.empty:
center_lat, center_lon, zoom = 35.0, 135.0, 4
else:
center_lat = float(df_valid["lat"].median())
center_lon = float(df_valid["lon"].median())
zoom = 6
# ベースマップ
m = folium.Map(location=[center_lat, center_lon], zoom_start=zoom, control_scale=True, tiles=None)
for name, url in TILE_CATALOG.items():
folium.TileLayer(tiles=url, name=name, attr=f"© {name}", overlay=False, control=True, max_zoom=20).add_to(m)
# カテゴリ列
if cat_col and (cat_col in df_points.columns):
cats = df_points[cat_col].astype("string").fillna("その他/不明")
else:
cat_col = "(区分なし)"
cats = pd.Series(["その他/不明"] * len(df_points))
palette = _build_category_palette(cats)
# マーカーのサイズ(内部的には CF を使用)
if "CF" in df_valid.columns and df_valid["CF"].notna().any():
cf = df_valid["CF"].clip(lower=0)
cf_norm = (cf - cf.min()) / (cf.max() - cf.min() + 1e-9)
sizes = (cf_norm * 12 + 3).fillna(6).tolist()
else:
sizes = [6] * len(df_valid)
# マーカー描画(色 = 区分)
for (idx, row), r in zip(df_valid.iterrows(), sizes):
lat, lon = float(row["lat"]), float(row["lon"])
addr = str(row.get("address_input", ""))
cfv = row.get("CF", np.nan)
catv = str(df_points.loc[idx, cat_col]) if cat_col in df_points.columns else "その他/不明"
col = palette.get(catv, "#666666")
popup_html = (
f"<b>住所:</b> {addr}<br>"
f"<b>発電出力(kW):</b> {'' if pd.isna(cfv) else cfv}<br>"
f"<b>{cat_col}:</b> {catv}"
)
folium.CircleMarker(
location=(lat, lon),
radius=float(r),
weight=1,
color=col,
fill=True,
fill_opacity=0.85,
fill_color=col,
popup=folium.Popup(popup_html, max_width=320),
).add_to(m)
# 凡例
legend_html = _build_legend_html(cat_col, palette)
m.get_root().html.add_child(Element(legend_html))
folium.LayerControl(position="topright").add_to(m)
return m.get_root().render()
def _rewrite_leaflet_cdn(html_text: str, host: str) -> str:
"""
Folium が出力する Leaflet の CDN(通常 jsDelivr)を必要に応じて置換。
SRI不整合を避けるため integrity/crossorigin を除去。
"""
html_text = re.sub(r'\sintegrity="[^"]+"', "", html_text)
html_text = re.sub(r'\scrossorigin="[^"]+"', "", html_text)
if host == "jsdelivr":
return html_text
elif host == "cdnjs":
html_text = html_text.replace(
"https://cdn.jsdelivr.net/npm/leaflet@", "https://cdnjs.cloudflare.com/ajax/libs/leaflet/"
)
html_text = html_text.replace("/dist/leaflet.css", "/leaflet.css")
html_text = html_text.replace("/dist/leaflet.js", "/leaflet.js")
return html_text
elif host == "unpkg":
html_text = html_text.replace(
"https://cdn.jsdelivr.net/npm/", "https://unpkg.com/"
)
return html_text
else:
return html_text
def _save_map_html_file(html_text: str) -> str:
"""地図HTMLを実ファイルに保存(Gradio Fileに渡すパスを返す)"""
fd, path = tempfile.mkstemp(suffix=".html")
os.close(fd)
with open(path, "w", encoding="utf-8") as f:
f.write(html_text)
return path
# ----------------------------
# 実行パイプライン
# ----------------------------
def _parse_indexer(x):
try:
return int(x)
except Exception:
return x
def run(excel_file, sheet_name, header_row, address_col, power_col, category_col, use_inet, base_name, leaflet_cdn):
# Excel 読み込み
if excel_file is None or not hasattr(excel_file, "name"):
table_df = pd.DataFrame(columns=["address_input", "発電出力(kW)", "lat", "lon", category_col or "発電設備区分"])
return ("Excelファイルを指定してください。", table_df, "", None)
try:
df = pd.read_excel(excel_file.name, sheet_name=sheet_name, header=int(header_row))
except Exception as e:
empty_df = pd.DataFrame(columns=["address_input", "発電出力(kW)", "lat", "lon", category_col or "発電設備区分"])
return (f"Excel の読み込みに失敗しました: {e}", empty_df, "", None)
# 列参照(番号/名前の両対応)
addr_series = df.iloc[:, address_col] if isinstance(address_col, int) else df[address_col]
cf_series = df.iloc[:, power_col] if isinstance(power_col, int) else df[power_col]
# 区分列は任意
if category_col:
try:
cat_series = df.iloc[:, category_col] if isinstance(category_col, int) else df[category_col]
except Exception:
cat_series = pd.Series([np.nan] * len(df))
category_col = "発電設備区分"
else:
cat_series = pd.Series([np.nan] * len(df))
category_col = "発電設備区分"
addresses = addr_series.astype(str).tolist()
cfs = cf_series.tolist()
# ジオコーディング(内部列名は CF のまま)
geo_df = geocode_with_cache(addresses, cfs, use_internet=bool(use_inet))
# 表示用に列名を日本語化(地図生成には内部の CF を使用するので別DataFrame)
table_df = geo_df.copy()
table_df[category_col] = cat_series.values
display_df = table_df.rename(columns={"CF": "発電出力(kW)"})
# 地図HTML生成 → CDN書換 → 実ファイル保存 → File出力
try:
html_text = _build_folium_map_html(table_df, base_name=base_name, cat_col=category_col) # ← CF列は内部名のまま使用
html_text = _rewrite_leaflet_cdn(html_text, host=leaflet_cdn)
map_file_path = _save_map_html_file(html_text)
msg = (
"✅ 地図HTMLを生成しました。下の **地図HTMLファイル** をクリックして新規タブで開いてください。\n"
"色=「発電設備区分」、サイズ=「発電出力(kW)」です。"
)
info = f"ポイント数(有効座標): {int(display_df[['lat','lon']].dropna().shape[0])} / {len(display_df)}"
# 表示テーブルの列順
disp_cols = ["address_input", "発電出力(kW)", "lat", "lon", category_col]
return (msg, display_df[disp_cols], info, map_file_path)
except Exception as e:
disp_cols = ["address_input", "発電出力(kW)", "lat", "lon", category_col]
return (f"地図描画に失敗しました: {e}", display_df[disp_cols], "", None)
# ----------------------------
# Gradio UI
# ----------------------------
with gr.Blocks(title="Excel住所 → Folium(無料タイル・File配信・区分色分け)") as demo:
gr.Markdown(
"## Excelの住所を国土地理院APIでジオコーディング → Folium(Leaflet)で地図表示(無料タイル・Mapbox不要)\n"
"- 地図は **実ファイル(.html)** として配信します(CSPが厳しい環境でもOK)。\n"
"- **色=発電設備区分、サイズ=発電出力(kW)**。タイル=地理院/OSM、CDNは必要に応じて切替できます。"
)
with gr.Row():
xlsx_in = gr.File(label="Excelファイル(住所付き)", file_count="single", file_types=[".xlsx", ".xls"])
with gr.Row():
sheet = gr.Textbox(label="シート名", value="認定設備")
header_row = gr.Number(label="ヘッダー行番号(0始まり)", value=2, precision=0)
with gr.Row():
address_col = gr.Textbox(label="住所列(列名 or 0始まり列番号)", value="発電設備の所在地")
power_col = gr.Textbox(label="発電出力(kW)の列(列名 or 0始まり列番号)", value="発電出力(kW)")
category_col = gr.Textbox(label="区分列(色分け:列名 or 0始まり列番号)", value="発電設備区分")
with gr.Row():
use_inet = gr.Checkbox(label="国土地理院APIに問い合わせ(オフでキャッシュのみ使用)", value=True)
base_name = gr.Dropdown(choices=list(TILE_CATALOG.keys()), value="GSI 標準地図", label="ベースマップ")
leaflet_cdn = gr.Dropdown(
choices=["jsdelivr", "cdnjs", "unpkg"], value="jsdelivr",
label="Leaflet CDN(遮断時に切替)"
)
run_btn = gr.Button("描画")
out_html = gr.HTML(label="案内メッセージ")
out_table = gr.Dataframe(label="ジオコーディング結果(住所・発電出力(kW)・緯度・経度・区分)", wrap=True)
out_info = gr.Textbox(label="メタ情報", lines=2)
out_file = gr.File(label="地図HTMLファイル(クリックで開く/ダウンロード)")
def _parse(x):
try:
return int(x)
except Exception:
return x
def app_run(xls, s, h, a, p, c, inet, base, cdn):
return run(
xls, s, int(h), _parse(a), _parse(p), _parse(c), inet, base, cdn
)
run_btn.click(
fn=app_run,
inputs=[xlsx_in, sheet, header_row, address_col, power_col, category_col, use_inet, base_name, leaflet_cdn],
outputs=[out_html, out_table, out_info, out_file],
)
if __name__ == "__main__":
demo.launch()