import re import fitz # PyMuPDF import unicodedata # ========================================================== # 1️⃣ TEXT EXTRACTION (Clean + TOC Detection) # ========================================================== def extract_text_from_pdf(file_path: str): """ Extracts and cleans text from a PDF using PyMuPDF. Handles layout artifacts, numbered sections, and TOC. Returns both clean text and detected TOC (if any). """ text = "" try: with fitz.open(file_path) as pdf: for page_num, page in enumerate(pdf, start=1): page_text = page.get_text("text").strip() # Fallback: for scanned/weird layouts if not page_text: blocks = page.get_text("blocks") page_text = " ".join( block[4] for block in blocks if isinstance(block[4], str) ) # Ensure bullets & numbered sections start on new lines page_text = page_text.replace("• ", "\n• ") page_text = re.sub(r"(\d+\.\d+\.\d+)", r"\n\1", page_text) # Remove headers/footers and confidential tags page_text = re.sub( r"Page\s*\d+\s*(of\s*\d+)?", "", page_text, flags=re.IGNORECASE ) page_text = re.sub( r"(PUBLIC|Confidential|© SAP.*|\bSAP\b\s*\d{4})", "", page_text, flags=re.IGNORECASE, ) text += page_text + "\n" except Exception as e: raise RuntimeError(f"❌ PDF extraction failed: {e}") # --- Cleaning pipeline --- text = clean_text(text) # --- TOC extraction --- toc = extract_table_of_contents(text) if toc: print(f"📘 TOC detected with {len(toc)} entries.") else: print("⚠️ No Table of Contents detected.") return text, toc # ========================================================== # 2️⃣ ADVANCED CLEANING PIPELINE # ========================================================== def clean_text(text: str) -> str: """Cleans noisy PDF text before chunking and embedding.""" text = unicodedata.normalize("NFKD", text) # Remove TOC noise (like "6.3.1 Prerequisites .............. 53") text = re.sub( r"\b\d+(\.\d+){1,}\s+[A-Za-z].{0,40}\.{2,}\s*\d+\b", "", text ) # Replace bullet symbols and dots with consistent spacing text = text.replace("•", "- ").replace("▪", "- ").replace("‣", "- ") # Remove excessive dots, hyphens, headers text = re.sub(r"\.{3,}", ". ", text) text = re.sub(r"-\s*\n", "", text) text = re.sub(r"\n\s*(PUBLIC|PRIVATE|Confidential)\s*\n", "\n", text, flags=re.IGNORECASE) text = re.sub(r"©\s*[A-Z].*?\d{4}", "", text) # Normalize newlines and spaces text = text.replace("\r", " ") text = re.sub(r"\n{2,}", "\n", text) text = re.sub(r"\s{2,}", " ", text) # Clean leftover special chars text = re.sub(r"[^A-Za-z0-9,;:.\-\(\)/&\n\s]", "", text) text = re.sub(r"(\s*\.\s*){3,}", " ", text) return text.strip() # ========================================================== # 3️⃣ TABLE OF CONTENTS DETECTION (Improved) # ========================================================== def extract_table_of_contents(text: str): """ Smart TOC detector for enterprise PDFs. Handles 'Table of Contents', 'Contents', 'Content', 'Index', 'Overview', and implicit numbered TOCs without a header. Returns list of (section_number, section_title). """ toc_entries = [] lines = text.split("\n") toc_started = False toc_ended = False line_count = len(lines) for i, line in enumerate(lines): # --- Step 1️⃣: Detect possible TOC header variants --- if not toc_started and re.search(r"\b(table\s*of\s*contents?|contents?|index|overview)\b", line, re.IGNORECASE): next_lines = lines[i + 1 : i + 8] if any(re.match(r"^\s*\d+(\.\d+)*\s+[A-Za-z]", l) for l in next_lines): toc_started = True continue # --- Step 2️⃣: Smart fallback — detect implicit TOC without header --- if not toc_started and re.match(r"^\s*\d+(\.\d+)*\s+[A-Za-z]", line): numbered_lines = 0 for j in range(i, min(i + 5, line_count)): if re.match(r"^\s*\d+(\.\d+)*\s+[A-Za-z]", lines[j]): numbered_lines += 1 if numbered_lines >= 3: # heuristic to confirm pattern toc_started = True # --- Step 3️⃣: Detect end of TOC region --- if toc_started and re.match(r"^\s*(Step\s*\d+|[A-Z][a-z]{2,}\s[A-Z])", line): toc_ended = True break # --- Step 4️⃣: Extract TOC entries --- if toc_started and not toc_ended: match = re.match( r"^\s*(\d+(?:\.\d+)*)\s+([A-Z][A-Za-z0-9\s/&(),-]+)(?:\.+\s*\d+)?$", line.strip() ) if match: section = match.group(1).strip() title = match.group(2).strip() if len(title) > 3 and not re.match(r"^\d+$", title): toc_entries.append((section, title)) # --- Step 5️⃣: Clean up duplicates --- deduped = [] seen = set() for sec, title in toc_entries: key = (sec, title.lower()) if key not in seen: deduped.append((sec, title)) seen.add(key) return deduped # ========================================================== # 4️⃣ SMART CHUNKING (Auto-Sized + Continuity-Aware) # ========================================================== def chunk_text(text: str, chunk_size: int = None, overlap: int = None) -> list: """ Enhanced chunking for structured enterprise PDFs. Auto-selects chunk size and keeps procedural context intact. """ text_length = len(text) if chunk_size is None: if text_length > 200000: chunk_size, overlap = 2000, 250 elif text_length > 50000: chunk_size, overlap = 1500, 200 else: chunk_size, overlap = 1000, 150 elif overlap is None: overlap = 150 print(f"⚙️ Auto-selected chunk_size={chunk_size}, overlap={overlap} (len={text_length})") text = re.sub(r"\s+", " ", text.strip()) section_pattern = ( r"(?=(?:\n?\d+(?:\.\d+){0,3}\s+[A-Z][^\n]{3,100})|(?:Step\s*\d+[:.\s]))" ) sections = re.split(section_pattern, text) sections = [s.strip() for s in sections if s.strip()] chunks = [] for section in sections: section = re.sub(r"\n\s*[-•▪‣]\s*", " • ", section) bullets = re.split(r"(?=\s*[-•▪‣]\s)", section) bullets = [b.strip() for b in bullets if b.strip()] if len(bullets) > 2: combined = " ".join(bullets) if len(combined) > chunk_size * 1.5: for i in range(0, len(bullets), 6): block = " ".join(bullets[i:i+6]) chunks.append(block.strip()) else: chunks.append(combined.strip()) else: chunks.extend(_split_by_sentence(section, chunk_size, overlap)) chunks = _merge_small_chunks(chunks, min_len=200) # Add continuity overlap final_chunks = [] for i, ch in enumerate(chunks): if i == 0: final_chunks.append(ch) else: prev_tail = chunks[i - 1][-overlap:] if overlap > 0 else "" final_chunks.append((prev_tail + " " + ch).strip()) print(f"✅ Final chunks created (continuity-aware): {len(final_chunks)}") return final_chunks # ========================================================== # 5️⃣ Helper Functions # ========================================================== def _split_by_sentence(text, chunk_size=800, overlap=80): sentences = re.split(r"(?<=[.!?])\s+", text) chunks, current = [], "" for sent in sentences: if len(current) + len(sent) + 1 <= chunk_size: current += " " + sent else: if current.strip(): chunks.append(current.strip()) overlap_part = current[-overlap:] if overlap > 0 else "" current = overlap_part + " " + sent if current.strip(): chunks.append(current.strip()) return chunks def _merge_small_chunks(chunks, min_len=150): merged, buffer = [], "" for ch in chunks: if len(ch) < min_len: buffer += " " + ch else: if buffer: merged.append(buffer.strip()) buffer = "" merged.append(ch.strip()) if buffer: merged.append(buffer.strip()) return merged # ========================================================== # 6️⃣ DEBUGGING (Manual Run) # ========================================================== if __name__ == "__main__": pdf_path = "sample.pdf" text, toc = extract_text_from_pdf(pdf_path) print("\n📚 TOC Preview:", toc[:5]) chunks = chunk_text(text) print(f"\n✅ {len(chunks)} chunks created.") for i, c in enumerate(chunks[:5], 1): print(f"\n--- Chunk {i} ---\n{c[:500]}...\n")