Spaces:
Sleeping
Sleeping
| """ | |
| CrewAI Configuration - Policy Summarizer | |
| """ | |
| import os | |
| from crewai import Agent, Task, Crew, Process | |
| from tools.web_scraper import web_scraper_tool | |
| from tools.text_analyzer import text_analyzer_tool | |
| from utils.logger import log_agent_action, clear_logs | |
| def create_agents(): | |
| """Create the 3 agents""" | |
| orchestrator = Agent( | |
| role="Policy Analysis Orchestrator", | |
| goal="Coordinate the policy analysis and create a user-friendly summary", | |
| backstory="""You are an expert at analyzing legal documents and presenting | |
| complex information in simple terms. You coordinate the analysis workflow.""", | |
| verbose=True, | |
| allow_delegation=True | |
| ) | |
| scraper = Agent( | |
| role="Web Content Scraper", | |
| goal="Extract clean policy text from web URLs", | |
| backstory="""You specialize in web scraping and content extraction. | |
| You can extract policy text while filtering out irrelevant content.""", | |
| verbose=True, | |
| allow_delegation=False, | |
| tools=[web_scraper_tool] | |
| ) | |
| analyzer = Agent( | |
| role="Policy Analyzer", | |
| goal="Analyze policies to identify key points, rights, and concerns", | |
| backstory="""You are a legal expert who analyzes terms of service and | |
| privacy policies. You identify user rights and potential red flags.""", | |
| verbose=True, | |
| allow_delegation=False, | |
| tools=[text_analyzer_tool] | |
| ) | |
| return orchestrator, scraper, analyzer | |
| def create_tasks(orchestrator, scraper, analyzer, url: str): | |
| """Create the tasks for each agent""" | |
| scrape_task = Task( | |
| description=f""" | |
| Scrape the policy content from: {url} | |
| Use the web_scraper_tool to fetch and extract the text. | |
| Return the full policy text content. | |
| """, | |
| expected_output="The extracted policy text content", | |
| agent=scraper | |
| ) | |
| analyze_task = Task( | |
| description=""" | |
| Analyze the scraped policy content: | |
| 1. Use text_analyzer_tool to identify key sections | |
| 2. Find user rights (deletion, access, opt-out, etc.) | |
| 3. Identify concerns and red flags | |
| 4. Note data collection and sharing practices | |
| """, | |
| expected_output="Structured analysis with sections, rights, and concerns", | |
| agent=analyzer, | |
| context=[scrape_task] | |
| ) | |
| summary_task = Task( | |
| description=""" | |
| Create a user-friendly summary with these sections: | |
| ## π Policy Summary | |
| [3-5 key points about this policy] | |
| ## β Your Rights | |
| [List user rights with brief explanations] | |
| ## β οΈ Concerns & Warnings | |
| [List red flags with severity: π΄ High, π‘ Medium, π’ Low] | |
| ## π‘ Recommendation | |
| [Overall assessment and advice] | |
| Use simple language, avoid legal jargon. | |
| """, | |
| expected_output="A formatted, user-friendly policy summary", | |
| agent=orchestrator, | |
| context=[scrape_task, analyze_task] | |
| ) | |
| return [scrape_task, analyze_task, summary_task] | |
| def run_policy_analysis(url: str) -> str: | |
| """Main function to analyze a policy URL""" | |
| clear_logs() | |
| log_agent_action( | |
| agent_name="System", | |
| action="Starting Analysis", | |
| input_summary=f"URL length: {len(url)}", | |
| output_summary="Initializing agents...", | |
| duration_seconds=0, | |
| success=True | |
| ) | |
| try: | |
| orchestrator, scraper, analyzer = create_agents() | |
| tasks = create_tasks(orchestrator, scraper, analyzer, url) | |
| crew = Crew( | |
| agents=[orchestrator, scraper, analyzer], | |
| tasks=tasks, | |
| process=Process.sequential, | |
| verbose=True | |
| ) | |
| result = crew.kickoff() | |
| return str(result) | |
| except Exception as e: | |
| return f"β Error: {str(e)}" |