PolicySummarizer / tools /text_chunker.py
Nadasr's picture
Upload 3 files
81ddc8e verified
"""
Text Chunker Tool - Splits and processes long policy texts
"""
from crewai.tools import tool
from typing import List
import sys
import os
# Add parent directory to path for imports
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from utils.logger import get_logs
logger = get_logs("TextChunkerTool")
# Configuration
DEFAULT_CHUNK_SIZE = 4000
DEFAULT_OVERLAP = 200
@tool("text_chunker")
def text_chunker(text: str, chunk_size: int = DEFAULT_CHUNK_SIZE) -> str:
"""
Splits long text into manageable chunks for analysis.
Use this tool when the policy text is too long to process at once.
Args:
text: The text to split into chunks
chunk_size: Maximum size of each chunk (default 4000)
Returns:
Chunked text with section markers
"""
logger.log_step("Starting text chunking", f"Input length: {len(text)}")
if not text or len(text.strip()) == 0:
logger.log_error("Empty text provided")
return "Error: No text provided to chunk"
# If text is short enough, return as is
if len(text) <= chunk_size:
logger.log_result("Chunking", "Text short enough, no chunking needed")
return text
chunks = []
paragraphs = text.split('\n\n')
current_chunk = ""
chunk_num = 1
for para in paragraphs:
# If adding this paragraph would exceed chunk size
if len(current_chunk) + len(para) + 2 > chunk_size:
if current_chunk:
chunks.append(f"[Section {chunk_num}]\n{current_chunk.strip()}")
chunk_num += 1
current_chunk = para
else:
# Paragraph itself is too long, split by sentences
sentences = para.replace('. ', '.\n').split('\n')
for sentence in sentences:
if len(current_chunk) + len(sentence) + 1 > chunk_size:
if current_chunk:
chunks.append(f"[Section {chunk_num}]\n{current_chunk.strip()}")
chunk_num += 1
current_chunk = sentence
else:
current_chunk += " " + sentence if current_chunk else sentence
else:
current_chunk += "\n\n" + para if current_chunk else para
# Add remaining content
if current_chunk:
chunks.append(f"[Section {chunk_num}]\n{current_chunk.strip()}")
result = "\n\n---\n\n".join(chunks)
logger.log_tool_call("text_chunker", "success")
logger.log_result("Chunking", f"Split into {len(chunks)} sections")
return result
@tool("extract_sections")
def extract_sections(text: str) -> str:
"""
Extracts and identifies key sections from policy text.
Looks for common policy sections like Privacy, Data Collection, User Rights, etc.
Args:
text: The policy text to analyze
Returns:
Identified sections with their content
"""
logger.log_step("Extracting sections from policy")
# Common section headers in policies
section_keywords = [
"privacy", "data collection", "data we collect", "information we collect",
"how we use", "data use", "sharing", "third party", "third-party",
"your rights", "user rights", "your choices", "opt-out", "opt out",
"cookies", "tracking", "retention", "how long", "security",
"children", "minors", "contact", "changes", "updates"
]
lines = text.split('\n')
sections = {}
current_section = "Introduction"
current_content = []
for line in lines:
line_lower = line.lower().strip()
# Check if this line is a section header
is_header = False
for keyword in section_keywords:
if keyword in line_lower and len(line) < 100:
is_header = True
# Save previous section
if current_content:
sections[current_section] = '\n'.join(current_content)
current_section = line.strip()
current_content = []
break
if not is_header:
current_content.append(line)
# Save last section
if current_content:
sections[current_section] = '\n'.join(current_content)
# Format output
result = "Identified Policy Sections:\n\n"
for section_name, content in sections.items():
preview = content[:300] + "..." if len(content) > 300 else content
result += f"## {section_name}\n{preview}\n\n"
logger.log_tool_call("extract_sections", "success")
logger.log_result("Section extraction", f"Found {len(sections)} sections")
return result