BeatDebate / src /services /components /agent_coordinator.py
SulmanK's picture
Remove obsolete phase completion summaries and demo test scripts - Deleted `PHASE1_COMPLETION_SUMMARY.md`, `PHASE2_COMPLETION_SUMMARY.md`, `PHASE3_COMPLETION_SUMMARY.md`, and associated demo test scripts to streamline the codebase and eliminate unused documentation. This cleanup supports ongoing refactoring efforts and enhances overall project maintainability.
d5eabda
Raw
History Blame Contribute Delete
9.45 kB
"""
Agent Coordinator for Enhanced Recommendation Service
Manages agent initialization, configuration, and coordination.
Extracted from EnhancedRecommendationService to improve modularity and maintainability.
"""
import os
from typing import Optional, TYPE_CHECKING
if TYPE_CHECKING:
from ...agents.planner.agent import PlannerAgent
from ...agents.genre_mood.agent import GenreMoodAgent
from ...agents.discovery.agent import DiscoveryAgent
from ...agents.judge.agent import JudgeAgent
import structlog
# Handle imports gracefully
try:
from ...models.agent_models import AgentConfig
from ..api_service import APIService
from ..metadata_service import MetadataService
from ..session_manager_service import SessionManagerService
from ...api.rate_limiter import UnifiedRateLimiter
except ImportError:
# Fallback imports for testing
import sys
sys.path.append('src')
from models.agent_models import AgentConfig
from services.api_service import APIService
from services.metadata_service import MetadataService
from services.session_manager_service import SessionManagerService
from api.rate_limiter import UnifiedRateLimiter
logger = structlog.get_logger(__name__)
def create_gemini_client(api_key: str):
"""
Create a Gemini client for LLM interactions.
Args:
api_key: Gemini API key
Returns:
Configured Gemini client or None if creation fails
"""
try:
import google.generativeai as genai
# Configure Gemini
genai.configure(api_key=api_key)
# Create generative model
model = genai.GenerativeModel('gemini-2.0-flash-exp')
logger.info("Gemini client created successfully")
return model
except ImportError:
logger.warning("google-generativeai not available, LLM features will be disabled")
return None
except Exception as e:
logger.error(f"Failed to create Gemini client: {e}")
return None
class AgentCoordinator:
"""
Coordinates agent initialization and configuration for the Enhanced Recommendation Service.
Responsibilities:
- Agent initialization with shared services
- LLM client and rate limiter configuration
- Agent configuration management
- Agent lifecycle management
"""
def __init__(
self,
api_service: APIService,
session_manager: SessionManagerService
):
self.api_service = api_service
self.session_manager = session_manager
self.logger = structlog.get_logger(__name__)
# Agent instances
self.planner_agent: Optional["PlannerAgent"] = None
self.genre_mood_agent: Optional["GenreMoodAgent"] = None
self.discovery_agent: Optional["DiscoveryAgent"] = None
self.judge_agent: Optional["JudgeAgent"] = None
# Shared resources
self.gemini_client = None
self.gemini_rate_limiter = None
self.metadata_service = None
# Initialization status
self._agents_initialized = False
async def initialize_agents(self) -> bool:
"""
Initialize all agents with shared API service and rate limiting.
Returns:
bool: True if initialization successful, False otherwise
"""
if self._agents_initialized:
return True
try:
# Initialize shared resources
await self._initialize_shared_resources()
# Create agent configuration
agent_config = self._create_agent_config()
# Initialize individual agents
await self._initialize_individual_agents(agent_config)
self._agents_initialized = True
self.logger.info("All agents initialized successfully")
return True
except Exception as e:
self.logger.error("Failed to initialize agents", error=str(e))
return False
async def _initialize_shared_resources(self):
"""Initialize shared resources for all agents."""
# Create Gemini client for LLM interactions
gemini_api_key = os.getenv('GEMINI_API_KEY', 'demo_gemini_key')
self.gemini_client = create_gemini_client(gemini_api_key)
if not self.gemini_client:
self.logger.warning("Failed to create Gemini client, agents will have limited functionality")
# Create rate limiter for Gemini API (free tier: 10 requests per minute)
self.gemini_rate_limiter = UnifiedRateLimiter.for_gemini(calls_per_minute=8) # Conservative limit
self.logger.info("Gemini rate limiter created", calls_per_minute=8)
# Get shared clients from API service
lastfm_client = await self.api_service.get_lastfm_client()
# Create metadata service with shared client
self.metadata_service = MetadataService(lastfm_client=lastfm_client)
self.logger.info("Shared resources initialized")
def _create_agent_config(self) -> AgentConfig:
"""Create standard agent configuration."""
return AgentConfig(
agent_name="default",
agent_type="enhanced",
llm_model="gemini-2.0-flash-exp",
temperature=0.7,
max_tokens=1000
)
async def _initialize_individual_agents(self, agent_config: AgentConfig):
"""Initialize individual agent instances."""
# Import agents dynamically to avoid circular imports
try:
from ...agents.planner.agent import PlannerAgent
from ...agents.genre_mood.agent import GenreMoodAgent
from ...agents.discovery.agent import DiscoveryAgent
from ...agents.judge.agent import JudgeAgent
except ImportError:
from agents.planner.agent import PlannerAgent
from agents.genre_mood.agent import GenreMoodAgent
from agents.discovery.agent import DiscoveryAgent
from agents.judge.agent import JudgeAgent
# Initialize PlannerAgent
self.planner_agent = PlannerAgent(
config=agent_config,
llm_client=self.gemini_client,
api_service=self.api_service,
metadata_service=self.metadata_service,
rate_limiter=self.gemini_rate_limiter
)
self.logger.debug("PlannerAgent initialized")
# Initialize GenreMoodAgent
self.genre_mood_agent = GenreMoodAgent(
config=agent_config,
llm_client=self.gemini_client,
api_service=self.api_service,
metadata_service=self.metadata_service,
rate_limiter=self.gemini_rate_limiter,
session_manager=self.session_manager # Phase 3: For candidate pool persistence
)
self.logger.debug("GenreMoodAgent initialized")
# Initialize DiscoveryAgent
self.discovery_agent = DiscoveryAgent(
config=agent_config,
llm_client=self.gemini_client,
api_service=self.api_service,
metadata_service=self.metadata_service,
rate_limiter=self.gemini_rate_limiter,
session_manager=self.session_manager # Phase 3: For candidate pool persistence
)
self.logger.debug("DiscoveryAgent initialized")
# Initialize JudgeAgent
self.judge_agent = JudgeAgent(
config=agent_config,
llm_client=self.gemini_client,
api_service=self.api_service,
metadata_service=self.metadata_service,
rate_limiter=self.gemini_rate_limiter,
session_manager=self.session_manager # Phase 3: For candidate pool retrieval
)
self.logger.debug("JudgeAgent initialized")
def get_planner_agent(self) -> Optional["PlannerAgent"]:
"""Get the planner agent instance."""
return self.planner_agent
def get_genre_mood_agent(self) -> Optional["GenreMoodAgent"]:
"""Get the genre mood agent instance."""
return self.genre_mood_agent
def get_discovery_agent(self) -> Optional["DiscoveryAgent"]:
"""Get the discovery agent instance."""
return self.discovery_agent
def get_judge_agent(self) -> Optional["JudgeAgent"]:
"""Get the judge agent instance."""
return self.judge_agent
def get_gemini_client(self):
"""Get the shared Gemini client."""
return self.gemini_client
def get_rate_limiter(self):
"""Get the shared rate limiter."""
return self.gemini_rate_limiter
def are_agents_initialized(self) -> bool:
"""Check if agents are initialized."""
return self._agents_initialized
async def close(self):
"""Clean up agent resources."""
# Close individual agents if they have cleanup methods
for agent in [self.planner_agent, self.genre_mood_agent, self.discovery_agent, self.judge_agent]:
if agent and hasattr(agent, 'close'):
try:
await agent.close()
except Exception as e:
self.logger.warning(f"Error closing agent: {e}")
self.logger.info("Agent coordinator closed")