BeatDebate / src /models /agent_models.py
SulmanK's picture
Remove obsolete phase completion summaries and demo test scripts - Deleted `PHASE1_COMPLETION_SUMMARY.md`, `PHASE2_COMPLETION_SUMMARY.md`, `PHASE3_COMPLETION_SUMMARY.md`, and associated demo test scripts to streamline the codebase and eliminate unused documentation. This cleanup supports ongoing refactoring efforts and enhances overall project maintainability.
d5eabda
Raw
History Blame Contribute Delete
16.3 kB
"""
Agent Models for BeatDebate Multi-Agent Music Recommendation System
Pydantic models for state management, agent communication, and data structures.
"""
from typing import Dict, List, Any, Optional, Annotated
from pydantic import BaseModel, Field
from datetime import datetime
from enum import Enum
from dataclasses import dataclass
def keep_first(x: Any, y: Any) -> Any:
"""Reducer that keeps the first non-None value"""
# If x is None or empty, use y
if x is None or (isinstance(x, (list, dict, str)) and len(x) == 0):
return y
# Otherwise keep x (the first/existing value)
return x
def list_replace_reducer(x: List, y: List) -> List:
"""Reducer that replaces the list entirely if y is not empty"""
if y is None:
return x if x is not None else []
if len(y) > 0:
return y
return x if x is not None else []
def list_append_reducer(x: List, y: List) -> List:
"""Reducer that appends new items to existing list"""
if x is None:
x = []
if y is None:
y = []
return x + y
def dict_update_reducer(x: Dict, y: Dict) -> Dict:
"""Reducer that updates dictionary with new values"""
if x is None:
x = {}
if y is None:
y = {}
result = x.copy()
result.update(y)
return result
class MusicRecommenderState(BaseModel):
"""Shared state across all agents in the LangGraph workflow"""
# Input - these should not change after initial setting
user_query: Annotated[str, keep_first] = Field(..., description="Original user query for music recommendation")
user_profile: Annotated[Optional[Dict[str, Any]], keep_first] = Field(default=None, description="User preferences and history")
max_recommendations: Annotated[int, keep_first] = Field(default=10, description="Maximum number of recommendations to return")
# Planning phase - set once by planner
planning_strategy: Annotated[Optional[Dict[str, Any]], keep_first] = Field(default=None, description="Strategy created by PlannerAgent")
execution_plan: Annotated[Optional[Dict[str, Any]], keep_first] = Field(default=None, description="Execution monitoring plan")
coordination_strategy: Annotated[Optional[Dict[str, Any]], keep_first] = Field(default=None, description="Enhanced coordination strategy with confidence-based selection")
agent_coordination: Annotated[Optional[Dict[str, Any]], keep_first] = Field(default=None, description="Agent coordination plan")
# Enhanced Planning Phase - Entity Recognition (NEW)
entities: Annotated[Optional[Dict[str, Any]], keep_first] = Field(default=None, description="Extracted entities from user query")
intent_analysis: Annotated[Optional[Dict[str, Any]], keep_first] = Field(default=None, description="Intent analysis from user query")
query_understanding: Annotated[Optional[Any], keep_first] = Field(default=None, description="Pure LLM query understanding result")
conversation_context: Annotated[Optional[Dict[str, Any]], dict_update_reducer] = Field(default=None, description="Session conversation context")
entity_reasoning: Annotated[List[Dict], list_append_reducer] = Field(default_factory=list, description="Entity extraction reasoning steps")
context_decision: Annotated[Optional[Dict[str, Any]], keep_first] = Field(default=None, description="Context analysis decision")
# 🔧 NEW: History tracking for follow-up queries
recently_shown_track_ids: Optional[List[str]] = Field(
default_factory=list,
description="Track IDs recently shown to the user in this session to avoid duplicates in follow-up queries"
)
# Advocate phase - these will be updated by parallel agents
genre_mood_recommendations: Annotated[List[Dict], list_append_reducer] = Field(default_factory=list, description="GenreMoodAgent recommendations")
discovery_recommendations: Annotated[List[Dict], list_append_reducer] = Field(default_factory=list, description="DiscoveryAgent recommendations")
# Judge phase - set by judge agent
final_recommendations: Annotated[List[Dict], list_replace_reducer] = Field(default_factory=list, description="Final selected recommendations")
judge_metadata: Annotated[Optional[Dict[str, Any]], keep_first] = Field(default=None, description="Metadata about judge decision process")
# Reasoning transparency - can be updated by any agent
reasoning_log: Annotated[List[str], list_append_reducer] = Field(default_factory=list, description="Step-by-step reasoning log")
agent_deliberations: Annotated[List[Dict], list_append_reducer] = Field(default_factory=list, description="Agent decision records")
# Error handling - can be set by any agent
error_info: Annotated[Optional[Dict[str, str]], keep_first] = Field(default=None, description="Error information if workflow fails")
# Metadata - set once at start, updated at end
processing_start_time: Annotated[Optional[float], keep_first] = Field(default=None, description="Processing start timestamp")
total_processing_time: Annotated[Optional[float], keep_first] = Field(default=None, description="Total processing time in seconds")
session_id: Annotated[Optional[str], keep_first] = Field(default=None, description="Unique session identifier")
confidence: Annotated[Optional[float], keep_first] = Field(default=None, description="Overall confidence in query understanding")
# NEW: For intent override system
context_override: Annotated[Optional[Dict[str, Any]], keep_first] = Field(default=None, description="Context override for intent system")
effective_intent: Annotated[Optional[Dict[str, Any]], keep_first] = Field(default=None, description="Effective intent combining query analysis and context override")
class AgentStrategy(BaseModel):
"""Strategy object passed between agents"""
task_analysis: Dict[str, Any] = Field(..., description="Analysis of the user query and task")
coordination_strategy: Dict[str, Any] = Field(..., description="Strategy for each advocate agent")
evaluation_framework: Dict[str, Any] = Field(..., description="Criteria for judge evaluation")
execution_monitoring: Dict[str, Any] = Field(..., description="Monitoring and adaptation protocols")
class TaskAnalysis(BaseModel):
"""Analysis of user query complexity and intent"""
primary_goal: str = Field(..., description="Main intent extracted from query")
complexity_level: str = Field(..., description="Query complexity: simple, medium, complex")
context_factors: List[str] = Field(default_factory=list, description="Context clues from query")
mood_indicators: List[str] = Field(default_factory=list, description="Mood/energy indicators")
genre_hints: List[str] = Field(default_factory=list, description="Genre preferences or hints")
class AgentCoordinationStrategy(BaseModel):
"""Coordination strategy for advocate agents"""
genre_mood_agent: Dict[str, Any] = Field(..., description="Strategy for GenreMoodAgent")
discovery_agent: Dict[str, Any] = Field(..., description="Strategy for DiscoveryAgent")
class EvaluationFramework(BaseModel):
"""Framework for judge evaluation"""
primary_weights: Dict[str, float] = Field(..., description="Weights for different criteria")
diversity_targets: Dict[str, int] = Field(..., description="Diversity targets for recommendations")
explanation_style: str = Field(..., description="Style for generating explanations")
class TrackRecommendation(BaseModel):
"""Individual track recommendation with reasoning"""
# Track metadata
title: str = Field(..., description="Track title")
artist: str = Field(..., description="Artist name")
album: Optional[str] = Field(default=None, description="Album name")
year: Optional[int] = Field(default=None, description="Release year")
# External identifiers
lastfm_url: Optional[str] = Field(default=None, description="Last.fm URL")
spotify_url: Optional[str] = Field(default=None, description="Spotify URL")
preview_url: Optional[str] = Field(default=None, description="Audio preview URL")
# Metadata
genres: List[str] = Field(default_factory=list, description="Genre tags")
tags: List[str] = Field(default_factory=list, description="Mood/style tags")
similar_artists: List[str] = Field(default_factory=list, description="Similar artists")
# Recommendation context
reasoning_chain: str = Field(..., description="Agent's reasoning for this recommendation")
confidence_score: float = Field(..., ge=0.0, le=1.0, description="Confidence in recommendation")
novelty_score: Optional[float] = Field(default=None, ge=0.0, le=1.0, description="Novelty/underground score")
relevance_score: float = Field(..., ge=0.0, le=1.0, description="Relevance to user query")
# Agent attribution
recommending_agent: str = Field(..., description="Agent that made this recommendation")
strategy_applied: Dict[str, Any] = Field(..., description="Strategy used for this recommendation")
class AgentDeliberation(BaseModel):
"""Record of agent decision-making process"""
agent_name: str = Field(..., description="Name of the agent")
timestamp: datetime = Field(default_factory=datetime.now, description="When deliberation occurred")
input_data: Dict[str, Any] = Field(..., description="Input data for the agent")
reasoning_steps: List[str] = Field(..., description="Step-by-step reasoning process")
output_data: Dict[str, Any] = Field(..., description="Agent's output/decision")
confidence: float = Field(..., ge=0.0, le=1.0, description="Agent's confidence in decision")
processing_time: float = Field(..., description="Time taken for deliberation in seconds")
class ReasoningChain(BaseModel):
"""Structured reasoning chain for transparency"""
step_number: int = Field(..., description="Step number in reasoning chain")
step_type: str = Field(..., description="Type of reasoning step")
description: str = Field(..., description="Description of reasoning step")
evidence: List[str] = Field(default_factory=list, description="Evidence supporting this step")
confidence: float = Field(..., ge=0.0, le=1.0, description="Confidence in this reasoning step")
class FinalRecommendationResponse(BaseModel):
"""Final response format for the user"""
recommendations: List[TrackRecommendation] = Field(..., description="Final track recommendations")
explanation: str = Field(..., description="Overall explanation of recommendations")
planning_summary: str = Field(..., description="Summary of planning process")
agent_coordination_summary: str = Field(..., description="Summary of agent coordination")
# Metadata
total_processing_time: float = Field(..., description="Total processing time in seconds")
agents_involved: List[str] = Field(..., description="List of agents that participated")
reasoning_transparency: List[AgentDeliberation] = Field(..., description="Full reasoning transparency")
session_id: str = Field(..., description="Session identifier")
# Quality metrics
diversity_score: float = Field(..., ge=0.0, le=1.0, description="Diversity of recommendations")
novelty_score: float = Field(..., ge=0.0, le=1.0, description="Average novelty score")
confidence_score: float = Field(..., ge=0.0, le=1.0, description="Overall confidence")
class AgentConfig(BaseModel):
"""Configuration for individual agents"""
agent_name: str = Field(..., description="Name of the agent")
agent_type: str = Field(..., description="Type of agent (planner, advocate, judge)")
llm_model: str = Field(default="gemini-2.0-flash-exp", description="LLM model to use")
temperature: float = Field(default=0.7, ge=0.0, le=2.0, description="LLM temperature")
max_tokens: int = Field(default=1000, description="Maximum tokens for LLM response")
timeout_seconds: int = Field(default=60, description="Timeout for agent processing")
# Agent-specific configuration
specialty_config: Dict[str, Any] = Field(default_factory=dict, description="Agent-specific configuration")
class SystemConfig(BaseModel):
"""Overall system configuration"""
# API configurations
gemini_api_key: str = Field(..., description="Gemini API key")
lastfm_api_key: str = Field(..., description="Last.fm API key")
spotify_client_id: Optional[str] = Field(default=None, description="Spotify client ID")
spotify_client_secret: Optional[str] = Field(default=None, description="Spotify client secret")
# Rate limiting
gemini_rate_limit: int = Field(default=15, description="Gemini requests per minute")
lastfm_rate_limit: float = Field(default=3.0, description="Last.fm requests per second")
spotify_rate_limit: int = Field(default=50, description="Spotify requests per hour")
# Caching
cache_enabled: bool = Field(default=True, description="Enable caching")
cache_ttl_hours: int = Field(default=24, description="Cache TTL in hours")
cache_directory: str = Field(default="data/cache", description="Cache directory path")
# Agent configurations
agent_configs: Dict[str, AgentConfig] = Field(default_factory=dict, description="Configuration for each agent")
# Performance settings
max_concurrent_agents: int = Field(default=2, description="Maximum concurrent agent executions")
total_timeout_minutes: int = Field(default=5, description="Total workflow timeout in minutes")
class QueryIntent(Enum):
"""Primary intent types for music queries."""
BY_ARTIST = "by_artist" # "Music by X" - focus on artist's own tracks
BY_ARTIST_UNDERGROUND = "by_artist_underground" # "Discover underground tracks by X" - focus on artist's deep cuts/b-sides
ARTIST_SIMILARITY = "artist_similarity" # "Music like X" - focus on similarity
ARTIST_GENRE = "artist_genre" # "Songs by X that are Y genre" - artist tracks filtered by genre
DISCOVERY = "discovery" # "Something new and different" - focus on novelty
DISCOVERING_SERENDIPITY = "discovering_serendipity" # "Surprise me with something unexpected" - focus on serendipitous discovery
GENRE_MOOD = "genre_mood" # "Upbeat electronic music" - focus on style/vibe
CONTEXTUAL = "contextual" # "Music for studying" - focus on functional fit
HYBRID_SIMILARITY_GENRE = "hybrid_similarity_genre" # "Music like [Artist] but [Genre]" - artist similarity + genre filtering
# Legacy compatibility (can be removed after migration)
GENRE_EXPLORATION = "genre_exploration" # Maps to GENRE_MOOD
MOOD_MATCHING = "mood_matching" # Maps to GENRE_MOOD
ACTIVITY_CONTEXT = "activity_context" # Maps to CONTEXTUAL
PLAYLIST_BUILDING = "playlist_building" # Maps to CONTEXTUAL
SPECIFIC_REQUEST = "specific_request" # Maps to ARTIST_SIMILARITY
class SimilarityType(Enum):
"""Types of similarity for artist-based queries."""
STYLISTIC = "stylistic" # Similar sound/production
GENRE = "genre" # Same genre family
ERA = "era" # Same time period
MOOD = "mood" # Similar emotional feel
ENERGY = "energy" # Similar energy level
@dataclass
class QueryUnderstanding:
"""Structured representation of understood query."""
intent: QueryIntent
confidence: float
# Core entities
artists: List[str]
genres: List[str]
moods: List[str]
activities: List[str]
# Intent-specific details
similarity_type: Optional[SimilarityType] = None
exploration_level: str = "moderate" # strict, moderate, broad
temporal_context: Optional[str] = None
energy_level: Optional[str] = None
# Agent coordination hints
primary_agent: str = "genre_mood" # Which agent should lead
agent_weights: Dict[str, float] = None
search_strategy: Dict[str, Any] = None
# Original query context
original_query: str = ""
normalized_query: str = ""
reasoning: str = ""