FIT_data / app.py
hiroki0008's picture
Update app.py
bd5a765 verified
import os
import re
import time
import zipfile
import unicodedata
from urllib.parse import urljoin, urlparse, parse_qs, unquote
import gradio as gr
import requests
import pandas as pd
from bs4 import BeautifulSoup
PUBLIC_URL = "https://www.fit-portal.go.jp/PublicInfo"
OUTDIR = "data_fit"
# -------------------- ユーティリティ --------------------
def normalize_filename(name: str) -> str:
name = unicodedata.normalize("NFKC", name)
name = re.sub(r'[\\/:*?"<>|]+', "_", name)
name = name.strip()
return name or "file"
def guess_filename_from_headers(resp: requests.Response, fallback: str) -> str:
cd = resp.headers.get("Content-Disposition", "")
m = re.search(r'filename\*?=(?:UTF-8\'\')?"?([^";]+)"?', cd, flags=re.IGNORECASE)
if m:
try:
fn = unquote(m.group(1))
except Exception:
fn = m.group(1)
return normalize_filename(fn)
return normalize_filename(fallback)
def is_pref_link(a_tag) -> bool:
href = a_tag.get("href") or ""
return "servlet.FileDownload" in href and "file=" in href
def extract_pref_name(a_tag) -> str:
txt = (a_tag.get_text() or "").strip()
return txt or "pref"
def pick_sheet_name(xls_path: str, preferred: str | None) -> str | None:
try:
xl = pd.ExcelFile(xls_path)
if preferred and preferred in xl.sheet_names:
return preferred
# 一般的に「代表地番」を優先
for candidate in ["代表地番", "代表地番のみ", "代表地番シート"]:
if candidate in xl.sheet_names:
return candidate
return xl.sheet_names[0] if xl.sheet_names else None
except Exception:
return None
def collect_pref_links(session: requests.Session) -> list[dict]:
r = session.get(PUBLIC_URL, timeout=60)
r.raise_for_status()
soup = BeautifulSoup(r.text, "html.parser")
links = []
for a in soup.find_all("a"):
if is_pref_link(a):
links.append({"pref": extract_pref_name(a), "href": urljoin(PUBLIC_URL, a.get("href"))})
# 重複除去
seen, uniq = set(), []
for item in links:
key = (item["pref"], item["href"])
if key not in seen:
seen.add(key)
uniq.append(item)
return uniq
def download_one(session: requests.Session, url: str, outdir: str, pref: str) -> str:
os.makedirs(outdir, exist_ok=True)
qs = parse_qs(urlparse(url).query)
file_id = (qs.get("file", ["unknown"])[0])[:18]
with session.get(url, timeout=180, stream=True) as r:
r.raise_for_status()
fname = guess_filename_from_headers(r, f"{pref}_{file_id}.xlsx")
path = os.path.join(outdir, fname)
with open(path, "wb") as f:
for chunk in r.iter_content(chunk_size=1 << 15):
if chunk:
f.write(chunk)
return path
# -------------------- 列名選択: 小分類 > 中分類 > 大分類 --------------------
def _clean_cell(x) -> str:
if x is None:
return ""
s = str(x).strip()
if s.lower() == "nan":
return ""
return s
def choose_names_from_multiindex(mi: pd.MultiIndex) -> list[str]:
"""
3段ヘッダ(MultiIndex)から列名を選ぶ。
優先順: 中分類(第2段) → 小分類(第3段) → 大分類(第1段)。
すべて空なら 'col'。重複は .1, .2… を付与。
"""
def _clean_cell(x) -> str:
if x is None:
return ""
s = str(x).strip()
return "" if s.lower() == "nan" else s
names = []
for tpl in mi:
# tpl は (大, 中, 小) を想定
a = _clean_cell(tpl[0]) if len(tpl) >= 1 else ""
b = _clean_cell(tpl[1]) if len(tpl) >= 2 else ""
c = _clean_cell(tpl[2]) if len(tpl) >= 3 else ""
name = b or c or a or "col" # ★ 中 > 小 > 大
names.append(name)
# 重複解消
seen = {}
out = []
for n in names:
if n not in seen:
seen[n] = 0
out.append(n)
else:
seen[n] += 1
out.append(f"{n}.{seen[n]}")
return out
# -------------------- 読み込みルール --------------------
# 0行目は削除し、1/2/3行目をヘッダ(= header=[1,2,3])
HEADER_ROWS = [1, 2, 3]
# 2枚目以降は 0〜3行目をスキップ(= skiprows=4)、header=None でデータのみ
SKIP_ROWS_OTHERS = 4
def load_excel_first(xls_path: str, sheet_pref: str | None) -> tuple[pd.DataFrame, list[str]]:
"""
1枚目:
- header=[1,2,3] で3段ヘッダを読み込み(0行目は自動的に使われない)
- 左端の列を削除
- MultiIndex から列名を「小>中>大」の優先で単一行に変換
戻り値: (df, chosen_names)
"""
sheet = pick_sheet_name(xls_path, sheet_pref)
if not sheet:
raise RuntimeError("シートが見つかりません")
df = pd.read_excel(
xls_path,
sheet_name=sheet,
engine="openpyxl",
header=HEADER_ROWS,
dtype=str
)
# 左端の列を削除
df = df.iloc[:, 1:]
# 前後空白トリム
for c in df.select_dtypes(include=["object"]).columns:
df[c] = df[c].str.strip()
# 列名を選択
if isinstance(df.columns, pd.MultiIndex):
chosen = choose_names_from_multiindex(df.columns)
else:
# 念のため単層だった場合もクリーニング&重複解消
raw = [_clean_cell(c) or "col" for c in df.columns]
seen = {}
chosen = []
for n in raw:
if n not in seen:
seen[n] = 0
chosen.append(n)
else:
seen[n] += 1
chosen.append(f"{n}.{seen[n]}")
df.columns = chosen
return df, chosen
def load_excel_other(xls_path: str, sheet_pref: str | None, target_cols: list[str]) -> pd.DataFrame | None:
"""
2枚目以降:
- skiprows=4, header=None でデータのみ
- 左端の列を削除
- 列数が合わなければ切り詰め/ダミー列追加で合わせる
- 列名を 1枚目の chosen に置換
"""
sheet = pick_sheet_name(xls_path, sheet_pref)
if not sheet:
return None
df = pd.read_excel(
xls_path,
sheet_name=sheet,
engine="openpyxl",
header=None,
skiprows=SKIP_ROWS_OTHERS,
dtype=str
)
# 左端の列を削除
df = df.iloc[:, 1:]
# 前後空白トリム
for c in df.select_dtypes(include=["object"]).columns:
df[c] = df[c].str.strip()
# 列数調整
if df.shape[1] != len(target_cols):
print(f"[WARN] 列数不一致: file={os.path.basename(xls_path)} "
f"read={df.shape[1]} vs target={len(target_cols)} -> 自動調整")
if df.shape[1] > len(target_cols):
df = df.iloc[:, :len(target_cols)]
else:
# 足りないときは None 列を追加
for k in range(len(target_cols) - df.shape[1]):
df[f"_pad_{k}"] = None
df = df.iloc[:, :len(target_cols)]
df.columns = target_cols
return df
def zip_paths(paths: list[str], out_zip: str) -> str:
with zipfile.ZipFile(out_zip, "w", compression=zipfile.ZIP_DEFLATED) as z:
for p in paths:
if os.path.exists(p):
z.write(p, arcname=os.path.basename(p))
return out_zip
# -------------------- メイン実行(Gradioから呼ぶ) --------------------
def run_job(sheet_name, sleep_sec, limit, re_download, progress=gr.Progress(track_tqdm=False)):
progress(0, desc="初期化中…")
session = requests.Session()
session.headers.update({
"User-Agent": "Mozilla/5.0 (compatible; FITCollector/1.3; +https://huggingface.co/spaces)",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
})
# 1) リンク収集
links = collect_pref_links(session)
if not links:
return ("都道府県ファイルのリンク検出に失敗しました。ページ構成の変更/一時的な制限の可能性があります。",
None, None, None, None)
if limit and limit > 0:
links = links[:int(limit)]
progress(0.1, desc=f"リンク検出 {len(links)} 件")
# 2) ダウンロード
downloaded = []
for i, item in enumerate(links, start=1):
progress(0.1 + 0.6 * i / max(1, len(links)),
desc=f"ダウンロード {i}/{len(links)}: {item['pref']}")
try:
existing = None
if not re_download and os.path.isdir(OUTDIR):
for fn in os.listdir(OUTDIR):
if fn.lower().endswith(".xlsx") and item["pref"] in fn:
existing = os.path.join(OUTDIR, fn)
break
if existing and os.path.exists(existing):
path = existing
else:
path = download_one(session, item["href"], OUTDIR, item["pref"])
time.sleep(float(sleep_sec))
downloaded.append(path)
except Exception as e:
print(f"[WARN] ダウンロード失敗: {item['pref']} {e}")
if not downloaded:
return ("ダウンロードに失敗しました。", None, None, None, None)
# 3) 読み込み(1枚目で列名確定)
progress(0.75, desc="1枚目を読み込み(列名を確定)")
first_path = downloaded[0]
try:
df0, cols0 = load_excel_first(first_path, sheet_name if sheet_name else None)
except Exception as e:
return (f"1枚目の読み込みに失敗しました: {os.path.basename(first_path)} / {e}",
None, None, None, None)
frames = [df0]
# 4) 読み込み(2枚目以降)
for j, p in enumerate(downloaded[1:], start=2):
progress(0.75 + 0.25 * (j - 1) / max(1, len(downloaded) - 1),
desc=f"{j}枚目を読み込み")
df = load_excel_other(p, sheet_name if sheet_name else None, cols0)
if df is not None and len(df) > 0:
frames.append(df)
else:
print(f"[WARN] 読み込みスキップ: {os.path.basename(p)}")
# 5) 縦結合
combined = pd.concat(frames, ignore_index=True)
# 6) 出力
os.makedirs(OUTDIR, exist_ok=True)
out_xlsx = os.path.join(OUTDIR, "combined_fit.xlsx")
out_parq = os.path.join(OUTDIR, "combined_fit.parquet")
with pd.ExcelWriter(out_xlsx, engine="openpyxl") as w:
combined.to_excel(w, index=False, sheet_name="combined")
combined.to_parquet(out_parq, index=False)
# 7) ZIP(取得ファイル一式)
raw_zip = os.path.join(OUTDIR, "raw_excels.zip")
zip_paths(downloaded, raw_zip)
# 8) プレビュー
preview_csv = os.path.join(OUTDIR, "combined_head.csv")
combined.head(1000).to_csv(preview_csv, index=False)
progress(1.0, desc=f"完了({len(combined):,} 行)")
msg = (
f"✅ 結合完了: 行数 = {len(combined):,}\n"
f"・Excel: combined_fit.xlsx\n"
f"・Parquet: combined_fit.parquet\n"
f"・Raw ZIP: raw_excels.zip\n"
f"・プレビュー: combined_head.csv\n"
f"・列名は『小分類>中分類>大分類』の優先で単一行化(結合は不実施)"
)
return (msg, out_xlsx, out_parq, raw_zip, preview_csv)
# -------------------- Gradio UI --------------------
with gr.Blocks(title="FIT 公表(都道府県別Excel)一括取得&結合") as demo:
gr.Markdown(
"""
# FIT 公表(都道府県別Excel)一括取得 & 結合
**列名ポリシー**:
- 1枚目: 0行目を使わず、1/2/3行目をヘッダとして読み込み(3段)。
- 列名は **小分類に値があれば小分類、無ければ中分類のみ**(結合しません)。
- 2枚目以降: 0〜3行目をスキップし、データのみ読み込み。
- すべてのファイルで **左端の列は削除**。
- ファイル名/シート名などのメタ列は付与しません。
"""
)
with gr.Row():
sheet = gr.Textbox(label="読み込むシート名(空欄=自動)", placeholder="例)代表地番 / 全地番")
sleep = gr.Slider(0.0, 5.0, value=1.0, step=0.1, label="ダウンロード間隔(秒)")
with gr.Row():
limit = gr.Number(value=None, precision=0, label="先頭N県のみ(テスト用・空欄は全県)")
reget = gr.Checkbox(label="既存ファイルがあっても再ダウンロードする", value=False)
run_btn = gr.Button("実行", variant="primary")
out_msg = gr.Markdown()
out_xlsx = gr.File(label="結合Excel(combined_fit.xlsx)")
out_parq = gr.File(label="結合Parquet(combined_fit.parquet)")
out_zip = gr.File(label="取得した都道府県Excel一式(zip)")
out_preview = gr.File(label="先頭1000行プレビュー(CSV)")
run_btn.click(
fn=run_job,
inputs=[sheet, sleep, limit, reget],
outputs=[out_msg, out_xlsx, out_parq, out_zip, out_preview]
)
if __name__ == "__main__":
demo.queue(max_size=20).launch()