Spaces:
Sleeping
Sleeping
File size: 6,116 Bytes
2b7062a | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 | """
Bayesian Network - Pure Python Implementation
Simulates a simple belief network for question scoring, replacing complex dependencies.
"""
import math
from typing import List, Dict, Tuple, Set, Optional # <--- FIX: Added Optional
from collections import defaultdict
import logging
logger = logging.getLogger(__name__)
class BayesianNetwork:
"""Bayesian Network to track and propagate beliefs across attributes."""
def __init__(self):
# Stores P(Attribute=Value | E_prior)
self.attribute_beliefs = defaultdict(lambda: defaultdict(float))
# Stores dependencies for propagation (simple map)
self.attribute_groups = {
'continent': ['region', 'subRegion', 'hasCoast', 'isIsland'],
'region': ['continent', 'subRegion', 'landlocked'],
'population': ['size'],
'mainReligion': ['language'],
'climate': ['avgTemperature', 'hasMountains'],
}
self.evidence_log: List[Tuple[str, str, str]] = [] # List of (attribute, value, answer)
def build_network(self, items: List, questions: List[Dict]):
"""
Initializes beliefs based on the dataset distribution.
"""
self.attribute_beliefs.clear()
self.evidence_log.clear()
all_attributes = set(q['attribute'] for q in questions)
for attr in all_attributes:
value_counts = defaultdict(int)
total_count = 0
for item in items:
values = item.attributes.get(attr)
if values is None:
continue
if isinstance(values, list):
for v in values:
value_counts[str(v)] += 1
else:
value_counts[str(values)] += 1
total_count += 1
if total_count > 0:
for value, count in value_counts.items():
# Initial belief is P(Attribute=Value)
self.attribute_beliefs[attr][value] = count / total_count
logger.info(f"Bayesian Network initialized with {len(all_attributes)} attributes.")
def update_beliefs(self, question: Dict, answer: str, item_list: List):
"""
Update beliefs based on a direct question and propagate the effect.
"""
attribute = question['attribute']
value = str(question['value'])
self.evidence_log.append((attribute, value, answer))
# 1. Direct Belief Update: Use the distribution of the remaining items
active_items = [i for i in item_list if not i.eliminated]
if not active_items:
return
total_prob = sum(i.probability for i in active_items)
if total_prob < 1e-10:
return
match_prob_sum = 0.0
for item in active_items:
# Use item's inherent match function
dummy_q = {'attribute': attribute, 'value': question['value']}
if item.matches_question(dummy_q):
match_prob_sum += item.probability
# New belief is the sum of probabilities of matching items
new_belief = match_prob_sum / total_prob if total_prob > 0 else 0.0
# Update the belief in the specific attribute/value (simplified approach)
if value in self.attribute_beliefs[attribute]:
current_belief = self.attribute_beliefs[attribute][value]
# Interpolate old belief and new observed belief
updated_belief = (current_belief * 0.5) + (new_belief * 0.5)
self.attribute_beliefs[attribute][value] = updated_belief
# 2. Propagation: Spread this change to related attributes
self._propagate_beliefs_simple(attribute, new_belief, answer)
def _propagate_beliefs_simple(self, source_attr: str, new_belief: float, answer: str):
"""
Propagates belief changes to dependent attributes using a simple heuristic factor.
"""
dependents = set()
for attr, group in self.attribute_groups.items():
if source_attr in group:
dependents.add(attr)
elif attr == source_attr:
dependents.update(group)
prop_factor = 0.10 if answer in ['yes', 'no'] else 0.02 # Stronger prop for decisive answers
for dep_attr in dependents:
if dep_attr in self.attribute_beliefs:
for value in self.attribute_beliefs[dep_attr]:
current_belief = self.attribute_beliefs[dep_attr][value]
if new_belief > 0.7:
# Push dependent belief up
self.attribute_beliefs[dep_attr][value] = min(1.0, current_belief * (1.0 + prop_factor))
elif new_belief < 0.3:
# Pull dependent belief down
self.attribute_beliefs[dep_attr][value] = max(0.0, current_belief * (1.0 - prop_factor))
# Re-normalize dependent beliefs
total = sum(self.attribute_beliefs[dep_attr].values())
if total > 1e-10:
for value in self.attribute_beliefs[dep_attr]:
self.attribute_beliefs[dep_attr][value] /= total
def score_question(self, question: Dict) -> float:
"""
Score a question based on current beliefs.
"""
attribute = question['attribute']
value = str(question['value'])
belief = self.attribute_beliefs.get(attribute, {}).get(value, 0.5)
# Score rewards the confirmation of a strong belief (high confidence)
# 0.5 = low score (high uncertainty on attribute value)
# 1.0 = high score (low uncertainty on attribute value)
belief_score = 1.0 - abs(0.5 - belief) * 2
return belief_score
def reset(self):
"""Reset network"""
self.attribute_beliefs.clear()
self.evidence_log.clear()
|