"""Text cleaning pipeline for preparing training data.""" import logging import re import unicodedata logger = logging.getLogger(__name__) class TextCleaner: """Cleans raw text for character-level language model training.""" # Project Gutenberg header/footer patterns GUTENBERG_START = re.compile( r"\*\*\*\s*START OF (?:THE |THIS )?PROJECT GUTENBERG.*?\*\*\*", re.IGNORECASE, ) GUTENBERG_END = re.compile( r"\*\*\*\s*END OF (?:THE |THIS )?PROJECT GUTENBERG.*?\*\*\*", re.IGNORECASE, ) # Fallback for Gutenberg files that lack *** markers GUTENBERG_END_PLAIN = re.compile( r"^End of (?:the )?Project Gutenberg", re.IGNORECASE | re.MULTILINE, ) # MIT Internet Classics Archive patterns MIT_HEADER = re.compile( r"provided by the internet classics archive\..*?-{6,}", re.IGNORECASE | re.DOTALL, ) MIT_FOOTER = re.compile( r"the internet classics archive\b[^\n]*(?:web atomics)?[^\n]*", re.IGNORECASE, ) MIT_DASH_LINE = re.compile(r"-{6,}") # Internet Archive patterns IA_HEADER = re.compile( r"(?:Digitized by|Book digitized by|Original from|Uploaded by)" r"[^\n]*", re.IGNORECASE, ) IA_GOOGLE_MARKER = re.compile( r"(?:Generated (?:by|from)|Google-digitized|" r"This is a digital copy of a book)[^\n]*", re.IGNORECASE, ) # Roman numeral pattern — matches standalone uppercase Roman numerals (2+ chars) ROMAN_NUMERAL = re.compile( r"\b(M{0,3}(?:CM|CD|D?C{0,3})(?:XC|XL|L?X{0,3})(?:IX|IV|V?I{0,3}))\b" ) # Context words that allow single "I" to be treated as Roman numeral 1 ROMAN_CONTEXT = re.compile( r"\b(?:book|chapter|prop|proposition|part|vol|volume|no|number|" r"section|act|scene|lib|epistle|ode|psalm|canon|lemma|corollary|" r"cor|def|definition|axiom|postulate)\b", re.IGNORECASE, ) # Roman numeral value map ROMAN_VALUES = {"I": 1, "V": 5, "X": 10, "L": 50, "C": 100, "D": 500, "M": 1000} # Non-body section headers (for aggressive stripping) # NOTE: "INTRODUCTION" is deliberately excluded — it is often the author's own text FRONT_MATTER_HEADERS = re.compile( r"^\s*(?:PREFACE|FOREWORD|FORWARD|EDITOR[\u2019']?S?\s+NOTE|" r"TRANSLATOR[\u2019']?S?\s+NOTE|PREFATORY\s+NOTE|PRELIMINARY\s+NOTE|" r"BIOGRAPHICAL\s+(?:NOTE|SKETCH)|ADVERTISEMENT|DEDICAT(?:ION|ED\s+TO)|" r"TO\s+THE\s+READER|NOTE\s+ON\s+(?:THE\s+)?TEXT|ABOUT\s+THIS\s+EDITION|" r"CHRONOLOG(?:Y|ICAL))[.:\-\u2014]*\s*$", re.IGNORECASE | re.MULTILINE, ) BACK_MATTER_HEADERS = re.compile( r"^\s*(?:APPENDIX|ADDEND(?:UM|A)|INDEX|GLOSSARY|BIBLIOGRAPHY|" r"WORKS?\s+CITED|REFERENCES|ENDNOTES|FOOTNOTES|" r"ACKNOWLEDG(?:E?MENTS?)|CREDITS|COLOPHON|ERRATA|" r"TRANSCRIBER[\u2019']?S?\s+NOTES?|" r"TYPOGRAPHICAL\s+ERRORS?\s+CORRECTED|" r"LIST\s+OF\s+(?:ILLUSTRATIONS|FIGURES|PLATES))[.:\-\u2014]*\s*$", re.IGNORECASE | re.MULTILINE, ) TOC_HEADER = re.compile( r"^\s*(?:TABLE\s+OF\s+)?CONTENTS?[.:\-\u2014]*\s*$", re.IGNORECASE | re.MULTILINE, ) # Production/publisher patterns (for front matter cleanup) PRODUCTION_PATTERNS = [ re.compile(p, re.IGNORECASE) for p in [ r"(?:produced|prepared|transcribed|digitized|scanned)\s+(?:by|for|at)", r"production\s+note", r"transcriber[\u2019']?s?\s+note", r"scanner[\u2019']?s?\s+note", r"cornell\s+university\s+library", r"(?:published|printed)\s+(?:by|for|at|in)", r"(?:first|second|third|\d+(?:st|nd|rd|th))\s+edition", r"price\s+\w+[sd]\.", r"(?:cloth|paper|hardcover|paperback|octavo|quarto)", r"\bisbn\b", r"all\s+rights?\s+reserved", r"(?:copyright|copr\.?)\s*(?:\(c\)|\xa9|\d)", r"press\s+of\b", r"university\s+press", ] ] # Transcriber correction notes (back matter) TRANSCRIBER_CORRECTION = re.compile( r"^p\.\s*(?:\d+|\?\??|\.)\s*[.,]?\s*(?:sqq\.|in\s|the\s|as\s|heading|" r"reference|prop|from\s|then\s|these\s|def\.|" r"twenty|thirty|forty|fifty|sixty|seventy|eighty|ninety|" r"one\s|two\s|three|four|five|six\s|seven|eight|nine|" # Match quoted corrections r'["\u201c])', re.IGNORECASE, ) # Separator/decoration lines SEPARATOR_LINE = re.compile(r"^[\s.*_=~\-#]+$") # Number words for 0-19 and tens ONES = [ "zero", "one", "two", "three", "four", "five", "six", "seven", "eight", "nine", "ten", "eleven", "twelve", "thirteen", "fourteen", "fifteen", "sixteen", "seventeen", "eighteen", "nineteen", ] TENS = [ "", "", "twenty", "thirty", "forty", "fifty", "sixty", "seventy", "eighty", "ninety", ] def __init__(self, config: dict): self.lowercase = config.get("lowercase", True) self.strip_gutenberg = config.get("strip_gutenberg", True) self.strip_mit_classics = config.get("strip_mit_classics", True) self.strip_internet_archive = config.get("strip_internet_archive", True) self.normalize_unicode = config.get("normalize_unicode", True) self.convert_numerals = config.get("convert_numerals", False) self.convert_roman_numerals = config.get("convert_roman_numerals", False) self.strip_non_body = config.get("strip_non_body", True) self.min_line_length = config.get("min_line_length", 20) self.remove_urls = config.get("remove_urls", True) self.collapse_whitespace = config.get("collapse_whitespace", True) self.allowed_chars = config.get("allowed_chars", r"a-z0-9 .,;:!?'\"\-\(\)") def clean(self, text: str) -> str: """Run all cleaning stages on the input text.""" if not text.strip(): return "" # Stage 1: Strip source-specific boilerplate if self.strip_gutenberg: text = self._strip_gutenberg(text) if self.strip_mit_classics: text = self._strip_mit_classics(text) if self.strip_internet_archive: text = self._strip_internet_archive(text) # Stage 2: Strip non-body content (before any text transforms) if self.strip_non_body: text = self._strip_non_body(text) # Stage 3: Normalize unicode if self.normalize_unicode: text = self._normalize_unicode(text) if self.remove_urls: text = self._remove_urls(text) # Stage 4: Convert Roman numerals (BEFORE lowercase — needs uppercase) if self.convert_roman_numerals: text = self._convert_roman_numerals(text) # Stage 5: Lowercase if self.lowercase: text = text.lower() # Stage 6: Convert Arabic numerals if self.convert_numerals: text = self._convert_numerals(text) # Stage 7: Character filtering text = self._clean_chars(text) # Stage 8: Collapse whitespace if self.collapse_whitespace: text = self._collapse_whitespace(text) return text.strip() # ------------------------------------------------------------------ # Source boilerplate stripping # ------------------------------------------------------------------ def _strip_gutenberg(self, text: str) -> str: """Remove Project Gutenberg headers and footers.""" # Strip footer first (before positions shift) end_match = self.GUTENBERG_END.search(text) if not end_match: end_match = self.GUTENBERG_END_PLAIN.search(text) if end_match: text = text[:end_match.start()] # Strip header start_match = self.GUTENBERG_START.search(text) if start_match: text = text[start_match.end():] # Also strip common Gutenberg preamble lines lines = text.split("\n") cleaned = [] skip = True if start_match is None else False for line in lines: stripped = line.strip() if skip and stripped.startswith(("Title:", "Author:", "Release Date:", "Language:", "Character set", "Produced by", "Updated editions")): continue if skip and not stripped: continue skip = False cleaned.append(line) return "\n".join(cleaned) def _strip_mit_classics(self, text: str) -> str: """Remove MIT Internet Classics Archive headers, footers, and section dividers.""" text = self.MIT_HEADER.sub("", text) text = self.MIT_FOOTER.sub("", text) text = self.MIT_DASH_LINE.sub("", text) return text def _strip_internet_archive(self, text: str) -> str: """Remove Internet Archive / Google Books digitization boilerplate.""" text = self.IA_HEADER.sub("", text) text = self.IA_GOOGLE_MARKER.sub("", text) return text # ------------------------------------------------------------------ # Non-body content stripping (aggressive mode) # ------------------------------------------------------------------ def _strip_non_body(self, text: str) -> str: """Remove front matter, back matter, and inline non-body content.""" text = self._strip_front_matter(text) text = self._strip_back_matter(text) text = self._strip_inline_non_body(text) return text def _strip_front_matter(self, text: str) -> str: """Strip front matter: production notes, TOC, preface, etc. Order: (1) strip named sections by header, (2) skip remaining non-body paragraphs at the top. """ # Pass 1: Remove named sections that have clear headers text = self._strip_section(text, self.FRONT_MATTER_HEADERS) text = self._strip_section(text, self.TOC_HEADER) # Pass 2: Skip non-body paragraphs at the beginning. # Body prose = substantial paragraph (>150 chars) with full sentences # that does NOT match production/publisher patterns. lines = text.split("\n") start_idx = 0 i = 0 while i < len(lines): # Collect next paragraph while i < len(lines) and not lines[i].strip(): i += 1 para_start = i para_lines = [] while i < len(lines) and lines[i].strip(): para_lines.append(lines[i].strip()) i += 1 if not para_lines: continue para_text = " ".join(para_lines) has_sentences = bool(re.search(r"\.\s+[A-Z]", para_text)) is_substantial = len(para_text) > 150 is_production = self._is_production_line(para_text) # Title pages / heading blocks: mostly uppercase letters alpha_chars = [c for c in para_text if c.isalpha()] is_mostly_uppercase = ( alpha_chars and sum(1 for c in alpha_chars if c.isupper()) / len(alpha_chars) > 0.5 ) # Short average line length suggests a title/heading block avg_line_len = sum(len(l) for l in para_lines) / len(para_lines) is_short_lines = avg_line_len < 50 if (is_substantial and has_sentences and not is_production and not is_mostly_uppercase and not is_short_lines): start_idx = para_start break # Not body yet — skip it start_idx = i return "\n".join(lines[start_idx:]) def _strip_back_matter(self, text: str) -> str: """Strip back matter: appendixes, index, transcriber notes, etc.""" lines = text.split("\n") # Find the first back-matter header and truncate there first_back_idx = None for i, line in enumerate(lines): stripped = line.strip() if self.BACK_MATTER_HEADERS.match(stripped): first_back_idx = i break # Also detect "Typographical Errors corrected..." as back matter start if re.match(r"Typographical\s+Errors?\b", stripped, re.IGNORECASE): first_back_idx = i break if first_back_idx is not None: lines = lines[:first_back_idx] # Strip trailing transcriber correction notes (working backward) while lines: stripped = lines[-1].strip() if not stripped: lines.pop() continue if self.TRANSCRIBER_CORRECTION.match(stripped): lines.pop() continue if self._is_production_line(stripped): lines.pop() continue break return "\n".join(lines) def _strip_inline_non_body(self, text: str) -> str: """Strip inline non-body markers: separator lines, all-caps headings.""" lines = text.split("\n") cleaned = [] for line in lines: stripped = line.strip() # Remove separator/decoration lines if stripped and self.SEPARATOR_LINE.match(stripped): continue # Remove short ALL-CAPS lines (likely section headings) if stripped and len(stripped) < 80 and stripped == stripped.upper() and stripped.isalpha(): continue cleaned.append(line) return "\n".join(cleaned) def _strip_section(self, text: str, header_pattern: re.Pattern) -> str: """Remove a section identified by header_pattern until next section boundary.""" lines = text.split("\n") result = [] skipping = False for i, line in enumerate(lines): stripped = line.strip() if header_pattern.match(stripped): skipping = True continue if skipping: # Stop skipping at next section boundary: # A substantial non-empty line after a blank line, OR # A line that looks like a real body section start is_blank = not stripped if not is_blank and self._is_section_boundary(stripped, lines, i): skipping = False result.append(line) continue result.append(line) return "\n".join(result) def _is_section_boundary(self, stripped: str, lines: list[str], idx: int) -> bool: """Detect if a line marks the beginning of a new major section. Only returns True for explicit section headers/markers, NOT for long body-text lines (which can appear inside prefaces/forewords). """ # Body-start keywords (these signal real content resuming) if re.match( r"(?:Book|Chapter|Part|Section|Proposition|Theorem|Definition|" r"Axiom|Postulate|Introduction|Definitions|Lemma|Corollary|" r"Contents?)\b", stripped, re.IGNORECASE, ): return True # Another named section header (front or back matter) if self.FRONT_MATTER_HEADERS.match(stripped): return True if self.BACK_MATTER_HEADERS.match(stripped): return True if self.TOC_HEADER.match(stripped): return True return False def _is_production_line(self, line: str) -> bool: """Check if a line is production/publisher metadata.""" for pattern in self.PRODUCTION_PATTERNS: if pattern.search(line): return True return False # ------------------------------------------------------------------ # Unicode normalization # ------------------------------------------------------------------ def _normalize_unicode(self, text: str) -> str: """Normalize unicode characters to their closest ASCII equivalents.""" text = unicodedata.normalize("NFKD", text) replacements = { "\u2018": "'", "\u2019": "'", # smart quotes "\u201c": '"', "\u201d": '"', "\u2013": "-", "\u2014": "-", # en/em dash "\u2026": "...", # ellipsis "\u00a0": " ", # non-breaking space "\u00b6": "", # pilcrow "\u00a7": "", # section sign } for old, new in replacements.items(): text = text.replace(old, new) # Strip remaining non-ASCII text = text.encode("ascii", errors="ignore").decode("ascii") return text def _remove_urls(self, text: str) -> str: """Remove URLs and email addresses.""" text = re.sub(r"https?://\S+", "", text) text = re.sub(r"www\.\S+", "", text) text = re.sub(r"\S+@\S+\.\S+", "", text) return text # ------------------------------------------------------------------ # Roman numeral conversion # ------------------------------------------------------------------ def _roman_to_int(self, s: str) -> int: """Convert a Roman numeral string to an integer.""" result = 0 prev = 0 for char in reversed(s.upper()): val = self.ROMAN_VALUES.get(char, 0) if val < prev: result -= val else: result += val prev = val return result def _is_valid_roman(self, s: str) -> bool: """Check if a string is a valid Roman numeral (not just random letters).""" if not s: return False # Must only contain valid Roman numeral characters if not all(c in "IVXLCDM" for c in s.upper()): return False # Must convert to a positive number val = self._roman_to_int(s) return val > 0 def _convert_roman_numerals(self, text: str) -> str: """Convert Roman numerals to English words. Handles multi-character Roman numerals (II, IV, XIV, etc.) directly. Single 'I' is only converted when preceded by a context word. """ def replace_roman(m): numeral = m.group(1) # Skip single-char matches that aren't clearly Roman numerals if len(numeral) == 1: # Single 'I' — only convert after context words if numeral.upper() == "I": # Check the text before this match for context words before = text[max(0, m.start() - 30):m.start()] if not self.ROMAN_CONTEXT.search(before): return m.group(0) else: # Single V, X, L, C, D, M — convert them pass if not self._is_valid_roman(numeral): return m.group(0) val = self._roman_to_int(numeral) return self._number_to_words(val) return self.ROMAN_NUMERAL.sub(replace_roman, text) # ------------------------------------------------------------------ # Arabic numeral conversion # ------------------------------------------------------------------ def _number_to_words(self, n: int) -> str: """Convert an integer to English words.""" if n < 0: return "negative " + self._number_to_words(-n) if n == 0: return self.ONES[0] if n < 20: return self.ONES[n] if n < 100: tens, ones = divmod(n, 10) return self.TENS[tens] + (" " + self.ONES[ones] if ones else "") if n < 1000: hundreds, remainder = divmod(n, 100) result = self.ONES[hundreds] + " hundred" if remainder: result += " " + self._number_to_words(remainder) return result if n < 1000000: thousands, remainder = divmod(n, 1000) result = self._number_to_words(thousands) + " thousand" if remainder: result += " " + self._number_to_words(remainder) return result return str(n) def _convert_numerals(self, text: str) -> str: """Replace standalone digit sequences with their English word equivalents. Only converts digit groups that are standalone words (surrounded by whitespace or punctuation), preventing garbled output from codes like Z39.48-1984. """ def replace_match(m): # Ensure digits are not part of a larger alphanumeric token start, end = m.start(), m.end() if start > 0 and text[start - 1].isalnum(): return m.group() if end < len(text) and text[end].isalnum(): return m.group() try: n = int(m.group()) if n < 1000000: return self._number_to_words(n) except ValueError: pass return m.group() return re.sub(r"\d+", replace_match, text) # ------------------------------------------------------------------ # Character filtering and whitespace # ------------------------------------------------------------------ def _clean_chars(self, text: str) -> str: """Remove characters not in the allowed set.""" pattern = f"[^{self.allowed_chars}\n]" text = re.sub(pattern, " ", text) # Remove lines that are only dots and/or spaces (separator lines) text = re.sub(r"^[. ]+$", "", text, flags=re.MULTILINE) return text def _collapse_whitespace(self, text: str) -> str: """Collapse multiple spaces/newlines into single spaces.""" text = re.sub(r"\n{3,}", "\n\n", text) text = re.sub(r" {2,}", " ", text) text = re.sub(r" *\n *", "\n", text) return text