""" Trust and freshness management for sources and knowledge. Tracks source reliability and content freshness over time. """ import json import logging from datetime import datetime, timedelta from pathlib import Path from typing import Dict, Any, Optional, List logger = logging.getLogger(__name__) class TrustManager: """Manages trust scores and freshness for sources.""" def __init__(self, data_dir: str): self.data_dir = Path(data_dir) / "learning" self.data_dir.mkdir(parents=True, exist_ok=True) self.trust_file = self.data_dir / "source_trust.json" self.freshness_file = self.data_dir / "freshness_scores.json" # Initialize files if they don't exist if not self.trust_file.exists(): self._save_trust_data({}) if not self.freshness_file.exists(): self._save_freshness_data({}) def get_trust_score(self, source: str) -> float: """ Get trust score for a source. Args: source: Source identifier (URL or name) Returns: Trust score (0.0 to 1.0) """ trust_data = self._load_trust_data() source_data = trust_data.get(source, {}) return source_data.get("trust_score", 0.5) # Default to neutral def update_trust(self, source: str, verification_outcome: bool, weight: float = 1.0): """ Update trust score based on verification outcome. Args: source: Source identifier verification_outcome: True if verified, False if not weight: Weight of this update (0.0 to 1.0) """ trust_data = self._load_trust_data() if source not in trust_data: trust_data[source] = { "trust_score": 0.5, "verification_count": 0, "success_count": 0, "last_updated": datetime.utcnow().isoformat(), } source_data = trust_data[source] # Update counts source_data["verification_count"] += 1 if verification_outcome: source_data["success_count"] += 1 # Calculate new trust score using exponential moving average current_score = source_data["trust_score"] outcome_score = 1.0 if verification_outcome else 0.0 alpha = 0.1 * weight # Learning rate new_score = (1 - alpha) * current_score + alpha * outcome_score source_data["trust_score"] = new_score source_data["last_updated"] = datetime.utcnow().isoformat() trust_data[source] = source_data self._save_trust_data(trust_data) logger.info(f"Updated trust for {source}: {new_score:.3f} (outcome={verification_outcome})") def list_trusted_sources(self, min_trust: float = 0.7, min_verifications: int = 3) -> List[Dict[str, Any]]: """ List trusted sources. Args: min_trust: Minimum trust score min_verifications: Minimum number of verifications Returns: List of trusted sources """ trust_data = self._load_trust_data() trusted = [] for source, data in trust_data.items(): if data["trust_score"] >= min_trust and data["verification_count"] >= min_verifications: trusted.append({ "source": source, "trust_score": data["trust_score"], "verification_count": data["verification_count"], "success_rate": data["success_count"] / data["verification_count"], }) # Sort by trust score descending trusted.sort(key=lambda x: x["trust_score"], reverse=True) return trusted def list_untrusted_sources(self, max_trust: float = 0.3, min_verifications: int = 3) -> List[Dict[str, Any]]: """ List untrusted sources. Args: max_trust: Maximum trust score min_verifications: Minimum number of verifications Returns: List of untrusted sources """ trust_data = self._load_trust_data() untrusted = [] for source, data in trust_data.items(): if data["trust_score"] <= max_trust and data["verification_count"] >= min_verifications: untrusted.append({ "source": source, "trust_score": data["trust_score"], "verification_count": data["verification_count"], "success_rate": data["success_count"] / data["verification_count"], }) # Sort by trust score ascending untrusted.sort(key=lambda x: x["trust_score"]) return untrusted def calculate_freshness(self, item: Dict[str, Any], domain: Optional[str] = None) -> float: """ Calculate freshness score for a knowledge item. Args: item: Knowledge item domain: Domain for domain-specific rules Returns: Freshness score (0.0 to 1.0) """ # Get age in days saved_at = datetime.fromisoformat(item.get("saved_at", datetime.utcnow().isoformat())) age_days = (datetime.utcnow() - saved_at).days # Domain-specific expiration rules if domain == "finance": # Financial data expires quickly half_life_days = 7 # 50% fresh after 7 days else: # General knowledge expires slowly half_life_days = 30 # 50% fresh after 30 days # Calculate freshness using exponential decay freshness = 2 ** (-age_days / half_life_days) return max(0.0, min(1.0, freshness)) def update_freshness(self, item_id: str, freshness_score: float): """ Update freshness score for an item. Args: item_id: Item ID freshness_score: New freshness score """ freshness_data = self._load_freshness_data() freshness_data[item_id] = { "freshness_score": freshness_score, "last_updated": datetime.utcnow().isoformat(), } self._save_freshness_data(freshness_data) def get_stale_items(self, items: List[Dict[str, Any]], threshold: float = 0.3) -> List[Dict[str, Any]]: """ Get stale items that need refreshing. Args: items: List of knowledge items threshold: Freshness threshold Returns: List of stale items """ stale = [] for item in items: freshness = self.calculate_freshness(item) if freshness < threshold: stale.append({ "item_id": item.get("id"), "title": item.get("title"), "freshness": freshness, "age_days": (datetime.utcnow() - datetime.fromisoformat(item.get("saved_at", datetime.utcnow().isoformat()))).days, }) # Sort by freshness ascending (stalest first) stale.sort(key=lambda x: x["freshness"]) return stale def recommend_refresh(self, stale_items: List[Dict[str, Any]], max_recommendations: int = 10) -> List[Dict[str, Any]]: """ Recommend items to refresh. Args: stale_items: List of stale items max_recommendations: Maximum number of recommendations Returns: List of recommended items to refresh """ # Prioritize by staleness and importance recommendations = stale_items[:max_recommendations] return recommendations def _load_trust_data(self) -> Dict[str, Any]: """Load trust data from disk.""" with open(self.trust_file, 'r') as f: return json.load(f) def _save_trust_data(self, data: Dict[str, Any]): """Save trust data to disk.""" with open(self.trust_file, 'w') as f: json.dump(data, f, indent=2) def _load_freshness_data(self) -> Dict[str, Any]: """Load freshness data from disk.""" with open(self.freshness_file, 'r') as f: return json.load(f) def _save_freshness_data(self, data: Dict[str, Any]): """Save freshness data to disk.""" with open(self.freshness_file, 'w') as f: json.dump(data, f, indent=2)