Shubham170793's picture
Update src/ingestion.py
e11a9ad verified
raw
history blame
11.3 kB
import re
import fitz # PyMuPDF
import unicodedata
from gen_ai_hub.proxy.langchain.openai import ChatOpenAI
# ==========================================================
# 1️⃣ TEXT EXTRACTION (Clean + TOC Detection)
# ==========================================================
def extract_text_from_pdf(file_path: str):
"""
Extracts and cleans text from a PDF using PyMuPDF.
Handles layout artifacts, numbered sections, and TOC.
Returns clean text + TOC list + source label.
"""
text = ""
try:
with fitz.open(file_path) as pdf:
for page_num, page in enumerate(pdf, start=1):
page_text = page.get_text("text").strip()
# Fallback: for scanned or weird layouts
if not page_text:
blocks = page.get_text("blocks")
page_text = " ".join(
block[4] for block in blocks if isinstance(block[4], str)
)
# Ensure bullets & numbered sections start on new lines
page_text = page_text.replace("• ", "\n• ")
page_text = re.sub(r"(\d+\.\d+\.\d+)", r"\n\1", page_text)
# Remove headers/footers and confidential tags
page_text = re.sub(
r"Page\s*\d+\s*(of\s*\d+)?", "", page_text, flags=re.IGNORECASE
)
page_text = re.sub(
r"(PUBLIC|Confidential|© SAP.*|\bSAP\b\s*\d{4})",
"",
page_text,
flags=re.IGNORECASE,
)
text += page_text + "\n"
except Exception as e:
raise RuntimeError(f"❌ PDF extraction failed: {e}")
# --- Cleaning pipeline ---
text = clean_text(text)
# --- TOC extraction (Hybrid) ---
toc, toc_source = get_hybrid_toc(text)
print(f"📘 TOC Source: {toc_source} | Entries: {len(toc)}")
return text, toc, toc_source
# ==========================================================
# 2️⃣ ADVANCED CLEANING PIPELINE
# ==========================================================
def clean_text(text: str) -> str:
"""Cleans noisy PDF text before chunking and embedding."""
text = unicodedata.normalize("NFKD", text)
# Remove TOC noise (like "6.3.1 Prerequisites .............. 53")
text = re.sub(
r"\b\d+(\.\d+){1,}\s+[A-Za-z].{0,40}\.{2,}\s*\d+\b", "", text
)
# Replace bullet symbols and dots with consistent spacing
text = text.replace("•", "- ").replace("▪", "- ").replace("‣", "- ")
# Remove excessive dots, hyphens, headers
text = re.sub(r"\.{3,}", ". ", text)
text = re.sub(r"-\s*\n", "", text)
text = re.sub(r"\n\s*(PUBLIC|PRIVATE|Confidential)\s*\n", "\n", text, flags=re.IGNORECASE)
text = re.sub(r"©\s*[A-Z].*?\d{4}", "", text)
# Normalize newlines and spaces
text = text.replace("\r", " ")
text = re.sub(r"\n{2,}", "\n", text)
text = re.sub(r"\s{2,}", " ", text)
# Clean leftover special chars
text = re.sub(r"[^A-Za-z0-9,;:.\-\(\)/&\n\s]", "", text)
text = re.sub(r"(\s*\.\s*){3,}", " ", text)
return text.strip()
# ==========================================================
# 3️⃣ TABLE OF CONTENTS DETECTION (Heuristic)
# ==========================================================
def extract_table_of_contents(text: str):
"""
Smart TOC detector for enterprise PDFs.
Handles 'Table of Contents', 'Contents', 'Content', 'Index', 'Overview',
and implicit numbered TOCs without a header.
Returns list of (section_number, section_title).
"""
toc_entries = []
lines = text.split("\n")
toc_started = False
toc_ended = False
line_count = len(lines)
for i, line in enumerate(lines):
# --- Step 1️⃣: Detect TOC header variants ---
if not toc_started and re.search(r"\b(table\s*of\s*contents?|contents?|index|overview)\b", line, re.IGNORECASE):
next_lines = lines[i + 1 : i + 8]
if any(re.match(r"^\s*\d+(\.\d+)*\s+[A-Za-z]", l) for l in next_lines):
toc_started = True
continue
# --- Step 2️⃣: Smart fallback — detect implicit TOC ---
if not toc_started and re.match(r"^\s*\d+(\.\d+)*\s+[A-Za-z]", line):
numbered_lines = 0
for j in range(i, min(i + 5, line_count)):
if re.match(r"^\s*\d+(\.\d+)*\s+[A-Za-z]", lines[j]):
numbered_lines += 1
if numbered_lines >= 3: # heuristic to confirm pattern
toc_started = True
# --- Step 3️⃣: Detect end of TOC region ---
if toc_started and re.match(r"^\s*(Step\s*\d+|[A-Z][a-z]{2,}\s[A-Z])", line):
toc_ended = True
break
# --- Step 4️⃣: Extract TOC entries ---
if toc_started and not toc_ended:
match = re.match(
r"^\s*(\d+(?:\.\d+)*)\s+([A-Z][A-Za-z0-9\s/&(),-]+)(?:\.+\s*\d+)?$",
line.strip()
)
if match:
section = match.group(1).strip()
title = match.group(2).strip()
if len(title) > 3 and not re.match(r"^\d+$", title):
toc_entries.append((section, title))
# --- Step 5️⃣: Clean up duplicates ---
deduped = []
seen = set()
for sec, title in toc_entries:
key = (sec, title.lower())
if key not in seen:
deduped.append((sec, title))
seen.add(key)
return deduped
# ==========================================================
# 3A️⃣ HYBRID TOC FALLBACK (AI-Inferred)
# ==========================================================
def adaptive_fallback_toc(text: str, model: str = "gpt-4o-mini", max_chars: int = 7000):
"""
Uses an LLM to infer a Table of Contents from the document text.
Called only when no TOC is found via regex parsing.
"""
snippet = text[:max_chars]
llm = ChatOpenAI(model=model, temperature=0)
prompt = f"""
You are a document structure analyzer.
Read the following text and infer its main section titles.
Output a clean, numbered list (1., 2., 3.) with 5–10 entries max.
TEXT SAMPLE:
{snippet}
"""
try:
response = llm.invoke(prompt)
lines = [
re.sub(r"^[0-9.\-•\\s]+", "", l.strip())
for l in response.content.splitlines()
if l.strip()
]
toc_ai = [(str(i + 1), l) for i, l in enumerate(lines) if len(l) > 3]
return toc_ai
except Exception as e:
print(f"⚠️ AI TOC fallback failed: {e}")
return []
# ==========================================================
# 3B️⃣ UNIFIED WRAPPER (Heuristic + AI Hybrid)
# ==========================================================
def get_hybrid_toc(text: str):
"""
Attempts heuristic TOC extraction; if none found,
triggers adaptive AI fallback.
Returns (toc_entries, source_label).
"""
toc_entries = extract_table_of_contents(text)
if toc_entries:
print(f"📘 TOC detected with {len(toc_entries)} entries (heuristic).")
return toc_entries, "heuristic"
print("⚠️ No TOC detected — invoking adaptive AI fallback...")
toc_ai = adaptive_fallback_toc(text)
if toc_ai:
print(f"✨ AI-inferred TOC generated with {len(toc_ai)} entries.")
return toc_ai, "ai_inferred"
print("❌ No TOC could be detected or inferred.")
return [], "none"
# ==========================================================
# 4️⃣ SMART CHUNKING (Auto-Sized + Continuity-Aware)
# ==========================================================
def chunk_text(text: str, chunk_size: int = None, overlap: int = None) -> list:
"""
Enhanced chunking for structured enterprise PDFs.
Auto-selects chunk size and keeps procedural context intact.
"""
text_length = len(text)
if chunk_size is None:
if text_length > 200000:
chunk_size, overlap = 2000, 250
elif text_length > 50000:
chunk_size, overlap = 1500, 200
else:
chunk_size, overlap = 1000, 150
elif overlap is None:
overlap = 150
print(f"⚙️ Auto-selected chunk_size={chunk_size}, overlap={overlap} (len={text_length})")
text = re.sub(r"\s+", " ", text.strip())
section_pattern = (
r"(?=(?:\n?\d+(?:\.\d+){0,3}\s+[A-Z][^\n]{3,100})|(?:Step\s*\d+[:.\s]))"
)
sections = re.split(section_pattern, text)
sections = [s.strip() for s in sections if s.strip()]
chunks = []
for section in sections:
section = re.sub(r"\n\s*[-•▪‣]\s*", " • ", section)
bullets = re.split(r"(?=\s*[-•▪‣]\s)", section)
bullets = [b.strip() for b in bullets if b.strip()]
if len(bullets) > 2:
combined = " ".join(bullets)
if len(combined) > chunk_size * 1.5:
for i in range(0, len(bullets), 6):
block = " ".join(bullets[i:i+6])
chunks.append(block.strip())
else:
chunks.append(combined.strip())
else:
chunks.extend(_split_by_sentence(section, chunk_size, overlap))
chunks = _merge_small_chunks(chunks, min_len=200)
# Add continuity overlap
final_chunks = []
for i, ch in enumerate(chunks):
if i == 0:
final_chunks.append(ch)
else:
prev_tail = chunks[i - 1][-overlap:] if overlap > 0 else ""
final_chunks.append((prev_tail + " " + ch).strip())
print(f"✅ Final chunks created (continuity-aware): {len(final_chunks)}")
return final_chunks
# ==========================================================
# 5️⃣ Helper Functions
# ==========================================================
def _split_by_sentence(text, chunk_size=800, overlap=80):
sentences = re.split(r"(?<=[.!?])\s+", text)
chunks, current = [], ""
for sent in sentences:
if len(current) + len(sent) + 1 <= chunk_size:
current += " " + sent
else:
if current.strip():
chunks.append(current.strip())
overlap_part = current[-overlap:] if overlap > 0 else ""
current = overlap_part + " " + sent
if current.strip():
chunks.append(current.strip())
return chunks
def _merge_small_chunks(chunks, min_len=150):
merged, buffer = [], ""
for ch in chunks:
if len(ch) < min_len:
buffer += " " + ch
else:
if buffer:
merged.append(buffer.strip())
buffer = ""
merged.append(ch.strip())
if buffer:
merged.append(buffer.strip())
return merged
# ==========================================================
# 6️⃣ DEBUGGING (Manual Run)
# ==========================================================
if __name__ == "__main__":
pdf_path = "sample.pdf"
text, toc, source = extract_text_from_pdf(pdf_path)
print("\n📚 TOC Preview:", toc[:5])
chunks = chunk_text(text)
print(f"\n✅ {len(chunks)} chunks created.")
for i, c in enumerate(chunks[:5], 1):
print(f"\n--- Chunk {i} ---\n{c[:500]}...\n")