PolicySummarizer / tools /text_analyzer.py
Nadasr's picture
Upload 3 files
81ddc8e verified
"""
Text Analyzer Tool - Analyzes policy text to identify sections and concerns
"""
from crewai.tools import tool
from typing import List, Dict
import re
import time
from utils.logger import log_agent_action
# Keywords for identifying sections
SECTION_KEYWORDS = {
'data_collection': ['collect', 'gather', 'information we collect', 'personal data'],
'data_sharing': ['share', 'third party', 'partners', 'disclose', 'sell'],
'user_rights': ['your rights', 'opt-out', 'delete', 'access your data', 'gdpr', 'ccpa'],
'data_retention': ['retain', 'retention', 'how long', 'keep your'],
'security': ['security', 'protect', 'encryption', 'safeguard'],
'cookies': ['cookie', 'tracking', 'analytics'],
}
# Red flag keywords
RED_FLAG_KEYWORDS = [
'sell your data', 'sell your information', 'share with third parties',
'advertising partners', 'indefinitely', 'without notice',
'at our discretion', 'waive your right', 'arbitration', 'class action waiver'
]
def chunk_text(text: str, chunk_size: int = 2000, overlap: int = 200) -> List[str]:
"""Split text into overlapping chunks."""
if len(text) <= chunk_size:
return [text]
chunks = []
start = 0
while start < len(text):
end = start + chunk_size
if end < len(text):
para_break = text.rfind('\n\n', start, end)
if para_break > start + chunk_size // 2:
end = para_break
chunks.append(text[start:end].strip())
start = end - overlap
if start >= len(text) - overlap:
break
return chunks
def identify_sections(text: str) -> Dict[str, List[str]]:
"""Identify relevant sections in the policy text."""
sections = {key: [] for key in SECTION_KEYWORDS}
paragraphs = re.split(r'\n{2,}', text)
for paragraph in paragraphs:
para_lower = paragraph.lower()
for section_type, keywords in SECTION_KEYWORDS.items():
for keyword in keywords:
if keyword in para_lower:
excerpt = paragraph[:500] + "..." if len(paragraph) > 500 else paragraph
if excerpt not in sections[section_type]:
sections[section_type].append(excerpt)
break
return sections
def find_red_flags(text: str) -> List[Dict[str, str]]:
"""Find potential concerns in the policy."""
red_flags = []
text_lower = text.lower()
for keyword in RED_FLAG_KEYWORDS:
if keyword in text_lower:
idx = text_lower.find(keyword)
start = max(0, idx - 100)
end = min(len(text), idx + len(keyword) + 100)
context = text[start:end].strip()
red_flags.append({'keyword': keyword, 'context': context})
return red_flags
@tool("text_analyzer")
def text_analyzer_tool(text: str) -> str:
"""
Analyzes policy text to identify key sections and potential concerns.
Args:
text: The policy text content to analyze
Returns:
Structured analysis with sections and red flags
"""
start_time = time.time()
if not text or len(text.strip()) < 100:
error_msg = "Text too short for analysis"
log_agent_action("Text Analyzer Tool", "Validation", f"Received {len(text) if text else 0} chars",
error_msg, time.time() - start_time, False, error_msg)
return f"Error: {error_msg}"
try:
chunks = chunk_text(text)
all_sections = {key: [] for key in SECTION_KEYWORDS}
all_red_flags = []
for chunk in chunks:
sections = identify_sections(chunk)
for key, excerpts in sections.items():
all_sections[key].extend(excerpts)
flags = find_red_flags(chunk)
all_red_flags.extend(flags)
# Deduplicate
for key in all_sections:
all_sections[key] = list(set(all_sections[key]))[:3]
seen_keywords = set()
unique_flags = []
for flag in all_red_flags:
if flag['keyword'] not in seen_keywords:
seen_keywords.add(flag['keyword'])
unique_flags.append(flag)
all_red_flags = unique_flags[:10]
# Build result
result_parts = ["=== POLICY ANALYSIS ===\n"]
result_parts.append("## KEY SECTIONS:\n")
for section_type, excerpts in all_sections.items():
if excerpts:
result_parts.append(f"\n### {section_type.upper().replace('_', ' ')}:")
for i, excerpt in enumerate(excerpts, 1):
result_parts.append(f"{i}. {excerpt[:300]}...")
result_parts.append("\n\n## POTENTIAL CONCERNS:\n")
if all_red_flags:
for i, flag in enumerate(all_red_flags, 1):
result_parts.append(f"{i}. **{flag['keyword'].upper()}**")
result_parts.append(f" Context: \"{flag['context']}\"")
else:
result_parts.append("No major red flags identified.")
result_parts.append(f"\n\n## STATS: {len(text)} chars, {len(chunks)} chunks, {len(all_red_flags)} concerns")
result = "\n".join(result_parts)
log_agent_action("Text Analyzer Tool", "Analysis", f"Analyzed {len(chunks)} chunks",
f"Found {len(all_red_flags)} concerns", time.time() - start_time, True)
return result
except Exception as e:
error_msg = f"Analysis error: {str(e)}"
log_agent_action("Text Analyzer Tool", "Analysis", "Processing text", error_msg,
time.time() - start_time, False, error_msg)
return f"Error: {error_msg}"