File size: 8,085 Bytes
35646e4 275cb5c 85242e3 275cb5c 85242e3 df1d611 85242e3 df1d611 275cb5c 6b0c8b8 df1d611 275cb5c 6b0c8b8 85242e3 6b0c8b8 85242e3 df1d611 6b0c8b8 85242e3 b61a150 85242e3 df1d611 32f64de df1d611 b61a150 85242e3 6b0c8b8 85242e3 6b0c8b8 85242e3 df1d611 275cb5c 85242e3 df1d611 85242e3 df1d611 85242e3 df1d611 b61a150 85242e3 df1d611 85242e3 df1d611 85242e3 df1d611 85242e3 df1d611 85242e3 b61a150 85242e3 df1d611 85242e3 df1d611 275cb5c df1d611 35646e4 df1d611 85242e3 df1d611 b61a150 f2fb7ac b61a150 f2fb7ac df1d611 35646e4 dd8eaa7 f2fb7ac dd8eaa7 f2fb7ac df1d611 f2fb7ac 85242e3 f2fb7ac 85242e3 35646e4 85242e3 df1d611 85242e3 b61a150 85242e3 35646e4 85242e3 df1d611 85242e3 35646e4 85242e3 df1d611 b61a150 df1d611 85242e3 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 |
import re
import fitz # PyMuPDF
import unicodedata
# ==========================================================
# 1️⃣ TEXT EXTRACTION (Clean + TOC Detection)
# ==========================================================
def extract_text_from_pdf(file_path: str):
"""
Extracts and cleans text from a PDF using PyMuPDF.
Handles layout artifacts, numbered sections, and TOC.
Returns both clean text and detected TOC (if any).
"""
text = ""
try:
with fitz.open(file_path) as pdf:
for page_num, page in enumerate(pdf, start=1):
page_text = page.get_text("text").strip()
# Fallback: for scanned/weird layouts
if not page_text:
blocks = page.get_text("blocks")
page_text = " ".join(
block[4] for block in blocks if isinstance(block[4], str)
)
# Ensure bullets & numbered sections start on new lines
page_text = page_text.replace("• ", "\n• ")
page_text = re.sub(r"(\d+\.\d+\.\d+)", r"\n\1", page_text)
# Remove headers/footers and confidential tags
page_text = re.sub(
r"Page\s*\d+\s*(of\s*\d+)?", "", page_text, flags=re.IGNORECASE
)
page_text = re.sub(
r"(PUBLIC|Confidential|© SAP.*|\bSAP\b\s*\d{4})",
"",
page_text,
flags=re.IGNORECASE,
)
text += page_text + "\n"
except Exception as e:
raise RuntimeError(f"❌ PDF extraction failed: {e}")
# --- Cleaning pipeline ---
text = clean_text(text)
# --- TOC extraction ---
toc = extract_table_of_contents(text)
if toc:
print(f"📘 TOC detected with {len(toc)} entries.")
else:
print("⚠️ No Table of Contents detected.")
return text, toc
# ==========================================================
# 2️⃣ ADVANCED CLEANING PIPELINE
# ==========================================================
def clean_text(text: str) -> str:
"""Cleans noisy PDF text before chunking and embedding."""
text = unicodedata.normalize("NFKD", text)
# Remove TOC noise (like "6.3.1 Prerequisites .............. 53")
text = re.sub(
r"\b\d+(\.\d+){1,}\s+[A-Za-z].{0,40}\.{2,}\s*\d+\b", "", text
)
# Replace bullet symbols and dots with consistent spacing
text = text.replace("•", "- ").replace("▪", "- ").replace("‣", "- ")
# Remove excessive dots, hyphens, headers
text = re.sub(r"\.{3,}", ". ", text)
text = re.sub(r"-\s*\n", "", text)
text = re.sub(r"\n\s*(PUBLIC|PRIVATE|Confidential)\s*\n", "\n", text, flags=re.IGNORECASE)
text = re.sub(r"©\s*[A-Z].*?\d{4}", "", text)
# Normalize newlines and spaces
text = text.replace("\r", " ")
text = re.sub(r"\n{2,}", "\n", text)
text = re.sub(r"\s{2,}", " ", text)
# Clean leftover special chars
text = re.sub(r"[^A-Za-z0-9,;:.\-\(\)/&\n\s]", "", text)
text = re.sub(r"(\s*\.\s*){3,}", " ", text)
return text.strip()
# ==========================================================
# 3️⃣ TABLE OF CONTENTS DETECTION
# ==========================================================
def extract_table_of_contents(text: str):
"""
Detects Table of Contents (TOC) in PDFs.
Returns list of (section_number, section_title).
"""
toc_entries = []
lines = text.split("\n")
toc_started = False
for line in lines:
# Detect start of TOC
if not toc_started and re.search(r"table\s*of\s*contents", line, re.IGNORECASE):
toc_started = True
continue
if toc_started:
# Stop scanning when we reach main content
if re.match(r"^\s*(Step\s*\d+|1\.\s*[A-Z])", line):
break
# Match TOC patterns like "3.2 Configure Endpoints ........ 13"
match = re.match(r"^\s*(\d+(?:\.\d+)*)\s+([A-Z][A-Za-z0-9\s/&()-]+)", line)
if match:
section = match.group(1).strip()
title = match.group(2).strip()
if len(title) > 3:
toc_entries.append((section, title))
return toc_entries
# ==========================================================
# 4️⃣ SMART CHUNKING (Auto-Sized + Continuity-Aware)
# ==========================================================
def chunk_text(text: str, chunk_size: int = None, overlap: int = None) -> list:
"""
Enhanced chunking for structured enterprise PDFs.
Auto-selects chunk size and keeps procedural context intact.
"""
text_length = len(text)
if chunk_size is None:
if text_length > 200000:
chunk_size, overlap = 2000, 250
elif text_length > 50000:
chunk_size, overlap = 1500, 200
else:
chunk_size, overlap = 1000, 150
elif overlap is None:
overlap = 150
print(f"⚙️ Auto-selected chunk_size={chunk_size}, overlap={overlap} (len={text_length})")
text = re.sub(r"\s+", " ", text.strip())
section_pattern = (
r"(?=(?:\n?\d+(?:\.\d+){0,3}\s+[A-Z][^\n]{3,100})|(?:Step\s*\d+[:.\s]))"
)
sections = re.split(section_pattern, text)
sections = [s.strip() for s in sections if s.strip()]
chunks = []
for section in sections:
section = re.sub(r"\n\s*[-•▪‣]\s*", " • ", section)
bullets = re.split(r"(?=\s*[-•▪‣]\s)", section)
bullets = [b.strip() for b in bullets if b.strip()]
if len(bullets) > 2:
combined = " ".join(bullets)
if len(combined) > chunk_size * 1.5:
for i in range(0, len(bullets), 6):
block = " ".join(bullets[i:i+6])
chunks.append(block.strip())
else:
chunks.append(combined.strip())
else:
chunks.extend(_split_by_sentence(section, chunk_size, overlap))
chunks = _merge_small_chunks(chunks, min_len=200)
# Add continuity overlap
final_chunks = []
for i, ch in enumerate(chunks):
if i == 0:
final_chunks.append(ch)
else:
prev_tail = chunks[i - 1][-overlap:] if overlap > 0 else ""
final_chunks.append((prev_tail + " " + ch).strip())
print(f"✅ Final chunks created (continuity-aware): {len(final_chunks)}")
return final_chunks
# ==========================================================
# 5️⃣ Helper Functions
# ==========================================================
def _split_by_sentence(text, chunk_size=800, overlap=80):
sentences = re.split(r"(?<=[.!?])\s+", text)
chunks, current = [], ""
for sent in sentences:
if len(current) + len(sent) + 1 <= chunk_size:
current += " " + sent
else:
if current.strip():
chunks.append(current.strip())
overlap_part = current[-overlap:] if overlap > 0 else ""
current = overlap_part + " " + sent
if current.strip():
chunks.append(current.strip())
return chunks
def _merge_small_chunks(chunks, min_len=150):
merged, buffer = [], ""
for ch in chunks:
if len(ch) < min_len:
buffer += " " + ch
else:
if buffer:
merged.append(buffer.strip())
buffer = ""
merged.append(ch.strip())
if buffer:
merged.append(buffer.strip())
return merged
# ==========================================================
# 6️⃣ DEBUGGING (Manual Run)
# ==========================================================
if __name__ == "__main__":
pdf_path = "sample.pdf"
text, toc = extract_text_from_pdf(pdf_path)
print("\n📚 TOC Preview:", toc[:5])
chunks = chunk_text(text)
print(f"\n✅ {len(chunks)} chunks created.")
for i, c in enumerate(chunks[:5], 1):
print(f"\n--- Chunk {i} ---\n{c[:500]}...\n")
|