Spaces:
Sleeping
Sleeping
| import os | |
| import re | |
| import time | |
| import zipfile | |
| import unicodedata | |
| from urllib.parse import urljoin, urlparse, parse_qs, unquote | |
| import gradio as gr | |
| import requests | |
| import pandas as pd | |
| from bs4 import BeautifulSoup | |
| PUBLIC_URL = "https://www.fit-portal.go.jp/PublicInfo" | |
| OUTDIR = "data_fit" | |
| # -------------------- ユーティリティ -------------------- | |
| def normalize_filename(name: str) -> str: | |
| name = unicodedata.normalize("NFKC", name) | |
| name = re.sub(r'[\\/:*?"<>|]+', "_", name) | |
| name = name.strip() | |
| return name or "file" | |
| def guess_filename_from_headers(resp: requests.Response, fallback: str) -> str: | |
| cd = resp.headers.get("Content-Disposition", "") | |
| m = re.search(r'filename\*?=(?:UTF-8\'\')?"?([^";]+)"?', cd, flags=re.IGNORECASE) | |
| if m: | |
| try: | |
| fn = unquote(m.group(1)) | |
| except Exception: | |
| fn = m.group(1) | |
| return normalize_filename(fn) | |
| return normalize_filename(fallback) | |
| def is_pref_link(a_tag) -> bool: | |
| href = a_tag.get("href") or "" | |
| return "servlet.FileDownload" in href and "file=" in href | |
| def extract_pref_name(a_tag) -> str: | |
| txt = (a_tag.get_text() or "").strip() | |
| return txt or "pref" | |
| def pick_sheet_name(xls_path: str, preferred: str | None) -> str | None: | |
| try: | |
| xl = pd.ExcelFile(xls_path) | |
| if preferred and preferred in xl.sheet_names: | |
| return preferred | |
| # 一般的に「代表地番」を優先 | |
| for candidate in ["代表地番", "代表地番のみ", "代表地番シート"]: | |
| if candidate in xl.sheet_names: | |
| return candidate | |
| return xl.sheet_names[0] if xl.sheet_names else None | |
| except Exception: | |
| return None | |
| def collect_pref_links(session: requests.Session) -> list[dict]: | |
| r = session.get(PUBLIC_URL, timeout=60) | |
| r.raise_for_status() | |
| soup = BeautifulSoup(r.text, "html.parser") | |
| links = [] | |
| for a in soup.find_all("a"): | |
| if is_pref_link(a): | |
| links.append({"pref": extract_pref_name(a), "href": urljoin(PUBLIC_URL, a.get("href"))}) | |
| # 重複除去 | |
| seen, uniq = set(), [] | |
| for item in links: | |
| key = (item["pref"], item["href"]) | |
| if key not in seen: | |
| seen.add(key) | |
| uniq.append(item) | |
| return uniq | |
| def download_one(session: requests.Session, url: str, outdir: str, pref: str) -> str: | |
| os.makedirs(outdir, exist_ok=True) | |
| qs = parse_qs(urlparse(url).query) | |
| file_id = (qs.get("file", ["unknown"])[0])[:18] | |
| with session.get(url, timeout=180, stream=True) as r: | |
| r.raise_for_status() | |
| fname = guess_filename_from_headers(r, f"{pref}_{file_id}.xlsx") | |
| path = os.path.join(outdir, fname) | |
| with open(path, "wb") as f: | |
| for chunk in r.iter_content(chunk_size=1 << 15): | |
| if chunk: | |
| f.write(chunk) | |
| return path | |
| # -------------------- 列名選択: 小分類 > 中分類 > 大分類 -------------------- | |
| def _clean_cell(x) -> str: | |
| if x is None: | |
| return "" | |
| s = str(x).strip() | |
| if s.lower() == "nan": | |
| return "" | |
| return s | |
| def choose_names_from_multiindex(mi: pd.MultiIndex) -> list[str]: | |
| """ | |
| 3段ヘッダ(MultiIndex)から列名を選ぶ。 | |
| 優先順: 中分類(第2段) → 小分類(第3段) → 大分類(第1段)。 | |
| すべて空なら 'col'。重複は .1, .2… を付与。 | |
| """ | |
| def _clean_cell(x) -> str: | |
| if x is None: | |
| return "" | |
| s = str(x).strip() | |
| return "" if s.lower() == "nan" else s | |
| names = [] | |
| for tpl in mi: | |
| # tpl は (大, 中, 小) を想定 | |
| a = _clean_cell(tpl[0]) if len(tpl) >= 1 else "" | |
| b = _clean_cell(tpl[1]) if len(tpl) >= 2 else "" | |
| c = _clean_cell(tpl[2]) if len(tpl) >= 3 else "" | |
| name = b or c or a or "col" # ★ 中 > 小 > 大 | |
| names.append(name) | |
| # 重複解消 | |
| seen = {} | |
| out = [] | |
| for n in names: | |
| if n not in seen: | |
| seen[n] = 0 | |
| out.append(n) | |
| else: | |
| seen[n] += 1 | |
| out.append(f"{n}.{seen[n]}") | |
| return out | |
| # -------------------- 読み込みルール -------------------- | |
| # 0行目は削除し、1/2/3行目をヘッダ(= header=[1,2,3]) | |
| HEADER_ROWS = [1, 2, 3] | |
| # 2枚目以降は 0〜3行目をスキップ(= skiprows=4)、header=None でデータのみ | |
| SKIP_ROWS_OTHERS = 4 | |
| def load_excel_first(xls_path: str, sheet_pref: str | None) -> tuple[pd.DataFrame, list[str]]: | |
| """ | |
| 1枚目: | |
| - header=[1,2,3] で3段ヘッダを読み込み(0行目は自動的に使われない) | |
| - 左端の列を削除 | |
| - MultiIndex から列名を「小>中>大」の優先で単一行に変換 | |
| 戻り値: (df, chosen_names) | |
| """ | |
| sheet = pick_sheet_name(xls_path, sheet_pref) | |
| if not sheet: | |
| raise RuntimeError("シートが見つかりません") | |
| df = pd.read_excel( | |
| xls_path, | |
| sheet_name=sheet, | |
| engine="openpyxl", | |
| header=HEADER_ROWS, | |
| dtype=str | |
| ) | |
| # 左端の列を削除 | |
| df = df.iloc[:, 1:] | |
| # 前後空白トリム | |
| for c in df.select_dtypes(include=["object"]).columns: | |
| df[c] = df[c].str.strip() | |
| # 列名を選択 | |
| if isinstance(df.columns, pd.MultiIndex): | |
| chosen = choose_names_from_multiindex(df.columns) | |
| else: | |
| # 念のため単層だった場合もクリーニング&重複解消 | |
| raw = [_clean_cell(c) or "col" for c in df.columns] | |
| seen = {} | |
| chosen = [] | |
| for n in raw: | |
| if n not in seen: | |
| seen[n] = 0 | |
| chosen.append(n) | |
| else: | |
| seen[n] += 1 | |
| chosen.append(f"{n}.{seen[n]}") | |
| df.columns = chosen | |
| return df, chosen | |
| def load_excel_other(xls_path: str, sheet_pref: str | None, target_cols: list[str]) -> pd.DataFrame | None: | |
| """ | |
| 2枚目以降: | |
| - skiprows=4, header=None でデータのみ | |
| - 左端の列を削除 | |
| - 列数が合わなければ切り詰め/ダミー列追加で合わせる | |
| - 列名を 1枚目の chosen に置換 | |
| """ | |
| sheet = pick_sheet_name(xls_path, sheet_pref) | |
| if not sheet: | |
| return None | |
| df = pd.read_excel( | |
| xls_path, | |
| sheet_name=sheet, | |
| engine="openpyxl", | |
| header=None, | |
| skiprows=SKIP_ROWS_OTHERS, | |
| dtype=str | |
| ) | |
| # 左端の列を削除 | |
| df = df.iloc[:, 1:] | |
| # 前後空白トリム | |
| for c in df.select_dtypes(include=["object"]).columns: | |
| df[c] = df[c].str.strip() | |
| # 列数調整 | |
| if df.shape[1] != len(target_cols): | |
| print(f"[WARN] 列数不一致: file={os.path.basename(xls_path)} " | |
| f"read={df.shape[1]} vs target={len(target_cols)} -> 自動調整") | |
| if df.shape[1] > len(target_cols): | |
| df = df.iloc[:, :len(target_cols)] | |
| else: | |
| # 足りないときは None 列を追加 | |
| for k in range(len(target_cols) - df.shape[1]): | |
| df[f"_pad_{k}"] = None | |
| df = df.iloc[:, :len(target_cols)] | |
| df.columns = target_cols | |
| return df | |
| def zip_paths(paths: list[str], out_zip: str) -> str: | |
| with zipfile.ZipFile(out_zip, "w", compression=zipfile.ZIP_DEFLATED) as z: | |
| for p in paths: | |
| if os.path.exists(p): | |
| z.write(p, arcname=os.path.basename(p)) | |
| return out_zip | |
| # -------------------- メイン実行(Gradioから呼ぶ) -------------------- | |
| def run_job(sheet_name, sleep_sec, limit, re_download, progress=gr.Progress(track_tqdm=False)): | |
| progress(0, desc="初期化中…") | |
| session = requests.Session() | |
| session.headers.update({ | |
| "User-Agent": "Mozilla/5.0 (compatible; FITCollector/1.3; +https://huggingface.co/spaces)", | |
| "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8", | |
| }) | |
| # 1) リンク収集 | |
| links = collect_pref_links(session) | |
| if not links: | |
| return ("都道府県ファイルのリンク検出に失敗しました。ページ構成の変更/一時的な制限の可能性があります。", | |
| None, None, None, None) | |
| if limit and limit > 0: | |
| links = links[:int(limit)] | |
| progress(0.1, desc=f"リンク検出 {len(links)} 件") | |
| # 2) ダウンロード | |
| downloaded = [] | |
| for i, item in enumerate(links, start=1): | |
| progress(0.1 + 0.6 * i / max(1, len(links)), | |
| desc=f"ダウンロード {i}/{len(links)}: {item['pref']}") | |
| try: | |
| existing = None | |
| if not re_download and os.path.isdir(OUTDIR): | |
| for fn in os.listdir(OUTDIR): | |
| if fn.lower().endswith(".xlsx") and item["pref"] in fn: | |
| existing = os.path.join(OUTDIR, fn) | |
| break | |
| if existing and os.path.exists(existing): | |
| path = existing | |
| else: | |
| path = download_one(session, item["href"], OUTDIR, item["pref"]) | |
| time.sleep(float(sleep_sec)) | |
| downloaded.append(path) | |
| except Exception as e: | |
| print(f"[WARN] ダウンロード失敗: {item['pref']} {e}") | |
| if not downloaded: | |
| return ("ダウンロードに失敗しました。", None, None, None, None) | |
| # 3) 読み込み(1枚目で列名確定) | |
| progress(0.75, desc="1枚目を読み込み(列名を確定)") | |
| first_path = downloaded[0] | |
| try: | |
| df0, cols0 = load_excel_first(first_path, sheet_name if sheet_name else None) | |
| except Exception as e: | |
| return (f"1枚目の読み込みに失敗しました: {os.path.basename(first_path)} / {e}", | |
| None, None, None, None) | |
| frames = [df0] | |
| # 4) 読み込み(2枚目以降) | |
| for j, p in enumerate(downloaded[1:], start=2): | |
| progress(0.75 + 0.25 * (j - 1) / max(1, len(downloaded) - 1), | |
| desc=f"{j}枚目を読み込み") | |
| df = load_excel_other(p, sheet_name if sheet_name else None, cols0) | |
| if df is not None and len(df) > 0: | |
| frames.append(df) | |
| else: | |
| print(f"[WARN] 読み込みスキップ: {os.path.basename(p)}") | |
| # 5) 縦結合 | |
| combined = pd.concat(frames, ignore_index=True) | |
| # 6) 出力 | |
| os.makedirs(OUTDIR, exist_ok=True) | |
| out_xlsx = os.path.join(OUTDIR, "combined_fit.xlsx") | |
| out_parq = os.path.join(OUTDIR, "combined_fit.parquet") | |
| with pd.ExcelWriter(out_xlsx, engine="openpyxl") as w: | |
| combined.to_excel(w, index=False, sheet_name="combined") | |
| combined.to_parquet(out_parq, index=False) | |
| # 7) ZIP(取得ファイル一式) | |
| raw_zip = os.path.join(OUTDIR, "raw_excels.zip") | |
| zip_paths(downloaded, raw_zip) | |
| # 8) プレビュー | |
| preview_csv = os.path.join(OUTDIR, "combined_head.csv") | |
| combined.head(1000).to_csv(preview_csv, index=False) | |
| progress(1.0, desc=f"完了({len(combined):,} 行)") | |
| msg = ( | |
| f"✅ 結合完了: 行数 = {len(combined):,}\n" | |
| f"・Excel: combined_fit.xlsx\n" | |
| f"・Parquet: combined_fit.parquet\n" | |
| f"・Raw ZIP: raw_excels.zip\n" | |
| f"・プレビュー: combined_head.csv\n" | |
| f"・列名は『小分類>中分類>大分類』の優先で単一行化(結合は不実施)" | |
| ) | |
| return (msg, out_xlsx, out_parq, raw_zip, preview_csv) | |
| # -------------------- Gradio UI -------------------- | |
| with gr.Blocks(title="FIT 公表(都道府県別Excel)一括取得&結合") as demo: | |
| gr.Markdown( | |
| """ | |
| # FIT 公表(都道府県別Excel)一括取得 & 結合 | |
| **列名ポリシー**: | |
| - 1枚目: 0行目を使わず、1/2/3行目をヘッダとして読み込み(3段)。 | |
| - 列名は **小分類に値があれば小分類、無ければ中分類のみ**(結合しません)。 | |
| - 2枚目以降: 0〜3行目をスキップし、データのみ読み込み。 | |
| - すべてのファイルで **左端の列は削除**。 | |
| - ファイル名/シート名などのメタ列は付与しません。 | |
| """ | |
| ) | |
| with gr.Row(): | |
| sheet = gr.Textbox(label="読み込むシート名(空欄=自動)", placeholder="例)代表地番 / 全地番") | |
| sleep = gr.Slider(0.0, 5.0, value=1.0, step=0.1, label="ダウンロード間隔(秒)") | |
| with gr.Row(): | |
| limit = gr.Number(value=None, precision=0, label="先頭N県のみ(テスト用・空欄は全県)") | |
| reget = gr.Checkbox(label="既存ファイルがあっても再ダウンロードする", value=False) | |
| run_btn = gr.Button("実行", variant="primary") | |
| out_msg = gr.Markdown() | |
| out_xlsx = gr.File(label="結合Excel(combined_fit.xlsx)") | |
| out_parq = gr.File(label="結合Parquet(combined_fit.parquet)") | |
| out_zip = gr.File(label="取得した都道府県Excel一式(zip)") | |
| out_preview = gr.File(label="先頭1000行プレビュー(CSV)") | |
| run_btn.click( | |
| fn=run_job, | |
| inputs=[sheet, sleep, limit, reget], | |
| outputs=[out_msg, out_xlsx, out_parq, out_zip, out_preview] | |
| ) | |
| if __name__ == "__main__": | |
| demo.queue(max_size=20).launch() | |