Janus-backend / backend /app /services /learning /trust_manager.py
DevodG's picture
deploy: Janus full system stabilization
24f95f0
"""
Trust and freshness management for sources and knowledge.
Tracks source reliability and content freshness over time.
"""
import json
import logging
from datetime import datetime, timedelta
from pathlib import Path
from typing import Dict, Any, Optional, List
logger = logging.getLogger(__name__)
class TrustManager:
"""Manages trust scores and freshness for sources."""
def __init__(self, data_dir: str):
self.data_dir = Path(data_dir) / "learning"
self.data_dir.mkdir(parents=True, exist_ok=True)
self.trust_file = self.data_dir / "source_trust.json"
self.freshness_file = self.data_dir / "freshness_scores.json"
# Initialize files if they don't exist
if not self.trust_file.exists():
self._save_trust_data({})
if not self.freshness_file.exists():
self._save_freshness_data({})
def get_trust_score(self, source: str) -> float:
"""
Get trust score for a source.
Args:
source: Source identifier (URL or name)
Returns:
Trust score (0.0 to 1.0)
"""
trust_data = self._load_trust_data()
source_data = trust_data.get(source, {})
return source_data.get("trust_score", 0.5) # Default to neutral
def update_trust(self, source: str, verification_outcome: bool, weight: float = 1.0):
"""
Update trust score based on verification outcome.
Args:
source: Source identifier
verification_outcome: True if verified, False if not
weight: Weight of this update (0.0 to 1.0)
"""
trust_data = self._load_trust_data()
if source not in trust_data:
trust_data[source] = {
"trust_score": 0.5,
"verification_count": 0,
"success_count": 0,
"last_updated": datetime.utcnow().isoformat(),
}
source_data = trust_data[source]
# Update counts
source_data["verification_count"] += 1
if verification_outcome:
source_data["success_count"] += 1
# Calculate new trust score using exponential moving average
current_score = source_data["trust_score"]
outcome_score = 1.0 if verification_outcome else 0.0
alpha = 0.1 * weight # Learning rate
new_score = (1 - alpha) * current_score + alpha * outcome_score
source_data["trust_score"] = new_score
source_data["last_updated"] = datetime.utcnow().isoformat()
trust_data[source] = source_data
self._save_trust_data(trust_data)
logger.info(f"Updated trust for {source}: {new_score:.3f} (outcome={verification_outcome})")
def list_trusted_sources(self, min_trust: float = 0.7, min_verifications: int = 3) -> List[Dict[str, Any]]:
"""
List trusted sources.
Args:
min_trust: Minimum trust score
min_verifications: Minimum number of verifications
Returns:
List of trusted sources
"""
trust_data = self._load_trust_data()
trusted = []
for source, data in trust_data.items():
if data["trust_score"] >= min_trust and data["verification_count"] >= min_verifications:
trusted.append({
"source": source,
"trust_score": data["trust_score"],
"verification_count": data["verification_count"],
"success_rate": data["success_count"] / data["verification_count"],
})
# Sort by trust score descending
trusted.sort(key=lambda x: x["trust_score"], reverse=True)
return trusted
def list_untrusted_sources(self, max_trust: float = 0.3, min_verifications: int = 3) -> List[Dict[str, Any]]:
"""
List untrusted sources.
Args:
max_trust: Maximum trust score
min_verifications: Minimum number of verifications
Returns:
List of untrusted sources
"""
trust_data = self._load_trust_data()
untrusted = []
for source, data in trust_data.items():
if data["trust_score"] <= max_trust and data["verification_count"] >= min_verifications:
untrusted.append({
"source": source,
"trust_score": data["trust_score"],
"verification_count": data["verification_count"],
"success_rate": data["success_count"] / data["verification_count"],
})
# Sort by trust score ascending
untrusted.sort(key=lambda x: x["trust_score"])
return untrusted
def calculate_freshness(self, item: Dict[str, Any], domain: Optional[str] = None) -> float:
"""
Calculate freshness score for a knowledge item.
Args:
item: Knowledge item
domain: Domain for domain-specific rules
Returns:
Freshness score (0.0 to 1.0)
"""
# Get age in days
saved_at = datetime.fromisoformat(item.get("saved_at", datetime.utcnow().isoformat()))
age_days = (datetime.utcnow() - saved_at).days
# Domain-specific expiration rules
if domain == "finance":
# Financial data expires quickly
half_life_days = 7 # 50% fresh after 7 days
else:
# General knowledge expires slowly
half_life_days = 30 # 50% fresh after 30 days
# Calculate freshness using exponential decay
freshness = 2 ** (-age_days / half_life_days)
return max(0.0, min(1.0, freshness))
def update_freshness(self, item_id: str, freshness_score: float):
"""
Update freshness score for an item.
Args:
item_id: Item ID
freshness_score: New freshness score
"""
freshness_data = self._load_freshness_data()
freshness_data[item_id] = {
"freshness_score": freshness_score,
"last_updated": datetime.utcnow().isoformat(),
}
self._save_freshness_data(freshness_data)
def get_stale_items(self, items: List[Dict[str, Any]], threshold: float = 0.3) -> List[Dict[str, Any]]:
"""
Get stale items that need refreshing.
Args:
items: List of knowledge items
threshold: Freshness threshold
Returns:
List of stale items
"""
stale = []
for item in items:
freshness = self.calculate_freshness(item)
if freshness < threshold:
stale.append({
"item_id": item.get("id"),
"title": item.get("title"),
"freshness": freshness,
"age_days": (datetime.utcnow() - datetime.fromisoformat(item.get("saved_at", datetime.utcnow().isoformat()))).days,
})
# Sort by freshness ascending (stalest first)
stale.sort(key=lambda x: x["freshness"])
return stale
def recommend_refresh(self, stale_items: List[Dict[str, Any]], max_recommendations: int = 10) -> List[Dict[str, Any]]:
"""
Recommend items to refresh.
Args:
stale_items: List of stale items
max_recommendations: Maximum number of recommendations
Returns:
List of recommended items to refresh
"""
# Prioritize by staleness and importance
recommendations = stale_items[:max_recommendations]
return recommendations
def _load_trust_data(self) -> Dict[str, Any]:
"""Load trust data from disk."""
with open(self.trust_file, 'r') as f:
return json.load(f)
def _save_trust_data(self, data: Dict[str, Any]):
"""Save trust data to disk."""
with open(self.trust_file, 'w') as f:
json.dump(data, f, indent=2)
def _load_freshness_data(self) -> Dict[str, Any]:
"""Load freshness data from disk."""
with open(self.freshness_file, 'r') as f:
return json.load(f)
def _save_freshness_data(self, data: Dict[str, Any]):
"""Save freshness data to disk."""
with open(self.freshness_file, 'w') as f:
json.dump(data, f, indent=2)