""" Bayesian Network - Pure Python Implementation Simulates a simple belief network for question scoring, replacing complex dependencies. """ import math from typing import List, Dict, Tuple, Set, Optional # <--- FIX: Added Optional from collections import defaultdict import logging logger = logging.getLogger(__name__) class BayesianNetwork: """Bayesian Network to track and propagate beliefs across attributes.""" def __init__(self): # Stores P(Attribute=Value | E_prior) self.attribute_beliefs = defaultdict(lambda: defaultdict(float)) # Stores dependencies for propagation (simple map) self.attribute_groups = { 'continent': ['region', 'subRegion', 'hasCoast', 'isIsland'], 'region': ['continent', 'subRegion', 'landlocked'], 'population': ['size'], 'mainReligion': ['language'], 'climate': ['avgTemperature', 'hasMountains'], } self.evidence_log: List[Tuple[str, str, str]] = [] # List of (attribute, value, answer) def build_network(self, items: List, questions: List[Dict]): """ Initializes beliefs based on the dataset distribution. """ self.attribute_beliefs.clear() self.evidence_log.clear() all_attributes = set(q['attribute'] for q in questions) for attr in all_attributes: value_counts = defaultdict(int) total_count = 0 for item in items: values = item.attributes.get(attr) if values is None: continue if isinstance(values, list): for v in values: value_counts[str(v)] += 1 else: value_counts[str(values)] += 1 total_count += 1 if total_count > 0: for value, count in value_counts.items(): # Initial belief is P(Attribute=Value) self.attribute_beliefs[attr][value] = count / total_count logger.info(f"Bayesian Network initialized with {len(all_attributes)} attributes.") def update_beliefs(self, question: Dict, answer: str, item_list: List): """ Update beliefs based on a direct question and propagate the effect. """ attribute = question['attribute'] value = str(question['value']) self.evidence_log.append((attribute, value, answer)) # 1. Direct Belief Update: Use the distribution of the remaining items active_items = [i for i in item_list if not i.eliminated] if not active_items: return total_prob = sum(i.probability for i in active_items) if total_prob < 1e-10: return match_prob_sum = 0.0 for item in active_items: # Use item's inherent match function dummy_q = {'attribute': attribute, 'value': question['value']} if item.matches_question(dummy_q): match_prob_sum += item.probability # New belief is the sum of probabilities of matching items new_belief = match_prob_sum / total_prob if total_prob > 0 else 0.0 # Update the belief in the specific attribute/value (simplified approach) if value in self.attribute_beliefs[attribute]: current_belief = self.attribute_beliefs[attribute][value] # Interpolate old belief and new observed belief updated_belief = (current_belief * 0.5) + (new_belief * 0.5) self.attribute_beliefs[attribute][value] = updated_belief # 2. Propagation: Spread this change to related attributes self._propagate_beliefs_simple(attribute, new_belief, answer) def _propagate_beliefs_simple(self, source_attr: str, new_belief: float, answer: str): """ Propagates belief changes to dependent attributes using a simple heuristic factor. """ dependents = set() for attr, group in self.attribute_groups.items(): if source_attr in group: dependents.add(attr) elif attr == source_attr: dependents.update(group) prop_factor = 0.10 if answer in ['yes', 'no'] else 0.02 # Stronger prop for decisive answers for dep_attr in dependents: if dep_attr in self.attribute_beliefs: for value in self.attribute_beliefs[dep_attr]: current_belief = self.attribute_beliefs[dep_attr][value] if new_belief > 0.7: # Push dependent belief up self.attribute_beliefs[dep_attr][value] = min(1.0, current_belief * (1.0 + prop_factor)) elif new_belief < 0.3: # Pull dependent belief down self.attribute_beliefs[dep_attr][value] = max(0.0, current_belief * (1.0 - prop_factor)) # Re-normalize dependent beliefs total = sum(self.attribute_beliefs[dep_attr].values()) if total > 1e-10: for value in self.attribute_beliefs[dep_attr]: self.attribute_beliefs[dep_attr][value] /= total def score_question(self, question: Dict) -> float: """ Score a question based on current beliefs. """ attribute = question['attribute'] value = str(question['value']) belief = self.attribute_beliefs.get(attribute, {}).get(value, 0.5) # Score rewards the confirmation of a strong belief (high confidence) # 0.5 = low score (high uncertainty on attribute value) # 1.0 = high score (low uncertainty on attribute value) belief_score = 1.0 - abs(0.5 - belief) * 2 return belief_score def reset(self): """Reset network""" self.attribute_beliefs.clear() self.evidence_log.clear()