# pdf_utils.py # v5.4 — Robust PDF preprocessing: TOC removal, garbage filtering, finer segmentation. # Changes vs v5.3: # • extract_text_from_pdf() now calls clean_raw_pdf_text() after extraction # • clean_raw_pdf_text() strips page numbers, separator lines, OCR noise, # repeated doc titles, running headers/footers # • is_toc_block() heuristic detects and rejects Table of Contents chunks # • is_garbage_clause() rejects structurally empty / metadata-only chunks # • split_into_clauses_with_metadata() integrates both filters before returning from __future__ import annotations import re LONG_CLAUSE_CHARS = 1200 MAX_CLAUSE_CHARS = 3000 MIN_SUBCLAUSE_LEN = 60 # ───────────────────────────────────────────────────────────────────────────── # Step 1 — Raw text cleaning (runs immediately after PyMuPDF extraction) # ───────────────────────────────────────────────────────────────────────────── # Standalone page number line: e.g. "19", "- 3 -", "Page 4", "PAGE 4 OF 12" _PAGE_NUM_LINE = re.compile( r'(?m)^[ \t]*(?:[-–—]*\s*)?(?:page\s+)?\d{1,4}(?:\s+of\s+\d{1,4})?' r'(?:\s*[-–—]*)?[ \t]*$', re.IGNORECASE, ) # Roman-numeral-only lines (TOC page markers: i, ii, iii, iv, v, …) _ROMAN_PAGE_LINE = re.compile( r'(?m)^[ \t]*[ivxlcdmIVXLCDM]{1,6}[ \t]*$' ) # Horizontal separator lines: "___", "---", "===", "* * *", etc. _SEPARATOR_LINE = re.compile( r'(?m)^[ \t]*[-=_*·•]{3,}[ \t]*$' ) # Running header/footer patterns that repeat every page # e.g. "AGREEMENT AND PLAN OF MERGER", "CONFIDENTIAL", "EXECUTION VERSION" _RUNNING_HEADER = re.compile( r'(?m)^[ \t]*(AGREEMENT AND PLAN OF|EXECUTION COPY|EXECUTION VERSION|' r'CONFIDENTIAL|DRAFT|PRIVILEGED AND CONFIDENTIAL|' r'EXHIBIT [A-Z]|SCHEDULE [A-Z\d])[^\n]*$', re.IGNORECASE, ) # TOC "dot-leader" lines: "Section 7.04 ............ 43" _TOC_DOT_LEADER = re.compile( r'(?m)^[^\n]{5,80}[.\s]{4,}\s*\d{1,4}\s*$' ) def clean_raw_pdf_text(raw: str) -> str: """ Post-extraction cleaning: remove artefacts that corrupt clause segmentation. The goal is NOT to remove legal content — only structural/metadata noise. """ text = raw # 1. Normalize line endings and excessive whitespace text = re.sub(r'\r\n', '\n', text) text = re.sub(r'[ \t]+', ' ', text) text = re.sub(r'\n{4,}', '\n\n\n', text) # 2. Remove TOC dot-leader lines BEFORE other cleanup (greedy match) text = _TOC_DOT_LEADER.sub('', text) # 3. Running headers / footers text = _RUNNING_HEADER.sub('', text) # 4. Standalone page numbers and roman numerals text = _PAGE_NUM_LINE.sub('', text) text = _ROMAN_PAGE_LINE.sub('', text) # 5. Separator lines text = _SEPARATOR_LINE.sub('', text) # 6. "TABLE OF CONTENTS" heading itself (we will also filter the block below) text = re.sub( r'(?m)^[ \t]*TABLE\s+OF\s+CONTENTS[ \t]*$', '', text, flags=re.IGNORECASE ) # 7. Collapse runs of blank lines left by removals text = re.sub(r'\n{3,}', '\n\n', text) return text.strip() # ───────────────────────────────────────────────────────────────────────────── # Step 2 — TOC block detection (per-clause heuristic) # ───────────────────────────────────────────────────────────────────────────── # How many "Section X.XX" style references in a block makes it look like a TOC _TOC_SECTION_REF = re.compile( r'(?:Section|ARTICLE|Article|SCHEDULE|Annex|Exhibit)\s+[\dIVXA-Z]', re.IGNORECASE, ) # A line that is ONLY a heading / short label (no sentence verb) _HEADING_ONLY_LINE = re.compile( r'(?m)^[ \t]*[A-Z][A-Za-z0-9 &/\-]{2,50}[ \t]*$' ) def is_toc_block(text: str) -> bool: """ Return True if this chunk looks like a Table of Contents entry or a run of section listings that are not real legal prose. Heuristics (any one is sufficient to flag): A. ≥ 4 "Section X.XX / ARTICLE X" references with very few full sentences B. The heading-only-line density is > 60% of non-empty lines C. Word count < 60 but section-reference count ≥ 3 """ lines = [l.strip() for l in text.splitlines() if l.strip()] total_lines = len(lines) if total_lines == 0: return True # empty → garbage section_refs = len(_TOC_SECTION_REF.findall(text)) # Count lines that contain at least one verb-like word (rough sentence proxy) sentence_lines = sum( 1 for l in lines if re.search(r'\b(shall|will|may|must|agree|provide|require|include|' r'warrant|represent|indemnif|terminat|govern|licens|assign|' r'disclose|notify|maintain|ensure|permit|restrict)\b', l, re.I) ) word_count = len(text.split()) # Heuristic A: many section refs, almost no substantive sentences if section_refs >= 4 and sentence_lines <= max(1, total_lines * 0.15): return True # Heuristic B: very short and many section refs (classic TOC listing) if word_count < 80 and section_refs >= 3: return True # Heuristic C: heading-only lines dominate heading_lines = sum(1 for l in lines if _HEADING_ONLY_LINE.fullmatch(l)) if total_lines >= 4 and heading_lines / total_lines > 0.60: return True return False # ───────────────────────────────────────────────────────────────────────────── # Step 3 — Garbage clause filter (pre-inference gate) # ───────────────────────────────────────────────────────────────────────────── def is_garbage_clause(text: str, min_words: int = 15) -> bool: """ Return True for chunks that should never reach the neural model: • Too short to be a real clause • Mostly digits / page references • Mostly isolated section labels with no prose • All-caps title-only blocks """ words = text.split() if len(words) < min_words: return True # Too many digit tokens (page-number contamination) digit_ratio = sum(1 for w in words if w.strip('.,;:()').isdigit()) / len(words) if digit_ratio > 0.35: return True # Too many "Section" / "Article" tokens relative to word count struct_tokens = len(re.findall( r'\b(?:Section|ARTICLE|Article|Exhibit|Schedule|Annex|Appendix|Part|Chapter)\b', text, re.IGNORECASE, )) if struct_tokens / len(words) > 0.25: return True # No alphabetic word longer than 3 chars → pure noise / numbering block if not any(len(w) > 3 and w.isalpha() for w in words): return True # Delegate to TOC detector if is_toc_block(text): return True return False # ───────────────────────────────────────────────────────────────────────────── # PDF extraction (wraps clean step) # ───────────────────────────────────────────────────────────────────────────── def extract_text_from_pdf(file_path: str) -> str: import fitz doc = fitz.open(file_path) pages = [page.get_text("text") for page in doc] doc.close() raw = "\n".join(pages) raw = re.sub(r'(\w)-\n(\w)', r'\1\2', raw) # de-hyphenate before cleaning return clean_raw_pdf_text(raw) # ───────────────────────────────────────────────────────────────────────────── # Header detection (primary segmentation) — unchanged from v5.3 # ───────────────────────────────────────────────────────────────────────────── _HEADER_PATTERNS: list[tuple[str, re.Pattern]] = [ ("decimal", re.compile(r'(?m)^\s*(\d+(?:\.\d+){0,3}\.?)\s+(?=\S)')), ("article", re.compile( r'(?m)^\s*((?:Article|Section|Clause|Schedule|Annexure|Annex|Appendix|Part|Chapter)' r'\s+(?:\d+(?:\.\d+){0,2}|[IVXLC]+))[\s\.\-:]', re.IGNORECASE)), ("lettered", re.compile(r'(?m)^\s*(\(\s*[a-zA-Z]{1,4}\s*\))\s+(?=\S)')), ("roman", re.compile(r'(?m)^\s*([IVX]{1,5}\.)\s+(?=\S)')), ("caps", re.compile(r'(?m)^([A-Z][A-Z0-9 &/\-]{4,59})\s*$')), ] _INLINE_SUBCLAUSE = re.compile( r'(?<=[\s\.\;\:])(\(\s*(?:[a-z]|[ivx]{1,4})\s*\))\s+(?=[A-Z\w])', re.IGNORECASE, ) def _collect_headers(text: str) -> list[tuple[int, str, str]]: hits: list[tuple[int, str, str]] = [] for kind, pat in _HEADER_PATTERNS: for m in pat.finditer(text): hits.append((m.start(1), m.group(1).strip(), kind)) hits.sort(key=lambda h: h[0]) deduped: list[tuple[int, str, str]] = [] for h in hits: if not deduped or abs(h[0] - deduped[-1][0]) > 2: deduped.append(h) return deduped # ───────────────────────────────────────────────────────────────────────────── # Inline subclause splitting — unchanged from v5.3 # ───────────────────────────────────────────────────────────────────────────── def _split_inline_subclauses( body: str, parent_number: str | None = None, min_length: int = MIN_SUBCLAUSE_LEN, ) -> list[dict]: matches = list(_INLINE_SUBCLAUSE.finditer(body)) if len(matches) < 2: return [] parts: list[dict] = [] head = body[:matches[0].start()].strip() if head and len(head) >= 30: parts.append({ "text": head, "number": parent_number, "kind": "decimal" if parent_number else "paragraph", }) for i, m in enumerate(matches): start = m.start() end = matches[i + 1].start() if i + 1 < len(matches) else len(body) chunk = body[start:end].strip() if len(chunk) < min_length: if parts: parts[-1]["text"] = (parts[-1]["text"] + "\n" + chunk).strip() continue sub_marker = m.group(1).strip() composite = f"{parent_number}{sub_marker}" if parent_number else sub_marker parts.append({ "text": chunk, "number": composite, "kind": "subclause", }) return parts def _hard_cap_split(clause: dict, max_len: int = MAX_CLAUSE_CHARS) -> list[dict]: body = clause["text"] if len(body) <= max_len: return [clause] sentences = re.split(r'(?<=[\.\?\!])\s+(?=[A-Z])', body) chunks, current = [], "" for s in sentences: if len(current) + len(s) + 1 > max_len and current: chunks.append(current.strip()) current = s else: current = (current + " " + s).strip() if current else s if current: chunks.append(current.strip()) return [ { "text": c, "number": clause.get("number"), "kind": clause.get("kind", "paragraph") + "/chunked", } for c in chunks if len(c) >= MIN_SUBCLAUSE_LEN ] # ───────────────────────────────────────────────────────────────────────────── # Public API # ───────────────────────────────────────────────────────────────────────────── def split_into_clauses_with_metadata( text: str, min_length: int = 40, ) -> list[dict]: """ Segment text into clauses, filter TOC/garbage, return clean list. This is the single entry-point used by analyze_document(). """ headers = _collect_headers(text) # ── Primary segmentation (heading-based) ────────────────────────────── primary: list[dict] = [] if headers: for i, (start, marker, kind) in enumerate(headers): end = headers[i + 1][0] if i + 1 < len(headers) else len(text) body = text[start:end].strip() if len(body) >= min_length: primary.append({"text": body, "number": marker, "kind": kind}) # Paragraph fallback when no headers were found if not primary: for p in [p.strip() for p in re.split(r'\n\s*\n', text)]: if len(p) >= min_length: primary.append({"text": p, "number": None, "kind": "paragraph"}) # ── TOC / garbage filter (NEW in v5.4) ──────────────────────────────── primary = [c for c in primary if not is_garbage_clause(c["text"])] if not primary: # If everything was filtered, fall back to treating the full text as one # clause rather than returning an empty list (caller handles it). return [{"text": text[:2000], "number": None, "kind": "paragraph"}] # ── Secondary pass: inline subclause splitting for long clauses ──────── refined: list[dict] = [] for clause in primary: if len(clause["text"]) > LONG_CLAUSE_CHARS: subs = _split_inline_subclauses( clause["text"], parent_number=clause.get("number"), ) if subs: refined.extend(subs) continue refined.append(clause) # ── Tertiary pass: hard length cap ──────────────────────────────────── final: list[dict] = [] for clause in refined: final.extend(_hard_cap_split(clause)) # ── Final garbage sweep after splitting ─────────────────────────────── # Splitting can produce tiny chunks — filter them out too. final = [c for c in final if not is_garbage_clause(c["text"])] print(f"[INFO] Segmentation: {len(primary)} primary → " f"{len(refined)} refined → {len(final)} final clean clauses") return final if final else [{"text": text[:2000], "number": None, "kind": "paragraph"}] def split_into_clauses(text: str, min_length: int = 40) -> list[str]: """Backward-compat wrapper that returns plain strings.""" return [c["text"] for c in split_into_clauses_with_metadata(text, min_length)]