Spaces:
Sleeping
Sleeping
File size: 5,829 Bytes
81ddc8e | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 | """
Text Analyzer Tool - Analyzes policy text to identify sections and concerns
"""
from crewai.tools import tool
from typing import List, Dict
import re
import time
from utils.logger import log_agent_action
# Keywords for identifying sections
SECTION_KEYWORDS = {
'data_collection': ['collect', 'gather', 'information we collect', 'personal data'],
'data_sharing': ['share', 'third party', 'partners', 'disclose', 'sell'],
'user_rights': ['your rights', 'opt-out', 'delete', 'access your data', 'gdpr', 'ccpa'],
'data_retention': ['retain', 'retention', 'how long', 'keep your'],
'security': ['security', 'protect', 'encryption', 'safeguard'],
'cookies': ['cookie', 'tracking', 'analytics'],
}
# Red flag keywords
RED_FLAG_KEYWORDS = [
'sell your data', 'sell your information', 'share with third parties',
'advertising partners', 'indefinitely', 'without notice',
'at our discretion', 'waive your right', 'arbitration', 'class action waiver'
]
def chunk_text(text: str, chunk_size: int = 2000, overlap: int = 200) -> List[str]:
"""Split text into overlapping chunks."""
if len(text) <= chunk_size:
return [text]
chunks = []
start = 0
while start < len(text):
end = start + chunk_size
if end < len(text):
para_break = text.rfind('\n\n', start, end)
if para_break > start + chunk_size // 2:
end = para_break
chunks.append(text[start:end].strip())
start = end - overlap
if start >= len(text) - overlap:
break
return chunks
def identify_sections(text: str) -> Dict[str, List[str]]:
"""Identify relevant sections in the policy text."""
sections = {key: [] for key in SECTION_KEYWORDS}
paragraphs = re.split(r'\n{2,}', text)
for paragraph in paragraphs:
para_lower = paragraph.lower()
for section_type, keywords in SECTION_KEYWORDS.items():
for keyword in keywords:
if keyword in para_lower:
excerpt = paragraph[:500] + "..." if len(paragraph) > 500 else paragraph
if excerpt not in sections[section_type]:
sections[section_type].append(excerpt)
break
return sections
def find_red_flags(text: str) -> List[Dict[str, str]]:
"""Find potential concerns in the policy."""
red_flags = []
text_lower = text.lower()
for keyword in RED_FLAG_KEYWORDS:
if keyword in text_lower:
idx = text_lower.find(keyword)
start = max(0, idx - 100)
end = min(len(text), idx + len(keyword) + 100)
context = text[start:end].strip()
red_flags.append({'keyword': keyword, 'context': context})
return red_flags
@tool("text_analyzer")
def text_analyzer_tool(text: str) -> str:
"""
Analyzes policy text to identify key sections and potential concerns.
Args:
text: The policy text content to analyze
Returns:
Structured analysis with sections and red flags
"""
start_time = time.time()
if not text or len(text.strip()) < 100:
error_msg = "Text too short for analysis"
log_agent_action("Text Analyzer Tool", "Validation", f"Received {len(text) if text else 0} chars",
error_msg, time.time() - start_time, False, error_msg)
return f"Error: {error_msg}"
try:
chunks = chunk_text(text)
all_sections = {key: [] for key in SECTION_KEYWORDS}
all_red_flags = []
for chunk in chunks:
sections = identify_sections(chunk)
for key, excerpts in sections.items():
all_sections[key].extend(excerpts)
flags = find_red_flags(chunk)
all_red_flags.extend(flags)
# Deduplicate
for key in all_sections:
all_sections[key] = list(set(all_sections[key]))[:3]
seen_keywords = set()
unique_flags = []
for flag in all_red_flags:
if flag['keyword'] not in seen_keywords:
seen_keywords.add(flag['keyword'])
unique_flags.append(flag)
all_red_flags = unique_flags[:10]
# Build result
result_parts = ["=== POLICY ANALYSIS ===\n"]
result_parts.append("## KEY SECTIONS:\n")
for section_type, excerpts in all_sections.items():
if excerpts:
result_parts.append(f"\n### {section_type.upper().replace('_', ' ')}:")
for i, excerpt in enumerate(excerpts, 1):
result_parts.append(f"{i}. {excerpt[:300]}...")
result_parts.append("\n\n## POTENTIAL CONCERNS:\n")
if all_red_flags:
for i, flag in enumerate(all_red_flags, 1):
result_parts.append(f"{i}. **{flag['keyword'].upper()}**")
result_parts.append(f" Context: \"{flag['context']}\"")
else:
result_parts.append("No major red flags identified.")
result_parts.append(f"\n\n## STATS: {len(text)} chars, {len(chunks)} chunks, {len(all_red_flags)} concerns")
result = "\n".join(result_parts)
log_agent_action("Text Analyzer Tool", "Analysis", f"Analyzed {len(chunks)} chunks",
f"Found {len(all_red_flags)} concerns", time.time() - start_time, True)
return result
except Exception as e:
error_msg = f"Analysis error: {str(e)}"
log_agent_action("Text Analyzer Tool", "Analysis", "Processing text", error_msg,
time.time() - start_time, False, error_msg)
return f"Error: {error_msg}"
|