Spaces:
Sleeping
Sleeping
File size: 4,796 Bytes
81ddc8e | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 | """
Text Chunker Tool - Splits and processes long policy texts
"""
from crewai.tools import tool
from typing import List
import sys
import os
# Add parent directory to path for imports
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from utils.logger import get_logs
logger = get_logs("TextChunkerTool")
# Configuration
DEFAULT_CHUNK_SIZE = 4000
DEFAULT_OVERLAP = 200
@tool("text_chunker")
def text_chunker(text: str, chunk_size: int = DEFAULT_CHUNK_SIZE) -> str:
"""
Splits long text into manageable chunks for analysis.
Use this tool when the policy text is too long to process at once.
Args:
text: The text to split into chunks
chunk_size: Maximum size of each chunk (default 4000)
Returns:
Chunked text with section markers
"""
logger.log_step("Starting text chunking", f"Input length: {len(text)}")
if not text or len(text.strip()) == 0:
logger.log_error("Empty text provided")
return "Error: No text provided to chunk"
# If text is short enough, return as is
if len(text) <= chunk_size:
logger.log_result("Chunking", "Text short enough, no chunking needed")
return text
chunks = []
paragraphs = text.split('\n\n')
current_chunk = ""
chunk_num = 1
for para in paragraphs:
# If adding this paragraph would exceed chunk size
if len(current_chunk) + len(para) + 2 > chunk_size:
if current_chunk:
chunks.append(f"[Section {chunk_num}]\n{current_chunk.strip()}")
chunk_num += 1
current_chunk = para
else:
# Paragraph itself is too long, split by sentences
sentences = para.replace('. ', '.\n').split('\n')
for sentence in sentences:
if len(current_chunk) + len(sentence) + 1 > chunk_size:
if current_chunk:
chunks.append(f"[Section {chunk_num}]\n{current_chunk.strip()}")
chunk_num += 1
current_chunk = sentence
else:
current_chunk += " " + sentence if current_chunk else sentence
else:
current_chunk += "\n\n" + para if current_chunk else para
# Add remaining content
if current_chunk:
chunks.append(f"[Section {chunk_num}]\n{current_chunk.strip()}")
result = "\n\n---\n\n".join(chunks)
logger.log_tool_call("text_chunker", "success")
logger.log_result("Chunking", f"Split into {len(chunks)} sections")
return result
@tool("extract_sections")
def extract_sections(text: str) -> str:
"""
Extracts and identifies key sections from policy text.
Looks for common policy sections like Privacy, Data Collection, User Rights, etc.
Args:
text: The policy text to analyze
Returns:
Identified sections with their content
"""
logger.log_step("Extracting sections from policy")
# Common section headers in policies
section_keywords = [
"privacy", "data collection", "data we collect", "information we collect",
"how we use", "data use", "sharing", "third party", "third-party",
"your rights", "user rights", "your choices", "opt-out", "opt out",
"cookies", "tracking", "retention", "how long", "security",
"children", "minors", "contact", "changes", "updates"
]
lines = text.split('\n')
sections = {}
current_section = "Introduction"
current_content = []
for line in lines:
line_lower = line.lower().strip()
# Check if this line is a section header
is_header = False
for keyword in section_keywords:
if keyword in line_lower and len(line) < 100:
is_header = True
# Save previous section
if current_content:
sections[current_section] = '\n'.join(current_content)
current_section = line.strip()
current_content = []
break
if not is_header:
current_content.append(line)
# Save last section
if current_content:
sections[current_section] = '\n'.join(current_content)
# Format output
result = "Identified Policy Sections:\n\n"
for section_name, content in sections.items():
preview = content[:300] + "..." if len(content) > 300 else content
result += f"## {section_name}\n{preview}\n\n"
logger.log_tool_call("extract_sections", "success")
logger.log_result("Section extraction", f"Found {len(sections)} sections")
return result
|