Spaces:
Running on CPU Upgrade
Running on CPU Upgrade
| """ | |
| Sentiment Analyzer Agent for determining policy stance and debate intensity. | |
| """ | |
| import asyncio | |
| from typing import List, Dict, Any, Optional | |
| from datetime import datetime | |
| from loguru import logger | |
| from agents.base import BaseAgent, AgentRole, AgentMessage, MessageType, AgentStatus | |
| class PolicyStance: | |
| """Enumeration of policy stances.""" | |
| STRONGLY_SUPPORTIVE = "strongly_supportive" | |
| SUPPORTIVE = "supportive" | |
| NEUTRAL = "neutral" | |
| OPPOSED = "opposed" | |
| STRONGLY_OPPOSED = "strongly_opposed" | |
| DEBATED = "debated" # When there's active debate | |
| class DebateIntensity: | |
| """Enumeration of debate intensity levels.""" | |
| NONE = "none" # Passing mention | |
| LOW = "low" # Brief discussion | |
| MODERATE = "moderate" # Extended discussion | |
| HIGH = "high" # Heated debate with multiple viewpoints | |
| CRITICAL = "critical" # Vote imminent or major decision pending | |
| class SentimentAnalyzerAgent(BaseAgent): | |
| """ | |
| Agent responsible for analyzing sentiment and policy stance. | |
| Determines: | |
| - Overall stance toward oral health policies | |
| - Intensity of debate/discussion | |
| - Key arguments for and against | |
| - Likelihood of policy action | |
| - Advocacy opportunities | |
| """ | |
| def __init__(self, agent_id: str = "sentiment-001"): | |
| """Initialize the sentiment analyzer agent.""" | |
| super().__init__(agent_id, AgentRole.SENTIMENT_ANALYZER) | |
| self._initialize_indicators() | |
| def _initialize_indicators(self): | |
| """Initialize sentiment and debate indicators.""" | |
| self.supportive_indicators = [ | |
| "approve", "support", "favor", "endorse", "recommend", | |
| "beneficial", "important", "necessary", "implement", | |
| "move forward", "proceed with" | |
| ] | |
| self.opposition_indicators = [ | |
| "oppose", "against", "reject", "deny", "concerns about", | |
| "problems with", "issues with", "delay", "postpone", | |
| "table the motion", "reconsider" | |
| ] | |
| self.debate_indicators = [ | |
| "discussion", "debate", "motion", "vote", "amendment", | |
| "public comment", "testimony", "hearing", "concerns", | |
| "questions about", "divided" | |
| ] | |
| self.urgency_indicators = [ | |
| "urgent", "immediate", "deadline", "vote", "decision", | |
| "approval needed", "time-sensitive", "pressing", | |
| "second reading", "final vote" | |
| ] | |
| async def process(self, message: AgentMessage) -> List[AgentMessage]: | |
| """ | |
| Process sentiment analysis commands. | |
| Args: | |
| message: Message containing classified documents | |
| Returns: | |
| List of messages with sentiment analysis results | |
| """ | |
| self.update_status(AgentStatus.PROCESSING, "Analyzing policy sentiment and debate") | |
| try: | |
| documents = message.payload.get("documents", []) | |
| analyzed_documents = [] | |
| for doc in documents: | |
| analysis = await self._analyze_document(doc) | |
| doc["sentiment_analysis"] = analysis | |
| analyzed_documents.append(doc) | |
| # Identify advocacy opportunities | |
| opportunities = self._identify_advocacy_opportunities(analyzed_documents) | |
| # Send to advocacy writer agent | |
| response = await self.send_message( | |
| AgentRole.ADVOCACY_WRITER, | |
| MessageType.DATA, | |
| { | |
| "workflow_id": message.payload.get("workflow_id"), | |
| "documents": analyzed_documents, | |
| "opportunities": opportunities, | |
| "count": len(analyzed_documents) | |
| } | |
| ) | |
| self.log_success() | |
| logger.info( | |
| f"Analyzed sentiment for {len(analyzed_documents)} documents, " | |
| f"found {len(opportunities)} advocacy opportunities" | |
| ) | |
| return [response] | |
| except Exception as e: | |
| self.log_failure(str(e)) | |
| error_msg = await self.send_message( | |
| AgentRole.ORCHESTRATOR, | |
| MessageType.ERROR, | |
| {"error": str(e), "agent": self.agent_id} | |
| ) | |
| return [error_msg] | |
| async def _analyze_document( | |
| self, | |
| doc: Dict[str, Any] | |
| ) -> Dict[str, Any]: | |
| """ | |
| Analyze sentiment and policy stance for a document. | |
| Args: | |
| doc: Document to analyze | |
| Returns: | |
| Sentiment analysis results | |
| """ | |
| text = self._get_analyzable_text(doc) | |
| text_lower = text.lower() | |
| # Count sentiment indicators | |
| support_score = sum( | |
| 1 for indicator in self.supportive_indicators | |
| if indicator in text_lower | |
| ) | |
| opposition_score = sum( | |
| 1 for indicator in self.opposition_indicators | |
| if indicator in text_lower | |
| ) | |
| debate_score = sum( | |
| 1 for indicator in self.debate_indicators | |
| if indicator in text_lower | |
| ) | |
| urgency_score = sum( | |
| 1 for indicator in self.urgency_indicators | |
| if indicator in text_lower | |
| ) | |
| # Determine policy stance | |
| stance = self._determine_stance(support_score, opposition_score, debate_score) | |
| # Determine debate intensity | |
| intensity = self._determine_intensity(debate_score, urgency_score, doc) | |
| # Extract key arguments | |
| arguments = self._extract_arguments(doc, text_lower) | |
| # Calculate advocacy urgency | |
| advocacy_urgency = self._calculate_advocacy_urgency( | |
| stance, intensity, urgency_score | |
| ) | |
| analysis = { | |
| "stance": stance, | |
| "debate_intensity": intensity, | |
| "support_score": support_score, | |
| "opposition_score": opposition_score, | |
| "debate_score": debate_score, | |
| "urgency_score": urgency_score, | |
| "advocacy_urgency": advocacy_urgency, | |
| "key_arguments": arguments, | |
| "analyzed_at": datetime.utcnow().isoformat() | |
| } | |
| return analysis | |
| def _get_analyzable_text(self, doc: Dict[str, Any]) -> str: | |
| """Extract text for sentiment analysis.""" | |
| parts = [] | |
| # Prioritize excerpts from classification | |
| for excerpt in doc.get("classification", {}).get("relevant_excerpts", []): | |
| parts.append(excerpt.get("text", "")) | |
| # Add motions (highly relevant) | |
| for motion in doc.get("motions", []): | |
| parts.append(motion.get("text", "")) | |
| # Add votes | |
| for vote in doc.get("votes", []): | |
| parts.append(vote.get("result", "")) | |
| # Fallback to full text if needed | |
| if not parts: | |
| parts.append(doc.get("full_text", "")) | |
| return " ".join(parts) | |
| def _determine_stance( | |
| self, | |
| support_score: int, | |
| opposition_score: int, | |
| debate_score: int | |
| ) -> str: | |
| """Determine overall policy stance.""" | |
| if debate_score >= 3 and abs(support_score - opposition_score) <= 1: | |
| return PolicyStance.DEBATED | |
| if support_score > opposition_score: | |
| if support_score >= 3: | |
| return PolicyStance.STRONGLY_SUPPORTIVE | |
| else: | |
| return PolicyStance.SUPPORTIVE | |
| elif opposition_score > support_score: | |
| if opposition_score >= 3: | |
| return PolicyStance.STRONGLY_OPPOSED | |
| else: | |
| return PolicyStance.OPPOSED | |
| else: | |
| return PolicyStance.NEUTRAL | |
| def _determine_intensity( | |
| self, | |
| debate_score: int, | |
| urgency_score: int, | |
| doc: Dict[str, Any] | |
| ) -> str: | |
| """Determine debate intensity.""" | |
| # Check for votes or motions (indicates high intensity) | |
| has_vote = len(doc.get("votes", [])) > 0 | |
| has_motion = len(doc.get("motions", [])) > 0 | |
| if urgency_score >= 2 or (has_vote and has_motion): | |
| return DebateIntensity.CRITICAL | |
| elif debate_score >= 5 or has_vote or has_motion: | |
| return DebateIntensity.HIGH | |
| elif debate_score >= 3: | |
| return DebateIntensity.MODERATE | |
| elif debate_score >= 1: | |
| return DebateIntensity.LOW | |
| else: | |
| return DebateIntensity.NONE | |
| def _extract_arguments( | |
| self, | |
| doc: Dict[str, Any], | |
| text_lower: str | |
| ) -> Dict[str, List[str]]: | |
| """Extract key arguments for and against.""" | |
| arguments = { | |
| "supporting": [], | |
| "opposing": [] | |
| } | |
| # Extract from motions and discussion | |
| for motion in doc.get("motions", []): | |
| motion_text = motion.get("text", "").lower() | |
| if any(ind in motion_text for ind in self.supportive_indicators): | |
| arguments["supporting"].append(motion.get("text", "")) | |
| elif any(ind in motion_text for ind in self.opposition_indicators): | |
| arguments["opposing"].append(motion.get("text", "")) | |
| return arguments | |
| def _calculate_advocacy_urgency( | |
| self, | |
| stance: str, | |
| intensity: str, | |
| urgency_score: int | |
| ) -> str: | |
| """ | |
| Calculate how urgent advocacy action is needed. | |
| Returns: "critical", "high", "medium", "low", or "none" | |
| """ | |
| # Critical: Vote imminent and debated/opposed | |
| if intensity == DebateIntensity.CRITICAL: | |
| if stance in [PolicyStance.DEBATED, PolicyStance.OPPOSED, PolicyStance.STRONGLY_OPPOSED]: | |
| return "critical" | |
| return "high" | |
| # High: Active debate with opposition | |
| if intensity == DebateIntensity.HIGH: | |
| if stance in [PolicyStance.OPPOSED, PolicyStance.STRONGLY_OPPOSED]: | |
| return "high" | |
| elif stance == PolicyStance.DEBATED: | |
| return "high" | |
| return "medium" | |
| # Medium: Moderate discussion or emerging issue | |
| if intensity == DebateIntensity.MODERATE: | |
| return "medium" | |
| # Low: Early stage or general mention | |
| if intensity == DebateIntensity.LOW: | |
| return "low" | |
| return "none" | |
| def _identify_advocacy_opportunities( | |
| self, | |
| documents: List[Dict[str, Any]] | |
| ) -> List[Dict[str, Any]]: | |
| """ | |
| Identify advocacy opportunities across all analyzed documents. | |
| Args: | |
| documents: All analyzed documents | |
| Returns: | |
| List of advocacy opportunities | |
| """ | |
| opportunities = [] | |
| for doc in documents: | |
| sentiment = doc.get("sentiment_analysis", {}) | |
| urgency = sentiment.get("advocacy_urgency") | |
| # Only flag high and critical urgency items | |
| if urgency in ["critical", "high"]: | |
| opportunity = { | |
| "document_id": doc["document_id"], | |
| "municipality": doc["municipality"], | |
| "state": doc["state"], | |
| "meeting_date": doc["meeting_date"], | |
| "source_url": doc["source_url"], | |
| "topic": doc["classification"]["primary_topic"], | |
| "stance": sentiment["stance"], | |
| "intensity": sentiment["debate_intensity"], | |
| "urgency": urgency, | |
| "key_excerpts": doc["classification"].get("relevant_excerpts", []), | |
| "recommended_action": self._recommend_action(sentiment, doc) | |
| } | |
| opportunities.append(opportunity) | |
| # Sort by urgency | |
| urgency_order = {"critical": 0, "high": 1, "medium": 2, "low": 3} | |
| opportunities.sort(key=lambda x: urgency_order.get(x["urgency"], 4)) | |
| return opportunities | |
| def _recommend_action( | |
| self, | |
| sentiment: Dict[str, Any], | |
| doc: Dict[str, Any] | |
| ) -> str: | |
| """Recommend advocacy action based on analysis.""" | |
| stance = sentiment.get("stance") | |
| intensity = sentiment.get("debate_intensity") | |
| if intensity == DebateIntensity.CRITICAL: | |
| if stance in [PolicyStance.OPPOSED, PolicyStance.STRONGLY_OPPOSED]: | |
| return "URGENT: Contact officials immediately. Vote imminent." | |
| elif stance == PolicyStance.DEBATED: | |
| return "URGENT: Provide supporting testimony. Decision pending." | |
| if stance == PolicyStance.DEBATED: | |
| return "Engage with stakeholders. Provide educational materials." | |
| elif stance in [PolicyStance.OPPOSED, PolicyStance.STRONGLY_OPPOSED]: | |
| return "Initiate dialogue with decision-makers. Address concerns." | |
| elif stance == PolicyStance.NEUTRAL: | |
| return "Introduce topic to agenda. Build awareness." | |
| return "Monitor situation. Prepare support materials." | |