""" Text Chunker Tool - Splits and processes long policy texts """ from crewai.tools import tool from typing import List import sys import os # Add parent directory to path for imports sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from utils.logger import get_logs logger = get_logs("TextChunkerTool") # Configuration DEFAULT_CHUNK_SIZE = 4000 DEFAULT_OVERLAP = 200 @tool("text_chunker") def text_chunker(text: str, chunk_size: int = DEFAULT_CHUNK_SIZE) -> str: """ Splits long text into manageable chunks for analysis. Use this tool when the policy text is too long to process at once. Args: text: The text to split into chunks chunk_size: Maximum size of each chunk (default 4000) Returns: Chunked text with section markers """ logger.log_step("Starting text chunking", f"Input length: {len(text)}") if not text or len(text.strip()) == 0: logger.log_error("Empty text provided") return "Error: No text provided to chunk" # If text is short enough, return as is if len(text) <= chunk_size: logger.log_result("Chunking", "Text short enough, no chunking needed") return text chunks = [] paragraphs = text.split('\n\n') current_chunk = "" chunk_num = 1 for para in paragraphs: # If adding this paragraph would exceed chunk size if len(current_chunk) + len(para) + 2 > chunk_size: if current_chunk: chunks.append(f"[Section {chunk_num}]\n{current_chunk.strip()}") chunk_num += 1 current_chunk = para else: # Paragraph itself is too long, split by sentences sentences = para.replace('. ', '.\n').split('\n') for sentence in sentences: if len(current_chunk) + len(sentence) + 1 > chunk_size: if current_chunk: chunks.append(f"[Section {chunk_num}]\n{current_chunk.strip()}") chunk_num += 1 current_chunk = sentence else: current_chunk += " " + sentence if current_chunk else sentence else: current_chunk += "\n\n" + para if current_chunk else para # Add remaining content if current_chunk: chunks.append(f"[Section {chunk_num}]\n{current_chunk.strip()}") result = "\n\n---\n\n".join(chunks) logger.log_tool_call("text_chunker", "success") logger.log_result("Chunking", f"Split into {len(chunks)} sections") return result @tool("extract_sections") def extract_sections(text: str) -> str: """ Extracts and identifies key sections from policy text. Looks for common policy sections like Privacy, Data Collection, User Rights, etc. Args: text: The policy text to analyze Returns: Identified sections with their content """ logger.log_step("Extracting sections from policy") # Common section headers in policies section_keywords = [ "privacy", "data collection", "data we collect", "information we collect", "how we use", "data use", "sharing", "third party", "third-party", "your rights", "user rights", "your choices", "opt-out", "opt out", "cookies", "tracking", "retention", "how long", "security", "children", "minors", "contact", "changes", "updates" ] lines = text.split('\n') sections = {} current_section = "Introduction" current_content = [] for line in lines: line_lower = line.lower().strip() # Check if this line is a section header is_header = False for keyword in section_keywords: if keyword in line_lower and len(line) < 100: is_header = True # Save previous section if current_content: sections[current_section] = '\n'.join(current_content) current_section = line.strip() current_content = [] break if not is_header: current_content.append(line) # Save last section if current_content: sections[current_section] = '\n'.join(current_content) # Format output result = "Identified Policy Sections:\n\n" for section_name, content in sections.items(): preview = content[:300] + "..." if len(content) > 300 else content result += f"## {section_name}\n{preview}\n\n" logger.log_tool_call("extract_sections", "success") logger.log_result("Section extraction", f"Found {len(sections)} sections") return result