Knowledge-Universe / src /scoring /decay_engine.py
vlsiddarth's picture
feat: max_decay_detected as first-class field in /v1/discover (v2.2)
e5cdd9c
"""
Knowledge Universe — Knowledge Decay Engine (Core IP, v2.2)
==========================================================
Calculates decay [0.0 - 1.0] and the Penalty Multiplier for ranking.
v2.2 — Enterprise First-Class Fields:
- Added `max_decay_detected` as a first-class field in compute_batch_summary()
- This eliminates adapter-side derivation for enterprise clients (ReconAI, etc.)
- Added `worst_source_id` for graph labeling/tooltips (Dwayne's TrustGraph need)
- Unknown date penalty raised from 0.6 → 0.65
- Added DECAY_ENGINE_VERSION for cache invalidation safety.
- Added decay_velocity and days_until_stale directly into the core object.
"""
import logging
import math
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Dict, List, Optional
logger = logging.getLogger(__name__)
# Version constant to protect cache against silent staleness
DECAY_ENGINE_VERSION = "v2.2"
# Half-lives tuned per platform volatility
HALF_LIVES: Dict[str, int] = {
"arxiv": 1095, # 3 years
"wikipedia": 1460, # 4 years
"openlibrary": 1825, # 5 years
"mit_ocw": 1095, # 3 years
"stackoverflow": 365, # 1 year
"github": 180, # 6 months
"huggingface": 120, # 4 months (ML moves fast)
"kaggle": 365, # 1 year
"youtube": 270, # 9 months
"podcast": 180, # 6 months
"common_crawl": 90, # 3 months
"gharchive": 180, # 6 months
"libgen": 1825, # 5 years
}
DEFAULT_HALF_LIFE = 365
# Unknown date penalty — sources with no date get this multiplier
_UNKNOWN_DATE_PENALTY = 0.65
@dataclass
class DecayReport:
source_id: str
decay_score: float # 0.0 (fresh) → 1.0 (decayed)
freshness: float # 1.0 (fresh) → 0.0 (decayed)
age_days: Optional[int]
half_life: int
source_type: str
label: str
computed_at: str
penalty_multiplier: float # Multiplied against quality score
# Enterprise metrics baked into core IP
decay_velocity: float
days_until_stale: int
version: str = DECAY_ENGINE_VERSION
def as_dict(self) -> dict:
return {
"decay_score": round(self.decay_score, 3),
"freshness": round(self.freshness, 3),
"age_days": self.age_days,
"label": self.label,
"penalty_multiplier": round(self.penalty_multiplier, 3),
"decay_velocity": round(self.decay_velocity, 6),
"days_until_stale": self.days_until_stale,
"version": self.version,
}
class KnowledgeDecayEngine:
"""
Core IP: Penalizes stale content in the ranking pipeline.
Formula: Final Score = Base Quality * Decay Penalty
"""
def compute_from_dict(
self,
source_dict: dict,
customer: Optional[Dict] = None
) -> DecayReport:
platform = source_dict.get("source_platform", "unknown")
half_life = HALF_LIVES.get(platform, DEFAULT_HALF_LIFE)
if customer and isinstance(customer, dict):
overrides = customer.get("half_life_overrides", {})
if platform in overrides:
override_val = int(overrides[platform])
logger.debug(
f"Customer half-life override: {platform} "
f"{half_life}{override_val} days"
)
half_life = override_val
pub_raw = (
source_dict.get("publication_date")
or source_dict.get("last_updated")
)
if not pub_raw:
return self._unknown_report(
source_dict.get("id", "unknown"), platform, half_life
)
try:
if isinstance(pub_raw, str):
pub_raw = pub_raw.replace("Z", "+00:00")
if len(pub_raw) == 10:
pub_raw += "T00:00:00+00:00"
pub_date = datetime.fromisoformat(pub_raw)
else:
pub_date = pub_raw
if pub_date.tzinfo is None:
pub_date = pub_date.replace(tzinfo=timezone.utc)
now = datetime.now(timezone.utc)
age_days = max(0, (now - pub_date).days)
decay = round(1.0 - math.pow(0.5, age_days / half_life), 4)
decay = min(max(decay, 0.0), 1.0)
freshness = round(1.0 - decay, 4)
if decay <= 0.25:
penalty = round(0.90 + (0.10 * freshness), 4)
elif decay <= 0.50:
penalty = round(0.50 + (0.40 * freshness), 4)
elif decay <= 0.75:
penalty = round(0.20 + (0.45 * freshness), 4)
else:
penalty = round(0.05 + (0.25 * freshness), 4)
if age_days < 90:
penalty = max(penalty, 0.95)
velocity = math.log(2) / half_life
if decay >= 0.50:
days_stale = 0
else:
days_stale = int(max(0, (0.50 - decay) / velocity))
return DecayReport(
source_id=source_dict.get("id", "unknown"),
decay_score=decay,
freshness=freshness,
age_days=age_days,
half_life=half_life,
source_type=platform,
label=self._label(decay),
computed_at=now.isoformat(),
penalty_multiplier=penalty,
decay_velocity=velocity,
days_until_stale=days_stale,
)
except Exception as e:
logger.error(f"Decay computation failed for {source_dict.get('id','?')}: {e}")
return self._unknown_report(
source_dict.get("id", "unknown"), platform, half_life
)
def compute(self, source, customer: Optional[Dict] = None) -> DecayReport:
"""Alias — accepts Source model or dict."""
if hasattr(source, "model_dump"):
return self.compute_from_dict(source.model_dump(), customer=customer)
return self.compute_from_dict(source, customer=customer)
def compute_batch(
self,
sources: List,
customer: Optional[Dict] = None,
) -> Dict[str, dict]:
"""
Compute decay for a list of sources and return the full per-source
map PLUS first-class enterprise fields:
Returns:
{
"per_source": {source_id: decay_dict, ...},
"max_decay_detected": 0.711, ← first-class field
"avg_decay_score": 0.234, ← for reference
"worst_source_id": "crossref:...",← for TrustGraph tooltip
"stale_count": 2,
"total_sources": 5,
}
"""
per_source: Dict[str, dict] = {}
max_decay = 0.0
worst_source_id = None
decay_sum = 0.0
stale_count = 0
for s in sources:
try:
report = self.compute(s, customer=customer)
r_dict = report.as_dict()
per_source[report.source_id] = r_dict
score = r_dict["decay_score"]
decay_sum += score
if score > max_decay:
max_decay = score
worst_source_id = report.source_id
if r_dict.get("label") in ("stale", "decayed"):
stale_count += 1
except Exception as e:
logger.error(f"compute_batch: failed on source {s}: {e}")
# Fallback: try to get id from source
sid = getattr(s, "id", None) or (s.get("id") if isinstance(s, dict) else "unknown")
per_source[sid] = {"decay_score": 0.4, "label": "unknown", "error": str(e)}
n = len(sources)
avg_decay = round(decay_sum / n, 3) if n else 0.0
return {
"per_source": per_source,
"max_decay_detected": round(max_decay, 3),
"avg_decay_score": avg_decay,
"worst_source_id": worst_source_id,
"stale_count": stale_count,
"total_sources": n,
}
@staticmethod
def _label(decay: float) -> str:
if decay < 0.25: return "fresh"
if decay < 0.50: return "aging"
if decay < 0.75: return "stale"
return "decayed"
def _unknown_report(
self,
sid: str,
platform: str,
half_life: int,
) -> DecayReport:
"""
Sources with no publication date get a neutral penalty.
age_days=None so downstream math never goes negative.
"""
velocity = math.log(2) / half_life
return DecayReport(
source_id=sid,
decay_score=0.4,
freshness=0.6,
age_days=None,
half_life=half_life,
source_type=platform,
label="unknown",
computed_at=datetime.now(timezone.utc).isoformat(),
penalty_multiplier=_UNKNOWN_DATE_PENALTY,
decay_velocity=velocity,
days_until_stale=180,
)