|
|
import re |
|
|
import fitz |
|
|
import unicodedata |
|
|
from gen_ai_hub.proxy.langchain.openai import ChatOpenAI |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def extract_text_from_pdf(file_path: str): |
|
|
""" |
|
|
Extracts and cleans text from a PDF using PyMuPDF. |
|
|
Handles layout artifacts, numbered sections, and TOC. |
|
|
Returns clean text + TOC list + source label. |
|
|
""" |
|
|
text = "" |
|
|
try: |
|
|
with fitz.open(file_path) as pdf: |
|
|
for page_num, page in enumerate(pdf, start=1): |
|
|
page_text = page.get_text("text").strip() |
|
|
|
|
|
|
|
|
if not page_text: |
|
|
blocks = page.get_text("blocks") |
|
|
page_text = " ".join( |
|
|
block[4] for block in blocks if isinstance(block[4], str) |
|
|
) |
|
|
|
|
|
|
|
|
page_text = page_text.replace("• ", "\n• ") |
|
|
page_text = re.sub(r"(\d+\.\d+\.\d+)", r"\n\1", page_text) |
|
|
|
|
|
|
|
|
page_text = re.sub( |
|
|
r"Page\s*\d+\s*(of\s*\d+)?", "", page_text, flags=re.IGNORECASE |
|
|
) |
|
|
page_text = re.sub( |
|
|
r"(PUBLIC|Confidential|© SAP.*|\bSAP\b\s*\d{4})", |
|
|
"", |
|
|
page_text, |
|
|
flags=re.IGNORECASE, |
|
|
) |
|
|
|
|
|
text += page_text + "\n" |
|
|
|
|
|
except Exception as e: |
|
|
raise RuntimeError(f"❌ PDF extraction failed: {e}") |
|
|
|
|
|
|
|
|
text = clean_text(text) |
|
|
|
|
|
|
|
|
toc, toc_source = get_hybrid_toc(text) |
|
|
print(f"📘 TOC Source: {toc_source} | Entries: {len(toc)}") |
|
|
|
|
|
return text, toc, toc_source |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def clean_text(text: str) -> str: |
|
|
"""Cleans noisy PDF text before chunking and embedding.""" |
|
|
text = unicodedata.normalize("NFKD", text) |
|
|
|
|
|
|
|
|
text = re.sub( |
|
|
r"\b\d+(\.\d+){1,}\s+[A-Za-z].{0,40}\.{2,}\s*\d+\b", "", text |
|
|
) |
|
|
|
|
|
|
|
|
text = text.replace("•", "- ").replace("▪", "- ").replace("‣", "- ") |
|
|
|
|
|
|
|
|
text = re.sub(r"\.{3,}", ". ", text) |
|
|
text = re.sub(r"-\s*\n", "", text) |
|
|
text = re.sub(r"\n\s*(PUBLIC|PRIVATE|Confidential)\s*\n", "\n", text, flags=re.IGNORECASE) |
|
|
text = re.sub(r"©\s*[A-Z].*?\d{4}", "", text) |
|
|
|
|
|
|
|
|
text = text.replace("\r", " ") |
|
|
text = re.sub(r"\n{2,}", "\n", text) |
|
|
text = re.sub(r"\s{2,}", " ", text) |
|
|
|
|
|
|
|
|
text = re.sub(r"[^A-Za-z0-9,;:.\-\(\)/&\n\s]", "", text) |
|
|
text = re.sub(r"(\s*\.\s*){3,}", " ", text) |
|
|
|
|
|
return text.strip() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def extract_table_of_contents(text: str): |
|
|
""" |
|
|
Smart TOC detector for enterprise PDFs. |
|
|
Handles 'Table of Contents', 'Contents', 'Content', 'Index', 'Overview', |
|
|
and implicit numbered TOCs without a header. |
|
|
Returns list of (section_number, section_title). |
|
|
""" |
|
|
toc_entries = [] |
|
|
lines = text.split("\n") |
|
|
toc_started = False |
|
|
toc_ended = False |
|
|
line_count = len(lines) |
|
|
|
|
|
for i, line in enumerate(lines): |
|
|
|
|
|
if not toc_started and re.search(r"\b(table\s*of\s*contents?|contents?|index|overview)\b", line, re.IGNORECASE): |
|
|
next_lines = lines[i + 1 : i + 8] |
|
|
if any(re.match(r"^\s*\d+(\.\d+)*\s+[A-Za-z]", l) for l in next_lines): |
|
|
toc_started = True |
|
|
continue |
|
|
|
|
|
|
|
|
if not toc_started and re.match(r"^\s*\d+(\.\d+)*\s+[A-Za-z]", line): |
|
|
numbered_lines = 0 |
|
|
for j in range(i, min(i + 5, line_count)): |
|
|
if re.match(r"^\s*\d+(\.\d+)*\s+[A-Za-z]", lines[j]): |
|
|
numbered_lines += 1 |
|
|
if numbered_lines >= 3: |
|
|
toc_started = True |
|
|
|
|
|
|
|
|
if toc_started and re.match(r"^\s*(Step\s*\d+|[A-Z][a-z]{2,}\s[A-Z])", line): |
|
|
toc_ended = True |
|
|
break |
|
|
|
|
|
|
|
|
if toc_started and not toc_ended: |
|
|
match = re.match( |
|
|
r"^\s*(\d+(?:\.\d+)*)\s+([A-Z][A-Za-z0-9\s/&(),-]+)(?:\.+\s*\d+)?$", |
|
|
line.strip() |
|
|
) |
|
|
if match: |
|
|
section = match.group(1).strip() |
|
|
title = match.group(2).strip() |
|
|
if len(title) > 3 and not re.match(r"^\d+$", title): |
|
|
toc_entries.append((section, title)) |
|
|
|
|
|
|
|
|
deduped = [] |
|
|
seen = set() |
|
|
for sec, title in toc_entries: |
|
|
key = (sec, title.lower()) |
|
|
if key not in seen: |
|
|
deduped.append((sec, title)) |
|
|
seen.add(key) |
|
|
|
|
|
return deduped |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def adaptive_fallback_toc(text: str, model: str = "gpt-4o-mini", max_chars: int = 7000): |
|
|
""" |
|
|
Uses an LLM to infer a Table of Contents from the document text. |
|
|
Called only when no TOC is found via regex parsing. |
|
|
""" |
|
|
snippet = text[:max_chars] |
|
|
llm = ChatOpenAI(model=model, temperature=0) |
|
|
prompt = f""" |
|
|
You are a document structure analyzer. |
|
|
Read the following text and infer its main section titles. |
|
|
Output a clean, numbered list (1., 2., 3.) with 5–10 entries max. |
|
|
|
|
|
TEXT SAMPLE: |
|
|
{snippet} |
|
|
""" |
|
|
try: |
|
|
response = llm.invoke(prompt) |
|
|
lines = [ |
|
|
re.sub(r"^[0-9.\-•\\s]+", "", l.strip()) |
|
|
for l in response.content.splitlines() |
|
|
if l.strip() |
|
|
] |
|
|
toc_ai = [(str(i + 1), l) for i, l in enumerate(lines) if len(l) > 3] |
|
|
return toc_ai |
|
|
except Exception as e: |
|
|
print(f"⚠️ AI TOC fallback failed: {e}") |
|
|
return [] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_hybrid_toc(text: str): |
|
|
""" |
|
|
Attempts heuristic TOC extraction; if none found, |
|
|
triggers adaptive AI fallback. |
|
|
Returns (toc_entries, source_label). |
|
|
""" |
|
|
toc_entries = extract_table_of_contents(text) |
|
|
if toc_entries: |
|
|
print(f"📘 TOC detected with {len(toc_entries)} entries (heuristic).") |
|
|
return toc_entries, "heuristic" |
|
|
|
|
|
print("⚠️ No TOC detected — invoking adaptive AI fallback...") |
|
|
toc_ai = adaptive_fallback_toc(text) |
|
|
if toc_ai: |
|
|
print(f"✨ AI-inferred TOC generated with {len(toc_ai)} entries.") |
|
|
return toc_ai, "ai_inferred" |
|
|
|
|
|
print("❌ No TOC could be detected or inferred.") |
|
|
return [], "none" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def chunk_text(text: str, chunk_size: int = None, overlap: int = None) -> list: |
|
|
""" |
|
|
Enhanced chunking for structured enterprise PDFs. |
|
|
Auto-selects chunk size and keeps procedural context intact. |
|
|
""" |
|
|
text_length = len(text) |
|
|
if chunk_size is None: |
|
|
if text_length > 200000: |
|
|
chunk_size, overlap = 2000, 250 |
|
|
elif text_length > 50000: |
|
|
chunk_size, overlap = 1500, 200 |
|
|
else: |
|
|
chunk_size, overlap = 1000, 150 |
|
|
elif overlap is None: |
|
|
overlap = 150 |
|
|
|
|
|
print(f"⚙️ Auto-selected chunk_size={chunk_size}, overlap={overlap} (len={text_length})") |
|
|
|
|
|
text = re.sub(r"\s+", " ", text.strip()) |
|
|
section_pattern = ( |
|
|
r"(?=(?:\n?\d+(?:\.\d+){0,3}\s+[A-Z][^\n]{3,100})|(?:Step\s*\d+[:.\s]))" |
|
|
) |
|
|
sections = re.split(section_pattern, text) |
|
|
sections = [s.strip() for s in sections if s.strip()] |
|
|
|
|
|
chunks = [] |
|
|
for section in sections: |
|
|
section = re.sub(r"\n\s*[-•▪‣]\s*", " • ", section) |
|
|
bullets = re.split(r"(?=\s*[-•▪‣]\s)", section) |
|
|
bullets = [b.strip() for b in bullets if b.strip()] |
|
|
|
|
|
if len(bullets) > 2: |
|
|
combined = " ".join(bullets) |
|
|
if len(combined) > chunk_size * 1.5: |
|
|
for i in range(0, len(bullets), 6): |
|
|
block = " ".join(bullets[i:i+6]) |
|
|
chunks.append(block.strip()) |
|
|
else: |
|
|
chunks.append(combined.strip()) |
|
|
else: |
|
|
chunks.extend(_split_by_sentence(section, chunk_size, overlap)) |
|
|
|
|
|
chunks = _merge_small_chunks(chunks, min_len=200) |
|
|
|
|
|
|
|
|
final_chunks = [] |
|
|
for i, ch in enumerate(chunks): |
|
|
if i == 0: |
|
|
final_chunks.append(ch) |
|
|
else: |
|
|
prev_tail = chunks[i - 1][-overlap:] if overlap > 0 else "" |
|
|
final_chunks.append((prev_tail + " " + ch).strip()) |
|
|
|
|
|
print(f"✅ Final chunks created (continuity-aware): {len(final_chunks)}") |
|
|
return final_chunks |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def _split_by_sentence(text, chunk_size=800, overlap=80): |
|
|
sentences = re.split(r"(?<=[.!?])\s+", text) |
|
|
chunks, current = [], "" |
|
|
for sent in sentences: |
|
|
if len(current) + len(sent) + 1 <= chunk_size: |
|
|
current += " " + sent |
|
|
else: |
|
|
if current.strip(): |
|
|
chunks.append(current.strip()) |
|
|
overlap_part = current[-overlap:] if overlap > 0 else "" |
|
|
current = overlap_part + " " + sent |
|
|
if current.strip(): |
|
|
chunks.append(current.strip()) |
|
|
return chunks |
|
|
|
|
|
|
|
|
def _merge_small_chunks(chunks, min_len=150): |
|
|
merged, buffer = [], "" |
|
|
for ch in chunks: |
|
|
if len(ch) < min_len: |
|
|
buffer += " " + ch |
|
|
else: |
|
|
if buffer: |
|
|
merged.append(buffer.strip()) |
|
|
buffer = "" |
|
|
merged.append(ch.strip()) |
|
|
if buffer: |
|
|
merged.append(buffer.strip()) |
|
|
return merged |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
pdf_path = "sample.pdf" |
|
|
text, toc, source = extract_text_from_pdf(pdf_path) |
|
|
print("\n📚 TOC Preview:", toc[:5]) |
|
|
chunks = chunk_text(text) |
|
|
print(f"\n✅ {len(chunks)} chunks created.") |
|
|
for i, c in enumerate(chunks[:5], 1): |
|
|
print(f"\n--- Chunk {i} ---\n{c[:500]}...\n") |
|
|
|