Major_Project / pdf_utils.py
riyasuryawanshi746's picture
Fixed PDF preprocessing and clause segmentation
6893de4 verified
# pdf_utils.py
# v5.4 β€” Robust PDF preprocessing: TOC removal, garbage filtering, finer segmentation.
# Changes vs v5.3:
# β€’ extract_text_from_pdf() now calls clean_raw_pdf_text() after extraction
# β€’ clean_raw_pdf_text() strips page numbers, separator lines, OCR noise,
# repeated doc titles, running headers/footers
# β€’ is_toc_block() heuristic detects and rejects Table of Contents chunks
# β€’ is_garbage_clause() rejects structurally empty / metadata-only chunks
# β€’ split_into_clauses_with_metadata() integrates both filters before returning
from __future__ import annotations
import re
LONG_CLAUSE_CHARS = 1200
MAX_CLAUSE_CHARS = 3000
MIN_SUBCLAUSE_LEN = 60
# ─────────────────────────────────────────────────────────────────────────────
# Step 1 β€” Raw text cleaning (runs immediately after PyMuPDF extraction)
# ─────────────────────────────────────────────────────────────────────────────
# Standalone page number line: e.g. "19", "- 3 -", "Page 4", "PAGE 4 OF 12"
_PAGE_NUM_LINE = re.compile(
r'(?m)^[ \t]*(?:[-–—]*\s*)?(?:page\s+)?\d{1,4}(?:\s+of\s+\d{1,4})?'
r'(?:\s*[-–—]*)?[ \t]*$',
re.IGNORECASE,
)
# Roman-numeral-only lines (TOC page markers: i, ii, iii, iv, v, …)
_ROMAN_PAGE_LINE = re.compile(
r'(?m)^[ \t]*[ivxlcdmIVXLCDM]{1,6}[ \t]*$'
)
# Horizontal separator lines: "___", "---", "===", "* * *", etc.
_SEPARATOR_LINE = re.compile(
r'(?m)^[ \t]*[-=_*Β·β€’]{3,}[ \t]*$'
)
# Running header/footer patterns that repeat every page
# e.g. "AGREEMENT AND PLAN OF MERGER", "CONFIDENTIAL", "EXECUTION VERSION"
_RUNNING_HEADER = re.compile(
r'(?m)^[ \t]*(AGREEMENT AND PLAN OF|EXECUTION COPY|EXECUTION VERSION|'
r'CONFIDENTIAL|DRAFT|PRIVILEGED AND CONFIDENTIAL|'
r'EXHIBIT [A-Z]|SCHEDULE [A-Z\d])[^\n]*$',
re.IGNORECASE,
)
# TOC "dot-leader" lines: "Section 7.04 ............ 43"
_TOC_DOT_LEADER = re.compile(
r'(?m)^[^\n]{5,80}[.\s]{4,}\s*\d{1,4}\s*$'
)
def clean_raw_pdf_text(raw: str) -> str:
"""
Post-extraction cleaning: remove artefacts that corrupt clause segmentation.
The goal is NOT to remove legal content β€” only structural/metadata noise.
"""
text = raw
# 1. Normalize line endings and excessive whitespace
text = re.sub(r'\r\n', '\n', text)
text = re.sub(r'[ \t]+', ' ', text)
text = re.sub(r'\n{4,}', '\n\n\n', text)
# 2. Remove TOC dot-leader lines BEFORE other cleanup (greedy match)
text = _TOC_DOT_LEADER.sub('', text)
# 3. Running headers / footers
text = _RUNNING_HEADER.sub('', text)
# 4. Standalone page numbers and roman numerals
text = _PAGE_NUM_LINE.sub('', text)
text = _ROMAN_PAGE_LINE.sub('', text)
# 5. Separator lines
text = _SEPARATOR_LINE.sub('', text)
# 6. "TABLE OF CONTENTS" heading itself (we will also filter the block below)
text = re.sub(
r'(?m)^[ \t]*TABLE\s+OF\s+CONTENTS[ \t]*$', '', text, flags=re.IGNORECASE
)
# 7. Collapse runs of blank lines left by removals
text = re.sub(r'\n{3,}', '\n\n', text)
return text.strip()
# ─────────────────────────────────────────────────────────────────────────────
# Step 2 β€” TOC block detection (per-clause heuristic)
# ─────────────────────────────────────────────────────────────────────────────
# How many "Section X.XX" style references in a block makes it look like a TOC
_TOC_SECTION_REF = re.compile(
r'(?:Section|ARTICLE|Article|SCHEDULE|Annex|Exhibit)\s+[\dIVXA-Z]',
re.IGNORECASE,
)
# A line that is ONLY a heading / short label (no sentence verb)
_HEADING_ONLY_LINE = re.compile(
r'(?m)^[ \t]*[A-Z][A-Za-z0-9 &/\-]{2,50}[ \t]*$'
)
def is_toc_block(text: str) -> bool:
"""
Return True if this chunk looks like a Table of Contents entry or
a run of section listings that are not real legal prose.
Heuristics (any one is sufficient to flag):
A. β‰₯ 4 "Section X.XX / ARTICLE X" references with very few full sentences
B. The heading-only-line density is > 60% of non-empty lines
C. Word count < 60 but section-reference count β‰₯ 3
"""
lines = [l.strip() for l in text.splitlines() if l.strip()]
total_lines = len(lines)
if total_lines == 0:
return True # empty β†’ garbage
section_refs = len(_TOC_SECTION_REF.findall(text))
# Count lines that contain at least one verb-like word (rough sentence proxy)
sentence_lines = sum(
1 for l in lines
if re.search(r'\b(shall|will|may|must|agree|provide|require|include|'
r'warrant|represent|indemnif|terminat|govern|licens|assign|'
r'disclose|notify|maintain|ensure|permit|restrict)\b', l, re.I)
)
word_count = len(text.split())
# Heuristic A: many section refs, almost no substantive sentences
if section_refs >= 4 and sentence_lines <= max(1, total_lines * 0.15):
return True
# Heuristic B: very short and many section refs (classic TOC listing)
if word_count < 80 and section_refs >= 3:
return True
# Heuristic C: heading-only lines dominate
heading_lines = sum(1 for l in lines if _HEADING_ONLY_LINE.fullmatch(l))
if total_lines >= 4 and heading_lines / total_lines > 0.60:
return True
return False
# ─────────────────────────────────────────────────────────────────────────────
# Step 3 β€” Garbage clause filter (pre-inference gate)
# ─────────────────────────────────────────────────────────────────────────────
def is_garbage_clause(text: str, min_words: int = 15) -> bool:
"""
Return True for chunks that should never reach the neural model:
β€’ Too short to be a real clause
β€’ Mostly digits / page references
β€’ Mostly isolated section labels with no prose
β€’ All-caps title-only blocks
"""
words = text.split()
if len(words) < min_words:
return True
# Too many digit tokens (page-number contamination)
digit_ratio = sum(1 for w in words if w.strip('.,;:()').isdigit()) / len(words)
if digit_ratio > 0.35:
return True
# Too many "Section" / "Article" tokens relative to word count
struct_tokens = len(re.findall(
r'\b(?:Section|ARTICLE|Article|Exhibit|Schedule|Annex|Appendix|Part|Chapter)\b',
text, re.IGNORECASE,
))
if struct_tokens / len(words) > 0.25:
return True
# No alphabetic word longer than 3 chars β†’ pure noise / numbering block
if not any(len(w) > 3 and w.isalpha() for w in words):
return True
# Delegate to TOC detector
if is_toc_block(text):
return True
return False
# ─────────────────────────────────────────────────────────────────────────────
# PDF extraction (wraps clean step)
# ─────────────────────────────────────────────────────────────────────────────
def extract_text_from_pdf(file_path: str) -> str:
import fitz
doc = fitz.open(file_path)
pages = [page.get_text("text") for page in doc]
doc.close()
raw = "\n".join(pages)
raw = re.sub(r'(\w)-\n(\w)', r'\1\2', raw) # de-hyphenate before cleaning
return clean_raw_pdf_text(raw)
# ─────────────────────────────────────────────────────────────────────────────
# Header detection (primary segmentation) β€” unchanged from v5.3
# ─────────────────────────────────────────────────────────────────────────────
_HEADER_PATTERNS: list[tuple[str, re.Pattern]] = [
("decimal", re.compile(r'(?m)^\s*(\d+(?:\.\d+){0,3}\.?)\s+(?=\S)')),
("article", re.compile(
r'(?m)^\s*((?:Article|Section|Clause|Schedule|Annexure|Annex|Appendix|Part|Chapter)'
r'\s+(?:\d+(?:\.\d+){0,2}|[IVXLC]+))[\s\.\-:]', re.IGNORECASE)),
("lettered", re.compile(r'(?m)^\s*(\(\s*[a-zA-Z]{1,4}\s*\))\s+(?=\S)')),
("roman", re.compile(r'(?m)^\s*([IVX]{1,5}\.)\s+(?=\S)')),
("caps", re.compile(r'(?m)^([A-Z][A-Z0-9 &/\-]{4,59})\s*$')),
]
_INLINE_SUBCLAUSE = re.compile(
r'(?<=[\s\.\;\:])(\(\s*(?:[a-z]|[ivx]{1,4})\s*\))\s+(?=[A-Z\w])',
re.IGNORECASE,
)
def _collect_headers(text: str) -> list[tuple[int, str, str]]:
hits: list[tuple[int, str, str]] = []
for kind, pat in _HEADER_PATTERNS:
for m in pat.finditer(text):
hits.append((m.start(1), m.group(1).strip(), kind))
hits.sort(key=lambda h: h[0])
deduped: list[tuple[int, str, str]] = []
for h in hits:
if not deduped or abs(h[0] - deduped[-1][0]) > 2:
deduped.append(h)
return deduped
# ─────────────────────────────────────────────────────────────────────────────
# Inline subclause splitting β€” unchanged from v5.3
# ─────────────────────────────────────────────────────────────────────────────
def _split_inline_subclauses(
body: str,
parent_number: str | None = None,
min_length: int = MIN_SUBCLAUSE_LEN,
) -> list[dict]:
matches = list(_INLINE_SUBCLAUSE.finditer(body))
if len(matches) < 2:
return []
parts: list[dict] = []
head = body[:matches[0].start()].strip()
if head and len(head) >= 30:
parts.append({
"text": head,
"number": parent_number,
"kind": "decimal" if parent_number else "paragraph",
})
for i, m in enumerate(matches):
start = m.start()
end = matches[i + 1].start() if i + 1 < len(matches) else len(body)
chunk = body[start:end].strip()
if len(chunk) < min_length:
if parts:
parts[-1]["text"] = (parts[-1]["text"] + "\n" + chunk).strip()
continue
sub_marker = m.group(1).strip()
composite = f"{parent_number}{sub_marker}" if parent_number else sub_marker
parts.append({
"text": chunk,
"number": composite,
"kind": "subclause",
})
return parts
def _hard_cap_split(clause: dict, max_len: int = MAX_CLAUSE_CHARS) -> list[dict]:
body = clause["text"]
if len(body) <= max_len:
return [clause]
sentences = re.split(r'(?<=[\.\?\!])\s+(?=[A-Z])', body)
chunks, current = [], ""
for s in sentences:
if len(current) + len(s) + 1 > max_len and current:
chunks.append(current.strip())
current = s
else:
current = (current + " " + s).strip() if current else s
if current:
chunks.append(current.strip())
return [
{
"text": c,
"number": clause.get("number"),
"kind": clause.get("kind", "paragraph") + "/chunked",
}
for c in chunks if len(c) >= MIN_SUBCLAUSE_LEN
]
# ─────────────────────────────────────────────────────────────────────────────
# Public API
# ─────────────────────────────────────────────────────────────────────────────
def split_into_clauses_with_metadata(
text: str,
min_length: int = 40,
) -> list[dict]:
"""
Segment text into clauses, filter TOC/garbage, return clean list.
This is the single entry-point used by analyze_document().
"""
headers = _collect_headers(text)
# ── Primary segmentation (heading-based) ──────────────────────────────
primary: list[dict] = []
if headers:
for i, (start, marker, kind) in enumerate(headers):
end = headers[i + 1][0] if i + 1 < len(headers) else len(text)
body = text[start:end].strip()
if len(body) >= min_length:
primary.append({"text": body, "number": marker, "kind": kind})
# Paragraph fallback when no headers were found
if not primary:
for p in [p.strip() for p in re.split(r'\n\s*\n', text)]:
if len(p) >= min_length:
primary.append({"text": p, "number": None, "kind": "paragraph"})
# ── TOC / garbage filter (NEW in v5.4) ────────────────────────────────
primary = [c for c in primary if not is_garbage_clause(c["text"])]
if not primary:
# If everything was filtered, fall back to treating the full text as one
# clause rather than returning an empty list (caller handles it).
return [{"text": text[:2000], "number": None, "kind": "paragraph"}]
# ── Secondary pass: inline subclause splitting for long clauses ────────
refined: list[dict] = []
for clause in primary:
if len(clause["text"]) > LONG_CLAUSE_CHARS:
subs = _split_inline_subclauses(
clause["text"],
parent_number=clause.get("number"),
)
if subs:
refined.extend(subs)
continue
refined.append(clause)
# ── Tertiary pass: hard length cap ────────────────────────────────────
final: list[dict] = []
for clause in refined:
final.extend(_hard_cap_split(clause))
# ── Final garbage sweep after splitting ───────────────────────────────
# Splitting can produce tiny chunks β€” filter them out too.
final = [c for c in final if not is_garbage_clause(c["text"])]
print(f"[INFO] Segmentation: {len(primary)} primary β†’ "
f"{len(refined)} refined β†’ {len(final)} final clean clauses")
return final if final else [{"text": text[:2000], "number": None, "kind": "paragraph"}]
def split_into_clauses(text: str, min_length: int = 40) -> list[str]:
"""Backward-compat wrapper that returns plain strings."""
return [c["text"] for c in split_into_clauses_with_metadata(text, min_length)]