#!/usr/bin/env python3 """ Multi-Agent MoE Integration This module provides integration between the multi-agent training system and the existing MoE framework, allowing for seamless combination of agent-specific conditioning and expert specialization. """ import os import json import logging from typing import Dict, List, Optional, Tuple, Any, Union from dataclasses import dataclass from pathlib import Path import torch from transformers import AutoTokenizer, AutoModelForCausalLM from .phi35_moe_integration import EnhancedMoEFramework, Phi35MoEConfig, Phi35MoEExpert from ..multi_agent_tokenization.agent_tokenizer import AgentTokenManager, AgentTokenConfig from ..multi_agent_training.multi_agent_trainer import MultiAgentTrainingConfig logger = logging.getLogger(__name__) @dataclass class MultiAgentMoEConfig: """Configuration for multi-agent MoE integration""" # Base MoE configuration moe_config: Phi35MoEConfig # Multi-agent configuration agent_prefix: str = "<|agent:" agent_suffix: str = "|>" agents_file: Optional[str] = None # Integration settings enable_agent_conditioning: bool = True enable_expert_routing: bool = True hybrid_mode: bool = True # Use both agent tokens and expert routing # Model paths base_model_path: str = "microsoft/Phi-3.5-MoE-instruct" lora_adapter_path: Optional[str] = None # Agent-Expert mapping agent_expert_mapping: Optional[Dict[str, str]] = None class MultiAgentMoEExpert(Phi35MoEExpert): """ Enhanced MoE expert with multi-agent support """ def __init__(self, expert_id: str, specialization: str, config: Phi35MoEConfig, agent_manager: Optional[AgentTokenManager] = None): super().__init__(expert_id, specialization, config) self.agent_manager = agent_manager self.supported_agents: List[str] = [] def add_agent_support(self, agent: str): """Add agent support to this expert""" if agent not in self.supported_agents: self.supported_agents.append(agent) logger.info(f"Added agent '{agent}' support to expert '{self.expert_id}'") def format_agent_prompt(self, agent: str, messages: List[Dict[str, str]]) -> List[Dict[str, str]]: """Format messages with agent conditioning""" if not self.agent_manager or agent not in self.supported_agents: return messages # Add agent token to system message or create one formatted_messages = messages.copy() # Check if there's a system message has_system = any(msg.get("role") == "system" for msg in formatted_messages) if not has_system: # Add system message with agent token agent_token = f"{self.agent_manager.config.agent_prefix}{agent}{self.agent_manager.config.agent_suffix}" system_message = { "role": "system", "content": f"You are a {agent} agent specialized in {self.specialization}." } formatted_messages.insert(0, system_message) else: # Update existing system message for msg in formatted_messages: if msg.get("role") == "system": agent_token = f"{self.agent_manager.config.agent_prefix}{agent}{self.agent_manager.config.agent_suffix}" msg["content"] = f"{agent_token}\n{msg['content']}" break return formatted_messages async def generate_response(self, messages: List[Dict[str, str]], agent: Optional[str] = None, **kwargs) -> Dict[str, Any]: """Generate response with optional agent conditioning""" if agent and agent in self.supported_agents: messages = self.format_agent_prompt(agent, messages) # Call parent method return await super().generate_response(messages, **kwargs) class MultiAgentMoERouter: """ Enhanced router that considers both agent and content for expert selection """ def __init__(self, agent_manager: Optional[AgentTokenManager] = None): self.agent_manager = agent_manager self.agent_expert_mapping: Dict[str, str] = {} self.expert_specializations = { "code": ["programming", "software", "development", "coding", "algorithm", "python", "javascript", "java", "function", "code"], "math": ["mathematics", "calculation", "equation", "formula", "statistics", "derivative", "integral", "algebra", "calculus", "math", "solve", "calculate"], "reasoning": ["logic", "analysis", "reasoning", "problem-solving", "critical", "explain", "why", "how", "because"], "multilingual": ["translation", "language", "multilingual", "localization", "translate", "spanish", "french", "german"], "general": ["general", "conversation", "assistance", "help", "hello", "hi", "what", "who", "when", "where"] } def set_agent_expert_mapping(self, mapping: Dict[str, str]): """Set mapping from agents to preferred experts""" self.agent_expert_mapping = mapping logger.info(f"Set agent-expert mapping: {mapping}") def extract_agent_from_messages(self, messages: List[Dict[str, str]]) -> Optional[str]: """Extract agent from messages""" if not self.agent_manager: return None # Look for agent token in system message for msg in messages: if msg.get("role") == "system": content = msg.get("content", "") agent = self.agent_manager.extract_agent_from_text(content) if agent: return agent return None def route_experts(self, messages: List[Dict[str, str]], available_experts: List[MultiAgentMoEExpert]) -> List[MultiAgentMoEExpert]: """Route to appropriate experts considering both agent and content""" # Extract agent agent = self.extract_agent_from_messages(messages) # Get content for analysis content = "" for msg in messages: if msg.get("role") in ["user", "assistant"]: content += " " + msg.get("content", "") content_lower = content.lower() # First, try agent-based routing if agent and agent in self.agent_expert_mapping: preferred_expert_type = self.agent_expert_mapping[agent] agent_experts = [exp for exp in available_experts if exp.specialization == preferred_expert_type] if agent_experts: logger.debug(f"Routing agent '{agent}' to {preferred_expert_type} expert") return agent_experts # Fall back to content-based routing for specialization, keywords in self.expert_specializations.items(): if any(keyword in content_lower for keyword in keywords): content_experts = [exp for exp in available_experts if exp.specialization == specialization] if content_experts: logger.debug(f"Routing based on content to {specialization} expert") return content_experts # Default to general expert general_experts = [exp for exp in available_experts if exp.specialization == "general"] if general_experts: logger.debug("Routing to general expert") return general_experts # Return all experts if no specific routing return available_experts class MultiAgentMoEFramework(EnhancedMoEFramework): """ Enhanced MoE framework with multi-agent support """ def __init__(self, config: MultiAgentMoEConfig): super().__init__(config.moe_config) self.multi_agent_config = config self.agent_manager: Optional[AgentTokenManager] = None self.agent_expert_mapping: Dict[str, str] = {} self.agents: List[str] = [] async def initialize_agents(self, agents: List[str], agent_expert_mapping: Optional[Dict[str, str]] = None): """Initialize multi-agent support""" self.agents = agents # Create agent token manager agent_config = AgentTokenConfig( agent_prefix=self.multi_agent_config.agent_prefix, agent_suffix=self.multi_agent_config.agent_suffix ) self.agent_manager = AgentTokenManager(agent_config) # Set agent-expert mapping if agent_expert_mapping: self.agent_expert_mapping = agent_expert_mapping else: # Default mapping based on agent names self.agent_expert_mapping = self._create_default_mapping(agents) # Update router with agent manager if hasattr(self, 'router') and isinstance(self.router, MultiAgentMoERouter): self.router.agent_manager = self.agent_manager self.router.set_agent_expert_mapping(self.agent_expert_mapping) logger.info(f"Initialized multi-agent support for {len(agents)} agents") logger.info(f"Agent-expert mapping: {self.agent_expert_mapping}") def _create_default_mapping(self, agents: List[str]) -> Dict[str, str]: """Create default agent-expert mapping""" mapping = {} for agent in agents: agent_lower = agent.lower() if any(keyword in agent_lower for keyword in ["swe", "developer", "programmer", "engineer"]): mapping[agent] = "code" elif any(keyword in agent_lower for keyword in ["sqa", "tester", "qa", "quality"]): mapping[agent] = "code" elif any(keyword in agent_lower for keyword in ["devops", "ops", "deployment"]): mapping[agent] = "code" elif any(keyword in agent_lower for keyword in ["architect", "design", "system"]): mapping[agent] = "reasoning" elif any(keyword in agent_lower for keyword in ["security", "sec", "cyber"]): mapping[agent] = "reasoning" elif any(keyword in agent_lower for keyword in ["math", "analyst", "data"]): mapping[agent] = "math" elif any(keyword in agent_lower for keyword in ["translate", "localization", "lang"]): mapping[agent] = "multilingual" else: mapping[agent] = "general" return mapping async def initialize_experts(self, expert_configs: List[Dict[str, str]]): """Initialize experts with multi-agent support""" # Call parent method await super().initialize_experts(expert_configs) # Enhance experts with agent support for expert_id, expert in self.experts.items(): if isinstance(expert, MultiAgentMoEExpert): # Add agent support based on mapping for agent, expert_type in self.agent_expert_mapping.items(): if expert.specialization == expert_type: expert.add_agent_support(agent) async def process_query(self, query: str, agent: Optional[str] = None, system_message: Optional[str] = None, use_multiple_experts: bool = True, **kwargs) -> Dict[str, Any]: """Process query with optional agent conditioning""" # Format messages with agent context messages = [{"role": "user", "content": query}] if system_message: messages.insert(0, {"role": "system", "content": system_message}) # Add agent conditioning if specified if agent and self.agent_manager: agent_token = f"{self.agent_manager.config.agent_prefix}{agent}{self.agent_manager.config.agent_suffix}" if messages[0].get("role") == "system": messages[0]["content"] = f"{agent_token}\n{messages[0]['content']}" else: messages.insert(0, {"role": "system", "content": f"{agent_token}\nYou are a {agent} agent."}) # Process with enhanced framework return await super().process_query( query=query, system_message=system_message, use_multiple_experts=use_multiple_experts, **kwargs ) def get_agent_statistics(self) -> Dict[str, Any]: """Get statistics about agent support""" stats = { "total_agents": len(self.agents), "agents": self.agents, "agent_expert_mapping": self.agent_expert_mapping, "expert_agent_support": {} } for expert_id, expert in self.experts.items(): if isinstance(expert, MultiAgentMoEExpert): stats["expert_agent_support"][expert_id] = { "specialization": expert.specialization, "supported_agents": expert.supported_agents } return stats def save_agent_configuration(self, output_dir: str): """Save agent configuration for deployment""" os.makedirs(output_dir, exist_ok=True) config_data = { "agents": self.agents, "agent_expert_mapping": self.agent_expert_mapping, "agent_token_config": { "agent_prefix": self.multi_agent_config.agent_prefix, "agent_suffix": self.multi_agent_config.agent_suffix }, "expert_agent_support": self.get_agent_statistics()["expert_agent_support"] } config_file = os.path.join(output_dir, "agent_config.json") with open(config_file, 'w') as f: json.dump(config_data, f, indent=2) logger.info(f"Saved agent configuration to {config_file}") return config_file class MultiAgentMoEAdapter: """ Adapter for integrating multi-agent MoE with existing systems """ def __init__(self, multi_agent_moe: MultiAgentMoEFramework): self.multi_agent_moe = multi_agent_moe async def process_agent_query(self, agent: str, query: str, **kwargs) -> Dict[str, Any]: """Process query for specific agent""" return await self.multi_agent_moe.process_query( query=query, agent=agent, **kwargs ) def get_agent_capabilities(self, agent: str) -> Dict[str, Any]: """Get capabilities for specific agent""" if agent not in self.multi_agent_moe.agents: return {"error": f"Agent '{agent}' not found"} expert_type = self.multi_agent_moe.agent_expert_mapping.get(agent, "general") return { "agent": agent, "expert_type": expert_type, "supported": True, "capabilities": self.multi_agent_moe.expert_specializations.get(expert_type, []) } def list_available_agents(self) -> List[Dict[str, Any]]: """List all available agents and their capabilities""" agents_info = [] for agent in self.multi_agent_moe.agents: expert_type = self.multi_agent_moe.agent_expert_mapping.get(agent, "general") agents_info.append({ "agent": agent, "expert_type": expert_type, "capabilities": self.multi_agent_moe.expert_specializations.get(expert_type, []) }) return agents_info # Example usage and testing if __name__ == "__main__": # Configure logging logging.basicConfig(level=logging.INFO) # Example configuration moe_config = Phi35MoEConfig() multi_agent_config = MultiAgentMoEConfig( moe_config=moe_config, agent_prefix="<|agent:", agent_suffix="|>" ) # Create framework framework = MultiAgentMoEFramework(multi_agent_config) # Example agents agents = ["SWE", "SQE", "DevOps", "Architect", "Security"] # Example agent-expert mapping agent_expert_mapping = { "SWE": "code", "SQE": "code", "DevOps": "code", "Architect": "reasoning", "Security": "reasoning" } print("Multi-agent MoE framework ready") print(f"Agents: {agents}") print(f"Agent-expert mapping: {agent_expert_mapping}")