| """Corpus filtering, safety, and quality heuristics.""" |
|
|
| from __future__ import annotations |
|
|
| import re |
| from dataclasses import dataclass |
| from typing import Iterable |
|
|
|
|
| EMAIL_RE = re.compile(r"\b[A-Z0-9._%+-]+@[A-Z0-9.-]+\.[A-Z]{2,}\b", re.IGNORECASE) |
| PHONE_RE = re.compile(r"(?:(?:\+?\d{1,3})?[-.\s]?)?(?:\(?\d{3}\)?[-.\s]?){2}\d{4}") |
| SSN_RE = re.compile(r"\b\d{3}-\d{2}-\d{4}\b") |
| HTML_RE = re.compile(r"<[^>]+>") |
| MULTISPACE_RE = re.compile(r"[ \t]+") |
| NSFW_TERMS = {"porn", "explicit sex", "rape"} |
| HATE_TERMS = {"kill all", "ethnic cleansing"} |
| ALLOWED_LICENSES = {"permissive", "restricted"} |
| ALLOWED_LANGS = {"en", "es", "fr", "de", "hi", "zh", "ar", "pt"} |
|
|
|
|
| @dataclass(frozen=True) |
| class FilterConfig: |
| """Policy controls for the filtering pipeline.""" |
|
|
| minimum_chars: int = 200 |
| maximum_chars: int = 200_000 |
| minimum_alpha_ratio: float = 0.45 |
| minimum_quality_score: float = 0.20 |
| language_confidence_threshold: float = 0.65 |
|
|
|
|
| def normalize_text(text: str) -> str: |
| """Strip tags and normalize whitespace.""" |
| text = HTML_RE.sub(" ", text) |
| text = MULTISPACE_RE.sub(" ", text) |
| return text.strip() |
|
|
|
|
| def detect_language(text: str) -> tuple[str, float]: |
| """Use a light heuristic to assign a language code.""" |
| ascii_ratio = sum(ch.isascii() for ch in text) / max(len(text), 1) |
| devanagari = sum("\u0900" <= ch <= "\u097f" for ch in text) |
| cjk = sum("\u4e00" <= ch <= "\u9fff" for ch in text) |
| arabic = sum("\u0600" <= ch <= "\u06ff" for ch in text) |
| if cjk > 8: |
| return "zh", 0.95 |
| if arabic > 8: |
| return "ar", 0.95 |
| if devanagari > 8: |
| return "hi", 0.95 |
| if ascii_ratio > 0.9: |
| return "en", 0.80 |
| return "unknown", 0.40 |
|
|
|
|
| def quality_score(text: str) -> float: |
| """Score text using length, punctuation, and alphabetic density.""" |
| if not text: |
| return 0.0 |
| alpha_ratio = sum(ch.isalpha() for ch in text) / len(text) |
| punct_ratio = sum(ch in ".,;:!?()[]{}" for ch in text) / len(text) |
| line_count = text.count("\n") + 1 |
| score = min(len(text) / 4000.0, 1.0) * 0.4 + alpha_ratio * 0.4 + min(punct_ratio * 8.0, 1.0) * 0.2 |
| if line_count < 2 and len(text) > 10_000: |
| score *= 0.85 |
| return round(score, 4) |
|
|
|
|
| def quality_tier(score: float) -> str: |
| """Map a numeric score to a quality tier.""" |
| if score >= 0.70: |
| return "high" |
| if score >= 0.40: |
| return "medium" |
| return "low" |
|
|
|
|
| def strip_pii(text: str) -> str: |
| """Mask basic email, phone, and SSN patterns.""" |
| text = EMAIL_RE.sub("[EMAIL]", text) |
| text = PHONE_RE.sub("[PHONE]", text) |
| text = SSN_RE.sub("[SSN]", text) |
| return text |
|
|
|
|
| def passes_safety_filter(text: str) -> bool: |
| """Reject obviously unsafe content with simple keyword checks.""" |
| lower = text.lower() |
| if any(term in lower for term in NSFW_TERMS): |
| return False |
| if any(term in lower for term in HATE_TERMS): |
| return False |
| return True |
|
|
|
|
| def license_allowed(category: str) -> bool: |
| """Return whether the source license category is allowed.""" |
| return category in ALLOWED_LICENSES |
|
|
|
|
| def filter_record(record: dict[str, object], config: FilterConfig = FilterConfig()) -> dict[str, object] | None: |
| """Apply the full filter pipeline to one record.""" |
| if not license_allowed(str(record.get("license_category", ""))): |
| return None |
| text = normalize_text(str(record.get("text", ""))) |
| if not (config.minimum_chars <= len(text) <= config.maximum_chars): |
| return None |
| lang, confidence = detect_language(text) |
| if lang not in ALLOWED_LANGS or confidence < config.language_confidence_threshold: |
| return None |
| text = strip_pii(text) |
| if not passes_safety_filter(text): |
| return None |
| score = quality_score(text) |
| if score < config.minimum_quality_score: |
| return None |
| return { |
| **record, |
| "text": text, |
| "lang": lang, |
| "lang_confidence": confidence, |
| "quality_score": score, |
| "quality_tier": quality_tier(score), |
| "token_count_estimate": max(1, len(text) // 4), |
| } |
|
|
|
|
| def filter_corpus(records: Iterable[dict[str, object]], config: FilterConfig = FilterConfig()) -> list[dict[str, object]]: |
| """Filter a corpus in memory.""" |
| kept: list[dict[str, object]] = [] |
| for record in records: |
| filtered = filter_record(record, config) |
| if filtered is not None: |
| kept.append(filtered) |
| return kept |
|
|