Spaces:
Sleeping
Sleeping
| import math | |
| from typing import List, Dict, Any, Tuple | |
| import json | |
| import os | |
| class ConsensusEngine: | |
| """ | |
| Computes Canonical Score using Trust Weighted Frequency and TF-IDF Distinctiveness. | |
| """ | |
| def __init__(self, output_dir: str = "data/knowledge/consensus"): | |
| self.output_dir = output_dir | |
| # Simulated source trust (could be dynamic) | |
| self.source_trust = { | |
| "wikidata": 1.0, | |
| "anilist": 0.9, | |
| "danbooru": 0.75 | |
| } | |
| def compute_tfidf(self, entity_traits: Dict[str, List[Dict]], total_entities: int) -> Dict[str, float]: | |
| """Computes Distinctiveness (TF-IDF equivalent) for each trait.""" | |
| trait_document_freq = {} | |
| for entity, traits in entity_traits.items(): | |
| unique_traits = set(t["trait"] for t in traits) | |
| for t in unique_traits: | |
| trait_document_freq[t] = trait_document_freq.get(t, 0) + 1 | |
| distinctiveness = {} | |
| for t, freq in trait_document_freq.items(): | |
| distinctiveness[t] = math.log(total_entities / (1 + freq)) | |
| return distinctiveness | |
| def resolve_conflicts(self, scored_traits: List[Dict], conflicts: List[Tuple[str, str]]) -> List[Dict]: | |
| """Removes lower-scoring traits that conflict with higher-scoring ones.""" | |
| # Sort by score desc | |
| scored_traits.sort(key=lambda x: x["canonical_score"], reverse=True) | |
| accepted = [] | |
| rejected = set() | |
| for trait_record in scored_traits: | |
| trait = trait_record["trait"] | |
| if trait in rejected: | |
| continue | |
| accepted.append(trait_record) | |
| # Find and reject conflicting traits | |
| for a, b in conflicts: | |
| if trait == a: | |
| rejected.add(b) | |
| elif trait == b: | |
| rejected.add(a) | |
| return accepted | |
| def calculate_consensus( | |
| self, | |
| entity_id: str, | |
| raw_observations: List[Dict], | |
| distinctiveness_map: Dict[str, float] | |
| ) -> List[Dict]: | |
| """ | |
| raw_observations format: {"trait": "pink_hair", "source": "danbooru", "count": 5} | |
| """ | |
| trait_freq = {} | |
| trait_sources = {} | |
| for obs in raw_observations: | |
| trait = obs["trait"] | |
| source = obs["source"] | |
| trust = self.source_trust.get(source, 0.5) | |
| # Trust-weighted frequency | |
| weighted_count = obs.get("count", 1) * trust | |
| if trait not in trait_freq: | |
| trait_freq[trait] = 0 | |
| trait_sources[trait] = set() | |
| trait_freq[trait] += weighted_count | |
| trait_sources[trait].add(source) | |
| max_freq = max(trait_freq.values()) if trait_freq else 1.0 | |
| results = [] | |
| for trait, freq in trait_freq.items(): | |
| sources = len(trait_sources[trait]) | |
| # Local Confidence (LC) | |
| lc = freq / max_freq | |
| # Distinctiveness (TF-IDF) | |
| distinctiveness = distinctiveness_map.get(trait, 1.0) | |
| # Normalize distinctiveness somewhat to keep it in a sane range | |
| norm_dist = min(distinctiveness / 5.0, 1.0) | |
| # Source Multiplier (SM) | |
| sm = min(1.0, 0.5 + (0.25 * sources)) | |
| # Canonical Score | |
| alpha = 0.7 | |
| beta = 0.3 | |
| score = sm * (alpha * lc + beta * norm_dist) | |
| if score >= 0.4: # Threshold | |
| results.append({ | |
| "trait": trait, | |
| "confidence": lc, | |
| "canonical_score": score, | |
| "frequency": int(freq), | |
| "source_count": sources | |
| }) | |
| return results | |
| def run_pipeline(self, all_entities_data: Dict[str, List[Dict]], conflicts: List[Tuple[str, str]]): | |
| total_entities = len(all_entities_data) | |
| dist_map = self.compute_tfidf(all_entities_data, total_entities) | |
| final_consensus = {} | |
| for entity_id, observations in all_entities_data.items(): | |
| scored = self.calculate_consensus(entity_id, observations, dist_map) | |
| resolved = self.resolve_conflicts(scored, conflicts) | |
| final_consensus[entity_id] = resolved | |
| # Save to data lake (consensus stage) | |
| os.makedirs(self.output_dir, exist_ok=True) | |
| with open(os.path.join(self.output_dir, "consensus_scores.json"), "w") as f: | |
| json.dump(final_consensus, f, indent=2) | |
| return final_consensus | |