""" Knowledge Universe — Knowledge Decay Engine (Core IP, v2.2) ========================================================== Calculates decay [0.0 - 1.0] and the Penalty Multiplier for ranking. v2.2 — Enterprise First-Class Fields: - Added `max_decay_detected` as a first-class field in compute_batch_summary() - This eliminates adapter-side derivation for enterprise clients (ReconAI, etc.) - Added `worst_source_id` for graph labeling/tooltips (Dwayne's TrustGraph need) - Unknown date penalty raised from 0.6 → 0.65 - Added DECAY_ENGINE_VERSION for cache invalidation safety. - Added decay_velocity and days_until_stale directly into the core object. """ import logging import math from dataclasses import dataclass, field from datetime import datetime, timezone from typing import Dict, List, Optional logger = logging.getLogger(__name__) # Version constant to protect cache against silent staleness DECAY_ENGINE_VERSION = "v2.2" # Half-lives tuned per platform volatility HALF_LIVES: Dict[str, int] = { "arxiv": 1095, # 3 years "wikipedia": 1460, # 4 years "openlibrary": 1825, # 5 years "mit_ocw": 1095, # 3 years "stackoverflow": 365, # 1 year "github": 180, # 6 months "huggingface": 120, # 4 months (ML moves fast) "kaggle": 365, # 1 year "youtube": 270, # 9 months "podcast": 180, # 6 months "common_crawl": 90, # 3 months "gharchive": 180, # 6 months "libgen": 1825, # 5 years } DEFAULT_HALF_LIFE = 365 # Unknown date penalty — sources with no date get this multiplier _UNKNOWN_DATE_PENALTY = 0.65 @dataclass class DecayReport: source_id: str decay_score: float # 0.0 (fresh) → 1.0 (decayed) freshness: float # 1.0 (fresh) → 0.0 (decayed) age_days: Optional[int] half_life: int source_type: str label: str computed_at: str penalty_multiplier: float # Multiplied against quality score # Enterprise metrics baked into core IP decay_velocity: float days_until_stale: int version: str = DECAY_ENGINE_VERSION def as_dict(self) -> dict: return { "decay_score": round(self.decay_score, 3), "freshness": round(self.freshness, 3), "age_days": self.age_days, "label": self.label, "penalty_multiplier": round(self.penalty_multiplier, 3), "decay_velocity": round(self.decay_velocity, 6), "days_until_stale": self.days_until_stale, "version": self.version, } class KnowledgeDecayEngine: """ Core IP: Penalizes stale content in the ranking pipeline. Formula: Final Score = Base Quality * Decay Penalty """ def compute_from_dict( self, source_dict: dict, customer: Optional[Dict] = None ) -> DecayReport: platform = source_dict.get("source_platform", "unknown") half_life = HALF_LIVES.get(platform, DEFAULT_HALF_LIFE) if customer and isinstance(customer, dict): overrides = customer.get("half_life_overrides", {}) if platform in overrides: override_val = int(overrides[platform]) logger.debug( f"Customer half-life override: {platform} " f"{half_life}→{override_val} days" ) half_life = override_val pub_raw = ( source_dict.get("publication_date") or source_dict.get("last_updated") ) if not pub_raw: return self._unknown_report( source_dict.get("id", "unknown"), platform, half_life ) try: if isinstance(pub_raw, str): pub_raw = pub_raw.replace("Z", "+00:00") if len(pub_raw) == 10: pub_raw += "T00:00:00+00:00" pub_date = datetime.fromisoformat(pub_raw) else: pub_date = pub_raw if pub_date.tzinfo is None: pub_date = pub_date.replace(tzinfo=timezone.utc) now = datetime.now(timezone.utc) age_days = max(0, (now - pub_date).days) decay = round(1.0 - math.pow(0.5, age_days / half_life), 4) decay = min(max(decay, 0.0), 1.0) freshness = round(1.0 - decay, 4) if decay <= 0.25: penalty = round(0.90 + (0.10 * freshness), 4) elif decay <= 0.50: penalty = round(0.50 + (0.40 * freshness), 4) elif decay <= 0.75: penalty = round(0.20 + (0.45 * freshness), 4) else: penalty = round(0.05 + (0.25 * freshness), 4) if age_days < 90: penalty = max(penalty, 0.95) velocity = math.log(2) / half_life if decay >= 0.50: days_stale = 0 else: days_stale = int(max(0, (0.50 - decay) / velocity)) return DecayReport( source_id=source_dict.get("id", "unknown"), decay_score=decay, freshness=freshness, age_days=age_days, half_life=half_life, source_type=platform, label=self._label(decay), computed_at=now.isoformat(), penalty_multiplier=penalty, decay_velocity=velocity, days_until_stale=days_stale, ) except Exception as e: logger.error(f"Decay computation failed for {source_dict.get('id','?')}: {e}") return self._unknown_report( source_dict.get("id", "unknown"), platform, half_life ) def compute(self, source, customer: Optional[Dict] = None) -> DecayReport: """Alias — accepts Source model or dict.""" if hasattr(source, "model_dump"): return self.compute_from_dict(source.model_dump(), customer=customer) return self.compute_from_dict(source, customer=customer) def compute_batch( self, sources: List, customer: Optional[Dict] = None, ) -> Dict[str, dict]: """ Compute decay for a list of sources and return the full per-source map PLUS first-class enterprise fields: Returns: { "per_source": {source_id: decay_dict, ...}, "max_decay_detected": 0.711, ← first-class field "avg_decay_score": 0.234, ← for reference "worst_source_id": "crossref:...",← for TrustGraph tooltip "stale_count": 2, "total_sources": 5, } """ per_source: Dict[str, dict] = {} max_decay = 0.0 worst_source_id = None decay_sum = 0.0 stale_count = 0 for s in sources: try: report = self.compute(s, customer=customer) r_dict = report.as_dict() per_source[report.source_id] = r_dict score = r_dict["decay_score"] decay_sum += score if score > max_decay: max_decay = score worst_source_id = report.source_id if r_dict.get("label") in ("stale", "decayed"): stale_count += 1 except Exception as e: logger.error(f"compute_batch: failed on source {s}: {e}") # Fallback: try to get id from source sid = getattr(s, "id", None) or (s.get("id") if isinstance(s, dict) else "unknown") per_source[sid] = {"decay_score": 0.4, "label": "unknown", "error": str(e)} n = len(sources) avg_decay = round(decay_sum / n, 3) if n else 0.0 return { "per_source": per_source, "max_decay_detected": round(max_decay, 3), "avg_decay_score": avg_decay, "worst_source_id": worst_source_id, "stale_count": stale_count, "total_sources": n, } @staticmethod def _label(decay: float) -> str: if decay < 0.25: return "fresh" if decay < 0.50: return "aging" if decay < 0.75: return "stale" return "decayed" def _unknown_report( self, sid: str, platform: str, half_life: int, ) -> DecayReport: """ Sources with no publication date get a neutral penalty. age_days=None so downstream math never goes negative. """ velocity = math.log(2) / half_life return DecayReport( source_id=sid, decay_score=0.4, freshness=0.6, age_days=None, half_life=half_life, source_type=platform, label="unknown", computed_at=datetime.now(timezone.utc).isoformat(), penalty_multiplier=_UNKNOWN_DATE_PENALTY, decay_velocity=velocity, days_until_stale=180, )