| """Text cleaning pipeline for preparing training data.""" |
|
|
| import logging |
| import re |
| import unicodedata |
|
|
| logger = logging.getLogger(__name__) |
|
|
|
|
| class TextCleaner: |
| """Cleans raw text for character-level language model training.""" |
|
|
| |
| GUTENBERG_START = re.compile( |
| r"\*\*\*\s*START OF (?:THE |THIS )?PROJECT GUTENBERG.*?\*\*\*", |
| re.IGNORECASE, |
| ) |
| GUTENBERG_END = re.compile( |
| r"\*\*\*\s*END OF (?:THE |THIS )?PROJECT GUTENBERG.*?\*\*\*", |
| re.IGNORECASE, |
| ) |
| |
| GUTENBERG_END_PLAIN = re.compile( |
| r"^End of (?:the )?Project Gutenberg", |
| re.IGNORECASE | re.MULTILINE, |
| ) |
|
|
| |
| MIT_HEADER = re.compile( |
| r"provided by the internet classics archive\..*?-{6,}", |
| re.IGNORECASE | re.DOTALL, |
| ) |
| MIT_FOOTER = re.compile( |
| r"the internet classics archive\b[^\n]*(?:web atomics)?[^\n]*", |
| re.IGNORECASE, |
| ) |
| MIT_DASH_LINE = re.compile(r"-{6,}") |
|
|
| |
| IA_HEADER = re.compile( |
| r"(?:Digitized by|Book digitized by|Original from|Uploaded by)" |
| r"[^\n]*", |
| re.IGNORECASE, |
| ) |
| IA_GOOGLE_MARKER = re.compile( |
| r"(?:Generated (?:by|from)|Google-digitized|" |
| r"This is a digital copy of a book)[^\n]*", |
| re.IGNORECASE, |
| ) |
|
|
| |
| ROMAN_NUMERAL = re.compile( |
| r"\b(M{0,3}(?:CM|CD|D?C{0,3})(?:XC|XL|L?X{0,3})(?:IX|IV|V?I{0,3}))\b" |
| ) |
| |
| ROMAN_CONTEXT = re.compile( |
| r"\b(?:book|chapter|prop|proposition|part|vol|volume|no|number|" |
| r"section|act|scene|lib|epistle|ode|psalm|canon|lemma|corollary|" |
| r"cor|def|definition|axiom|postulate)\b", |
| re.IGNORECASE, |
| ) |
|
|
| |
| ROMAN_VALUES = {"I": 1, "V": 5, "X": 10, "L": 50, "C": 100, "D": 500, "M": 1000} |
|
|
| |
| |
| FRONT_MATTER_HEADERS = re.compile( |
| r"^\s*(?:PREFACE|FOREWORD|FORWARD|EDITOR[\u2019']?S?\s+NOTE|" |
| r"TRANSLATOR[\u2019']?S?\s+NOTE|PREFATORY\s+NOTE|PRELIMINARY\s+NOTE|" |
| r"BIOGRAPHICAL\s+(?:NOTE|SKETCH)|ADVERTISEMENT|DEDICAT(?:ION|ED\s+TO)|" |
| r"TO\s+THE\s+READER|NOTE\s+ON\s+(?:THE\s+)?TEXT|ABOUT\s+THIS\s+EDITION|" |
| r"CHRONOLOG(?:Y|ICAL))[.:\-\u2014]*\s*$", |
| re.IGNORECASE | re.MULTILINE, |
| ) |
| BACK_MATTER_HEADERS = re.compile( |
| r"^\s*(?:APPENDIX|ADDEND(?:UM|A)|INDEX|GLOSSARY|BIBLIOGRAPHY|" |
| r"WORKS?\s+CITED|REFERENCES|ENDNOTES|FOOTNOTES|" |
| r"ACKNOWLEDG(?:E?MENTS?)|CREDITS|COLOPHON|ERRATA|" |
| r"TRANSCRIBER[\u2019']?S?\s+NOTES?|" |
| r"TYPOGRAPHICAL\s+ERRORS?\s+CORRECTED|" |
| r"LIST\s+OF\s+(?:ILLUSTRATIONS|FIGURES|PLATES))[.:\-\u2014]*\s*$", |
| re.IGNORECASE | re.MULTILINE, |
| ) |
| TOC_HEADER = re.compile( |
| r"^\s*(?:TABLE\s+OF\s+)?CONTENTS?[.:\-\u2014]*\s*$", |
| re.IGNORECASE | re.MULTILINE, |
| ) |
|
|
| |
| PRODUCTION_PATTERNS = [ |
| re.compile(p, re.IGNORECASE) for p in [ |
| r"(?:produced|prepared|transcribed|digitized|scanned)\s+(?:by|for|at)", |
| r"production\s+note", |
| r"transcriber[\u2019']?s?\s+note", |
| r"scanner[\u2019']?s?\s+note", |
| r"cornell\s+university\s+library", |
| r"(?:published|printed)\s+(?:by|for|at|in)", |
| r"(?:first|second|third|\d+(?:st|nd|rd|th))\s+edition", |
| r"price\s+\w+[sd]\.", |
| r"(?:cloth|paper|hardcover|paperback|octavo|quarto)", |
| r"\bisbn\b", |
| r"all\s+rights?\s+reserved", |
| r"(?:copyright|copr\.?)\s*(?:\(c\)|\xa9|\d)", |
| r"press\s+of\b", |
| r"university\s+press", |
| ] |
| ] |
|
|
| |
| TRANSCRIBER_CORRECTION = re.compile( |
| r"^p\.\s*(?:\d+|\?\??|\.)\s*[.,]?\s*(?:sqq\.|in\s|the\s|as\s|heading|" |
| r"reference|prop|from\s|then\s|these\s|def\.|" |
| r"twenty|thirty|forty|fifty|sixty|seventy|eighty|ninety|" |
| r"one\s|two\s|three|four|five|six\s|seven|eight|nine|" |
| |
| r'["\u201c])', |
| re.IGNORECASE, |
| ) |
|
|
| |
| SEPARATOR_LINE = re.compile(r"^[\s.*_=~\-#]+$") |
|
|
| |
| ONES = [ |
| "zero", "one", "two", "three", "four", "five", "six", "seven", |
| "eight", "nine", "ten", "eleven", "twelve", "thirteen", "fourteen", |
| "fifteen", "sixteen", "seventeen", "eighteen", "nineteen", |
| ] |
| TENS = [ |
| "", "", "twenty", "thirty", "forty", "fifty", "sixty", "seventy", |
| "eighty", "ninety", |
| ] |
|
|
| def __init__(self, config: dict): |
| self.lowercase = config.get("lowercase", True) |
| self.strip_gutenberg = config.get("strip_gutenberg", True) |
| self.strip_mit_classics = config.get("strip_mit_classics", True) |
| self.strip_internet_archive = config.get("strip_internet_archive", True) |
| self.normalize_unicode = config.get("normalize_unicode", True) |
| self.convert_numerals = config.get("convert_numerals", False) |
| self.convert_roman_numerals = config.get("convert_roman_numerals", False) |
| self.strip_non_body = config.get("strip_non_body", True) |
| self.min_line_length = config.get("min_line_length", 20) |
| self.remove_urls = config.get("remove_urls", True) |
| self.collapse_whitespace = config.get("collapse_whitespace", True) |
| self.allowed_chars = config.get("allowed_chars", r"a-z0-9 .,;:!?'\"\-\(\)") |
|
|
| def clean(self, text: str) -> str: |
| """Run all cleaning stages on the input text.""" |
| if not text.strip(): |
| return "" |
|
|
| |
| if self.strip_gutenberg: |
| text = self._strip_gutenberg(text) |
|
|
| if self.strip_mit_classics: |
| text = self._strip_mit_classics(text) |
|
|
| if self.strip_internet_archive: |
| text = self._strip_internet_archive(text) |
|
|
| |
| if self.strip_non_body: |
| text = self._strip_non_body(text) |
|
|
| |
| if self.normalize_unicode: |
| text = self._normalize_unicode(text) |
|
|
| if self.remove_urls: |
| text = self._remove_urls(text) |
|
|
| |
| if self.convert_roman_numerals: |
| text = self._convert_roman_numerals(text) |
|
|
| |
| if self.lowercase: |
| text = text.lower() |
|
|
| |
| if self.convert_numerals: |
| text = self._convert_numerals(text) |
|
|
| |
| text = self._clean_chars(text) |
|
|
| |
| if self.collapse_whitespace: |
| text = self._collapse_whitespace(text) |
|
|
| return text.strip() |
|
|
| |
| |
| |
|
|
| def _strip_gutenberg(self, text: str) -> str: |
| """Remove Project Gutenberg headers and footers.""" |
| |
| end_match = self.GUTENBERG_END.search(text) |
| if not end_match: |
| end_match = self.GUTENBERG_END_PLAIN.search(text) |
| if end_match: |
| text = text[:end_match.start()] |
|
|
| |
| start_match = self.GUTENBERG_START.search(text) |
| if start_match: |
| text = text[start_match.end():] |
|
|
| |
| lines = text.split("\n") |
| cleaned = [] |
| skip = True if start_match is None else False |
| for line in lines: |
| stripped = line.strip() |
| if skip and stripped.startswith(("Title:", "Author:", "Release Date:", |
| "Language:", "Character set", |
| "Produced by", "Updated editions")): |
| continue |
| if skip and not stripped: |
| continue |
| skip = False |
| cleaned.append(line) |
|
|
| return "\n".join(cleaned) |
|
|
| def _strip_mit_classics(self, text: str) -> str: |
| """Remove MIT Internet Classics Archive headers, footers, and section dividers.""" |
| text = self.MIT_HEADER.sub("", text) |
| text = self.MIT_FOOTER.sub("", text) |
| text = self.MIT_DASH_LINE.sub("", text) |
| return text |
|
|
| def _strip_internet_archive(self, text: str) -> str: |
| """Remove Internet Archive / Google Books digitization boilerplate.""" |
| text = self.IA_HEADER.sub("", text) |
| text = self.IA_GOOGLE_MARKER.sub("", text) |
| return text |
|
|
| |
| |
| |
|
|
| def _strip_non_body(self, text: str) -> str: |
| """Remove front matter, back matter, and inline non-body content.""" |
| text = self._strip_front_matter(text) |
| text = self._strip_back_matter(text) |
| text = self._strip_inline_non_body(text) |
| return text |
|
|
| def _strip_front_matter(self, text: str) -> str: |
| """Strip front matter: production notes, TOC, preface, etc. |
| |
| Order: (1) strip named sections by header, (2) skip remaining |
| non-body paragraphs at the top. |
| """ |
| |
| text = self._strip_section(text, self.FRONT_MATTER_HEADERS) |
| text = self._strip_section(text, self.TOC_HEADER) |
|
|
| |
| |
| |
| lines = text.split("\n") |
| start_idx = 0 |
| i = 0 |
| while i < len(lines): |
| |
| while i < len(lines) and not lines[i].strip(): |
| i += 1 |
| para_start = i |
| para_lines = [] |
| while i < len(lines) and lines[i].strip(): |
| para_lines.append(lines[i].strip()) |
| i += 1 |
|
|
| if not para_lines: |
| continue |
|
|
| para_text = " ".join(para_lines) |
|
|
| has_sentences = bool(re.search(r"\.\s+[A-Z]", para_text)) |
| is_substantial = len(para_text) > 150 |
| is_production = self._is_production_line(para_text) |
|
|
| |
| alpha_chars = [c for c in para_text if c.isalpha()] |
| is_mostly_uppercase = ( |
| alpha_chars |
| and sum(1 for c in alpha_chars if c.isupper()) / len(alpha_chars) > 0.5 |
| ) |
|
|
| |
| avg_line_len = sum(len(l) for l in para_lines) / len(para_lines) |
| is_short_lines = avg_line_len < 50 |
|
|
| if (is_substantial and has_sentences |
| and not is_production |
| and not is_mostly_uppercase |
| and not is_short_lines): |
| start_idx = para_start |
| break |
|
|
| |
| start_idx = i |
|
|
| return "\n".join(lines[start_idx:]) |
|
|
| def _strip_back_matter(self, text: str) -> str: |
| """Strip back matter: appendixes, index, transcriber notes, etc.""" |
| lines = text.split("\n") |
|
|
| |
| first_back_idx = None |
| for i, line in enumerate(lines): |
| stripped = line.strip() |
| if self.BACK_MATTER_HEADERS.match(stripped): |
| first_back_idx = i |
| break |
| |
| if re.match(r"Typographical\s+Errors?\b", stripped, re.IGNORECASE): |
| first_back_idx = i |
| break |
|
|
| if first_back_idx is not None: |
| lines = lines[:first_back_idx] |
|
|
| |
| while lines: |
| stripped = lines[-1].strip() |
| if not stripped: |
| lines.pop() |
| continue |
| if self.TRANSCRIBER_CORRECTION.match(stripped): |
| lines.pop() |
| continue |
| if self._is_production_line(stripped): |
| lines.pop() |
| continue |
| break |
|
|
| return "\n".join(lines) |
|
|
| def _strip_inline_non_body(self, text: str) -> str: |
| """Strip inline non-body markers: separator lines, all-caps headings.""" |
| lines = text.split("\n") |
| cleaned = [] |
| for line in lines: |
| stripped = line.strip() |
|
|
| |
| if stripped and self.SEPARATOR_LINE.match(stripped): |
| continue |
|
|
| |
| if stripped and len(stripped) < 80 and stripped == stripped.upper() and stripped.isalpha(): |
| continue |
|
|
| cleaned.append(line) |
|
|
| return "\n".join(cleaned) |
|
|
| def _strip_section(self, text: str, header_pattern: re.Pattern) -> str: |
| """Remove a section identified by header_pattern until next section boundary.""" |
| lines = text.split("\n") |
| result = [] |
| skipping = False |
|
|
| for i, line in enumerate(lines): |
| stripped = line.strip() |
|
|
| if header_pattern.match(stripped): |
| skipping = True |
| continue |
|
|
| if skipping: |
| |
| |
| |
| is_blank = not stripped |
| if not is_blank and self._is_section_boundary(stripped, lines, i): |
| skipping = False |
| result.append(line) |
| continue |
|
|
| result.append(line) |
|
|
| return "\n".join(result) |
|
|
| def _is_section_boundary(self, stripped: str, lines: list[str], idx: int) -> bool: |
| """Detect if a line marks the beginning of a new major section. |
| |
| Only returns True for explicit section headers/markers, NOT for |
| long body-text lines (which can appear inside prefaces/forewords). |
| """ |
| |
| if re.match( |
| r"(?:Book|Chapter|Part|Section|Proposition|Theorem|Definition|" |
| r"Axiom|Postulate|Introduction|Definitions|Lemma|Corollary|" |
| r"Contents?)\b", |
| stripped, re.IGNORECASE, |
| ): |
| return True |
|
|
| |
| if self.FRONT_MATTER_HEADERS.match(stripped): |
| return True |
| if self.BACK_MATTER_HEADERS.match(stripped): |
| return True |
| if self.TOC_HEADER.match(stripped): |
| return True |
|
|
| return False |
|
|
| def _is_production_line(self, line: str) -> bool: |
| """Check if a line is production/publisher metadata.""" |
| for pattern in self.PRODUCTION_PATTERNS: |
| if pattern.search(line): |
| return True |
| return False |
|
|
| |
| |
| |
|
|
| def _normalize_unicode(self, text: str) -> str: |
| """Normalize unicode characters to their closest ASCII equivalents.""" |
| text = unicodedata.normalize("NFKD", text) |
| replacements = { |
| "\u2018": "'", "\u2019": "'", |
| "\u201c": '"', "\u201d": '"', |
| "\u2013": "-", "\u2014": "-", |
| "\u2026": "...", |
| "\u00a0": " ", |
| "\u00b6": "", |
| "\u00a7": "", |
| } |
| for old, new in replacements.items(): |
| text = text.replace(old, new) |
|
|
| |
| text = text.encode("ascii", errors="ignore").decode("ascii") |
| return text |
|
|
| def _remove_urls(self, text: str) -> str: |
| """Remove URLs and email addresses.""" |
| text = re.sub(r"https?://\S+", "", text) |
| text = re.sub(r"www\.\S+", "", text) |
| text = re.sub(r"\S+@\S+\.\S+", "", text) |
| return text |
|
|
| |
| |
| |
|
|
| def _roman_to_int(self, s: str) -> int: |
| """Convert a Roman numeral string to an integer.""" |
| result = 0 |
| prev = 0 |
| for char in reversed(s.upper()): |
| val = self.ROMAN_VALUES.get(char, 0) |
| if val < prev: |
| result -= val |
| else: |
| result += val |
| prev = val |
| return result |
|
|
| def _is_valid_roman(self, s: str) -> bool: |
| """Check if a string is a valid Roman numeral (not just random letters).""" |
| if not s: |
| return False |
| |
| if not all(c in "IVXLCDM" for c in s.upper()): |
| return False |
| |
| val = self._roman_to_int(s) |
| return val > 0 |
|
|
| def _convert_roman_numerals(self, text: str) -> str: |
| """Convert Roman numerals to English words. |
| |
| Handles multi-character Roman numerals (II, IV, XIV, etc.) directly. |
| Single 'I' is only converted when preceded by a context word. |
| """ |
| def replace_roman(m): |
| numeral = m.group(1) |
| |
| if len(numeral) == 1: |
| |
| if numeral.upper() == "I": |
| |
| before = text[max(0, m.start() - 30):m.start()] |
| if not self.ROMAN_CONTEXT.search(before): |
| return m.group(0) |
| else: |
| |
| pass |
|
|
| if not self._is_valid_roman(numeral): |
| return m.group(0) |
|
|
| val = self._roman_to_int(numeral) |
| return self._number_to_words(val) |
|
|
| return self.ROMAN_NUMERAL.sub(replace_roman, text) |
|
|
| |
| |
| |
|
|
| def _number_to_words(self, n: int) -> str: |
| """Convert an integer to English words.""" |
| if n < 0: |
| return "negative " + self._number_to_words(-n) |
| if n == 0: |
| return self.ONES[0] |
| if n < 20: |
| return self.ONES[n] |
| if n < 100: |
| tens, ones = divmod(n, 10) |
| return self.TENS[tens] + (" " + self.ONES[ones] if ones else "") |
| if n < 1000: |
| hundreds, remainder = divmod(n, 100) |
| result = self.ONES[hundreds] + " hundred" |
| if remainder: |
| result += " " + self._number_to_words(remainder) |
| return result |
| if n < 1000000: |
| thousands, remainder = divmod(n, 1000) |
| result = self._number_to_words(thousands) + " thousand" |
| if remainder: |
| result += " " + self._number_to_words(remainder) |
| return result |
| return str(n) |
|
|
| def _convert_numerals(self, text: str) -> str: |
| """Replace standalone digit sequences with their English word equivalents. |
| |
| Only converts digit groups that are standalone words (surrounded by |
| whitespace or punctuation), preventing garbled output from codes |
| like Z39.48-1984. |
| """ |
| def replace_match(m): |
| |
| start, end = m.start(), m.end() |
| if start > 0 and text[start - 1].isalnum(): |
| return m.group() |
| if end < len(text) and text[end].isalnum(): |
| return m.group() |
| try: |
| n = int(m.group()) |
| if n < 1000000: |
| return self._number_to_words(n) |
| except ValueError: |
| pass |
| return m.group() |
| return re.sub(r"\d+", replace_match, text) |
|
|
| |
| |
| |
|
|
| def _clean_chars(self, text: str) -> str: |
| """Remove characters not in the allowed set.""" |
| pattern = f"[^{self.allowed_chars}\n]" |
| text = re.sub(pattern, " ", text) |
| |
| text = re.sub(r"^[. ]+$", "", text, flags=re.MULTILINE) |
| return text |
|
|
| def _collapse_whitespace(self, text: str) -> str: |
| """Collapse multiple spaces/newlines into single spaces.""" |
| text = re.sub(r"\n{3,}", "\n\n", text) |
| text = re.sub(r" {2,}", " ", text) |
| text = re.sub(r" *\n *", "\n", text) |
| return text |
|
|