Spaces:
Running
Running
| """ | |
| Knowledge Universe — Knowledge Decay Engine (Core IP, v2.2) | |
| ========================================================== | |
| Calculates decay [0.0 - 1.0] and the Penalty Multiplier for ranking. | |
| v2.2 — Enterprise First-Class Fields: | |
| - Added `max_decay_detected` as a first-class field in compute_batch_summary() | |
| - This eliminates adapter-side derivation for enterprise clients (ReconAI, etc.) | |
| - Added `worst_source_id` for graph labeling/tooltips (Dwayne's TrustGraph need) | |
| - Unknown date penalty raised from 0.6 → 0.65 | |
| - Added DECAY_ENGINE_VERSION for cache invalidation safety. | |
| - Added decay_velocity and days_until_stale directly into the core object. | |
| """ | |
| import logging | |
| import math | |
| from dataclasses import dataclass, field | |
| from datetime import datetime, timezone | |
| from typing import Dict, List, Optional | |
| logger = logging.getLogger(__name__) | |
| # Version constant to protect cache against silent staleness | |
| DECAY_ENGINE_VERSION = "v2.2" | |
| # Half-lives tuned per platform volatility | |
| HALF_LIVES: Dict[str, int] = { | |
| "arxiv": 1095, # 3 years | |
| "wikipedia": 1460, # 4 years | |
| "openlibrary": 1825, # 5 years | |
| "mit_ocw": 1095, # 3 years | |
| "stackoverflow": 365, # 1 year | |
| "github": 180, # 6 months | |
| "huggingface": 120, # 4 months (ML moves fast) | |
| "kaggle": 365, # 1 year | |
| "youtube": 270, # 9 months | |
| "podcast": 180, # 6 months | |
| "common_crawl": 90, # 3 months | |
| "gharchive": 180, # 6 months | |
| "libgen": 1825, # 5 years | |
| } | |
| DEFAULT_HALF_LIFE = 365 | |
| # Unknown date penalty — sources with no date get this multiplier | |
| _UNKNOWN_DATE_PENALTY = 0.65 | |
| class DecayReport: | |
| source_id: str | |
| decay_score: float # 0.0 (fresh) → 1.0 (decayed) | |
| freshness: float # 1.0 (fresh) → 0.0 (decayed) | |
| age_days: Optional[int] | |
| half_life: int | |
| source_type: str | |
| label: str | |
| computed_at: str | |
| penalty_multiplier: float # Multiplied against quality score | |
| # Enterprise metrics baked into core IP | |
| decay_velocity: float | |
| days_until_stale: int | |
| version: str = DECAY_ENGINE_VERSION | |
| def as_dict(self) -> dict: | |
| return { | |
| "decay_score": round(self.decay_score, 3), | |
| "freshness": round(self.freshness, 3), | |
| "age_days": self.age_days, | |
| "label": self.label, | |
| "penalty_multiplier": round(self.penalty_multiplier, 3), | |
| "decay_velocity": round(self.decay_velocity, 6), | |
| "days_until_stale": self.days_until_stale, | |
| "version": self.version, | |
| } | |
| class KnowledgeDecayEngine: | |
| """ | |
| Core IP: Penalizes stale content in the ranking pipeline. | |
| Formula: Final Score = Base Quality * Decay Penalty | |
| """ | |
| def compute_from_dict( | |
| self, | |
| source_dict: dict, | |
| customer: Optional[Dict] = None | |
| ) -> DecayReport: | |
| platform = source_dict.get("source_platform", "unknown") | |
| half_life = HALF_LIVES.get(platform, DEFAULT_HALF_LIFE) | |
| if customer and isinstance(customer, dict): | |
| overrides = customer.get("half_life_overrides", {}) | |
| if platform in overrides: | |
| override_val = int(overrides[platform]) | |
| logger.debug( | |
| f"Customer half-life override: {platform} " | |
| f"{half_life}→{override_val} days" | |
| ) | |
| half_life = override_val | |
| pub_raw = ( | |
| source_dict.get("publication_date") | |
| or source_dict.get("last_updated") | |
| ) | |
| if not pub_raw: | |
| return self._unknown_report( | |
| source_dict.get("id", "unknown"), platform, half_life | |
| ) | |
| try: | |
| if isinstance(pub_raw, str): | |
| pub_raw = pub_raw.replace("Z", "+00:00") | |
| if len(pub_raw) == 10: | |
| pub_raw += "T00:00:00+00:00" | |
| pub_date = datetime.fromisoformat(pub_raw) | |
| else: | |
| pub_date = pub_raw | |
| if pub_date.tzinfo is None: | |
| pub_date = pub_date.replace(tzinfo=timezone.utc) | |
| now = datetime.now(timezone.utc) | |
| age_days = max(0, (now - pub_date).days) | |
| decay = round(1.0 - math.pow(0.5, age_days / half_life), 4) | |
| decay = min(max(decay, 0.0), 1.0) | |
| freshness = round(1.0 - decay, 4) | |
| if decay <= 0.25: | |
| penalty = round(0.90 + (0.10 * freshness), 4) | |
| elif decay <= 0.50: | |
| penalty = round(0.50 + (0.40 * freshness), 4) | |
| elif decay <= 0.75: | |
| penalty = round(0.20 + (0.45 * freshness), 4) | |
| else: | |
| penalty = round(0.05 + (0.25 * freshness), 4) | |
| if age_days < 90: | |
| penalty = max(penalty, 0.95) | |
| velocity = math.log(2) / half_life | |
| if decay >= 0.50: | |
| days_stale = 0 | |
| else: | |
| days_stale = int(max(0, (0.50 - decay) / velocity)) | |
| return DecayReport( | |
| source_id=source_dict.get("id", "unknown"), | |
| decay_score=decay, | |
| freshness=freshness, | |
| age_days=age_days, | |
| half_life=half_life, | |
| source_type=platform, | |
| label=self._label(decay), | |
| computed_at=now.isoformat(), | |
| penalty_multiplier=penalty, | |
| decay_velocity=velocity, | |
| days_until_stale=days_stale, | |
| ) | |
| except Exception as e: | |
| logger.error(f"Decay computation failed for {source_dict.get('id','?')}: {e}") | |
| return self._unknown_report( | |
| source_dict.get("id", "unknown"), platform, half_life | |
| ) | |
| def compute(self, source, customer: Optional[Dict] = None) -> DecayReport: | |
| """Alias — accepts Source model or dict.""" | |
| if hasattr(source, "model_dump"): | |
| return self.compute_from_dict(source.model_dump(), customer=customer) | |
| return self.compute_from_dict(source, customer=customer) | |
| def compute_batch( | |
| self, | |
| sources: List, | |
| customer: Optional[Dict] = None, | |
| ) -> Dict[str, dict]: | |
| """ | |
| Compute decay for a list of sources and return the full per-source | |
| map PLUS first-class enterprise fields: | |
| Returns: | |
| { | |
| "per_source": {source_id: decay_dict, ...}, | |
| "max_decay_detected": 0.711, ← first-class field | |
| "avg_decay_score": 0.234, ← for reference | |
| "worst_source_id": "crossref:...",← for TrustGraph tooltip | |
| "stale_count": 2, | |
| "total_sources": 5, | |
| } | |
| """ | |
| per_source: Dict[str, dict] = {} | |
| max_decay = 0.0 | |
| worst_source_id = None | |
| decay_sum = 0.0 | |
| stale_count = 0 | |
| for s in sources: | |
| try: | |
| report = self.compute(s, customer=customer) | |
| r_dict = report.as_dict() | |
| per_source[report.source_id] = r_dict | |
| score = r_dict["decay_score"] | |
| decay_sum += score | |
| if score > max_decay: | |
| max_decay = score | |
| worst_source_id = report.source_id | |
| if r_dict.get("label") in ("stale", "decayed"): | |
| stale_count += 1 | |
| except Exception as e: | |
| logger.error(f"compute_batch: failed on source {s}: {e}") | |
| # Fallback: try to get id from source | |
| sid = getattr(s, "id", None) or (s.get("id") if isinstance(s, dict) else "unknown") | |
| per_source[sid] = {"decay_score": 0.4, "label": "unknown", "error": str(e)} | |
| n = len(sources) | |
| avg_decay = round(decay_sum / n, 3) if n else 0.0 | |
| return { | |
| "per_source": per_source, | |
| "max_decay_detected": round(max_decay, 3), | |
| "avg_decay_score": avg_decay, | |
| "worst_source_id": worst_source_id, | |
| "stale_count": stale_count, | |
| "total_sources": n, | |
| } | |
| def _label(decay: float) -> str: | |
| if decay < 0.25: return "fresh" | |
| if decay < 0.50: return "aging" | |
| if decay < 0.75: return "stale" | |
| return "decayed" | |
| def _unknown_report( | |
| self, | |
| sid: str, | |
| platform: str, | |
| half_life: int, | |
| ) -> DecayReport: | |
| """ | |
| Sources with no publication date get a neutral penalty. | |
| age_days=None so downstream math never goes negative. | |
| """ | |
| velocity = math.log(2) / half_life | |
| return DecayReport( | |
| source_id=sid, | |
| decay_score=0.4, | |
| freshness=0.6, | |
| age_days=None, | |
| half_life=half_life, | |
| source_type=platform, | |
| label="unknown", | |
| computed_at=datetime.now(timezone.utc).isoformat(), | |
| penalty_multiplier=_UNKNOWN_DATE_PENALTY, | |
| decay_velocity=velocity, | |
| days_until_stale=180, | |
| ) |