Spaces:
Sleeping
Sleeping
| from __future__ import annotations | |
| import argparse | |
| import unicodedata | |
| from functools import lru_cache | |
| from pathlib import Path | |
| from typing import Any | |
| import pandas as pd | |
| from huggingface_hub import HfApi, hf_hub_download | |
| from language import ALL_LANGS, LANG_ISO2_TO_ISO3, canonical_lang | |
| from sentence_sampling import sample_multi_group_bundle, sample_single_group_bundle | |
| FLEURS_DATASET = "google/fleurs" | |
| FLEURS_CACHE_DIR = Path(__file__).with_name("data") / "fleurs" | |
| FLEURS_PARQUET_PATH = FLEURS_CACHE_DIR / "fleurs_text_only.parquet" | |
| FLEURS_DOWNLOAD_DIR = FLEURS_CACHE_DIR / "downloads" | |
| FLEURS_TSV_COLUMNS = [ | |
| "id", | |
| "file_name", | |
| "source_sentence", | |
| "transcription", | |
| "tokens", | |
| "num_samples", | |
| "gender", | |
| ] | |
| FLEURS_SPLIT_ORDER = {"train": 0, "validation": 1, "test": 2} | |
| FLEURS_LEAN_COLUMNS = ["id", "text", "source_lang", "model_lang", "split"] | |
| def _normalize_model_lang(source_lang: str) -> str: | |
| """Map a FLEURS locale like `am_et` to the model language code.""" | |
| base_lang = source_lang.split("_", 1)[0].strip().lower() | |
| return canonical_lang(base_lang) | |
| def _discover_tsv_files() -> list[str]: | |
| """Return all FLEURS TSV metadata files, preferring the local cache.""" | |
| local_root = FLEURS_DOWNLOAD_DIR / "data" | |
| local_files = sorted(local_root.rglob("*.tsv")) | |
| if local_files: | |
| return [str(path.relative_to(FLEURS_DOWNLOAD_DIR)) for path in local_files] | |
| api = HfApi() | |
| try: | |
| files = api.list_repo_files(repo_id=FLEURS_DATASET, repo_type="dataset") | |
| except TypeError: | |
| files = api.list_repo_files(FLEURS_DATASET, repo_type="dataset") | |
| tsv_files = [ | |
| file_path | |
| for file_path in files | |
| if file_path.startswith("data/") and file_path.endswith(".tsv") | |
| ] | |
| if not tsv_files: | |
| raise RuntimeError("Could not find any FLEURS TSV metadata files.") | |
| return sorted(tsv_files) | |
| def _normalize_split_name(file_name: str) -> str: | |
| stem = Path(file_name).stem.lower() | |
| if stem == "dev": | |
| return "validation" | |
| return stem | |
| def _normalize_text_key(text: str) -> str: | |
| """Normalize text for deduping while keeping the original text intact.""" | |
| normalized = unicodedata.normalize("NFKC", text) | |
| normalized = " ".join(normalized.split()) | |
| return normalized.casefold().strip() | |
| def _download_tsv(file_path: str) -> Path: | |
| local_candidate = FLEURS_DOWNLOAD_DIR / file_path | |
| if local_candidate.exists(): | |
| return local_candidate | |
| FLEURS_DOWNLOAD_DIR.mkdir(parents=True, exist_ok=True) | |
| try: | |
| local_path = hf_hub_download( | |
| repo_id=FLEURS_DATASET, | |
| repo_type="dataset", | |
| filename=file_path, | |
| local_dir=str(FLEURS_DOWNLOAD_DIR), | |
| ) | |
| except TypeError: | |
| local_path = hf_hub_download( | |
| repo_id=FLEURS_DATASET, | |
| repo_type="dataset", | |
| filename=file_path, | |
| cache_dir=str(FLEURS_DOWNLOAD_DIR), | |
| ) | |
| return Path(local_path) | |
| def _frame_from_tsv(tsv_path: Path, source_lang: str) -> pd.DataFrame: | |
| records: list[dict[str, Any]] = [] | |
| header_seen = False | |
| header_markers = {name.lower() for name in FLEURS_TSV_COLUMNS} | |
| with tsv_path.open("r", encoding="utf-8", newline="") as handle: | |
| for line in handle: | |
| line = line.rstrip("\n") | |
| if not line.strip(): | |
| continue | |
| parts = line.split("\t", 6) | |
| if not header_seen: | |
| header_candidate = [part.strip().lower() for part in parts] | |
| if header_markers.issubset(set(header_candidate)): | |
| header_seen = True | |
| continue | |
| header_seen = True | |
| if len(parts) < len(FLEURS_TSV_COLUMNS): | |
| parts.extend([""] * (len(FLEURS_TSV_COLUMNS) - len(parts))) | |
| elif len(parts) > len(FLEURS_TSV_COLUMNS): | |
| parts = parts[: len(FLEURS_TSV_COLUMNS) - 1] + ["\t".join(parts[len(FLEURS_TSV_COLUMNS) - 1 :])] | |
| record = dict(zip(FLEURS_TSV_COLUMNS, parts, strict=True)) | |
| records.append(record) | |
| if not records: | |
| return pd.DataFrame() | |
| frame = pd.DataFrame.from_records(records) | |
| frame = frame.fillna("") | |
| frame["source_sentence"] = frame["source_sentence"].astype(str).str.strip() | |
| frame["transcription"] = frame["transcription"].astype(str).str.strip() | |
| frame["tokens"] = frame["tokens"].astype(str).str.strip() | |
| frame["text"] = frame["transcription"].where(frame["transcription"].ne(""), frame["source_sentence"]) | |
| frame["raw_text"] = frame["source_sentence"].where(frame["source_sentence"].ne(""), frame["transcription"]) | |
| frame["source"] = "fleurs" | |
| frame["source_lang"] = source_lang | |
| frame["model_lang"] = _normalize_model_lang(source_lang) | |
| frame["split"] = _normalize_split_name(tsv_path.name) | |
| frame["lang_iso3"] = frame["model_lang"].map(lambda lang: LANG_ISO2_TO_ISO3.get(lang, "")) | |
| frame["language_name"] = source_lang | |
| frame["text"] = frame["text"].astype(str).str.strip().replace("", pd.NA) | |
| frame["raw_text"] = frame["raw_text"].astype(str).str.strip() | |
| frame = frame[frame["text"].notna()].reset_index(drop=True) | |
| return frame | |
| def _post_process_fleurs_frame(frame: pd.DataFrame) -> pd.DataFrame: | |
| """Drop redundant rows and keep only the lean demo columns.""" | |
| if frame.empty: | |
| return frame | |
| frame = frame.copy() | |
| frame["split_rank"] = frame["split"].map(lambda split: FLEURS_SPLIT_ORDER.get(str(split), 99)) | |
| frame["text_key"] = frame["text"].astype(str).map(_normalize_text_key) | |
| frame["id_sort"] = pd.to_numeric(frame["id"], errors="coerce").fillna(10**18) | |
| frame = frame[frame["text_key"].ne("")].sort_values( | |
| by=["source_lang", "text_key", "split_rank", "id_sort"], | |
| kind="stable", | |
| ) | |
| frame = frame.drop_duplicates(subset=["source_lang", "text_key"], keep="first") | |
| lean = frame.loc[:, [col for col in FLEURS_LEAN_COLUMNS if col in frame.columns]].copy() | |
| lean["text"] = frame["text"].astype(str).values | |
| lean["source_lang"] = frame["source_lang"].astype(str).values | |
| lean["model_lang"] = frame["model_lang"].astype(str).values | |
| lean["split"] = frame["split"].astype(str).values | |
| lean["id"] = pd.to_numeric(frame["id"], errors="coerce").fillna(-1).astype(int).values | |
| lean = lean[lean["text"].astype(str).str.strip().ne("")].reset_index(drop=True) | |
| return lean | |
| def build_fleurs_text_parquet( | |
| parquet_path: str | Path = FLEURS_PARQUET_PATH, | |
| ) -> Path: | |
| """Download FLEURS TSV metadata and persist a text-only parquet cache.""" | |
| parquet_path = Path(parquet_path) | |
| parquet_path.parent.mkdir(parents=True, exist_ok=True) | |
| frames: list[pd.DataFrame] = [] | |
| for repo_path in _discover_tsv_files(): | |
| source_lang = Path(repo_path).parent.name | |
| tsv_path = _download_tsv(repo_path) | |
| frame = _frame_from_tsv(tsv_path, source_lang) | |
| if not frame.empty: | |
| frames.append(frame) | |
| if not frames: | |
| raise RuntimeError("No rows were loaded from the FLEURS TSV metadata files.") | |
| combined = pd.concat(frames, ignore_index=True) | |
| before_rows = len(combined) | |
| combined = _post_process_fleurs_frame(combined) | |
| combined.to_parquet(parquet_path, index=False) | |
| print( | |
| f"Built lean FLEURS parquet with {len(combined):,} rows " | |
| f"from {before_rows:,} raw rows and {len(combined.columns)} columns." | |
| ) | |
| return parquet_path | |
| def load_fleurs_table(parquet_path: str | Path = FLEURS_PARQUET_PATH) -> pd.DataFrame: | |
| """Load the cached FLEURS text-only parquet into memory.""" | |
| parquet_path = Path(parquet_path) | |
| if not parquet_path.exists(): | |
| raise FileNotFoundError( | |
| f"Missing FLEURS cache at {parquet_path}. " | |
| "Run `./.venv/bin/python fleurs_cache.py` once while online to build it." | |
| ) | |
| frame = pd.read_parquet(parquet_path) | |
| if "text" not in frame.columns: | |
| raise RuntimeError("FLEURS parquet cache is missing the text column.") | |
| return frame | |
| def _row_to_sentence(row: pd.Series) -> dict[str, Any]: | |
| source_lang = str(row.get("source_lang", "")).strip() | |
| model_lang = str(row.get("model_lang", "")).strip() | |
| lang_iso2 = model_lang or _normalize_model_lang(source_lang) | |
| language = str(row.get("language_name", source_lang)).strip() | |
| text = str(row.get("text", "")).strip() | |
| return { | |
| "text": text, | |
| "raw_text": text, | |
| "source": "fleurs", | |
| "source_lang": source_lang, | |
| "model_lang": model_lang or lang_iso2, | |
| "lang_iso2": lang_iso2, | |
| "lang_iso3": LANG_ISO2_TO_ISO3.get(lang_iso2, ""), | |
| "language": language, | |
| "split": str(row.get("split", "")).strip(), | |
| "fleurs_id": int(row.get("id", -1)) if str(row.get("id", "-1")).strip().lstrip("-").isdigit() else -1, | |
| } | |
| def fetch_random_fleurs_sentence( | |
| *, | |
| attempts: int = 8, | |
| parquet_path: str | Path = FLEURS_PARQUET_PATH, | |
| ) -> dict[str, Any]: | |
| """Fetch one random text sample, sometimes repeated within one language.""" | |
| frame = load_fleurs_table(parquet_path) | |
| candidate_frame = frame[frame["model_lang"].isin(ALL_LANGS)] if "model_lang" in frame.columns else frame | |
| return sample_single_group_bundle( | |
| candidate_frame, | |
| group_column="model_lang", | |
| row_to_sentence=_row_to_sentence, | |
| attempts=attempts, | |
| ) | |
| def fetch_random_fleurs_sentence_mix( | |
| *, | |
| min_sentences: int = 2, | |
| max_sentences: int = 3, | |
| parquet_path: str | Path = FLEURS_PARQUET_PATH, | |
| ) -> dict[str, Any]: | |
| """Fetch 2-3 random FLEURS sentences from distinct languages and concatenate them.""" | |
| frame = load_fleurs_table(parquet_path) | |
| candidate_frame = frame[frame["model_lang"].isin(ALL_LANGS)] if "model_lang" in frame.columns else frame | |
| bundle = sample_multi_group_bundle( | |
| candidate_frame, | |
| group_column="model_lang", | |
| row_to_sentence=_row_to_sentence, | |
| min_groups=min_sentences, | |
| max_groups=max_sentences, | |
| ) | |
| return { | |
| **bundle, | |
| "source": "fleurs-mix", | |
| } | |
| def main() -> None: | |
| parser = argparse.ArgumentParser(description="Build the cached text-only FLEURS parquet.") | |
| parser.add_argument( | |
| "--output", | |
| default=str(FLEURS_PARQUET_PATH), | |
| help="Output parquet path for the cached FLEURS text rows.", | |
| ) | |
| args = parser.parse_args() | |
| path = build_fleurs_text_parquet(args.output) | |
| print(f"Wrote FLEURS text cache to {path}") | |
| if __name__ == "__main__": | |
| main() | |