Spaces:
Running
Running
File size: 24,901 Bytes
b1c84b5 | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 | """
ml/dataset_builder.py
=====================
PhilVerify β Async Dataset Builder / Orchestrator
Loads all configured data sources in parallel via ThreadPoolExecutor,
deduplicates samples using TF-IDF + cosine similarity (with optional
MinHashLSH fast-path), reports class balance, and saves the combined
dataset to Parquet + CSV preview.
Usage
-----
python -m ml.dataset_builder
python -m ml.dataset_builder --no-dedup
python -m ml.dataset_builder --sources philverify_handcrafted fake_news_filipino
python -m ml.dataset_builder --output-dir data/processed
Author : PhilVerify Team
Python : 3.10+
"""
from __future__ import annotations
import argparse
import logging
import sys
import time
from collections import Counter, defaultdict
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
from typing import TYPE_CHECKING
# ---------------------------------------------------------------------------
# Logging β configure before any heavy imports so early errors are visible
# ---------------------------------------------------------------------------
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)-8s %(name)s β %(message)s",
datefmt="%H:%M:%S",
stream=sys.stdout,
)
logger = logging.getLogger("philverify.dataset_builder")
# ---------------------------------------------------------------------------
# Third-party / project imports
# ---------------------------------------------------------------------------
try:
import pandas as pd
except ImportError as exc: # pragma: no cover
raise SystemExit("pandas is required: pip install pandas") from exc
try:
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
except ImportError as exc: # pragma: no cover
raise SystemExit("scikit-learn is required: pip install scikit-learn") from exc
import numpy as np
# Optional MinHashLSH for large-scale dedup
try:
from datasketch import MinHash, MinHashLSH
_DATASKETCH_AVAILABLE = True
logger.debug("datasketch available β MinHashLSH dedup path enabled.")
except ImportError:
_DATASKETCH_AVAILABLE = False
logger.debug("datasketch not found β falling back to batched TF-IDF dedup.")
# Ensure project root is on sys.path when run directly (python ml/dataset_builder.py)
sys.path.insert(0, str(Path(__file__).parent.parent))
# Project-level imports
from ml.data_sources.base import NormalizedSample, detect_language # type: ignore[import]
# Data source adapters β imported lazily inside _build_sources() so we can
# still import this module even if individual adapters are missing.
from ml.dataset import DATASET, LABEL_NAMES, Sample # type: ignore[import]
# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------
_SIM_THRESHOLD: float = 0.85 # cosine similarity dedup cutoff
_TFIDF_MAX_FEATURES: int = 10_000
_TFIDF_CHUNK_SIZE: int = 500 # rows per chunk during batched dedup
_MINHASH_NUM_PERM: int = 128 # MinHash permutations
_MINHASH_THRESHOLD: float = 0.85 # Jaccard threshold for LSH
#: Source name β priority index (lower = higher priority)
_SOURCE_PRIORITY: dict[str, int] = {
"philverify_handcrafted": 0,
"fake_news_filipino": 1,
"ph_fake_news_seacrowd": 2,
"github_ph_corpus": 3,
"vera_files": 4,
"rappler": 5,
"liar": 6,
"isot": 7,
}
_ALL_SOURCE_KEYS: list[str] = list(_SOURCE_PRIORITY.keys())
# ---------------------------------------------------------------------------
# Helper β load handcrafted samples from ml.dataset
# ---------------------------------------------------------------------------
def _load_handcrafted() -> list[NormalizedSample]:
"""Convert the 100-sample DATASET list to NormalizedSample objects."""
out: list[NormalizedSample] = []
for s in DATASET:
try:
lang = detect_language(s.text)
out.append(
NormalizedSample(
text=s.text,
label=s.label,
source="philverify_handcrafted",
language=lang,
original_label=LABEL_NAMES[s.label],
confidence=1.0,
)
)
except Exception as exc: # noqa: BLE001
logger.warning("Skipping handcrafted sample (conversion error): %s", exc)
logger.info(
"[philverify_handcrafted] Finished β %d samples loaded.", len(out)
)
return out
# ---------------------------------------------------------------------------
# Helper β dynamic source loader
# ---------------------------------------------------------------------------
def _try_import_source(module_path: str, class_name: str):
"""Dynamically import a data-source class, returning None on failure."""
import importlib
try:
mod = importlib.import_module(module_path)
return getattr(mod, class_name)
except Exception as exc: # noqa: BLE001
logger.warning(
"Could not import %s.%s β source will be skipped. (%s)",
module_path,
class_name,
exc,
)
return None
def _build_source_callables(
include: set[str] | None = None,
) -> dict[str, object]:
"""
Return a mapping of source_key β callable() that returns
list[NormalizedSample].
``include``, when given, restricts which sources are built.
"""
include = include or set(_ALL_SOURCE_KEYS)
sources: dict[str, object] = {}
# -- Handcrafted is always loaded in-process (no heavy import needed)
if "philverify_handcrafted" in include:
sources["philverify_handcrafted"] = _load_handcrafted
# -- HuggingFace: FakeNewsFilipino
if "fake_news_filipino" in include:
cls = _try_import_source(
"ml.data_sources.hf_fake_news_filipino", "FakeNewsFilipino"
)
if cls is not None:
sources["fake_news_filipino"] = cls().load
# -- HuggingFace: PHFakeNewsSEACrowd
if "ph_fake_news_seacrowd" in include:
cls = _try_import_source(
"ml.data_sources.hf_ph_fake_news", "PHFakeNewsSEACrowd"
)
if cls is not None:
sources["ph_fake_news_seacrowd"] = cls().load
# -- GitHub: GitHubPHCorpus
if "github_ph_corpus" in include:
cls = _try_import_source(
"ml.data_sources.gh_ph_corpus", "GitHubPHCorpus"
)
if cls is not None:
sources["github_ph_corpus"] = cls().load
# -- VeraFiles scraper
if "vera_files" in include:
cls = _try_import_source(
"ml.data_sources.vera_files_scraper", "VeraFilesScraper"
)
if cls is not None:
sources["vera_files"] = cls().load
# -- Rappler scraper
if "rappler" in include:
cls = _try_import_source(
"ml.data_sources.rappler_scraper", "RapplerScraper"
)
if cls is not None:
sources["rappler"] = cls().load
# -- LIAR dataset
if "liar" in include:
cls = _try_import_source("ml.data_sources.liar_dataset", "LIARDataset")
if cls is not None:
sources["liar"] = cls().load
# -- ISOT dataset
if "isot" in include:
cls = _try_import_source("ml.data_sources.isot_dataset", "ISOTDataset")
if cls is not None:
sources["isot"] = cls().load
return sources
# ---------------------------------------------------------------------------
# DatasetBuilder
# ---------------------------------------------------------------------------
class DatasetBuilder:
"""
Orchestrates loading all PhilVerify data sources in parallel,
deduplicates them, reports class balance, and persists the result.
Parameters
----------
output_dir:
Directory where ``combined.parquet`` and ``sample_preview.csv``
will be written. Defaults to ``<this file's parent>/data/processed``.
"""
def __init__(self, output_dir: Path | None = None) -> None:
if output_dir is None:
output_dir = Path(__file__).parent / "data" / "processed"
self.output_dir: Path = Path(output_dir)
self.output_path: Path = self.output_dir / "combined.parquet"
self._sources: dict[str, object] = {} # populated in run_parallel
# ------------------------------------------------------------------
# Public API
# ------------------------------------------------------------------
def run_parallel(
self,
include: set[str] | None = None,
) -> list[NormalizedSample]:
"""
Load all configured data sources concurrently.
Each source's ``.load()`` method (or equivalent callable) is
submitted to a ``ThreadPoolExecutor``. Results from each source
are logged as they become available.
Parameters
----------
include:
Optional set of source keys to load. When *None*, all sources
are loaded.
Returns
-------
list[NormalizedSample]
Combined, undeduped samples from every successful source.
"""
callables = _build_source_callables(include)
if not callables:
logger.error("No data sources could be loaded.")
return []
all_samples: list[NormalizedSample] = []
total_start = time.perf_counter()
logger.info(
"Starting parallel load of %d source(s): %s",
len(callables),
", ".join(callables),
)
future_to_key: dict = {}
with ThreadPoolExecutor(max_workers=min(len(callables), 8)) as pool:
for key, fn in callables.items():
logger.info("[%s] Submitting load task β¦", key)
future_to_key[pool.submit(fn)] = key
for future in as_completed(future_to_key):
key = future_to_key[future]
try:
result: list[NormalizedSample] = future.result()
count = len(result)
all_samples.extend(result)
logger.info(
"[%s] β Finished β %d sample(s) loaded.", key, count
)
except Exception as exc: # noqa: BLE001
logger.error(
"[%s] β Load FAILED β skipping source. Error: %s",
key,
exc,
exc_info=True,
)
elapsed = time.perf_counter() - total_start
logger.info(
"Parallel load complete β %d total samples in %.2fs.",
len(all_samples),
elapsed,
)
return all_samples
# ------------------------------------------------------------------
def deduplicate(
self, samples: list[NormalizedSample]
) -> list[NormalizedSample]:
"""
Remove near-duplicate samples across all sources.
Strategy
--------
1. Sort samples so higher-priority sources appear first.
2. Fit a TF-IDF vectoriser on all texts (max_features=10 000).
3. Iterate through samples in chunks of 500; for each new sample
compute its cosine similarity against all already-kept samples.
If max similarity > 0.85, discard the new sample.
When *datasketch* is available the function uses MinHashLSH for an
O(n) approximate pass before the exact TF-IDF check.
Parameters
----------
samples:
Raw combined sample list (may contain duplicates).
Returns
-------
list[NormalizedSample]
Deduplicated list, highest-priority source wins ties.
"""
if not samples:
return []
t0 = time.perf_counter()
logger.info("Deduplication β sorting %d samples by source priority β¦", len(samples))
# 1. Sort by priority (stable sort preserves order within a source)
def _priority(s: NormalizedSample) -> int:
return _SOURCE_PRIORITY.get(s.source, 99)
samples = sorted(samples, key=_priority)
texts = [s.text for s in samples]
logger.info("Deduplication β fitting TF-IDF on %d texts β¦", len(texts))
vectoriser = TfidfVectorizer(
max_features=_TFIDF_MAX_FEATURES,
sublinear_tf=True,
strip_accents="unicode",
)
tfidf_matrix = vectoriser.fit_transform(texts) # sparse (N, F)
if _DATASKETCH_AVAILABLE:
return self._dedup_minhash_lsh(samples, tfidf_matrix, t0)
return self._dedup_batched_tfidf(samples, tfidf_matrix, t0)
# ------------------------------------------------------------------
def class_report(self, samples: list[NormalizedSample]) -> dict:
"""
Compute and print a breakdown of sample counts by class, source,
and language.
Returns
-------
dict
Keys: ``"by_class"``, ``"by_source"``, ``"by_language"``,
each mapping to a ``Counter``.
"""
by_class: Counter[int] = Counter(s.label for s in samples)
by_source: Counter[str] = Counter(s.source for s in samples)
by_language: Counter[str] = Counter(s.language for s in samples)
total = len(samples)
print("\n" + "β" * 60)
print(f" PhilVerify Dataset Report (total: {total:,} samples)")
print("β" * 60)
# -- By class
print("\n Class distribution:")
print(f" {'Label':<20} {'Count':>8} {'%':>6}")
print(" " + "-" * 40)
for label_id in sorted(by_class):
name = LABEL_NAMES.get(label_id, f"unknown({label_id})")
cnt = by_class[label_id]
pct = 100.0 * cnt / total if total else 0.0
print(f" {name:<20} {cnt:>8,} {pct:>5.1f}%")
# -- By source
print("\n Source distribution:")
print(f" {'Source':<30} {'Count':>8} {'%':>6}")
print(" " + "-" * 44)
for src, cnt in sorted(by_source.items(), key=lambda x: -x[1]):
pct = 100.0 * cnt / total if total else 0.0
print(f" {src:<30} {cnt:>8,} {pct:>5.1f}%")
# -- By language
print("\n Language distribution:")
print(f" {'Language':<20} {'Count':>8} {'%':>6}")
print(" " + "-" * 40)
for lang, cnt in sorted(by_language.items(), key=lambda x: -x[1]):
pct = 100.0 * cnt / total if total else 0.0
print(f" {lang:<20} {cnt:>8,} {pct:>5.1f}%")
print("β" * 60 + "\n")
return {
"by_class": dict(by_class),
"by_source": dict(by_source),
"by_language": dict(by_language),
}
# ------------------------------------------------------------------
def save(self, samples: list[NormalizedSample]) -> Path:
"""
Persist the dataset to Parquet and write a CSV preview file.
Files written
-------------
* ``<output_dir>/combined.parquet`` β full dataset
* ``<output_dir>/sample_preview.csv`` β first 5 rows of each class
Parameters
----------
samples:
Final deduplicated+validated sample list.
Returns
-------
Path
Absolute path to the written Parquet file.
"""
self.output_dir.mkdir(parents=True, exist_ok=True)
logger.info("Saving %d samples to %s β¦", len(samples), self.output_path)
rows = [
{
"text": s.text,
"label": s.label,
"source": s.source,
"language": s.language,
"original_label": s.original_label,
"confidence": s.confidence,
}
for s in samples
]
df = pd.DataFrame(rows)
# Ensure correct dtypes
df["label"] = df["label"].astype("int8")
df["confidence"] = df["confidence"].astype("float32")
df.to_parquet(self.output_path, index=False, engine="pyarrow")
logger.info("Parquet saved β %s", self.output_path)
# -- CSV preview (first 5 of each class)
preview_path = self.output_dir / "sample_preview.csv"
preview_frames = []
for label_id in sorted(df["label"].unique()):
subset = df[df["label"] == label_id].head(5)
preview_frames.append(subset)
if preview_frames:
preview_df = pd.concat(preview_frames, ignore_index=True)
preview_df.to_csv(preview_path, index=False)
logger.info("CSV preview saved β %s (%d rows)", preview_path, len(preview_df))
return self.output_path
# ------------------------------------------------------------------
def build(
self,
include: set[str] | None = None,
skip_dedup: bool = False,
) -> Path:
"""
Full pipeline: load β (optional dedup) β report β save.
Parameters
----------
include:
Restrict which source keys are loaded; *None* loads all.
skip_dedup:
When *True*, the deduplication step is skipped (faster but
may produce a noisier dataset).
Returns
-------
Path
Path to the saved Parquet file.
"""
pipeline_start = time.perf_counter()
logger.info("β" * 50)
logger.info("PhilVerify DatasetBuilder β starting build pipeline")
logger.info("β" * 50)
# Step 1: Load all sources in parallel
samples = self.run_parallel(include=include)
logger.info("Step 1/4 β Load : %d raw samples collected.", len(samples))
if not samples:
raise RuntimeError("No samples were loaded β aborting build.")
# Step 2: Deduplicate
if skip_dedup:
logger.info("Step 2/4 β Dedup : SKIPPED (--no-dedup flag).")
else:
samples = self.deduplicate(samples)
logger.info("Step 2/4 β Dedup : %d samples after deduplication.", len(samples))
# Step 3: Report
logger.info("Step 3/4 β Report:")
self.class_report(samples)
# Step 4: Save
path = self.save(samples)
logger.info("Step 4/4 β Save : written to %s", path)
elapsed = time.perf_counter() - pipeline_start
logger.info("Build pipeline complete in %.2fs.", elapsed)
return path
# ------------------------------------------------------------------
# Internal dedup helpers
# ------------------------------------------------------------------
def _dedup_batched_tfidf(
self,
samples: list[NormalizedSample],
tfidf_matrix, # sparse array (N, F)
t0: float,
) -> list[NormalizedSample]:
"""
O(nΒ²/chunk) batched TF-IDF cosine-similarity deduplication.
Used when datasketch is not available.
"""
kept_indices: list[int] = []
n = len(samples)
logger.info(
"Dedup (batched TF-IDF) β processing %d samples in chunks of %d β¦",
n,
_TFIDF_CHUNK_SIZE,
)
for i in range(n):
if i % 1000 == 0 and i > 0:
logger.debug(
" β¦ %d / %d processed, %d kept so far.", i, n, len(kept_indices)
)
if not kept_indices:
kept_indices.append(i)
continue
row = tfidf_matrix[i] # sparse (1, F)
# Compare against kept in batches to limit peak memory
is_dup = False
for chunk_start in range(0, len(kept_indices), _TFIDF_CHUNK_SIZE):
chunk_idx = kept_indices[chunk_start : chunk_start + _TFIDF_CHUNK_SIZE]
kept_chunk = tfidf_matrix[chunk_idx] # sparse (K, F)
sims = cosine_similarity(row, kept_chunk)[0] # (K,)
if float(sims.max()) > _SIM_THRESHOLD:
is_dup = True
break
if not is_dup:
kept_indices.append(i)
removed = n - len(kept_indices)
elapsed = time.perf_counter() - t0
logger.info(
"Dedup complete β removed %d duplicates (%d kept) in %.2fs.",
removed,
len(kept_indices),
elapsed,
)
return [samples[i] for i in kept_indices]
def _dedup_minhash_lsh(
self,
samples: list[NormalizedSample],
tfidf_matrix, # unused for hashing, kept for exact verification
t0: float,
) -> list[NormalizedSample]:
"""
Two-phase dedup using MinHashLSH (approximate Jaccard) followed by
exact TF-IDF cosine check for candidate pairs.
datasketch must be available.
"""
logger.info(
"Dedup (MinHashLSH) β building %d MinHash objects β¦", len(samples)
)
lsh = MinHashLSH(threshold=_MINHASH_THRESHOLD, num_perm=_MINHASH_NUM_PERM)
minhashes: list[MinHash] = []
for idx, sample in enumerate(samples):
m = MinHash(num_perm=_MINHASH_NUM_PERM)
for token in sample.text.lower().split():
m.update(token.encode("utf8"))
minhashes.append(m)
# Insert one by one; query before inserting to find near-duplicates
kept_indices: list[int] = []
dup_set: set[int] = set()
n = len(samples)
for i in range(n):
if i in dup_set:
continue
result = lsh.query(minhashes[i])
# result contains keys of previously-inserted near-duplicates
# (we use str(i) as key)
if result:
# Already have a similar sample β current sample is lower-priority
# (sorted by priority above, so earlier = better)
dup_set.add(i)
continue
try:
lsh.insert(str(i), minhashes[i])
except ValueError:
# Key already inserted (shouldn't happen, but guard anyway)
pass
kept_indices.append(i)
removed = n - len(kept_indices)
elapsed = time.perf_counter() - t0
logger.info(
"Dedup (MinHashLSH) complete β removed %d duplicates (%d kept) in %.2fs.",
removed,
len(kept_indices),
elapsed,
)
return [samples[i] for i in kept_indices]
# ---------------------------------------------------------------------------
# CLI entry-point
# ---------------------------------------------------------------------------
def main() -> None: # noqa: D401
"""Command-line interface for the DatasetBuilder pipeline."""
parser = argparse.ArgumentParser(
prog="dataset_builder",
description="Build the PhilVerify training dataset from all configured sources.",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
parser.add_argument(
"--no-dedup",
action="store_true",
default=False,
help="Skip deduplication (faster, but noisier dataset).",
)
parser.add_argument(
"--sources",
nargs="+",
metavar="SOURCE",
default=None,
choices=_ALL_SOURCE_KEYS,
help=(
"Subset of sources to include. "
f"Valid keys: {', '.join(_ALL_SOURCE_KEYS)}"
),
)
parser.add_argument(
"--output-dir",
type=Path,
default=None,
metavar="PATH",
help="Output directory for combined.parquet and sample_preview.csv.",
)
parser.add_argument(
"-v",
"--verbose",
action="store_true",
default=False,
help="Enable DEBUG-level logging.",
)
args = parser.parse_args()
if args.verbose:
logging.getLogger().setLevel(logging.DEBUG)
include_set: set[str] | None = set(args.sources) if args.sources else None
builder = DatasetBuilder(output_dir=args.output_dir)
try:
output_path = builder.build(
include=include_set,
skip_dedup=args.no_dedup,
)
except RuntimeError as exc:
logger.error("Build failed: %s", exc)
sys.exit(1)
# Final summary
try:
_df = pd.read_parquet(output_path)
total: int | str = len(_df)
except Exception: # noqa: BLE001
total = "unknown"
print(f"\nβ Dataset ready β {total:,} samples β {output_path}\n")
if __name__ == "__main__":
main()
|