File size: 4,796 Bytes
81ddc8e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
"""
Text Chunker Tool - Splits and processes long policy texts
"""
from crewai.tools import tool
from typing import List
import sys
import os

# Add parent directory to path for imports
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))

from utils.logger import get_logs

logger = get_logs("TextChunkerTool")

# Configuration
DEFAULT_CHUNK_SIZE = 4000
DEFAULT_OVERLAP = 200


@tool("text_chunker")
def text_chunker(text: str, chunk_size: int = DEFAULT_CHUNK_SIZE) -> str:
    """
    Splits long text into manageable chunks for analysis.
    Use this tool when the policy text is too long to process at once.
    
    Args:
        text: The text to split into chunks
        chunk_size: Maximum size of each chunk (default 4000)
        
    Returns:
        Chunked text with section markers
    """
    logger.log_step("Starting text chunking", f"Input length: {len(text)}")
    
    if not text or len(text.strip()) == 0:
        logger.log_error("Empty text provided")
        return "Error: No text provided to chunk"
    
    # If text is short enough, return as is
    if len(text) <= chunk_size:
        logger.log_result("Chunking", "Text short enough, no chunking needed")
        return text
    
    chunks = []
    paragraphs = text.split('\n\n')
    current_chunk = ""
    chunk_num = 1
    
    for para in paragraphs:
        # If adding this paragraph would exceed chunk size
        if len(current_chunk) + len(para) + 2 > chunk_size:
            if current_chunk:
                chunks.append(f"[Section {chunk_num}]\n{current_chunk.strip()}")
                chunk_num += 1
                current_chunk = para
            else:
                # Paragraph itself is too long, split by sentences
                sentences = para.replace('. ', '.\n').split('\n')
                for sentence in sentences:
                    if len(current_chunk) + len(sentence) + 1 > chunk_size:
                        if current_chunk:
                            chunks.append(f"[Section {chunk_num}]\n{current_chunk.strip()}")
                            chunk_num += 1
                        current_chunk = sentence
                    else:
                        current_chunk += " " + sentence if current_chunk else sentence
        else:
            current_chunk += "\n\n" + para if current_chunk else para
    
    # Add remaining content
    if current_chunk:
        chunks.append(f"[Section {chunk_num}]\n{current_chunk.strip()}")
    
    result = "\n\n---\n\n".join(chunks)
    
    logger.log_tool_call("text_chunker", "success")
    logger.log_result("Chunking", f"Split into {len(chunks)} sections")
    
    return result


@tool("extract_sections")
def extract_sections(text: str) -> str:
    """
    Extracts and identifies key sections from policy text.
    Looks for common policy sections like Privacy, Data Collection, User Rights, etc.
    
    Args:
        text: The policy text to analyze
        
    Returns:
        Identified sections with their content
    """
    logger.log_step("Extracting sections from policy")
    
    # Common section headers in policies
    section_keywords = [
        "privacy", "data collection", "data we collect", "information we collect",
        "how we use", "data use", "sharing", "third party", "third-party",
        "your rights", "user rights", "your choices", "opt-out", "opt out",
        "cookies", "tracking", "retention", "how long", "security",
        "children", "minors", "contact", "changes", "updates"
    ]
    
    lines = text.split('\n')
    sections = {}
    current_section = "Introduction"
    current_content = []
    
    for line in lines:
        line_lower = line.lower().strip()
        
        # Check if this line is a section header
        is_header = False
        for keyword in section_keywords:
            if keyword in line_lower and len(line) < 100:
                is_header = True
                # Save previous section
                if current_content:
                    sections[current_section] = '\n'.join(current_content)
                current_section = line.strip()
                current_content = []
                break
        
        if not is_header:
            current_content.append(line)
    
    # Save last section
    if current_content:
        sections[current_section] = '\n'.join(current_content)
    
    # Format output
    result = "Identified Policy Sections:\n\n"
    for section_name, content in sections.items():
        preview = content[:300] + "..." if len(content) > 300 else content
        result += f"## {section_name}\n{preview}\n\n"
    
    logger.log_tool_call("extract_sections", "success")
    logger.log_result("Section extraction", f"Found {len(sections)} sections")
    
    return result