import math from typing import List, Dict, Any, Tuple import json import os class ConsensusEngine: """ Computes Canonical Score using Trust Weighted Frequency and TF-IDF Distinctiveness. """ def __init__(self, output_dir: str = "data/knowledge/consensus"): self.output_dir = output_dir # Simulated source trust (could be dynamic) self.source_trust = { "wikidata": 1.0, "anilist": 0.9, "danbooru": 0.75 } def compute_tfidf(self, entity_traits: Dict[str, List[Dict]], total_entities: int) -> Dict[str, float]: """Computes Distinctiveness (TF-IDF equivalent) for each trait.""" trait_document_freq = {} for entity, traits in entity_traits.items(): unique_traits = set(t["trait"] for t in traits) for t in unique_traits: trait_document_freq[t] = trait_document_freq.get(t, 0) + 1 distinctiveness = {} for t, freq in trait_document_freq.items(): distinctiveness[t] = math.log(total_entities / (1 + freq)) return distinctiveness def resolve_conflicts(self, scored_traits: List[Dict], conflicts: List[Tuple[str, str]]) -> List[Dict]: """Removes lower-scoring traits that conflict with higher-scoring ones.""" # Sort by score desc scored_traits.sort(key=lambda x: x["canonical_score"], reverse=True) accepted = [] rejected = set() for trait_record in scored_traits: trait = trait_record["trait"] if trait in rejected: continue accepted.append(trait_record) # Find and reject conflicting traits for a, b in conflicts: if trait == a: rejected.add(b) elif trait == b: rejected.add(a) return accepted def calculate_consensus( self, entity_id: str, raw_observations: List[Dict], distinctiveness_map: Dict[str, float] ) -> List[Dict]: """ raw_observations format: {"trait": "pink_hair", "source": "danbooru", "count": 5} """ trait_freq = {} trait_sources = {} for obs in raw_observations: trait = obs["trait"] source = obs["source"] trust = self.source_trust.get(source, 0.5) # Trust-weighted frequency weighted_count = obs.get("count", 1) * trust if trait not in trait_freq: trait_freq[trait] = 0 trait_sources[trait] = set() trait_freq[trait] += weighted_count trait_sources[trait].add(source) max_freq = max(trait_freq.values()) if trait_freq else 1.0 results = [] for trait, freq in trait_freq.items(): sources = len(trait_sources[trait]) # Local Confidence (LC) lc = freq / max_freq # Distinctiveness (TF-IDF) distinctiveness = distinctiveness_map.get(trait, 1.0) # Normalize distinctiveness somewhat to keep it in a sane range norm_dist = min(distinctiveness / 5.0, 1.0) # Source Multiplier (SM) sm = min(1.0, 0.5 + (0.25 * sources)) # Canonical Score alpha = 0.7 beta = 0.3 score = sm * (alpha * lc + beta * norm_dist) if score >= 0.4: # Threshold results.append({ "trait": trait, "confidence": lc, "canonical_score": score, "frequency": int(freq), "source_count": sources }) return results def run_pipeline(self, all_entities_data: Dict[str, List[Dict]], conflicts: List[Tuple[str, str]]): total_entities = len(all_entities_data) dist_map = self.compute_tfidf(all_entities_data, total_entities) final_consensus = {} for entity_id, observations in all_entities_data.items(): scored = self.calculate_consensus(entity_id, observations, dist_map) resolved = self.resolve_conflicts(scored, conflicts) final_consensus[entity_id] = resolved # Save to data lake (consensus stage) os.makedirs(self.output_dir, exist_ok=True) with open(os.path.join(self.output_dir, "consensus_scores.json"), "w") as f: json.dump(final_consensus, f, indent=2) return final_consensus