Spaces:
Sleeping
Sleeping
| # pdf_utils.py | |
| # v5.4 β Robust PDF preprocessing: TOC removal, garbage filtering, finer segmentation. | |
| # Changes vs v5.3: | |
| # β’ extract_text_from_pdf() now calls clean_raw_pdf_text() after extraction | |
| # β’ clean_raw_pdf_text() strips page numbers, separator lines, OCR noise, | |
| # repeated doc titles, running headers/footers | |
| # β’ is_toc_block() heuristic detects and rejects Table of Contents chunks | |
| # β’ is_garbage_clause() rejects structurally empty / metadata-only chunks | |
| # β’ split_into_clauses_with_metadata() integrates both filters before returning | |
| from __future__ import annotations | |
| import re | |
| LONG_CLAUSE_CHARS = 1200 | |
| MAX_CLAUSE_CHARS = 3000 | |
| MIN_SUBCLAUSE_LEN = 60 | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Step 1 β Raw text cleaning (runs immediately after PyMuPDF extraction) | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Standalone page number line: e.g. "19", "- 3 -", "Page 4", "PAGE 4 OF 12" | |
| _PAGE_NUM_LINE = re.compile( | |
| r'(?m)^[ \t]*(?:[-ββ]*\s*)?(?:page\s+)?\d{1,4}(?:\s+of\s+\d{1,4})?' | |
| r'(?:\s*[-ββ]*)?[ \t]*$', | |
| re.IGNORECASE, | |
| ) | |
| # Roman-numeral-only lines (TOC page markers: i, ii, iii, iv, v, β¦) | |
| _ROMAN_PAGE_LINE = re.compile( | |
| r'(?m)^[ \t]*[ivxlcdmIVXLCDM]{1,6}[ \t]*$' | |
| ) | |
| # Horizontal separator lines: "___", "---", "===", "* * *", etc. | |
| _SEPARATOR_LINE = re.compile( | |
| r'(?m)^[ \t]*[-=_*Β·β’]{3,}[ \t]*$' | |
| ) | |
| # Running header/footer patterns that repeat every page | |
| # e.g. "AGREEMENT AND PLAN OF MERGER", "CONFIDENTIAL", "EXECUTION VERSION" | |
| _RUNNING_HEADER = re.compile( | |
| r'(?m)^[ \t]*(AGREEMENT AND PLAN OF|EXECUTION COPY|EXECUTION VERSION|' | |
| r'CONFIDENTIAL|DRAFT|PRIVILEGED AND CONFIDENTIAL|' | |
| r'EXHIBIT [A-Z]|SCHEDULE [A-Z\d])[^\n]*$', | |
| re.IGNORECASE, | |
| ) | |
| # TOC "dot-leader" lines: "Section 7.04 ............ 43" | |
| _TOC_DOT_LEADER = re.compile( | |
| r'(?m)^[^\n]{5,80}[.\s]{4,}\s*\d{1,4}\s*$' | |
| ) | |
| def clean_raw_pdf_text(raw: str) -> str: | |
| """ | |
| Post-extraction cleaning: remove artefacts that corrupt clause segmentation. | |
| The goal is NOT to remove legal content β only structural/metadata noise. | |
| """ | |
| text = raw | |
| # 1. Normalize line endings and excessive whitespace | |
| text = re.sub(r'\r\n', '\n', text) | |
| text = re.sub(r'[ \t]+', ' ', text) | |
| text = re.sub(r'\n{4,}', '\n\n\n', text) | |
| # 2. Remove TOC dot-leader lines BEFORE other cleanup (greedy match) | |
| text = _TOC_DOT_LEADER.sub('', text) | |
| # 3. Running headers / footers | |
| text = _RUNNING_HEADER.sub('', text) | |
| # 4. Standalone page numbers and roman numerals | |
| text = _PAGE_NUM_LINE.sub('', text) | |
| text = _ROMAN_PAGE_LINE.sub('', text) | |
| # 5. Separator lines | |
| text = _SEPARATOR_LINE.sub('', text) | |
| # 6. "TABLE OF CONTENTS" heading itself (we will also filter the block below) | |
| text = re.sub( | |
| r'(?m)^[ \t]*TABLE\s+OF\s+CONTENTS[ \t]*$', '', text, flags=re.IGNORECASE | |
| ) | |
| # 7. Collapse runs of blank lines left by removals | |
| text = re.sub(r'\n{3,}', '\n\n', text) | |
| return text.strip() | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Step 2 β TOC block detection (per-clause heuristic) | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # How many "Section X.XX" style references in a block makes it look like a TOC | |
| _TOC_SECTION_REF = re.compile( | |
| r'(?:Section|ARTICLE|Article|SCHEDULE|Annex|Exhibit)\s+[\dIVXA-Z]', | |
| re.IGNORECASE, | |
| ) | |
| # A line that is ONLY a heading / short label (no sentence verb) | |
| _HEADING_ONLY_LINE = re.compile( | |
| r'(?m)^[ \t]*[A-Z][A-Za-z0-9 &/\-]{2,50}[ \t]*$' | |
| ) | |
| def is_toc_block(text: str) -> bool: | |
| """ | |
| Return True if this chunk looks like a Table of Contents entry or | |
| a run of section listings that are not real legal prose. | |
| Heuristics (any one is sufficient to flag): | |
| A. β₯ 4 "Section X.XX / ARTICLE X" references with very few full sentences | |
| B. The heading-only-line density is > 60% of non-empty lines | |
| C. Word count < 60 but section-reference count β₯ 3 | |
| """ | |
| lines = [l.strip() for l in text.splitlines() if l.strip()] | |
| total_lines = len(lines) | |
| if total_lines == 0: | |
| return True # empty β garbage | |
| section_refs = len(_TOC_SECTION_REF.findall(text)) | |
| # Count lines that contain at least one verb-like word (rough sentence proxy) | |
| sentence_lines = sum( | |
| 1 for l in lines | |
| if re.search(r'\b(shall|will|may|must|agree|provide|require|include|' | |
| r'warrant|represent|indemnif|terminat|govern|licens|assign|' | |
| r'disclose|notify|maintain|ensure|permit|restrict)\b', l, re.I) | |
| ) | |
| word_count = len(text.split()) | |
| # Heuristic A: many section refs, almost no substantive sentences | |
| if section_refs >= 4 and sentence_lines <= max(1, total_lines * 0.15): | |
| return True | |
| # Heuristic B: very short and many section refs (classic TOC listing) | |
| if word_count < 80 and section_refs >= 3: | |
| return True | |
| # Heuristic C: heading-only lines dominate | |
| heading_lines = sum(1 for l in lines if _HEADING_ONLY_LINE.fullmatch(l)) | |
| if total_lines >= 4 and heading_lines / total_lines > 0.60: | |
| return True | |
| return False | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Step 3 β Garbage clause filter (pre-inference gate) | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def is_garbage_clause(text: str, min_words: int = 15) -> bool: | |
| """ | |
| Return True for chunks that should never reach the neural model: | |
| β’ Too short to be a real clause | |
| β’ Mostly digits / page references | |
| β’ Mostly isolated section labels with no prose | |
| β’ All-caps title-only blocks | |
| """ | |
| words = text.split() | |
| if len(words) < min_words: | |
| return True | |
| # Too many digit tokens (page-number contamination) | |
| digit_ratio = sum(1 for w in words if w.strip('.,;:()').isdigit()) / len(words) | |
| if digit_ratio > 0.35: | |
| return True | |
| # Too many "Section" / "Article" tokens relative to word count | |
| struct_tokens = len(re.findall( | |
| r'\b(?:Section|ARTICLE|Article|Exhibit|Schedule|Annex|Appendix|Part|Chapter)\b', | |
| text, re.IGNORECASE, | |
| )) | |
| if struct_tokens / len(words) > 0.25: | |
| return True | |
| # No alphabetic word longer than 3 chars β pure noise / numbering block | |
| if not any(len(w) > 3 and w.isalpha() for w in words): | |
| return True | |
| # Delegate to TOC detector | |
| if is_toc_block(text): | |
| return True | |
| return False | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # PDF extraction (wraps clean step) | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def extract_text_from_pdf(file_path: str) -> str: | |
| import fitz | |
| doc = fitz.open(file_path) | |
| pages = [page.get_text("text") for page in doc] | |
| doc.close() | |
| raw = "\n".join(pages) | |
| raw = re.sub(r'(\w)-\n(\w)', r'\1\2', raw) # de-hyphenate before cleaning | |
| return clean_raw_pdf_text(raw) | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Header detection (primary segmentation) β unchanged from v5.3 | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| _HEADER_PATTERNS: list[tuple[str, re.Pattern]] = [ | |
| ("decimal", re.compile(r'(?m)^\s*(\d+(?:\.\d+){0,3}\.?)\s+(?=\S)')), | |
| ("article", re.compile( | |
| r'(?m)^\s*((?:Article|Section|Clause|Schedule|Annexure|Annex|Appendix|Part|Chapter)' | |
| r'\s+(?:\d+(?:\.\d+){0,2}|[IVXLC]+))[\s\.\-:]', re.IGNORECASE)), | |
| ("lettered", re.compile(r'(?m)^\s*(\(\s*[a-zA-Z]{1,4}\s*\))\s+(?=\S)')), | |
| ("roman", re.compile(r'(?m)^\s*([IVX]{1,5}\.)\s+(?=\S)')), | |
| ("caps", re.compile(r'(?m)^([A-Z][A-Z0-9 &/\-]{4,59})\s*$')), | |
| ] | |
| _INLINE_SUBCLAUSE = re.compile( | |
| r'(?<=[\s\.\;\:])(\(\s*(?:[a-z]|[ivx]{1,4})\s*\))\s+(?=[A-Z\w])', | |
| re.IGNORECASE, | |
| ) | |
| def _collect_headers(text: str) -> list[tuple[int, str, str]]: | |
| hits: list[tuple[int, str, str]] = [] | |
| for kind, pat in _HEADER_PATTERNS: | |
| for m in pat.finditer(text): | |
| hits.append((m.start(1), m.group(1).strip(), kind)) | |
| hits.sort(key=lambda h: h[0]) | |
| deduped: list[tuple[int, str, str]] = [] | |
| for h in hits: | |
| if not deduped or abs(h[0] - deduped[-1][0]) > 2: | |
| deduped.append(h) | |
| return deduped | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Inline subclause splitting β unchanged from v5.3 | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _split_inline_subclauses( | |
| body: str, | |
| parent_number: str | None = None, | |
| min_length: int = MIN_SUBCLAUSE_LEN, | |
| ) -> list[dict]: | |
| matches = list(_INLINE_SUBCLAUSE.finditer(body)) | |
| if len(matches) < 2: | |
| return [] | |
| parts: list[dict] = [] | |
| head = body[:matches[0].start()].strip() | |
| if head and len(head) >= 30: | |
| parts.append({ | |
| "text": head, | |
| "number": parent_number, | |
| "kind": "decimal" if parent_number else "paragraph", | |
| }) | |
| for i, m in enumerate(matches): | |
| start = m.start() | |
| end = matches[i + 1].start() if i + 1 < len(matches) else len(body) | |
| chunk = body[start:end].strip() | |
| if len(chunk) < min_length: | |
| if parts: | |
| parts[-1]["text"] = (parts[-1]["text"] + "\n" + chunk).strip() | |
| continue | |
| sub_marker = m.group(1).strip() | |
| composite = f"{parent_number}{sub_marker}" if parent_number else sub_marker | |
| parts.append({ | |
| "text": chunk, | |
| "number": composite, | |
| "kind": "subclause", | |
| }) | |
| return parts | |
| def _hard_cap_split(clause: dict, max_len: int = MAX_CLAUSE_CHARS) -> list[dict]: | |
| body = clause["text"] | |
| if len(body) <= max_len: | |
| return [clause] | |
| sentences = re.split(r'(?<=[\.\?\!])\s+(?=[A-Z])', body) | |
| chunks, current = [], "" | |
| for s in sentences: | |
| if len(current) + len(s) + 1 > max_len and current: | |
| chunks.append(current.strip()) | |
| current = s | |
| else: | |
| current = (current + " " + s).strip() if current else s | |
| if current: | |
| chunks.append(current.strip()) | |
| return [ | |
| { | |
| "text": c, | |
| "number": clause.get("number"), | |
| "kind": clause.get("kind", "paragraph") + "/chunked", | |
| } | |
| for c in chunks if len(c) >= MIN_SUBCLAUSE_LEN | |
| ] | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Public API | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def split_into_clauses_with_metadata( | |
| text: str, | |
| min_length: int = 40, | |
| ) -> list[dict]: | |
| """ | |
| Segment text into clauses, filter TOC/garbage, return clean list. | |
| This is the single entry-point used by analyze_document(). | |
| """ | |
| headers = _collect_headers(text) | |
| # ββ Primary segmentation (heading-based) ββββββββββββββββββββββββββββββ | |
| primary: list[dict] = [] | |
| if headers: | |
| for i, (start, marker, kind) in enumerate(headers): | |
| end = headers[i + 1][0] if i + 1 < len(headers) else len(text) | |
| body = text[start:end].strip() | |
| if len(body) >= min_length: | |
| primary.append({"text": body, "number": marker, "kind": kind}) | |
| # Paragraph fallback when no headers were found | |
| if not primary: | |
| for p in [p.strip() for p in re.split(r'\n\s*\n', text)]: | |
| if len(p) >= min_length: | |
| primary.append({"text": p, "number": None, "kind": "paragraph"}) | |
| # ββ TOC / garbage filter (NEW in v5.4) ββββββββββββββββββββββββββββββββ | |
| primary = [c for c in primary if not is_garbage_clause(c["text"])] | |
| if not primary: | |
| # If everything was filtered, fall back to treating the full text as one | |
| # clause rather than returning an empty list (caller handles it). | |
| return [{"text": text[:2000], "number": None, "kind": "paragraph"}] | |
| # ββ Secondary pass: inline subclause splitting for long clauses ββββββββ | |
| refined: list[dict] = [] | |
| for clause in primary: | |
| if len(clause["text"]) > LONG_CLAUSE_CHARS: | |
| subs = _split_inline_subclauses( | |
| clause["text"], | |
| parent_number=clause.get("number"), | |
| ) | |
| if subs: | |
| refined.extend(subs) | |
| continue | |
| refined.append(clause) | |
| # ββ Tertiary pass: hard length cap ββββββββββββββββββββββββββββββββββββ | |
| final: list[dict] = [] | |
| for clause in refined: | |
| final.extend(_hard_cap_split(clause)) | |
| # ββ Final garbage sweep after splitting βββββββββββββββββββββββββββββββ | |
| # Splitting can produce tiny chunks β filter them out too. | |
| final = [c for c in final if not is_garbage_clause(c["text"])] | |
| print(f"[INFO] Segmentation: {len(primary)} primary β " | |
| f"{len(refined)} refined β {len(final)} final clean clauses") | |
| return final if final else [{"text": text[:2000], "number": None, "kind": "paragraph"}] | |
| def split_into_clauses(text: str, min_length: int = 40) -> list[str]: | |
| """Backward-compat wrapper that returns plain strings.""" | |
| return [c["text"] for c in split_into_clauses_with_metadata(text, min_length)] |