File size: 9,717 Bytes
d5eabda
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
"""
Candidate Post-Processing Component

Handles deduplication, filtering, and diversity enforcement for candidate tracks.
Extracted from the monolithic UnifiedCandidateGenerator for better modularity.
"""

from typing import List, Dict, Any, Set, Optional
import structlog
from collections import defaultdict

from ...models.metadata_models import UnifiedTrackMetadata


class CandidateProcessor:
    """
    Processes and refines candidate tracks generated by strategies.
    
    Responsibilities:
    - Deduplication
    - Quality filtering
    - Diversity enforcement
    - Source confidence weighting
    """
    
    def __init__(self):
        """Initialize the candidate processor."""
        self.logger = structlog.get_logger(__name__)
    
    def process_candidates(
        self,
        candidates: List[Dict[str, Any]],
        enforce_diversity: bool = True,
        min_confidence: float = 0.1,
        diversity_threshold: float = 0.7
    ) -> List[Dict[str, Any]]:
        """
        Process and refine candidate tracks.
        
        Args:
            candidates: Raw candidate tracks from strategies
            enforce_diversity: Whether to enforce artist/genre diversity
            min_confidence: Minimum source confidence to keep tracks
            diversity_threshold: Artist diversity threshold (0.0-1.0)
            
        Returns:
            Processed and refined candidate tracks
        """
        if not candidates:
            return []
        
        self.logger.info(f"🔧 PROCESSING CANDIDATES: {len(candidates)} initial candidates")
        
        # Step 1: Filter by confidence
        filtered_candidates = self._filter_by_confidence(candidates, min_confidence)
        self.logger.debug(f"After confidence filtering: {len(filtered_candidates)} candidates")
        
        # Step 2: Deduplicate
        deduplicated_candidates = self._deduplicate_candidates(filtered_candidates)
        self.logger.debug(f"After deduplication: {len(deduplicated_candidates)} candidates")
        
        # Step 3: Enforce diversity if requested
        if enforce_diversity:
            diverse_candidates = self._enforce_diversity(deduplicated_candidates, diversity_threshold)
            self.logger.debug(f"After diversity enforcement: {len(diverse_candidates)} candidates")
        else:
            diverse_candidates = deduplicated_candidates
        
        # Step 4: Sort by quality metrics
        sorted_candidates = self._sort_by_quality(diverse_candidates)
        
        self.logger.info(f"🔧 PROCESSING COMPLETE: {len(sorted_candidates)} final candidates")
        return sorted_candidates
    
    def _filter_by_confidence(self, candidates: List[Dict[str, Any]], min_confidence: float) -> List[Dict[str, Any]]:
        """Filter candidates by minimum source confidence."""
        filtered = []
        
        for candidate in candidates:
            confidence = candidate.get('source_confidence', 0.5)
            if confidence >= min_confidence:
                filtered.append(candidate)
            else:
                self.logger.debug(
                    f"Filtered out low confidence candidate",
                    track=candidate.get('name', 'Unknown'),
                    confidence=confidence
                )
        
        return filtered
    
    def _deduplicate_candidates(self, candidates: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
        """
        Remove duplicate tracks based on multiple criteria.
        
        Uses artist-title combinations and track IDs for deduplication.
        """
        seen_tracks: Set[str] = set()
        seen_ids: Set[str] = set()
        deduplicated = []
        
        for candidate in candidates:
            # Create deduplication keys
            artist = candidate.get('artist', '').lower().strip()
            title = candidate.get('name', '').lower().strip()
            track_key = f"{artist}::{title}"
            
            # Check track ID if available
            track_id = candidate.get('track_id') or candidate.get('id')
            
            # Skip if we've seen this track before
            if track_key in seen_tracks or (track_id and track_id in seen_ids):
                self.logger.debug(
                    f"Skipping duplicate track",
                    track=candidate.get('name', 'Unknown'),
                    artist=candidate.get('artist', 'Unknown')
                )
                continue
            
            # Add to seen sets
            seen_tracks.add(track_key)
            if track_id:
                seen_ids.add(track_id)
            
            deduplicated.append(candidate)
        
        return deduplicated
    
    def _enforce_diversity(self, candidates: List[Dict[str, Any]], diversity_threshold: float) -> List[Dict[str, Any]]:
        """
        Enforce artist and genre diversity in candidates.
        
        Args:
            candidates: Deduplicated candidates
            diversity_threshold: Maximum proportion for any single artist (0.0-1.0)
            
        Returns:
            Diversity-enforced candidates
        """
        if not candidates:
            return candidates
        
        # Group candidates by artist
        artist_groups: Dict[str, List[Dict[str, Any]]] = defaultdict(list)
        for candidate in candidates:
            artist = candidate.get('artist', 'Unknown Artist').lower().strip()
            artist_groups[artist].append(candidate)
        
        # Calculate maximum tracks per artist
        max_tracks_per_artist = max(1, int(len(candidates) * diversity_threshold))
        
        diverse_candidates = []
        artist_track_counts = defaultdict(int)
        
        # Sort candidates by quality for selection
        sorted_candidates = self._sort_by_quality(candidates)
        
        # Select candidates while enforcing diversity
        for candidate in sorted_candidates:
            artist = candidate.get('artist', 'Unknown Artist').lower().strip()
            
            if artist_track_counts[artist] < max_tracks_per_artist:
                diverse_candidates.append(candidate)
                artist_track_counts[artist] += 1
            else:
                self.logger.debug(
                    f"Skipping track for diversity",
                    track=candidate.get('name', 'Unknown'),
                    artist=candidate.get('artist', 'Unknown'),
                    artist_count=artist_track_counts[artist],
                    max_per_artist=max_tracks_per_artist
                )
        
        # Log diversity statistics
        unique_artists = len(set(c.get('artist', '').lower() for c in diverse_candidates))
        self.logger.debug(
            f"Diversity enforcement complete",
            total_tracks=len(diverse_candidates),
            unique_artists=unique_artists,
            max_per_artist=max_tracks_per_artist
        )
        
        return diverse_candidates
    
    def _sort_by_quality(self, candidates: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
        """
        Sort candidates by quality metrics.
        
        Prioritizes:
        1. Source confidence
        2. Popularity (if available)
        3. Source priority
        """
        def quality_score(candidate: Dict[str, Any]) -> float:
            confidence = candidate.get('source_confidence', 0.5)
            popularity = candidate.get('popularity', 50) / 100.0  # Normalize to 0-1
            
            # Source priority weights
            source_weights = {
                'target_artist': 1.0,
                'similar_artist': 0.9,
                'genre_focused': 0.8,
                'mood_filtered': 0.8,
                'genre_exploration': 0.7,
                'underground_gems': 0.6,
                'serendipitous_discovery': 0.5,
                'mood_based_serendipity': 0.5,
                'random_genre_exploration': 0.4
            }
            
            source = candidate.get('source', 'unknown')
            source_weight = source_weights.get(source, 0.5)
            
            # Combined quality score
            quality = (confidence * 0.5) + (popularity * 0.3) + (source_weight * 0.2)
            return quality
        
        return sorted(candidates, key=quality_score, reverse=True)
    
    def get_processing_stats(self, candidates: List[Dict[str, Any]]) -> Dict[str, Any]:
        """
        Get statistics about the processed candidates.
        
        Args:
            candidates: Processed candidates
            
        Returns:
            Statistics dictionary
        """
        if not candidates:
            return {'total': 0}
        
        # Source distribution
        source_distribution = defaultdict(int)
        confidence_scores = []
        
        for candidate in candidates:
            source = candidate.get('source', 'unknown')
            source_distribution[source] += 1
            confidence_scores.append(candidate.get('source_confidence', 0.5))
        
        # Artist diversity
        unique_artists = len(set(c.get('artist', '').lower() for c in candidates))
        
        stats = {
            'total': len(candidates),
            'unique_artists': unique_artists,
            'artist_diversity_ratio': unique_artists / len(candidates) if candidates else 0,
            'source_distribution': dict(source_distribution),
            'avg_confidence': sum(confidence_scores) / len(confidence_scores) if confidence_scores else 0,
            'min_confidence': min(confidence_scores) if confidence_scores else 0,
            'max_confidence': max(confidence_scores) if confidence_scores else 0
        }
        
        return stats