BeatDebate / src /agents /components /candidate_processor.py
SulmanK's picture
Remove obsolete phase completion summaries and demo test scripts - Deleted `PHASE1_COMPLETION_SUMMARY.md`, `PHASE2_COMPLETION_SUMMARY.md`, `PHASE3_COMPLETION_SUMMARY.md`, and associated demo test scripts to streamline the codebase and eliminate unused documentation. This cleanup supports ongoing refactoring efforts and enhances overall project maintainability.
d5eabda
Raw
History Blame Contribute Delete
9.72 kB
"""
Candidate Post-Processing Component
Handles deduplication, filtering, and diversity enforcement for candidate tracks.
Extracted from the monolithic UnifiedCandidateGenerator for better modularity.
"""
from typing import List, Dict, Any, Set, Optional
import structlog
from collections import defaultdict
from ...models.metadata_models import UnifiedTrackMetadata
class CandidateProcessor:
"""
Processes and refines candidate tracks generated by strategies.
Responsibilities:
- Deduplication
- Quality filtering
- Diversity enforcement
- Source confidence weighting
"""
def __init__(self):
"""Initialize the candidate processor."""
self.logger = structlog.get_logger(__name__)
def process_candidates(
self,
candidates: List[Dict[str, Any]],
enforce_diversity: bool = True,
min_confidence: float = 0.1,
diversity_threshold: float = 0.7
) -> List[Dict[str, Any]]:
"""
Process and refine candidate tracks.
Args:
candidates: Raw candidate tracks from strategies
enforce_diversity: Whether to enforce artist/genre diversity
min_confidence: Minimum source confidence to keep tracks
diversity_threshold: Artist diversity threshold (0.0-1.0)
Returns:
Processed and refined candidate tracks
"""
if not candidates:
return []
self.logger.info(f"🔧 PROCESSING CANDIDATES: {len(candidates)} initial candidates")
# Step 1: Filter by confidence
filtered_candidates = self._filter_by_confidence(candidates, min_confidence)
self.logger.debug(f"After confidence filtering: {len(filtered_candidates)} candidates")
# Step 2: Deduplicate
deduplicated_candidates = self._deduplicate_candidates(filtered_candidates)
self.logger.debug(f"After deduplication: {len(deduplicated_candidates)} candidates")
# Step 3: Enforce diversity if requested
if enforce_diversity:
diverse_candidates = self._enforce_diversity(deduplicated_candidates, diversity_threshold)
self.logger.debug(f"After diversity enforcement: {len(diverse_candidates)} candidates")
else:
diverse_candidates = deduplicated_candidates
# Step 4: Sort by quality metrics
sorted_candidates = self._sort_by_quality(diverse_candidates)
self.logger.info(f"🔧 PROCESSING COMPLETE: {len(sorted_candidates)} final candidates")
return sorted_candidates
def _filter_by_confidence(self, candidates: List[Dict[str, Any]], min_confidence: float) -> List[Dict[str, Any]]:
"""Filter candidates by minimum source confidence."""
filtered = []
for candidate in candidates:
confidence = candidate.get('source_confidence', 0.5)
if confidence >= min_confidence:
filtered.append(candidate)
else:
self.logger.debug(
f"Filtered out low confidence candidate",
track=candidate.get('name', 'Unknown'),
confidence=confidence
)
return filtered
def _deduplicate_candidates(self, candidates: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""
Remove duplicate tracks based on multiple criteria.
Uses artist-title combinations and track IDs for deduplication.
"""
seen_tracks: Set[str] = set()
seen_ids: Set[str] = set()
deduplicated = []
for candidate in candidates:
# Create deduplication keys
artist = candidate.get('artist', '').lower().strip()
title = candidate.get('name', '').lower().strip()
track_key = f"{artist}::{title}"
# Check track ID if available
track_id = candidate.get('track_id') or candidate.get('id')
# Skip if we've seen this track before
if track_key in seen_tracks or (track_id and track_id in seen_ids):
self.logger.debug(
f"Skipping duplicate track",
track=candidate.get('name', 'Unknown'),
artist=candidate.get('artist', 'Unknown')
)
continue
# Add to seen sets
seen_tracks.add(track_key)
if track_id:
seen_ids.add(track_id)
deduplicated.append(candidate)
return deduplicated
def _enforce_diversity(self, candidates: List[Dict[str, Any]], diversity_threshold: float) -> List[Dict[str, Any]]:
"""
Enforce artist and genre diversity in candidates.
Args:
candidates: Deduplicated candidates
diversity_threshold: Maximum proportion for any single artist (0.0-1.0)
Returns:
Diversity-enforced candidates
"""
if not candidates:
return candidates
# Group candidates by artist
artist_groups: Dict[str, List[Dict[str, Any]]] = defaultdict(list)
for candidate in candidates:
artist = candidate.get('artist', 'Unknown Artist').lower().strip()
artist_groups[artist].append(candidate)
# Calculate maximum tracks per artist
max_tracks_per_artist = max(1, int(len(candidates) * diversity_threshold))
diverse_candidates = []
artist_track_counts = defaultdict(int)
# Sort candidates by quality for selection
sorted_candidates = self._sort_by_quality(candidates)
# Select candidates while enforcing diversity
for candidate in sorted_candidates:
artist = candidate.get('artist', 'Unknown Artist').lower().strip()
if artist_track_counts[artist] < max_tracks_per_artist:
diverse_candidates.append(candidate)
artist_track_counts[artist] += 1
else:
self.logger.debug(
f"Skipping track for diversity",
track=candidate.get('name', 'Unknown'),
artist=candidate.get('artist', 'Unknown'),
artist_count=artist_track_counts[artist],
max_per_artist=max_tracks_per_artist
)
# Log diversity statistics
unique_artists = len(set(c.get('artist', '').lower() for c in diverse_candidates))
self.logger.debug(
f"Diversity enforcement complete",
total_tracks=len(diverse_candidates),
unique_artists=unique_artists,
max_per_artist=max_tracks_per_artist
)
return diverse_candidates
def _sort_by_quality(self, candidates: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""
Sort candidates by quality metrics.
Prioritizes:
1. Source confidence
2. Popularity (if available)
3. Source priority
"""
def quality_score(candidate: Dict[str, Any]) -> float:
confidence = candidate.get('source_confidence', 0.5)
popularity = candidate.get('popularity', 50) / 100.0 # Normalize to 0-1
# Source priority weights
source_weights = {
'target_artist': 1.0,
'similar_artist': 0.9,
'genre_focused': 0.8,
'mood_filtered': 0.8,
'genre_exploration': 0.7,
'underground_gems': 0.6,
'serendipitous_discovery': 0.5,
'mood_based_serendipity': 0.5,
'random_genre_exploration': 0.4
}
source = candidate.get('source', 'unknown')
source_weight = source_weights.get(source, 0.5)
# Combined quality score
quality = (confidence * 0.5) + (popularity * 0.3) + (source_weight * 0.2)
return quality
return sorted(candidates, key=quality_score, reverse=True)
def get_processing_stats(self, candidates: List[Dict[str, Any]]) -> Dict[str, Any]:
"""
Get statistics about the processed candidates.
Args:
candidates: Processed candidates
Returns:
Statistics dictionary
"""
if not candidates:
return {'total': 0}
# Source distribution
source_distribution = defaultdict(int)
confidence_scores = []
for candidate in candidates:
source = candidate.get('source', 'unknown')
source_distribution[source] += 1
confidence_scores.append(candidate.get('source_confidence', 0.5))
# Artist diversity
unique_artists = len(set(c.get('artist', '').lower() for c in candidates))
stats = {
'total': len(candidates),
'unique_artists': unique_artists,
'artist_diversity_ratio': unique_artists / len(candidates) if candidates else 0,
'source_distribution': dict(source_distribution),
'avg_confidence': sum(confidence_scores) / len(confidence_scores) if confidence_scores else 0,
'min_confidence': min(confidence_scores) if confidence_scores else 0,
'max_confidence': max(confidence_scores) if confidence_scores else 0
}
return stats