import os import re import time import zipfile import unicodedata from urllib.parse import urljoin, urlparse, parse_qs, unquote import gradio as gr import requests import pandas as pd from bs4 import BeautifulSoup PUBLIC_URL = "https://www.fit-portal.go.jp/PublicInfo" OUTDIR = "data_fit" # -------------------- ユーティリティ -------------------- def normalize_filename(name: str) -> str: name = unicodedata.normalize("NFKC", name) name = re.sub(r'[\\/:*?"<>|]+', "_", name) name = name.strip() return name or "file" def guess_filename_from_headers(resp: requests.Response, fallback: str) -> str: cd = resp.headers.get("Content-Disposition", "") m = re.search(r'filename\*?=(?:UTF-8\'\')?"?([^";]+)"?', cd, flags=re.IGNORECASE) if m: try: fn = unquote(m.group(1)) except Exception: fn = m.group(1) return normalize_filename(fn) return normalize_filename(fallback) def is_pref_link(a_tag) -> bool: href = a_tag.get("href") or "" return "servlet.FileDownload" in href and "file=" in href def extract_pref_name(a_tag) -> str: txt = (a_tag.get_text() or "").strip() return txt or "pref" def pick_sheet_name(xls_path: str, preferred: str | None) -> str | None: try: xl = pd.ExcelFile(xls_path) if preferred and preferred in xl.sheet_names: return preferred # 一般的に「代表地番」を優先 for candidate in ["代表地番", "代表地番のみ", "代表地番シート"]: if candidate in xl.sheet_names: return candidate return xl.sheet_names[0] if xl.sheet_names else None except Exception: return None def collect_pref_links(session: requests.Session) -> list[dict]: r = session.get(PUBLIC_URL, timeout=60) r.raise_for_status() soup = BeautifulSoup(r.text, "html.parser") links = [] for a in soup.find_all("a"): if is_pref_link(a): links.append({"pref": extract_pref_name(a), "href": urljoin(PUBLIC_URL, a.get("href"))}) # 重複除去 seen, uniq = set(), [] for item in links: key = (item["pref"], item["href"]) if key not in seen: seen.add(key) uniq.append(item) return uniq def download_one(session: requests.Session, url: str, outdir: str, pref: str) -> str: os.makedirs(outdir, exist_ok=True) qs = parse_qs(urlparse(url).query) file_id = (qs.get("file", ["unknown"])[0])[:18] with session.get(url, timeout=180, stream=True) as r: r.raise_for_status() fname = guess_filename_from_headers(r, f"{pref}_{file_id}.xlsx") path = os.path.join(outdir, fname) with open(path, "wb") as f: for chunk in r.iter_content(chunk_size=1 << 15): if chunk: f.write(chunk) return path # -------------------- 列名選択: 小分類 > 中分類 > 大分類 -------------------- def _clean_cell(x) -> str: if x is None: return "" s = str(x).strip() if s.lower() == "nan": return "" return s def choose_names_from_multiindex(mi: pd.MultiIndex) -> list[str]: """ 3段ヘッダ(MultiIndex)から列名を選ぶ。 優先順: 中分類(第2段) → 小分類(第3段) → 大分類(第1段)。 すべて空なら 'col'。重複は .1, .2… を付与。 """ def _clean_cell(x) -> str: if x is None: return "" s = str(x).strip() return "" if s.lower() == "nan" else s names = [] for tpl in mi: # tpl は (大, 中, 小) を想定 a = _clean_cell(tpl[0]) if len(tpl) >= 1 else "" b = _clean_cell(tpl[1]) if len(tpl) >= 2 else "" c = _clean_cell(tpl[2]) if len(tpl) >= 3 else "" name = b or c or a or "col" # ★ 中 > 小 > 大 names.append(name) # 重複解消 seen = {} out = [] for n in names: if n not in seen: seen[n] = 0 out.append(n) else: seen[n] += 1 out.append(f"{n}.{seen[n]}") return out # -------------------- 読み込みルール -------------------- # 0行目は削除し、1/2/3行目をヘッダ(= header=[1,2,3]) HEADER_ROWS = [1, 2, 3] # 2枚目以降は 0〜3行目をスキップ(= skiprows=4)、header=None でデータのみ SKIP_ROWS_OTHERS = 4 def load_excel_first(xls_path: str, sheet_pref: str | None) -> tuple[pd.DataFrame, list[str]]: """ 1枚目: - header=[1,2,3] で3段ヘッダを読み込み(0行目は自動的に使われない) - 左端の列を削除 - MultiIndex から列名を「小>中>大」の優先で単一行に変換 戻り値: (df, chosen_names) """ sheet = pick_sheet_name(xls_path, sheet_pref) if not sheet: raise RuntimeError("シートが見つかりません") df = pd.read_excel( xls_path, sheet_name=sheet, engine="openpyxl", header=HEADER_ROWS, dtype=str ) # 左端の列を削除 df = df.iloc[:, 1:] # 前後空白トリム for c in df.select_dtypes(include=["object"]).columns: df[c] = df[c].str.strip() # 列名を選択 if isinstance(df.columns, pd.MultiIndex): chosen = choose_names_from_multiindex(df.columns) else: # 念のため単層だった場合もクリーニング&重複解消 raw = [_clean_cell(c) or "col" for c in df.columns] seen = {} chosen = [] for n in raw: if n not in seen: seen[n] = 0 chosen.append(n) else: seen[n] += 1 chosen.append(f"{n}.{seen[n]}") df.columns = chosen return df, chosen def load_excel_other(xls_path: str, sheet_pref: str | None, target_cols: list[str]) -> pd.DataFrame | None: """ 2枚目以降: - skiprows=4, header=None でデータのみ - 左端の列を削除 - 列数が合わなければ切り詰め/ダミー列追加で合わせる - 列名を 1枚目の chosen に置換 """ sheet = pick_sheet_name(xls_path, sheet_pref) if not sheet: return None df = pd.read_excel( xls_path, sheet_name=sheet, engine="openpyxl", header=None, skiprows=SKIP_ROWS_OTHERS, dtype=str ) # 左端の列を削除 df = df.iloc[:, 1:] # 前後空白トリム for c in df.select_dtypes(include=["object"]).columns: df[c] = df[c].str.strip() # 列数調整 if df.shape[1] != len(target_cols): print(f"[WARN] 列数不一致: file={os.path.basename(xls_path)} " f"read={df.shape[1]} vs target={len(target_cols)} -> 自動調整") if df.shape[1] > len(target_cols): df = df.iloc[:, :len(target_cols)] else: # 足りないときは None 列を追加 for k in range(len(target_cols) - df.shape[1]): df[f"_pad_{k}"] = None df = df.iloc[:, :len(target_cols)] df.columns = target_cols return df def zip_paths(paths: list[str], out_zip: str) -> str: with zipfile.ZipFile(out_zip, "w", compression=zipfile.ZIP_DEFLATED) as z: for p in paths: if os.path.exists(p): z.write(p, arcname=os.path.basename(p)) return out_zip # -------------------- メイン実行(Gradioから呼ぶ) -------------------- def run_job(sheet_name, sleep_sec, limit, re_download, progress=gr.Progress(track_tqdm=False)): progress(0, desc="初期化中…") session = requests.Session() session.headers.update({ "User-Agent": "Mozilla/5.0 (compatible; FITCollector/1.3; +https://huggingface.co/spaces)", "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", }) # 1) リンク収集 links = collect_pref_links(session) if not links: return ("都道府県ファイルのリンク検出に失敗しました。ページ構成の変更/一時的な制限の可能性があります。", None, None, None, None) if limit and limit > 0: links = links[:int(limit)] progress(0.1, desc=f"リンク検出 {len(links)} 件") # 2) ダウンロード downloaded = [] for i, item in enumerate(links, start=1): progress(0.1 + 0.6 * i / max(1, len(links)), desc=f"ダウンロード {i}/{len(links)}: {item['pref']}") try: existing = None if not re_download and os.path.isdir(OUTDIR): for fn in os.listdir(OUTDIR): if fn.lower().endswith(".xlsx") and item["pref"] in fn: existing = os.path.join(OUTDIR, fn) break if existing and os.path.exists(existing): path = existing else: path = download_one(session, item["href"], OUTDIR, item["pref"]) time.sleep(float(sleep_sec)) downloaded.append(path) except Exception as e: print(f"[WARN] ダウンロード失敗: {item['pref']} {e}") if not downloaded: return ("ダウンロードに失敗しました。", None, None, None, None) # 3) 読み込み(1枚目で列名確定) progress(0.75, desc="1枚目を読み込み(列名を確定)") first_path = downloaded[0] try: df0, cols0 = load_excel_first(first_path, sheet_name if sheet_name else None) except Exception as e: return (f"1枚目の読み込みに失敗しました: {os.path.basename(first_path)} / {e}", None, None, None, None) frames = [df0] # 4) 読み込み(2枚目以降) for j, p in enumerate(downloaded[1:], start=2): progress(0.75 + 0.25 * (j - 1) / max(1, len(downloaded) - 1), desc=f"{j}枚目を読み込み") df = load_excel_other(p, sheet_name if sheet_name else None, cols0) if df is not None and len(df) > 0: frames.append(df) else: print(f"[WARN] 読み込みスキップ: {os.path.basename(p)}") # 5) 縦結合 combined = pd.concat(frames, ignore_index=True) # 6) 出力 os.makedirs(OUTDIR, exist_ok=True) out_xlsx = os.path.join(OUTDIR, "combined_fit.xlsx") out_parq = os.path.join(OUTDIR, "combined_fit.parquet") with pd.ExcelWriter(out_xlsx, engine="openpyxl") as w: combined.to_excel(w, index=False, sheet_name="combined") combined.to_parquet(out_parq, index=False) # 7) ZIP(取得ファイル一式) raw_zip = os.path.join(OUTDIR, "raw_excels.zip") zip_paths(downloaded, raw_zip) # 8) プレビュー preview_csv = os.path.join(OUTDIR, "combined_head.csv") combined.head(1000).to_csv(preview_csv, index=False) progress(1.0, desc=f"完了({len(combined):,} 行)") msg = ( f"✅ 結合完了: 行数 = {len(combined):,}\n" f"・Excel: combined_fit.xlsx\n" f"・Parquet: combined_fit.parquet\n" f"・Raw ZIP: raw_excels.zip\n" f"・プレビュー: combined_head.csv\n" f"・列名は『小分類>中分類>大分類』の優先で単一行化(結合は不実施)" ) return (msg, out_xlsx, out_parq, raw_zip, preview_csv) # -------------------- Gradio UI -------------------- with gr.Blocks(title="FIT 公表(都道府県別Excel)一括取得&結合") as demo: gr.Markdown( """ # FIT 公表(都道府県別Excel)一括取得 & 結合 **列名ポリシー**: - 1枚目: 0行目を使わず、1/2/3行目をヘッダとして読み込み(3段)。 - 列名は **小分類に値があれば小分類、無ければ中分類のみ**(結合しません)。 - 2枚目以降: 0〜3行目をスキップし、データのみ読み込み。 - すべてのファイルで **左端の列は削除**。 - ファイル名/シート名などのメタ列は付与しません。 """ ) with gr.Row(): sheet = gr.Textbox(label="読み込むシート名(空欄=自動)", placeholder="例)代表地番 / 全地番") sleep = gr.Slider(0.0, 5.0, value=1.0, step=0.1, label="ダウンロード間隔(秒)") with gr.Row(): limit = gr.Number(value=None, precision=0, label="先頭N県のみ(テスト用・空欄は全県)") reget = gr.Checkbox(label="既存ファイルがあっても再ダウンロードする", value=False) run_btn = gr.Button("実行", variant="primary") out_msg = gr.Markdown() out_xlsx = gr.File(label="結合Excel(combined_fit.xlsx)") out_parq = gr.File(label="結合Parquet(combined_fit.parquet)") out_zip = gr.File(label="取得した都道府県Excel一式(zip)") out_preview = gr.File(label="先頭1000行プレビュー(CSV)") run_btn.click( fn=run_job, inputs=[sheet, sleep, limit, reget], outputs=[out_msg, out_xlsx, out_parq, out_zip, out_preview] ) if __name__ == "__main__": demo.queue(max_size=20).launch()