Spaces:
Sleeping
Sleeping
| """ | |
| Text Chunker Tool - Splits and processes long policy texts | |
| """ | |
| from crewai.tools import tool | |
| from typing import List | |
| import sys | |
| import os | |
| # Add parent directory to path for imports | |
| sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) | |
| from utils.logger import get_logs | |
| logger = get_logs("TextChunkerTool") | |
| # Configuration | |
| DEFAULT_CHUNK_SIZE = 4000 | |
| DEFAULT_OVERLAP = 200 | |
| def text_chunker(text: str, chunk_size: int = DEFAULT_CHUNK_SIZE) -> str: | |
| """ | |
| Splits long text into manageable chunks for analysis. | |
| Use this tool when the policy text is too long to process at once. | |
| Args: | |
| text: The text to split into chunks | |
| chunk_size: Maximum size of each chunk (default 4000) | |
| Returns: | |
| Chunked text with section markers | |
| """ | |
| logger.log_step("Starting text chunking", f"Input length: {len(text)}") | |
| if not text or len(text.strip()) == 0: | |
| logger.log_error("Empty text provided") | |
| return "Error: No text provided to chunk" | |
| # If text is short enough, return as is | |
| if len(text) <= chunk_size: | |
| logger.log_result("Chunking", "Text short enough, no chunking needed") | |
| return text | |
| chunks = [] | |
| paragraphs = text.split('\n\n') | |
| current_chunk = "" | |
| chunk_num = 1 | |
| for para in paragraphs: | |
| # If adding this paragraph would exceed chunk size | |
| if len(current_chunk) + len(para) + 2 > chunk_size: | |
| if current_chunk: | |
| chunks.append(f"[Section {chunk_num}]\n{current_chunk.strip()}") | |
| chunk_num += 1 | |
| current_chunk = para | |
| else: | |
| # Paragraph itself is too long, split by sentences | |
| sentences = para.replace('. ', '.\n').split('\n') | |
| for sentence in sentences: | |
| if len(current_chunk) + len(sentence) + 1 > chunk_size: | |
| if current_chunk: | |
| chunks.append(f"[Section {chunk_num}]\n{current_chunk.strip()}") | |
| chunk_num += 1 | |
| current_chunk = sentence | |
| else: | |
| current_chunk += " " + sentence if current_chunk else sentence | |
| else: | |
| current_chunk += "\n\n" + para if current_chunk else para | |
| # Add remaining content | |
| if current_chunk: | |
| chunks.append(f"[Section {chunk_num}]\n{current_chunk.strip()}") | |
| result = "\n\n---\n\n".join(chunks) | |
| logger.log_tool_call("text_chunker", "success") | |
| logger.log_result("Chunking", f"Split into {len(chunks)} sections") | |
| return result | |
| def extract_sections(text: str) -> str: | |
| """ | |
| Extracts and identifies key sections from policy text. | |
| Looks for common policy sections like Privacy, Data Collection, User Rights, etc. | |
| Args: | |
| text: The policy text to analyze | |
| Returns: | |
| Identified sections with their content | |
| """ | |
| logger.log_step("Extracting sections from policy") | |
| # Common section headers in policies | |
| section_keywords = [ | |
| "privacy", "data collection", "data we collect", "information we collect", | |
| "how we use", "data use", "sharing", "third party", "third-party", | |
| "your rights", "user rights", "your choices", "opt-out", "opt out", | |
| "cookies", "tracking", "retention", "how long", "security", | |
| "children", "minors", "contact", "changes", "updates" | |
| ] | |
| lines = text.split('\n') | |
| sections = {} | |
| current_section = "Introduction" | |
| current_content = [] | |
| for line in lines: | |
| line_lower = line.lower().strip() | |
| # Check if this line is a section header | |
| is_header = False | |
| for keyword in section_keywords: | |
| if keyword in line_lower and len(line) < 100: | |
| is_header = True | |
| # Save previous section | |
| if current_content: | |
| sections[current_section] = '\n'.join(current_content) | |
| current_section = line.strip() | |
| current_content = [] | |
| break | |
| if not is_header: | |
| current_content.append(line) | |
| # Save last section | |
| if current_content: | |
| sections[current_section] = '\n'.join(current_content) | |
| # Format output | |
| result = "Identified Policy Sections:\n\n" | |
| for section_name, content in sections.items(): | |
| preview = content[:300] + "..." if len(content) > 300 else content | |
| result += f"## {section_name}\n{preview}\n\n" | |
| logger.log_tool_call("extract_sections", "success") | |
| logger.log_result("Section extraction", f"Found {len(sections)} sections") | |
| return result | |