Spaces:
Sleeping
Sleeping
| """Opinion dynamics engine for multi-persona interactions""" | |
| from typing import List, Dict, Optional, Callable | |
| import statistics | |
| from concurrent.futures import ThreadPoolExecutor, as_completed | |
| import time | |
| from ..personas.models import Persona | |
| from ..personas.database import PersonaDatabase | |
| from ..llm.anthropic_client import AnthropicClient | |
| from ..llm.prompt_builder import PromptBuilder | |
| from .models import PersonaOpinion, RoundResult, OpinionPosition | |
| from .network import InfluenceNetwork | |
| class OpinionDynamicsEngine: | |
| """ | |
| Runs multi-round opinion dynamics where personas influence each other. | |
| Process: | |
| 1. Round 1: Each persona responds independently | |
| 2. Round 2+: Personas see others' responses and can update their position | |
| 3. Continue until equilibrium or max rounds reached | |
| """ | |
| def __init__( | |
| self, | |
| persona_db: PersonaDatabase, | |
| llm_client: AnthropicClient, | |
| max_workers: int = 3, | |
| requests_per_minute: int = 50, | |
| ): | |
| self.persona_db = persona_db | |
| self.llm_client = llm_client | |
| self.prompt_builder = PromptBuilder() | |
| self.max_workers = max_workers | |
| self.min_delay = 60.0 / requests_per_minute | |
| def run_dynamics( | |
| self, | |
| question: str, | |
| max_rounds: int = 5, | |
| convergence_threshold: float = 0.1, | |
| network_type: str = "scale_free", | |
| persona_ids: Optional[List[str]] = None, | |
| personas: Optional[List[Persona]] = None, | |
| progress_callback: Optional[Callable[[str], None]] = None, | |
| ) -> List[RoundResult]: | |
| """ | |
| Run opinion dynamics simulation. | |
| Args: | |
| question: The proposal/question to discuss | |
| max_rounds: Maximum number of rounds | |
| convergence_threshold: Stop if total change < threshold | |
| network_type: Network topology ("scale_free", "small_world", "fully_connected") | |
| persona_ids: List of persona IDs to include (for small group mode) | |
| personas: Pre-loaded personas list (for population mode) | |
| progress_callback: Function to call with progress updates | |
| Returns: | |
| List of RoundResult for each round | |
| """ | |
| # Load personas from IDs if not provided directly | |
| if personas is None: | |
| if persona_ids is None: | |
| raise ValueError("Either persona_ids or personas must be provided") | |
| personas = [self.persona_db.get_persona(pid) for pid in persona_ids] | |
| # Build influence network | |
| influence_network = InfluenceNetwork(personas, network_type=network_type) | |
| results = [] | |
| previous_opinions: Dict[str, PersonaOpinion] = {} | |
| for round_num in range(1, max_rounds + 1): | |
| if progress_callback: | |
| progress_callback( | |
| f"Round {round_num}/{max_rounds}: Gathering opinions..." | |
| ) | |
| # Query all personas for this round | |
| round_opinions = self._query_round( | |
| personas=personas, | |
| question=question, | |
| round_number=round_num, | |
| previous_opinions=previous_opinions, | |
| influence_network=influence_network, | |
| progress_callback=progress_callback, | |
| ) | |
| # Calculate round metrics | |
| round_result = self._analyze_round( | |
| round_num, round_opinions, previous_opinions | |
| ) | |
| results.append(round_result) | |
| if progress_callback: | |
| progress_callback( | |
| f"Round {round_num} complete: " | |
| f"Avg position: {round_result.average_position:.2f}, " | |
| f"Total change: {round_result.total_change:.2f}" | |
| ) | |
| # Check for convergence | |
| if round_num > 1 and round_result.total_change < convergence_threshold: | |
| if progress_callback: | |
| progress_callback( | |
| f"✓ Equilibrium reached at round {round_num}!" | |
| ) | |
| break | |
| # Update for next round | |
| previous_opinions = { | |
| op.persona_id: op for op in round_opinions | |
| } | |
| return results | |
| def _query_round( | |
| self, | |
| personas: List[Persona], | |
| question: str, | |
| round_number: int, | |
| previous_opinions: Dict[str, PersonaOpinion], | |
| influence_network: InfluenceNetwork, | |
| progress_callback: Optional[Callable[[str], None]] = None, | |
| ) -> List[PersonaOpinion]: | |
| """Query all personas for one round""" | |
| opinions = [] | |
| # Parallel querying with rate limiting | |
| with ThreadPoolExecutor(max_workers=self.max_workers) as executor: | |
| future_to_persona = {} | |
| for persona in personas: | |
| future = executor.submit( | |
| self._query_persona, | |
| persona=persona, | |
| question=question, | |
| round_number=round_number, | |
| previous_opinions=previous_opinions, | |
| influence_network=influence_network, | |
| ) | |
| future_to_persona[future] = persona | |
| time.sleep(self.min_delay) # Rate limiting | |
| for future in as_completed(future_to_persona): | |
| persona = future_to_persona[future] | |
| try: | |
| opinion = future.result() | |
| opinions.append(opinion) | |
| if progress_callback: | |
| progress_callback( | |
| f" ✓ {persona.name}: {opinion.position.value}" | |
| ) | |
| except Exception as e: | |
| import traceback | |
| error_msg = f"Error querying {persona.name}: {e}" | |
| full_traceback = traceback.format_exc() | |
| print(f"\n{'='*60}") | |
| print(f"ERROR querying {persona.name}:") | |
| print(full_traceback) | |
| print('='*60) | |
| if progress_callback: | |
| progress_callback(f" ✗ {error_msg}") | |
| return opinions | |
| def _query_persona( | |
| self, | |
| persona: Persona, | |
| question: str, | |
| round_number: int, | |
| previous_opinions: Dict[str, PersonaOpinion], | |
| influence_network: InfluenceNetwork, | |
| ) -> PersonaOpinion: | |
| """Query one persona for their opinion""" | |
| # Build prompt based on round | |
| if round_number == 1: | |
| # Round 1: Independent response | |
| system_prompt = self._build_initial_prompt(persona) | |
| user_message = f"""Please share your position on the following proposal: | |
| {question} | |
| Provide your response in this format: | |
| POSITION: [strongly_support/support/lean_support/neutral/lean_oppose/oppose/strongly_oppose] | |
| CONFIDENCE: [0.0-1.0] | |
| RESPONSE: [Your detailed response explaining your position] | |
| KEY_ARGUMENTS: [Bullet points of your main arguments] | |
| """ | |
| else: | |
| # Round 2+: Consider others' opinions | |
| system_prompt = self._build_influenced_prompt( | |
| persona, previous_opinions, influence_network | |
| ) | |
| user_message = f"""After hearing other stakeholders' perspectives, please share your updated position on: | |
| {question} | |
| Consider the arguments you've heard, but stay true to your values and expertise. | |
| Provide your response in this format: | |
| POSITION: [strongly_support/support/lean_support/neutral/lean_oppose/oppose/strongly_oppose] | |
| CONFIDENCE: [0.0-1.0] | |
| RESPONSE: [Your detailed response] | |
| KEY_ARGUMENTS: [Bullet points of your main arguments] | |
| INFLUENCED_BY: [Names of stakeholders whose arguments influenced you, if any] | |
| """ | |
| # Get LLM response | |
| response_text = self.llm_client.generate_response( | |
| system_prompt=system_prompt, | |
| user_message=user_message, | |
| temperature=0.7, | |
| ) | |
| # Debug logging for problematic responses | |
| if not any(keyword in response_text for keyword in ["POSITION:", "position:", "Position:"]): | |
| print(f"\n⚠️ WARNING: Response missing POSITION marker for {persona.name}") | |
| print(f"Response preview: {response_text[:200]}...") | |
| # Parse response | |
| opinion = self._parse_response( | |
| persona_id=persona.persona_id, | |
| persona_name=persona.name, | |
| round_number=round_number, | |
| response_text=response_text, | |
| previous_opinions=previous_opinions, | |
| ) | |
| return opinion | |
| def _build_initial_prompt(self, persona: Persona) -> str: | |
| """Build system prompt for initial round""" | |
| return f"""{persona.get_context_summary()} | |
| You are participating in a stakeholder discussion about urban planning. | |
| Share your honest perspective based on your values, expertise, and background. | |
| """ | |
| def _build_influenced_prompt( | |
| self, | |
| persona: Persona, | |
| previous_opinions: Dict[str, PersonaOpinion], | |
| influence_network: InfluenceNetwork, | |
| ) -> str: | |
| """Build system prompt showing other opinions""" | |
| base_prompt = self._build_initial_prompt(persona) | |
| # Get influencers (people who might sway this persona) | |
| influencers = influence_network.get_influencers( | |
| persona.persona_id, min_weight=0.4 | |
| ) | |
| if not influencers or not previous_opinions: | |
| return base_prompt | |
| # Show relevant opinions | |
| others_opinions = "\n\n---\n\nOTHER STAKEHOLDERS' PERSPECTIVES:\n\n" | |
| for influence_weight in influencers[:5]: # Top 5 influencers | |
| influencer_id = influence_weight.influencer_id | |
| if influencer_id in previous_opinions: | |
| opinion = previous_opinions[influencer_id] | |
| others_opinions += f""" | |
| {opinion.persona_name} ({opinion.position.value}): | |
| {opinion.response_text[:300]}... | |
| """ | |
| return base_prompt + others_opinions | |
| def _parse_response( | |
| self, | |
| persona_id: str, | |
| persona_name: str, | |
| round_number: int, | |
| response_text: str, | |
| previous_opinions: Dict[str, PersonaOpinion], | |
| ) -> PersonaOpinion: | |
| """Parse LLM response into PersonaOpinion""" | |
| # Extract position (case-insensitive) | |
| position = OpinionPosition.NEUTRAL | |
| for line in response_text.split("\n"): | |
| line_upper = line.upper() | |
| if line_upper.startswith("POSITION:"): | |
| position_str = line.split(":", 1)[1].strip().lower() | |
| try: | |
| position = OpinionPosition(position_str) | |
| except: | |
| # Try matching partial strings | |
| if "strong" in position_str and "support" in position_str: | |
| position = OpinionPosition.STRONGLY_SUPPORT | |
| elif "strong" in position_str and "oppos" in position_str: | |
| position = OpinionPosition.STRONGLY_OPPOSE | |
| elif "support" in position_str: | |
| position = OpinionPosition.SUPPORT if "lean" not in position_str else OpinionPosition.LEAN_SUPPORT | |
| elif "oppos" in position_str: | |
| position = OpinionPosition.OPPOSE if "lean" not in position_str else OpinionPosition.LEAN_OPPOSE | |
| elif "neutral" in position_str: | |
| position = OpinionPosition.NEUTRAL | |
| # Extract confidence (case-insensitive) | |
| confidence = 0.5 | |
| for line in response_text.split("\n"): | |
| line_upper = line.upper() | |
| if line_upper.startswith("CONFIDENCE:"): | |
| try: | |
| conf_str = line.split(":", 1)[1].strip() | |
| confidence = float(conf_str) | |
| # Clamp to valid range | |
| confidence = max(0.0, min(1.0, confidence)) | |
| except: | |
| pass | |
| # Extract key arguments (case-insensitive) | |
| key_arguments = [] | |
| in_arguments = False | |
| for line in response_text.split("\n"): | |
| line_upper = line.upper() | |
| if line_upper.startswith("KEY_ARGUMENTS:") or line_upper.startswith("KEY ARGUMENTS:"): | |
| in_arguments = True | |
| continue | |
| if in_arguments and line.strip().startswith("-"): | |
| key_arguments.append(line.strip("- ")) | |
| elif in_arguments and line_upper.startswith("INFLUENCED"): | |
| break | |
| # Extract influenced by (case-insensitive) | |
| influenced_by = [] | |
| for line in response_text.split("\n"): | |
| line_upper = line.upper() | |
| if line_upper.startswith("INFLUENCED"): | |
| influenced_str = line.split(":", 1)[1].strip() if ":" in line else "" | |
| # Parse names (could be comma-separated) | |
| if influenced_str and influenced_str.lower() not in ["none", ""]: | |
| influenced_by = [ | |
| name.strip() for name in influenced_str.split(",") | |
| ] | |
| # Calculate position change | |
| position_change = None | |
| if persona_id in previous_opinions: | |
| prev_score = previous_opinions[persona_id].position_score | |
| position_change = position.score - prev_score | |
| return PersonaOpinion( | |
| persona_id=persona_id, | |
| persona_name=persona_name, | |
| round_number=round_number, | |
| position=position, | |
| position_score=position.score, | |
| response_text=response_text, | |
| key_arguments=key_arguments, | |
| confidence=confidence, | |
| influenced_by=influenced_by, | |
| position_change=position_change, | |
| ) | |
| def _analyze_round( | |
| self, | |
| round_number: int, | |
| opinions: List[PersonaOpinion], | |
| previous_opinions: Dict[str, PersonaOpinion], | |
| ) -> RoundResult: | |
| """Analyze results from one round""" | |
| # Check if we have any opinions | |
| if not opinions: | |
| raise ValueError( | |
| f"Round {round_number} failed: No opinions were successfully generated. " | |
| "This may indicate that the LLM responses are not in the expected format. " | |
| "Please check that responses include POSITION, CONFIDENCE, and other required fields." | |
| ) | |
| # Calculate average position | |
| avg_position = statistics.mean([op.position_score for op in opinions]) | |
| # Calculate variance | |
| variance = statistics.variance([op.position_score for op in opinions]) if len(opinions) > 1 else 0.0 | |
| # Calculate total change from previous round | |
| total_change = 0.0 | |
| if previous_opinions: | |
| for opinion in opinions: | |
| if opinion.position_change is not None: | |
| total_change += abs(opinion.position_change) | |
| # Convergence metric (1 = no change, 0 = maximum change) | |
| max_possible_change = len(opinions) * 6 # Max change is 6 per person | |
| convergence = 1.0 - (total_change / max_possible_change) if max_possible_change > 0 else 1.0 | |
| # Simple clustering by position | |
| clusters = self._cluster_by_position(opinions) | |
| return RoundResult( | |
| round_number=round_number, | |
| opinions=opinions, | |
| average_position=avg_position, | |
| position_variance=variance, | |
| total_change=total_change, | |
| convergence_metric=convergence, | |
| clusters=clusters, | |
| ) | |
| def _cluster_by_position( | |
| self, opinions: List[PersonaOpinion] | |
| ) -> List[List[str]]: | |
| """Group personas with similar positions""" | |
| # Group by position | |
| position_groups: Dict[str, List[str]] = {} | |
| for opinion in opinions: | |
| pos = opinion.position.value | |
| if pos not in position_groups: | |
| position_groups[pos] = [] | |
| position_groups[pos].append(opinion.persona_id) | |
| return list(position_groups.values()) | |