Update src/ingestion.py
Browse files- src/ingestion.py +51 -30
src/ingestion.py
CHANGED
|
@@ -21,7 +21,7 @@ def extract_text_from_pdf(file_path: str) -> str:
|
|
| 21 |
page_text = page.get_text("text").strip()
|
| 22 |
if not page_text:
|
| 23 |
# Fallback: extract raw blocks (helps with weird PDFs)
|
| 24 |
-
blocks =
|
| 25 |
page_text = " ".join(block[4] for block in blocks if isinstance(block[4], str))
|
| 26 |
text += page_text + "\n"
|
| 27 |
except Exception as e:
|
|
@@ -33,44 +33,65 @@ def extract_text_from_pdf(file_path: str) -> str:
|
|
| 33 |
|
| 34 |
|
| 35 |
# -----------------------------
|
| 36 |
-
# SMART CHUNKING (Context Aware)
|
| 37 |
# -----------------------------
|
| 38 |
-
def chunk_text(text: str, chunk_size: int = 800, overlap: int =
|
| 39 |
"""
|
| 40 |
-
Splits text into overlapping,
|
| 41 |
-
|
|
|
|
| 42 |
|
| 43 |
Args:
|
| 44 |
text (str): Input text.
|
| 45 |
chunk_size (int): Max characters per chunk (default: 800).
|
| 46 |
-
overlap (int): Overlapping characters for continuity (default:
|
| 47 |
|
| 48 |
Returns:
|
| 49 |
list[str]: Chunked text segments.
|
| 50 |
"""
|
| 51 |
-
# Clean text
|
| 52 |
text = re.sub(r'\s+', ' ', text.strip())
|
| 53 |
|
| 54 |
-
#
|
| 55 |
-
|
|
|
|
| 56 |
|
| 57 |
-
chunks
|
| 58 |
|
| 59 |
-
|
| 60 |
-
|
| 61 |
-
|
| 62 |
-
|
| 63 |
-
|
| 64 |
-
|
| 65 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 66 |
|
| 67 |
-
|
| 68 |
-
|
| 69 |
-
|
| 70 |
-
|
| 71 |
-
|
| 72 |
-
|
| 73 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 74 |
|
| 75 |
return chunks
|
| 76 |
|
|
@@ -80,12 +101,12 @@ def chunk_text(text: str, chunk_size: int = 800, overlap: int = 150) -> list:
|
|
| 80 |
# -----------------------------
|
| 81 |
if __name__ == "__main__":
|
| 82 |
sample_text = """
|
| 83 |
-
|
| 84 |
-
|
| 85 |
-
|
| 86 |
-
|
| 87 |
"""
|
| 88 |
-
chunks = chunk_text(sample_text, chunk_size=
|
| 89 |
print(f"✅ Chunks created: {len(chunks)}")
|
| 90 |
for i, c in enumerate(chunks, 1):
|
| 91 |
-
print(f"\n--- Chunk {i}
|
|
|
|
| 21 |
page_text = page.get_text("text").strip()
|
| 22 |
if not page_text:
|
| 23 |
# Fallback: extract raw blocks (helps with weird PDFs)
|
| 24 |
+
blocks = pdf.get_text("blocks")
|
| 25 |
page_text = " ".join(block[4] for block in blocks if isinstance(block[4], str))
|
| 26 |
text += page_text + "\n"
|
| 27 |
except Exception as e:
|
|
|
|
| 33 |
|
| 34 |
|
| 35 |
# -----------------------------
|
| 36 |
+
# SMART CHUNKING (Step-Aware + Context Aware)
|
| 37 |
# -----------------------------
|
| 38 |
+
def chunk_text(text: str, chunk_size: int = 800, overlap: int = 200) -> list:
|
| 39 |
"""
|
| 40 |
+
Splits text into overlapping, structured chunks.
|
| 41 |
+
Detects procedural steps (e.g., 'Step 1:', 'STEP 2.') and keeps them intact.
|
| 42 |
+
Falls back to sentence-based chunking for normal paragraphs.
|
| 43 |
|
| 44 |
Args:
|
| 45 |
text (str): Input text.
|
| 46 |
chunk_size (int): Max characters per chunk (default: 800).
|
| 47 |
+
overlap (int): Overlapping characters for continuity (default: 200).
|
| 48 |
|
| 49 |
Returns:
|
| 50 |
list[str]: Chunked text segments.
|
| 51 |
"""
|
| 52 |
+
# Clean and normalize text
|
| 53 |
text = re.sub(r'\s+', ' ', text.strip())
|
| 54 |
|
| 55 |
+
# Try to detect “Step” patterns
|
| 56 |
+
step_splits = re.split(r'(?=(?:Step\s*\d+[:.\s]))', text, flags=re.IGNORECASE)
|
| 57 |
+
step_splits = [s.strip() for s in step_splits if s.strip()]
|
| 58 |
|
| 59 |
+
chunks = []
|
| 60 |
|
| 61 |
+
# Case 1️⃣: Document has visible “Step” structure
|
| 62 |
+
if len(step_splits) > 1:
|
| 63 |
+
for step in step_splits:
|
| 64 |
+
if len(step) > chunk_size:
|
| 65 |
+
# If a step is too long → split by sentences within that step
|
| 66 |
+
sentences = re.split(r'(?<=[.!?])\s+', step)
|
| 67 |
+
current = ""
|
| 68 |
+
for sent in sentences:
|
| 69 |
+
if len(current) + len(sent) + 1 <= chunk_size:
|
| 70 |
+
current += " " + sent
|
| 71 |
+
else:
|
| 72 |
+
if current.strip():
|
| 73 |
+
chunks.append(current.strip())
|
| 74 |
+
overlap_part = current[-overlap:] if overlap > 0 else ""
|
| 75 |
+
current = overlap_part + " " + sent
|
| 76 |
+
if current.strip():
|
| 77 |
+
chunks.append(current.strip())
|
| 78 |
+
else:
|
| 79 |
+
chunks.append(step.strip())
|
| 80 |
|
| 81 |
+
# Case 2️⃣: No “Step” keywords — fall back to sentence-based chunking
|
| 82 |
+
else:
|
| 83 |
+
sentences = re.split(r'(?<=[.!?])\s+', text)
|
| 84 |
+
current = ""
|
| 85 |
+
for sent in sentences:
|
| 86 |
+
if len(current) + len(sent) + 1 <= chunk_size:
|
| 87 |
+
current += " " + sent
|
| 88 |
+
else:
|
| 89 |
+
if current.strip():
|
| 90 |
+
chunks.append(current.strip())
|
| 91 |
+
overlap_part = current[-overlap:] if overlap > 0 else ""
|
| 92 |
+
current = overlap_part + " " + sent
|
| 93 |
+
if current.strip():
|
| 94 |
+
chunks.append(current.strip())
|
| 95 |
|
| 96 |
return chunks
|
| 97 |
|
|
|
|
| 101 |
# -----------------------------
|
| 102 |
if __name__ == "__main__":
|
| 103 |
sample_text = """
|
| 104 |
+
Step 1: Open the application.
|
| 105 |
+
Step 2: Navigate to the dashboard.
|
| 106 |
+
Step 3: Review the summary and click ‘Export’.
|
| 107 |
+
If the steps are missing, the function should still chunk by sentences.
|
| 108 |
"""
|
| 109 |
+
chunks = chunk_text(sample_text, chunk_size=100, overlap=20)
|
| 110 |
print(f"✅ Chunks created: {len(chunks)}")
|
| 111 |
for i, c in enumerate(chunks, 1):
|
| 112 |
+
print(f"\n--- Chunk {i} ---\n{c}")
|