""" Text Analyzer Tool - Analyzes policy text to identify sections and concerns """ from crewai.tools import tool from typing import List, Dict import re import time from utils.logger import log_agent_action # Keywords for identifying sections SECTION_KEYWORDS = { 'data_collection': ['collect', 'gather', 'information we collect', 'personal data'], 'data_sharing': ['share', 'third party', 'partners', 'disclose', 'sell'], 'user_rights': ['your rights', 'opt-out', 'delete', 'access your data', 'gdpr', 'ccpa'], 'data_retention': ['retain', 'retention', 'how long', 'keep your'], 'security': ['security', 'protect', 'encryption', 'safeguard'], 'cookies': ['cookie', 'tracking', 'analytics'], } # Red flag keywords RED_FLAG_KEYWORDS = [ 'sell your data', 'sell your information', 'share with third parties', 'advertising partners', 'indefinitely', 'without notice', 'at our discretion', 'waive your right', 'arbitration', 'class action waiver' ] def chunk_text(text: str, chunk_size: int = 2000, overlap: int = 200) -> List[str]: """Split text into overlapping chunks.""" if len(text) <= chunk_size: return [text] chunks = [] start = 0 while start < len(text): end = start + chunk_size if end < len(text): para_break = text.rfind('\n\n', start, end) if para_break > start + chunk_size // 2: end = para_break chunks.append(text[start:end].strip()) start = end - overlap if start >= len(text) - overlap: break return chunks def identify_sections(text: str) -> Dict[str, List[str]]: """Identify relevant sections in the policy text.""" sections = {key: [] for key in SECTION_KEYWORDS} paragraphs = re.split(r'\n{2,}', text) for paragraph in paragraphs: para_lower = paragraph.lower() for section_type, keywords in SECTION_KEYWORDS.items(): for keyword in keywords: if keyword in para_lower: excerpt = paragraph[:500] + "..." if len(paragraph) > 500 else paragraph if excerpt not in sections[section_type]: sections[section_type].append(excerpt) break return sections def find_red_flags(text: str) -> List[Dict[str, str]]: """Find potential concerns in the policy.""" red_flags = [] text_lower = text.lower() for keyword in RED_FLAG_KEYWORDS: if keyword in text_lower: idx = text_lower.find(keyword) start = max(0, idx - 100) end = min(len(text), idx + len(keyword) + 100) context = text[start:end].strip() red_flags.append({'keyword': keyword, 'context': context}) return red_flags @tool("text_analyzer") def text_analyzer_tool(text: str) -> str: """ Analyzes policy text to identify key sections and potential concerns. Args: text: The policy text content to analyze Returns: Structured analysis with sections and red flags """ start_time = time.time() if not text or len(text.strip()) < 100: error_msg = "Text too short for analysis" log_agent_action("Text Analyzer Tool", "Validation", f"Received {len(text) if text else 0} chars", error_msg, time.time() - start_time, False, error_msg) return f"Error: {error_msg}" try: chunks = chunk_text(text) all_sections = {key: [] for key in SECTION_KEYWORDS} all_red_flags = [] for chunk in chunks: sections = identify_sections(chunk) for key, excerpts in sections.items(): all_sections[key].extend(excerpts) flags = find_red_flags(chunk) all_red_flags.extend(flags) # Deduplicate for key in all_sections: all_sections[key] = list(set(all_sections[key]))[:3] seen_keywords = set() unique_flags = [] for flag in all_red_flags: if flag['keyword'] not in seen_keywords: seen_keywords.add(flag['keyword']) unique_flags.append(flag) all_red_flags = unique_flags[:10] # Build result result_parts = ["=== POLICY ANALYSIS ===\n"] result_parts.append("## KEY SECTIONS:\n") for section_type, excerpts in all_sections.items(): if excerpts: result_parts.append(f"\n### {section_type.upper().replace('_', ' ')}:") for i, excerpt in enumerate(excerpts, 1): result_parts.append(f"{i}. {excerpt[:300]}...") result_parts.append("\n\n## POTENTIAL CONCERNS:\n") if all_red_flags: for i, flag in enumerate(all_red_flags, 1): result_parts.append(f"{i}. **{flag['keyword'].upper()}**") result_parts.append(f" Context: \"{flag['context']}\"") else: result_parts.append("No major red flags identified.") result_parts.append(f"\n\n## STATS: {len(text)} chars, {len(chunks)} chunks, {len(all_red_flags)} concerns") result = "\n".join(result_parts) log_agent_action("Text Analyzer Tool", "Analysis", f"Analyzed {len(chunks)} chunks", f"Found {len(all_red_flags)} concerns", time.time() - start_time, True) return result except Exception as e: error_msg = f"Analysis error: {str(e)}" log_agent_action("Text Analyzer Tool", "Analysis", "Processing text", error_msg, time.time() - start_time, False, error_msg) return f"Error: {error_msg}"