LisaMegaWatts's picture
Upload cleaner.py with huggingface_hub
d184fb7 verified
"""Text cleaning pipeline for preparing training data."""
import logging
import re
import unicodedata
logger = logging.getLogger(__name__)
class TextCleaner:
"""Cleans raw text for character-level language model training."""
# Project Gutenberg header/footer patterns
GUTENBERG_START = re.compile(
r"\*\*\*\s*START OF (?:THE |THIS )?PROJECT GUTENBERG.*?\*\*\*",
re.IGNORECASE,
)
GUTENBERG_END = re.compile(
r"\*\*\*\s*END OF (?:THE |THIS )?PROJECT GUTENBERG.*?\*\*\*",
re.IGNORECASE,
)
# Fallback for Gutenberg files that lack *** markers
GUTENBERG_END_PLAIN = re.compile(
r"^End of (?:the )?Project Gutenberg",
re.IGNORECASE | re.MULTILINE,
)
# MIT Internet Classics Archive patterns
MIT_HEADER = re.compile(
r"provided by the internet classics archive\..*?-{6,}",
re.IGNORECASE | re.DOTALL,
)
MIT_FOOTER = re.compile(
r"the internet classics archive\b[^\n]*(?:web atomics)?[^\n]*",
re.IGNORECASE,
)
MIT_DASH_LINE = re.compile(r"-{6,}")
# Internet Archive patterns
IA_HEADER = re.compile(
r"(?:Digitized by|Book digitized by|Original from|Uploaded by)"
r"[^\n]*",
re.IGNORECASE,
)
IA_GOOGLE_MARKER = re.compile(
r"(?:Generated (?:by|from)|Google-digitized|"
r"This is a digital copy of a book)[^\n]*",
re.IGNORECASE,
)
# Roman numeral pattern — matches standalone uppercase Roman numerals (2+ chars)
ROMAN_NUMERAL = re.compile(
r"\b(M{0,3}(?:CM|CD|D?C{0,3})(?:XC|XL|L?X{0,3})(?:IX|IV|V?I{0,3}))\b"
)
# Context words that allow single "I" to be treated as Roman numeral 1
ROMAN_CONTEXT = re.compile(
r"\b(?:book|chapter|prop|proposition|part|vol|volume|no|number|"
r"section|act|scene|lib|epistle|ode|psalm|canon|lemma|corollary|"
r"cor|def|definition|axiom|postulate)\b",
re.IGNORECASE,
)
# Roman numeral value map
ROMAN_VALUES = {"I": 1, "V": 5, "X": 10, "L": 50, "C": 100, "D": 500, "M": 1000}
# Non-body section headers (for aggressive stripping)
# NOTE: "INTRODUCTION" is deliberately excluded — it is often the author's own text
FRONT_MATTER_HEADERS = re.compile(
r"^\s*(?:PREFACE|FOREWORD|FORWARD|EDITOR[\u2019']?S?\s+NOTE|"
r"TRANSLATOR[\u2019']?S?\s+NOTE|PREFATORY\s+NOTE|PRELIMINARY\s+NOTE|"
r"BIOGRAPHICAL\s+(?:NOTE|SKETCH)|ADVERTISEMENT|DEDICAT(?:ION|ED\s+TO)|"
r"TO\s+THE\s+READER|NOTE\s+ON\s+(?:THE\s+)?TEXT|ABOUT\s+THIS\s+EDITION|"
r"CHRONOLOG(?:Y|ICAL))[.:\-\u2014]*\s*$",
re.IGNORECASE | re.MULTILINE,
)
BACK_MATTER_HEADERS = re.compile(
r"^\s*(?:APPENDIX|ADDEND(?:UM|A)|INDEX|GLOSSARY|BIBLIOGRAPHY|"
r"WORKS?\s+CITED|REFERENCES|ENDNOTES|FOOTNOTES|"
r"ACKNOWLEDG(?:E?MENTS?)|CREDITS|COLOPHON|ERRATA|"
r"TRANSCRIBER[\u2019']?S?\s+NOTES?|"
r"TYPOGRAPHICAL\s+ERRORS?\s+CORRECTED|"
r"LIST\s+OF\s+(?:ILLUSTRATIONS|FIGURES|PLATES))[.:\-\u2014]*\s*$",
re.IGNORECASE | re.MULTILINE,
)
TOC_HEADER = re.compile(
r"^\s*(?:TABLE\s+OF\s+)?CONTENTS?[.:\-\u2014]*\s*$",
re.IGNORECASE | re.MULTILINE,
)
# Production/publisher patterns (for front matter cleanup)
PRODUCTION_PATTERNS = [
re.compile(p, re.IGNORECASE) for p in [
r"(?:produced|prepared|transcribed|digitized|scanned)\s+(?:by|for|at)",
r"production\s+note",
r"transcriber[\u2019']?s?\s+note",
r"scanner[\u2019']?s?\s+note",
r"cornell\s+university\s+library",
r"(?:published|printed)\s+(?:by|for|at|in)",
r"(?:first|second|third|\d+(?:st|nd|rd|th))\s+edition",
r"price\s+\w+[sd]\.",
r"(?:cloth|paper|hardcover|paperback|octavo|quarto)",
r"\bisbn\b",
r"all\s+rights?\s+reserved",
r"(?:copyright|copr\.?)\s*(?:\(c\)|\xa9|\d)",
r"press\s+of\b",
r"university\s+press",
]
]
# Transcriber correction notes (back matter)
TRANSCRIBER_CORRECTION = re.compile(
r"^p\.\s*(?:\d+|\?\??|\.)\s*[.,]?\s*(?:sqq\.|in\s|the\s|as\s|heading|"
r"reference|prop|from\s|then\s|these\s|def\.|"
r"twenty|thirty|forty|fifty|sixty|seventy|eighty|ninety|"
r"one\s|two\s|three|four|five|six\s|seven|eight|nine|"
# Match quoted corrections
r'["\u201c])',
re.IGNORECASE,
)
# Separator/decoration lines
SEPARATOR_LINE = re.compile(r"^[\s.*_=~\-#]+$")
# Number words for 0-19 and tens
ONES = [
"zero", "one", "two", "three", "four", "five", "six", "seven",
"eight", "nine", "ten", "eleven", "twelve", "thirteen", "fourteen",
"fifteen", "sixteen", "seventeen", "eighteen", "nineteen",
]
TENS = [
"", "", "twenty", "thirty", "forty", "fifty", "sixty", "seventy",
"eighty", "ninety",
]
def __init__(self, config: dict):
self.lowercase = config.get("lowercase", True)
self.strip_gutenberg = config.get("strip_gutenberg", True)
self.strip_mit_classics = config.get("strip_mit_classics", True)
self.strip_internet_archive = config.get("strip_internet_archive", True)
self.normalize_unicode = config.get("normalize_unicode", True)
self.convert_numerals = config.get("convert_numerals", False)
self.convert_roman_numerals = config.get("convert_roman_numerals", False)
self.strip_non_body = config.get("strip_non_body", True)
self.min_line_length = config.get("min_line_length", 20)
self.remove_urls = config.get("remove_urls", True)
self.collapse_whitespace = config.get("collapse_whitespace", True)
self.allowed_chars = config.get("allowed_chars", r"a-z0-9 .,;:!?'\"\-\(\)")
def clean(self, text: str) -> str:
"""Run all cleaning stages on the input text."""
if not text.strip():
return ""
# Stage 1: Strip source-specific boilerplate
if self.strip_gutenberg:
text = self._strip_gutenberg(text)
if self.strip_mit_classics:
text = self._strip_mit_classics(text)
if self.strip_internet_archive:
text = self._strip_internet_archive(text)
# Stage 2: Strip non-body content (before any text transforms)
if self.strip_non_body:
text = self._strip_non_body(text)
# Stage 3: Normalize unicode
if self.normalize_unicode:
text = self._normalize_unicode(text)
if self.remove_urls:
text = self._remove_urls(text)
# Stage 4: Convert Roman numerals (BEFORE lowercase — needs uppercase)
if self.convert_roman_numerals:
text = self._convert_roman_numerals(text)
# Stage 5: Lowercase
if self.lowercase:
text = text.lower()
# Stage 6: Convert Arabic numerals
if self.convert_numerals:
text = self._convert_numerals(text)
# Stage 7: Character filtering
text = self._clean_chars(text)
# Stage 8: Collapse whitespace
if self.collapse_whitespace:
text = self._collapse_whitespace(text)
return text.strip()
# ------------------------------------------------------------------
# Source boilerplate stripping
# ------------------------------------------------------------------
def _strip_gutenberg(self, text: str) -> str:
"""Remove Project Gutenberg headers and footers."""
# Strip footer first (before positions shift)
end_match = self.GUTENBERG_END.search(text)
if not end_match:
end_match = self.GUTENBERG_END_PLAIN.search(text)
if end_match:
text = text[:end_match.start()]
# Strip header
start_match = self.GUTENBERG_START.search(text)
if start_match:
text = text[start_match.end():]
# Also strip common Gutenberg preamble lines
lines = text.split("\n")
cleaned = []
skip = True if start_match is None else False
for line in lines:
stripped = line.strip()
if skip and stripped.startswith(("Title:", "Author:", "Release Date:",
"Language:", "Character set",
"Produced by", "Updated editions")):
continue
if skip and not stripped:
continue
skip = False
cleaned.append(line)
return "\n".join(cleaned)
def _strip_mit_classics(self, text: str) -> str:
"""Remove MIT Internet Classics Archive headers, footers, and section dividers."""
text = self.MIT_HEADER.sub("", text)
text = self.MIT_FOOTER.sub("", text)
text = self.MIT_DASH_LINE.sub("", text)
return text
def _strip_internet_archive(self, text: str) -> str:
"""Remove Internet Archive / Google Books digitization boilerplate."""
text = self.IA_HEADER.sub("", text)
text = self.IA_GOOGLE_MARKER.sub("", text)
return text
# ------------------------------------------------------------------
# Non-body content stripping (aggressive mode)
# ------------------------------------------------------------------
def _strip_non_body(self, text: str) -> str:
"""Remove front matter, back matter, and inline non-body content."""
text = self._strip_front_matter(text)
text = self._strip_back_matter(text)
text = self._strip_inline_non_body(text)
return text
def _strip_front_matter(self, text: str) -> str:
"""Strip front matter: production notes, TOC, preface, etc.
Order: (1) strip named sections by header, (2) skip remaining
non-body paragraphs at the top.
"""
# Pass 1: Remove named sections that have clear headers
text = self._strip_section(text, self.FRONT_MATTER_HEADERS)
text = self._strip_section(text, self.TOC_HEADER)
# Pass 2: Skip non-body paragraphs at the beginning.
# Body prose = substantial paragraph (>150 chars) with full sentences
# that does NOT match production/publisher patterns.
lines = text.split("\n")
start_idx = 0
i = 0
while i < len(lines):
# Collect next paragraph
while i < len(lines) and not lines[i].strip():
i += 1
para_start = i
para_lines = []
while i < len(lines) and lines[i].strip():
para_lines.append(lines[i].strip())
i += 1
if not para_lines:
continue
para_text = " ".join(para_lines)
has_sentences = bool(re.search(r"\.\s+[A-Z]", para_text))
is_substantial = len(para_text) > 150
is_production = self._is_production_line(para_text)
# Title pages / heading blocks: mostly uppercase letters
alpha_chars = [c for c in para_text if c.isalpha()]
is_mostly_uppercase = (
alpha_chars
and sum(1 for c in alpha_chars if c.isupper()) / len(alpha_chars) > 0.5
)
# Short average line length suggests a title/heading block
avg_line_len = sum(len(l) for l in para_lines) / len(para_lines)
is_short_lines = avg_line_len < 50
if (is_substantial and has_sentences
and not is_production
and not is_mostly_uppercase
and not is_short_lines):
start_idx = para_start
break
# Not body yet — skip it
start_idx = i
return "\n".join(lines[start_idx:])
def _strip_back_matter(self, text: str) -> str:
"""Strip back matter: appendixes, index, transcriber notes, etc."""
lines = text.split("\n")
# Find the first back-matter header and truncate there
first_back_idx = None
for i, line in enumerate(lines):
stripped = line.strip()
if self.BACK_MATTER_HEADERS.match(stripped):
first_back_idx = i
break
# Also detect "Typographical Errors corrected..." as back matter start
if re.match(r"Typographical\s+Errors?\b", stripped, re.IGNORECASE):
first_back_idx = i
break
if first_back_idx is not None:
lines = lines[:first_back_idx]
# Strip trailing transcriber correction notes (working backward)
while lines:
stripped = lines[-1].strip()
if not stripped:
lines.pop()
continue
if self.TRANSCRIBER_CORRECTION.match(stripped):
lines.pop()
continue
if self._is_production_line(stripped):
lines.pop()
continue
break
return "\n".join(lines)
def _strip_inline_non_body(self, text: str) -> str:
"""Strip inline non-body markers: separator lines, all-caps headings."""
lines = text.split("\n")
cleaned = []
for line in lines:
stripped = line.strip()
# Remove separator/decoration lines
if stripped and self.SEPARATOR_LINE.match(stripped):
continue
# Remove short ALL-CAPS lines (likely section headings)
if stripped and len(stripped) < 80 and stripped == stripped.upper() and stripped.isalpha():
continue
cleaned.append(line)
return "\n".join(cleaned)
def _strip_section(self, text: str, header_pattern: re.Pattern) -> str:
"""Remove a section identified by header_pattern until next section boundary."""
lines = text.split("\n")
result = []
skipping = False
for i, line in enumerate(lines):
stripped = line.strip()
if header_pattern.match(stripped):
skipping = True
continue
if skipping:
# Stop skipping at next section boundary:
# A substantial non-empty line after a blank line, OR
# A line that looks like a real body section start
is_blank = not stripped
if not is_blank and self._is_section_boundary(stripped, lines, i):
skipping = False
result.append(line)
continue
result.append(line)
return "\n".join(result)
def _is_section_boundary(self, stripped: str, lines: list[str], idx: int) -> bool:
"""Detect if a line marks the beginning of a new major section.
Only returns True for explicit section headers/markers, NOT for
long body-text lines (which can appear inside prefaces/forewords).
"""
# Body-start keywords (these signal real content resuming)
if re.match(
r"(?:Book|Chapter|Part|Section|Proposition|Theorem|Definition|"
r"Axiom|Postulate|Introduction|Definitions|Lemma|Corollary|"
r"Contents?)\b",
stripped, re.IGNORECASE,
):
return True
# Another named section header (front or back matter)
if self.FRONT_MATTER_HEADERS.match(stripped):
return True
if self.BACK_MATTER_HEADERS.match(stripped):
return True
if self.TOC_HEADER.match(stripped):
return True
return False
def _is_production_line(self, line: str) -> bool:
"""Check if a line is production/publisher metadata."""
for pattern in self.PRODUCTION_PATTERNS:
if pattern.search(line):
return True
return False
# ------------------------------------------------------------------
# Unicode normalization
# ------------------------------------------------------------------
def _normalize_unicode(self, text: str) -> str:
"""Normalize unicode characters to their closest ASCII equivalents."""
text = unicodedata.normalize("NFKD", text)
replacements = {
"\u2018": "'", "\u2019": "'", # smart quotes
"\u201c": '"', "\u201d": '"',
"\u2013": "-", "\u2014": "-", # en/em dash
"\u2026": "...", # ellipsis
"\u00a0": " ", # non-breaking space
"\u00b6": "", # pilcrow
"\u00a7": "", # section sign
}
for old, new in replacements.items():
text = text.replace(old, new)
# Strip remaining non-ASCII
text = text.encode("ascii", errors="ignore").decode("ascii")
return text
def _remove_urls(self, text: str) -> str:
"""Remove URLs and email addresses."""
text = re.sub(r"https?://\S+", "", text)
text = re.sub(r"www\.\S+", "", text)
text = re.sub(r"\S+@\S+\.\S+", "", text)
return text
# ------------------------------------------------------------------
# Roman numeral conversion
# ------------------------------------------------------------------
def _roman_to_int(self, s: str) -> int:
"""Convert a Roman numeral string to an integer."""
result = 0
prev = 0
for char in reversed(s.upper()):
val = self.ROMAN_VALUES.get(char, 0)
if val < prev:
result -= val
else:
result += val
prev = val
return result
def _is_valid_roman(self, s: str) -> bool:
"""Check if a string is a valid Roman numeral (not just random letters)."""
if not s:
return False
# Must only contain valid Roman numeral characters
if not all(c in "IVXLCDM" for c in s.upper()):
return False
# Must convert to a positive number
val = self._roman_to_int(s)
return val > 0
def _convert_roman_numerals(self, text: str) -> str:
"""Convert Roman numerals to English words.
Handles multi-character Roman numerals (II, IV, XIV, etc.) directly.
Single 'I' is only converted when preceded by a context word.
"""
def replace_roman(m):
numeral = m.group(1)
# Skip single-char matches that aren't clearly Roman numerals
if len(numeral) == 1:
# Single 'I' — only convert after context words
if numeral.upper() == "I":
# Check the text before this match for context words
before = text[max(0, m.start() - 30):m.start()]
if not self.ROMAN_CONTEXT.search(before):
return m.group(0)
else:
# Single V, X, L, C, D, M — convert them
pass
if not self._is_valid_roman(numeral):
return m.group(0)
val = self._roman_to_int(numeral)
return self._number_to_words(val)
return self.ROMAN_NUMERAL.sub(replace_roman, text)
# ------------------------------------------------------------------
# Arabic numeral conversion
# ------------------------------------------------------------------
def _number_to_words(self, n: int) -> str:
"""Convert an integer to English words."""
if n < 0:
return "negative " + self._number_to_words(-n)
if n == 0:
return self.ONES[0]
if n < 20:
return self.ONES[n]
if n < 100:
tens, ones = divmod(n, 10)
return self.TENS[tens] + (" " + self.ONES[ones] if ones else "")
if n < 1000:
hundreds, remainder = divmod(n, 100)
result = self.ONES[hundreds] + " hundred"
if remainder:
result += " " + self._number_to_words(remainder)
return result
if n < 1000000:
thousands, remainder = divmod(n, 1000)
result = self._number_to_words(thousands) + " thousand"
if remainder:
result += " " + self._number_to_words(remainder)
return result
return str(n)
def _convert_numerals(self, text: str) -> str:
"""Replace standalone digit sequences with their English word equivalents.
Only converts digit groups that are standalone words (surrounded by
whitespace or punctuation), preventing garbled output from codes
like Z39.48-1984.
"""
def replace_match(m):
# Ensure digits are not part of a larger alphanumeric token
start, end = m.start(), m.end()
if start > 0 and text[start - 1].isalnum():
return m.group()
if end < len(text) and text[end].isalnum():
return m.group()
try:
n = int(m.group())
if n < 1000000:
return self._number_to_words(n)
except ValueError:
pass
return m.group()
return re.sub(r"\d+", replace_match, text)
# ------------------------------------------------------------------
# Character filtering and whitespace
# ------------------------------------------------------------------
def _clean_chars(self, text: str) -> str:
"""Remove characters not in the allowed set."""
pattern = f"[^{self.allowed_chars}\n]"
text = re.sub(pattern, " ", text)
# Remove lines that are only dots and/or spaces (separator lines)
text = re.sub(r"^[. ]+$", "", text, flags=re.MULTILINE)
return text
def _collapse_whitespace(self, text: str) -> str:
"""Collapse multiple spaces/newlines into single spaces."""
text = re.sub(r"\n{3,}", "\n\n", text)
text = re.sub(r" {2,}", " ", text)
text = re.sub(r" *\n *", "\n", text)
return text