File size: 8,688 Bytes
24f95f0
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
"""
Trust and freshness management for sources and knowledge.

Tracks source reliability and content freshness over time.
"""

import json
import logging
from datetime import datetime, timedelta
from pathlib import Path
from typing import Dict, Any, Optional, List

logger = logging.getLogger(__name__)


class TrustManager:
    """Manages trust scores and freshness for sources."""
    
    def __init__(self, data_dir: str):
        self.data_dir = Path(data_dir) / "learning"
        self.data_dir.mkdir(parents=True, exist_ok=True)
        self.trust_file = self.data_dir / "source_trust.json"
        self.freshness_file = self.data_dir / "freshness_scores.json"
        
        # Initialize files if they don't exist
        if not self.trust_file.exists():
            self._save_trust_data({})
        if not self.freshness_file.exists():
            self._save_freshness_data({})
    
    def get_trust_score(self, source: str) -> float:
        """
        Get trust score for a source.
        
        Args:
            source: Source identifier (URL or name)
            
        Returns:
            Trust score (0.0 to 1.0)
        """
        trust_data = self._load_trust_data()
        source_data = trust_data.get(source, {})
        return source_data.get("trust_score", 0.5)  # Default to neutral
    
    def update_trust(self, source: str, verification_outcome: bool, weight: float = 1.0):
        """
        Update trust score based on verification outcome.
        
        Args:
            source: Source identifier
            verification_outcome: True if verified, False if not
            weight: Weight of this update (0.0 to 1.0)
        """
        trust_data = self._load_trust_data()
        
        if source not in trust_data:
            trust_data[source] = {
                "trust_score": 0.5,
                "verification_count": 0,
                "success_count": 0,
                "last_updated": datetime.utcnow().isoformat(),
            }
        
        source_data = trust_data[source]
        
        # Update counts
        source_data["verification_count"] += 1
        if verification_outcome:
            source_data["success_count"] += 1
        
        # Calculate new trust score using exponential moving average
        current_score = source_data["trust_score"]
        outcome_score = 1.0 if verification_outcome else 0.0
        alpha = 0.1 * weight  # Learning rate
        new_score = (1 - alpha) * current_score + alpha * outcome_score
        
        source_data["trust_score"] = new_score
        source_data["last_updated"] = datetime.utcnow().isoformat()
        
        trust_data[source] = source_data
        self._save_trust_data(trust_data)
        
        logger.info(f"Updated trust for {source}: {new_score:.3f} (outcome={verification_outcome})")
    
    def list_trusted_sources(self, min_trust: float = 0.7, min_verifications: int = 3) -> List[Dict[str, Any]]:
        """
        List trusted sources.
        
        Args:
            min_trust: Minimum trust score
            min_verifications: Minimum number of verifications
            
        Returns:
            List of trusted sources
        """
        trust_data = self._load_trust_data()
        
        trusted = []
        for source, data in trust_data.items():
            if data["trust_score"] >= min_trust and data["verification_count"] >= min_verifications:
                trusted.append({
                    "source": source,
                    "trust_score": data["trust_score"],
                    "verification_count": data["verification_count"],
                    "success_rate": data["success_count"] / data["verification_count"],
                })
        
        # Sort by trust score descending
        trusted.sort(key=lambda x: x["trust_score"], reverse=True)
        return trusted
    
    def list_untrusted_sources(self, max_trust: float = 0.3, min_verifications: int = 3) -> List[Dict[str, Any]]:
        """
        List untrusted sources.
        
        Args:
            max_trust: Maximum trust score
            min_verifications: Minimum number of verifications
            
        Returns:
            List of untrusted sources
        """
        trust_data = self._load_trust_data()
        
        untrusted = []
        for source, data in trust_data.items():
            if data["trust_score"] <= max_trust and data["verification_count"] >= min_verifications:
                untrusted.append({
                    "source": source,
                    "trust_score": data["trust_score"],
                    "verification_count": data["verification_count"],
                    "success_rate": data["success_count"] / data["verification_count"],
                })
        
        # Sort by trust score ascending
        untrusted.sort(key=lambda x: x["trust_score"])
        return untrusted
    
    def calculate_freshness(self, item: Dict[str, Any], domain: Optional[str] = None) -> float:
        """
        Calculate freshness score for a knowledge item.
        
        Args:
            item: Knowledge item
            domain: Domain for domain-specific rules
            
        Returns:
            Freshness score (0.0 to 1.0)
        """
        # Get age in days
        saved_at = datetime.fromisoformat(item.get("saved_at", datetime.utcnow().isoformat()))
        age_days = (datetime.utcnow() - saved_at).days
        
        # Domain-specific expiration rules
        if domain == "finance":
            # Financial data expires quickly
            half_life_days = 7  # 50% fresh after 7 days
        else:
            # General knowledge expires slowly
            half_life_days = 30  # 50% fresh after 30 days
        
        # Calculate freshness using exponential decay
        freshness = 2 ** (-age_days / half_life_days)
        
        return max(0.0, min(1.0, freshness))
    
    def update_freshness(self, item_id: str, freshness_score: float):
        """
        Update freshness score for an item.
        
        Args:
            item_id: Item ID
            freshness_score: New freshness score
        """
        freshness_data = self._load_freshness_data()
        
        freshness_data[item_id] = {
            "freshness_score": freshness_score,
            "last_updated": datetime.utcnow().isoformat(),
        }
        
        self._save_freshness_data(freshness_data)
    
    def get_stale_items(self, items: List[Dict[str, Any]], threshold: float = 0.3) -> List[Dict[str, Any]]:
        """
        Get stale items that need refreshing.
        
        Args:
            items: List of knowledge items
            threshold: Freshness threshold
            
        Returns:
            List of stale items
        """
        stale = []
        
        for item in items:
            freshness = self.calculate_freshness(item)
            if freshness < threshold:
                stale.append({
                    "item_id": item.get("id"),
                    "title": item.get("title"),
                    "freshness": freshness,
                    "age_days": (datetime.utcnow() - datetime.fromisoformat(item.get("saved_at", datetime.utcnow().isoformat()))).days,
                })
        
        # Sort by freshness ascending (stalest first)
        stale.sort(key=lambda x: x["freshness"])
        return stale
    
    def recommend_refresh(self, stale_items: List[Dict[str, Any]], max_recommendations: int = 10) -> List[Dict[str, Any]]:
        """
        Recommend items to refresh.
        
        Args:
            stale_items: List of stale items
            max_recommendations: Maximum number of recommendations
            
        Returns:
            List of recommended items to refresh
        """
        # Prioritize by staleness and importance
        recommendations = stale_items[:max_recommendations]
        
        return recommendations
    
    def _load_trust_data(self) -> Dict[str, Any]:
        """Load trust data from disk."""
        with open(self.trust_file, 'r') as f:
            return json.load(f)
    
    def _save_trust_data(self, data: Dict[str, Any]):
        """Save trust data to disk."""
        with open(self.trust_file, 'w') as f:
            json.dump(data, f, indent=2)
    
    def _load_freshness_data(self) -> Dict[str, Any]:
        """Load freshness data from disk."""
        with open(self.freshness_file, 'r') as f:
            return json.load(f)
    
    def _save_freshness_data(self, data: Dict[str, Any]):
        """Save freshness data to disk."""
        with open(self.freshness_file, 'w') as f:
            json.dump(data, f, indent=2)