JairoDanielMT's picture
Upload folder using huggingface_hub
4ef6c2b verified
Raw
History Blame Contribute Delete
4.98 kB
import math
from typing import List, Dict, Any, Tuple
import json
import os
class ConsensusEngine:
"""
Computes Canonical Score using Trust Weighted Frequency and TF-IDF Distinctiveness.
"""
def __init__(self, output_dir: str = "data/knowledge/consensus"):
self.output_dir = output_dir
# Simulated source trust (could be dynamic)
self.source_trust = {
"wikidata": 1.0,
"anilist": 0.9,
"danbooru": 0.75
}
def compute_tfidf(self, entity_traits: Dict[str, List[Dict]], total_entities: int) -> Dict[str, float]:
"""Computes Distinctiveness (TF-IDF equivalent) for each trait."""
trait_document_freq = {}
for entity, traits in entity_traits.items():
unique_traits = set(t["trait"] for t in traits)
for t in unique_traits:
trait_document_freq[t] = trait_document_freq.get(t, 0) + 1
distinctiveness = {}
for t, freq in trait_document_freq.items():
distinctiveness[t] = math.log(total_entities / (1 + freq))
return distinctiveness
def resolve_conflicts(self, scored_traits: List[Dict], conflicts: List[Tuple[str, str]]) -> List[Dict]:
"""Removes lower-scoring traits that conflict with higher-scoring ones."""
# Sort by score desc
scored_traits.sort(key=lambda x: x["canonical_score"], reverse=True)
accepted = []
rejected = set()
for trait_record in scored_traits:
trait = trait_record["trait"]
if trait in rejected:
continue
accepted.append(trait_record)
# Find and reject conflicting traits
for a, b in conflicts:
if trait == a:
rejected.add(b)
elif trait == b:
rejected.add(a)
return accepted
def calculate_consensus(
self,
entity_id: str,
raw_observations: List[Dict],
distinctiveness_map: Dict[str, float]
) -> List[Dict]:
"""
raw_observations format: {"trait": "pink_hair", "source": "danbooru", "count": 5}
"""
trait_freq = {}
trait_sources = {}
for obs in raw_observations:
trait = obs["trait"]
source = obs["source"]
trust = self.source_trust.get(source, 0.5)
# Trust-weighted frequency
weighted_count = obs.get("count", 1) * trust
if trait not in trait_freq:
trait_freq[trait] = 0
trait_sources[trait] = set()
trait_freq[trait] += weighted_count
trait_sources[trait].add(source)
max_freq = max(trait_freq.values()) if trait_freq else 1.0
results = []
for trait, freq in trait_freq.items():
sources = len(trait_sources[trait])
# Local Confidence (LC)
lc = freq / max_freq
# Distinctiveness (TF-IDF)
distinctiveness = distinctiveness_map.get(trait, 1.0)
# Normalize distinctiveness somewhat to keep it in a sane range
norm_dist = min(distinctiveness / 5.0, 1.0)
# Source Multiplier (SM)
sm = min(1.0, 0.5 + (0.25 * sources))
# Canonical Score
alpha = 0.7
beta = 0.3
score = sm * (alpha * lc + beta * norm_dist)
if score >= 0.4: # Threshold
results.append({
"trait": trait,
"confidence": lc,
"canonical_score": score,
"frequency": int(freq),
"source_count": sources
})
return results
def run_pipeline(self, all_entities_data: Dict[str, List[Dict]], conflicts: List[Tuple[str, str]]):
total_entities = len(all_entities_data)
dist_map = self.compute_tfidf(all_entities_data, total_entities)
final_consensus = {}
for entity_id, observations in all_entities_data.items():
scored = self.calculate_consensus(entity_id, observations, dist_map)
resolved = self.resolve_conflicts(scored, conflicts)
final_consensus[entity_id] = resolved
# Save to data lake (consensus stage)
os.makedirs(self.output_dir, exist_ok=True)
with open(os.path.join(self.output_dir, "consensus_scores.json"), "w") as f:
json.dump(final_consensus, f, indent=2)
return final_consensus