Spaces:
Sleeping
Sleeping
| """ | |
| Text Analyzer Tool - Analyzes policy text to identify sections and concerns | |
| """ | |
| from crewai.tools import tool | |
| from typing import List, Dict | |
| import re | |
| import time | |
| from utils.logger import log_agent_action | |
| # Keywords for identifying sections | |
| SECTION_KEYWORDS = { | |
| 'data_collection': ['collect', 'gather', 'information we collect', 'personal data'], | |
| 'data_sharing': ['share', 'third party', 'partners', 'disclose', 'sell'], | |
| 'user_rights': ['your rights', 'opt-out', 'delete', 'access your data', 'gdpr', 'ccpa'], | |
| 'data_retention': ['retain', 'retention', 'how long', 'keep your'], | |
| 'security': ['security', 'protect', 'encryption', 'safeguard'], | |
| 'cookies': ['cookie', 'tracking', 'analytics'], | |
| } | |
| # Red flag keywords | |
| RED_FLAG_KEYWORDS = [ | |
| 'sell your data', 'sell your information', 'share with third parties', | |
| 'advertising partners', 'indefinitely', 'without notice', | |
| 'at our discretion', 'waive your right', 'arbitration', 'class action waiver' | |
| ] | |
| def chunk_text(text: str, chunk_size: int = 2000, overlap: int = 200) -> List[str]: | |
| """Split text into overlapping chunks.""" | |
| if len(text) <= chunk_size: | |
| return [text] | |
| chunks = [] | |
| start = 0 | |
| while start < len(text): | |
| end = start + chunk_size | |
| if end < len(text): | |
| para_break = text.rfind('\n\n', start, end) | |
| if para_break > start + chunk_size // 2: | |
| end = para_break | |
| chunks.append(text[start:end].strip()) | |
| start = end - overlap | |
| if start >= len(text) - overlap: | |
| break | |
| return chunks | |
| def identify_sections(text: str) -> Dict[str, List[str]]: | |
| """Identify relevant sections in the policy text.""" | |
| sections = {key: [] for key in SECTION_KEYWORDS} | |
| paragraphs = re.split(r'\n{2,}', text) | |
| for paragraph in paragraphs: | |
| para_lower = paragraph.lower() | |
| for section_type, keywords in SECTION_KEYWORDS.items(): | |
| for keyword in keywords: | |
| if keyword in para_lower: | |
| excerpt = paragraph[:500] + "..." if len(paragraph) > 500 else paragraph | |
| if excerpt not in sections[section_type]: | |
| sections[section_type].append(excerpt) | |
| break | |
| return sections | |
| def find_red_flags(text: str) -> List[Dict[str, str]]: | |
| """Find potential concerns in the policy.""" | |
| red_flags = [] | |
| text_lower = text.lower() | |
| for keyword in RED_FLAG_KEYWORDS: | |
| if keyword in text_lower: | |
| idx = text_lower.find(keyword) | |
| start = max(0, idx - 100) | |
| end = min(len(text), idx + len(keyword) + 100) | |
| context = text[start:end].strip() | |
| red_flags.append({'keyword': keyword, 'context': context}) | |
| return red_flags | |
| def text_analyzer_tool(text: str) -> str: | |
| """ | |
| Analyzes policy text to identify key sections and potential concerns. | |
| Args: | |
| text: The policy text content to analyze | |
| Returns: | |
| Structured analysis with sections and red flags | |
| """ | |
| start_time = time.time() | |
| if not text or len(text.strip()) < 100: | |
| error_msg = "Text too short for analysis" | |
| log_agent_action("Text Analyzer Tool", "Validation", f"Received {len(text) if text else 0} chars", | |
| error_msg, time.time() - start_time, False, error_msg) | |
| return f"Error: {error_msg}" | |
| try: | |
| chunks = chunk_text(text) | |
| all_sections = {key: [] for key in SECTION_KEYWORDS} | |
| all_red_flags = [] | |
| for chunk in chunks: | |
| sections = identify_sections(chunk) | |
| for key, excerpts in sections.items(): | |
| all_sections[key].extend(excerpts) | |
| flags = find_red_flags(chunk) | |
| all_red_flags.extend(flags) | |
| # Deduplicate | |
| for key in all_sections: | |
| all_sections[key] = list(set(all_sections[key]))[:3] | |
| seen_keywords = set() | |
| unique_flags = [] | |
| for flag in all_red_flags: | |
| if flag['keyword'] not in seen_keywords: | |
| seen_keywords.add(flag['keyword']) | |
| unique_flags.append(flag) | |
| all_red_flags = unique_flags[:10] | |
| # Build result | |
| result_parts = ["=== POLICY ANALYSIS ===\n"] | |
| result_parts.append("## KEY SECTIONS:\n") | |
| for section_type, excerpts in all_sections.items(): | |
| if excerpts: | |
| result_parts.append(f"\n### {section_type.upper().replace('_', ' ')}:") | |
| for i, excerpt in enumerate(excerpts, 1): | |
| result_parts.append(f"{i}. {excerpt[:300]}...") | |
| result_parts.append("\n\n## POTENTIAL CONCERNS:\n") | |
| if all_red_flags: | |
| for i, flag in enumerate(all_red_flags, 1): | |
| result_parts.append(f"{i}. **{flag['keyword'].upper()}**") | |
| result_parts.append(f" Context: \"{flag['context']}\"") | |
| else: | |
| result_parts.append("No major red flags identified.") | |
| result_parts.append(f"\n\n## STATS: {len(text)} chars, {len(chunks)} chunks, {len(all_red_flags)} concerns") | |
| result = "\n".join(result_parts) | |
| log_agent_action("Text Analyzer Tool", "Analysis", f"Analyzed {len(chunks)} chunks", | |
| f"Found {len(all_red_flags)} concerns", time.time() - start_time, True) | |
| return result | |
| except Exception as e: | |
| error_msg = f"Analysis error: {str(e)}" | |
| log_agent_action("Text Analyzer Tool", "Analysis", "Processing text", error_msg, | |
| time.time() - start_time, False, error_msg) | |
| return f"Error: {error_msg}" | |