LearnFlow-AI / agents /planner /preprocess.py
Kyo-Kai's picture
Public Release
7bd8010
import nltk
from nltk.tokenize import sent_tokenize
from typing import List, Dict, Optional
import re
try:
from llama_index.core.schema import TextNode
except ImportError:
class TextNode:
def __init__(self, text: str, metadata: Optional[Dict] = None):
self.text = text
self.metadata = metadata if metadata is not None else {}
def __repr__(self):
return f"TextNode(text='{self.text[:50]}...', metadata={self.metadata})"
try:
nltk.data.find('tokenizers/punkt')
except Exception:
try:
nltk.download('punkt', quiet=True)
except Exception as e:
print(f"Warning: Failed to download nltk 'punkt' tokenizer. Error: {e}")
def pre_segment_into_major_units(text: str) -> List[Dict[str, str]]:
"""Segments text into major units based on patterns like 'Unit X: Title'."""
keywords = ["Unit", "Chapter", "Section", "Module", "Part"]
keyword_pattern = "|".join(keywords)
try:
unit_delimiters = list(re.finditer(
r"^((?:%s)\s*\d+:\s*.*?)(?=\n|$)" % keyword_pattern,
text,
re.MULTILINE | re.IGNORECASE
))
except re.error as e:
print(f"Regex error in pre_segment_into_major_units: {e}")
unit_delimiters = []
if not unit_delimiters:
if text.strip():
return [{
"title_line": "Full Document Content",
"content": text.strip(),
"is_primary_unit": False
}]
return []
segmented_units = []
for i, match_obj in enumerate(unit_delimiters):
unit_title_line = match_obj.group(1).strip()
content_start_index = match_obj.end()
if i + 1 < len(unit_delimiters):
content_end_index = unit_delimiters[i+1].start()
else:
content_end_index = len(text)
unit_content = text[content_start_index:content_end_index].strip()
if unit_content:
segmented_units.append({
"title_line": unit_title_line,
"content": unit_content,
"is_primary_unit": True
})
return segmented_units
def smart_chunk_with_content_awareness(
text: str,
max_chunk_chars: int = 6000,
overlap_chars: int = 200,
metadata: Optional[Dict] = None
) -> List[TextNode]:
"""Splits text into chunks based on paragraphs with content awareness."""
if not text.strip():
return []
raw_paragraphs = [p.strip() for p in text.split('\n\n') if p.strip()]
if not raw_paragraphs:
raw_paragraphs = [text.strip()]
chunks = []
current_chunk_content = ""
for para_text in raw_paragraphs:
# Handle oversized paragraphs
if len(para_text) > max_chunk_chars:
if current_chunk_content.strip():
chunks.append(TextNode(text=current_chunk_content, metadata=dict(metadata or {})))
current_chunk_content = ""
# Split large paragraph at sentence boundaries
chunks.extend(_split_oversized_paragraph(para_text, max_chunk_chars, metadata))
continue
# Check if adding paragraph would exceed limit
separator_len = len("\n\n") if current_chunk_content else 0
if current_chunk_content and (len(current_chunk_content) + separator_len + len(para_text) > max_chunk_chars):
chunks.append(TextNode(text=current_chunk_content, metadata=dict(metadata or {})))
# Extract overlap using your existing logic
overlap_text = _extract_overlap_content(current_chunk_content, overlap_chars)
current_chunk_content = overlap_text
if current_chunk_content and para_text:
current_chunk_content += "\n\n" + para_text
elif para_text:
current_chunk_content = para_text
else:
# Add paragraph to current chunk
if current_chunk_content:
current_chunk_content += "\n\n" + para_text
else:
current_chunk_content = para_text
if current_chunk_content.strip():
chunks.append(TextNode(text=current_chunk_content, metadata=dict(metadata or {})))
return chunks
def _split_oversized_paragraph(para_text: str, max_chunk_chars: int, metadata: Optional[Dict]) -> List[TextNode]:
"""Split oversized paragraph at sentence boundaries when possible."""
try:
sentences = sent_tokenize(para_text)
except Exception:
# Fallback to simple splitting
return [TextNode(text=para_text[i:i+max_chunk_chars], metadata=dict(metadata or {}))
for i in range(0, len(para_text), max_chunk_chars)]
chunks = []
current_content = ""
for sentence in sentences:
if len(sentence) > max_chunk_chars:
# Handle extremely long sentences
if current_content:
chunks.append(TextNode(text=current_content, metadata=dict(metadata or {})))
current_content = ""
# Split long sentence by characters
for i in range(0, len(sentence), max_chunk_chars):
chunk_text = sentence[i:i+max_chunk_chars]
chunks.append(TextNode(text=chunk_text, metadata=dict(metadata or {})))
elif current_content and len(current_content) + len(sentence) + 1 > max_chunk_chars:
chunks.append(TextNode(text=current_content, metadata=dict(metadata or {})))
current_content = sentence
else:
current_content += (" " if current_content else "") + sentence
if current_content:
chunks.append(TextNode(text=current_content, metadata=dict(metadata or {})))
return chunks
def _extract_overlap_content(current_chunk_content: str, overlap_chars: int) -> str:
"""Extract overlap content using your existing logic."""
if overlap_chars <= 0 or not current_chunk_content:
return ""
try:
sentences = sent_tokenize(current_chunk_content)
temp_overlap_content = ""
for s_idx in range(len(sentences) - 1, -1, -1):
s = sentences[s_idx]
test_length = len(s) + len(temp_overlap_content) + (1 if temp_overlap_content else 0)
if test_length <= overlap_chars:
temp_overlap_content = s + (" " if temp_overlap_content else "") + temp_overlap_content
else:
if not temp_overlap_content and len(s) > overlap_chars:
temp_overlap_content = s[-overlap_chars:]
break
return temp_overlap_content.strip()
except Exception:
if len(current_chunk_content) > overlap_chars:
return current_chunk_content[-overlap_chars:]
else:
return current_chunk_content