|
|
|
|
|
""" |
|
|
Multi-Agent MoE Integration |
|
|
|
|
|
This module provides integration between the multi-agent training system and the existing |
|
|
MoE framework, allowing for seamless combination of agent-specific conditioning and |
|
|
expert specialization. |
|
|
""" |
|
|
|
|
|
import os |
|
|
import json |
|
|
import logging |
|
|
from typing import Dict, List, Optional, Tuple, Any, Union |
|
|
from dataclasses import dataclass |
|
|
from pathlib import Path |
|
|
|
|
|
import torch |
|
|
from transformers import AutoTokenizer, AutoModelForCausalLM |
|
|
|
|
|
from .phi35_moe_integration import EnhancedMoEFramework, Phi35MoEConfig, Phi35MoEExpert |
|
|
from ..multi_agent_tokenization.agent_tokenizer import AgentTokenManager, AgentTokenConfig |
|
|
from ..multi_agent_training.multi_agent_trainer import MultiAgentTrainingConfig |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
@dataclass |
|
|
class MultiAgentMoEConfig: |
|
|
"""Configuration for multi-agent MoE integration""" |
|
|
|
|
|
moe_config: Phi35MoEConfig |
|
|
|
|
|
|
|
|
agent_prefix: str = "<|agent:" |
|
|
agent_suffix: str = "|>" |
|
|
agents_file: Optional[str] = None |
|
|
|
|
|
|
|
|
enable_agent_conditioning: bool = True |
|
|
enable_expert_routing: bool = True |
|
|
hybrid_mode: bool = True |
|
|
|
|
|
|
|
|
base_model_path: str = "microsoft/Phi-3.5-MoE-instruct" |
|
|
lora_adapter_path: Optional[str] = None |
|
|
|
|
|
|
|
|
agent_expert_mapping: Optional[Dict[str, str]] = None |
|
|
|
|
|
class MultiAgentMoEExpert(Phi35MoEExpert): |
|
|
""" |
|
|
Enhanced MoE expert with multi-agent support |
|
|
""" |
|
|
|
|
|
def __init__(self, expert_id: str, specialization: str, config: Phi35MoEConfig, |
|
|
agent_manager: Optional[AgentTokenManager] = None): |
|
|
super().__init__(expert_id, specialization, config) |
|
|
self.agent_manager = agent_manager |
|
|
self.supported_agents: List[str] = [] |
|
|
|
|
|
def add_agent_support(self, agent: str): |
|
|
"""Add agent support to this expert""" |
|
|
if agent not in self.supported_agents: |
|
|
self.supported_agents.append(agent) |
|
|
logger.info(f"Added agent '{agent}' support to expert '{self.expert_id}'") |
|
|
|
|
|
def format_agent_prompt(self, agent: str, messages: List[Dict[str, str]]) -> List[Dict[str, str]]: |
|
|
"""Format messages with agent conditioning""" |
|
|
if not self.agent_manager or agent not in self.supported_agents: |
|
|
return messages |
|
|
|
|
|
|
|
|
formatted_messages = messages.copy() |
|
|
|
|
|
|
|
|
has_system = any(msg.get("role") == "system" for msg in formatted_messages) |
|
|
|
|
|
if not has_system: |
|
|
|
|
|
agent_token = f"{self.agent_manager.config.agent_prefix}{agent}{self.agent_manager.config.agent_suffix}" |
|
|
system_message = { |
|
|
"role": "system", |
|
|
"content": f"You are a {agent} agent specialized in {self.specialization}." |
|
|
} |
|
|
formatted_messages.insert(0, system_message) |
|
|
else: |
|
|
|
|
|
for msg in formatted_messages: |
|
|
if msg.get("role") == "system": |
|
|
agent_token = f"{self.agent_manager.config.agent_prefix}{agent}{self.agent_manager.config.agent_suffix}" |
|
|
msg["content"] = f"{agent_token}\n{msg['content']}" |
|
|
break |
|
|
|
|
|
return formatted_messages |
|
|
|
|
|
async def generate_response(self, messages: List[Dict[str, str]], |
|
|
agent: Optional[str] = None, **kwargs) -> Dict[str, Any]: |
|
|
"""Generate response with optional agent conditioning""" |
|
|
if agent and agent in self.supported_agents: |
|
|
messages = self.format_agent_prompt(agent, messages) |
|
|
|
|
|
|
|
|
return await super().generate_response(messages, **kwargs) |
|
|
|
|
|
class MultiAgentMoERouter: |
|
|
""" |
|
|
Enhanced router that considers both agent and content for expert selection |
|
|
""" |
|
|
|
|
|
def __init__(self, agent_manager: Optional[AgentTokenManager] = None): |
|
|
self.agent_manager = agent_manager |
|
|
self.agent_expert_mapping: Dict[str, str] = {} |
|
|
self.expert_specializations = { |
|
|
"code": ["programming", "software", "development", "coding", "algorithm", "python", "javascript", "java", "function", "code"], |
|
|
"math": ["mathematics", "calculation", "equation", "formula", "statistics", "derivative", "integral", "algebra", "calculus", "math", "solve", "calculate"], |
|
|
"reasoning": ["logic", "analysis", "reasoning", "problem-solving", "critical", "explain", "why", "how", "because"], |
|
|
"multilingual": ["translation", "language", "multilingual", "localization", "translate", "spanish", "french", "german"], |
|
|
"general": ["general", "conversation", "assistance", "help", "hello", "hi", "what", "who", "when", "where"] |
|
|
} |
|
|
|
|
|
def set_agent_expert_mapping(self, mapping: Dict[str, str]): |
|
|
"""Set mapping from agents to preferred experts""" |
|
|
self.agent_expert_mapping = mapping |
|
|
logger.info(f"Set agent-expert mapping: {mapping}") |
|
|
|
|
|
def extract_agent_from_messages(self, messages: List[Dict[str, str]]) -> Optional[str]: |
|
|
"""Extract agent from messages""" |
|
|
if not self.agent_manager: |
|
|
return None |
|
|
|
|
|
|
|
|
for msg in messages: |
|
|
if msg.get("role") == "system": |
|
|
content = msg.get("content", "") |
|
|
agent = self.agent_manager.extract_agent_from_text(content) |
|
|
if agent: |
|
|
return agent |
|
|
|
|
|
return None |
|
|
|
|
|
def route_experts(self, messages: List[Dict[str, str]], available_experts: List[MultiAgentMoEExpert]) -> List[MultiAgentMoEExpert]: |
|
|
"""Route to appropriate experts considering both agent and content""" |
|
|
|
|
|
agent = self.extract_agent_from_messages(messages) |
|
|
|
|
|
|
|
|
content = "" |
|
|
for msg in messages: |
|
|
if msg.get("role") in ["user", "assistant"]: |
|
|
content += " " + msg.get("content", "") |
|
|
|
|
|
content_lower = content.lower() |
|
|
|
|
|
|
|
|
if agent and agent in self.agent_expert_mapping: |
|
|
preferred_expert_type = self.agent_expert_mapping[agent] |
|
|
agent_experts = [exp for exp in available_experts if exp.specialization == preferred_expert_type] |
|
|
if agent_experts: |
|
|
logger.debug(f"Routing agent '{agent}' to {preferred_expert_type} expert") |
|
|
return agent_experts |
|
|
|
|
|
|
|
|
for specialization, keywords in self.expert_specializations.items(): |
|
|
if any(keyword in content_lower for keyword in keywords): |
|
|
content_experts = [exp for exp in available_experts if exp.specialization == specialization] |
|
|
if content_experts: |
|
|
logger.debug(f"Routing based on content to {specialization} expert") |
|
|
return content_experts |
|
|
|
|
|
|
|
|
general_experts = [exp for exp in available_experts if exp.specialization == "general"] |
|
|
if general_experts: |
|
|
logger.debug("Routing to general expert") |
|
|
return general_experts |
|
|
|
|
|
|
|
|
return available_experts |
|
|
|
|
|
class MultiAgentMoEFramework(EnhancedMoEFramework): |
|
|
""" |
|
|
Enhanced MoE framework with multi-agent support |
|
|
""" |
|
|
|
|
|
def __init__(self, config: MultiAgentMoEConfig): |
|
|
super().__init__(config.moe_config) |
|
|
self.multi_agent_config = config |
|
|
self.agent_manager: Optional[AgentTokenManager] = None |
|
|
self.agent_expert_mapping: Dict[str, str] = {} |
|
|
self.agents: List[str] = [] |
|
|
|
|
|
async def initialize_agents(self, agents: List[str], agent_expert_mapping: Optional[Dict[str, str]] = None): |
|
|
"""Initialize multi-agent support""" |
|
|
self.agents = agents |
|
|
|
|
|
|
|
|
agent_config = AgentTokenConfig( |
|
|
agent_prefix=self.multi_agent_config.agent_prefix, |
|
|
agent_suffix=self.multi_agent_config.agent_suffix |
|
|
) |
|
|
self.agent_manager = AgentTokenManager(agent_config) |
|
|
|
|
|
|
|
|
if agent_expert_mapping: |
|
|
self.agent_expert_mapping = agent_expert_mapping |
|
|
else: |
|
|
|
|
|
self.agent_expert_mapping = self._create_default_mapping(agents) |
|
|
|
|
|
|
|
|
if hasattr(self, 'router') and isinstance(self.router, MultiAgentMoERouter): |
|
|
self.router.agent_manager = self.agent_manager |
|
|
self.router.set_agent_expert_mapping(self.agent_expert_mapping) |
|
|
|
|
|
logger.info(f"Initialized multi-agent support for {len(agents)} agents") |
|
|
logger.info(f"Agent-expert mapping: {self.agent_expert_mapping}") |
|
|
|
|
|
def _create_default_mapping(self, agents: List[str]) -> Dict[str, str]: |
|
|
"""Create default agent-expert mapping""" |
|
|
mapping = {} |
|
|
|
|
|
for agent in agents: |
|
|
agent_lower = agent.lower() |
|
|
if any(keyword in agent_lower for keyword in ["swe", "developer", "programmer", "engineer"]): |
|
|
mapping[agent] = "code" |
|
|
elif any(keyword in agent_lower for keyword in ["sqa", "tester", "qa", "quality"]): |
|
|
mapping[agent] = "code" |
|
|
elif any(keyword in agent_lower for keyword in ["devops", "ops", "deployment"]): |
|
|
mapping[agent] = "code" |
|
|
elif any(keyword in agent_lower for keyword in ["architect", "design", "system"]): |
|
|
mapping[agent] = "reasoning" |
|
|
elif any(keyword in agent_lower for keyword in ["security", "sec", "cyber"]): |
|
|
mapping[agent] = "reasoning" |
|
|
elif any(keyword in agent_lower for keyword in ["math", "analyst", "data"]): |
|
|
mapping[agent] = "math" |
|
|
elif any(keyword in agent_lower for keyword in ["translate", "localization", "lang"]): |
|
|
mapping[agent] = "multilingual" |
|
|
else: |
|
|
mapping[agent] = "general" |
|
|
|
|
|
return mapping |
|
|
|
|
|
async def initialize_experts(self, expert_configs: List[Dict[str, str]]): |
|
|
"""Initialize experts with multi-agent support""" |
|
|
|
|
|
await super().initialize_experts(expert_configs) |
|
|
|
|
|
|
|
|
for expert_id, expert in self.experts.items(): |
|
|
if isinstance(expert, MultiAgentMoEExpert): |
|
|
|
|
|
for agent, expert_type in self.agent_expert_mapping.items(): |
|
|
if expert.specialization == expert_type: |
|
|
expert.add_agent_support(agent) |
|
|
|
|
|
async def process_query(self, query: str, agent: Optional[str] = None, |
|
|
system_message: Optional[str] = None, |
|
|
use_multiple_experts: bool = True, **kwargs) -> Dict[str, Any]: |
|
|
"""Process query with optional agent conditioning""" |
|
|
|
|
|
messages = [{"role": "user", "content": query}] |
|
|
|
|
|
if system_message: |
|
|
messages.insert(0, {"role": "system", "content": system_message}) |
|
|
|
|
|
|
|
|
if agent and self.agent_manager: |
|
|
agent_token = f"{self.agent_manager.config.agent_prefix}{agent}{self.agent_manager.config.agent_suffix}" |
|
|
if messages[0].get("role") == "system": |
|
|
messages[0]["content"] = f"{agent_token}\n{messages[0]['content']}" |
|
|
else: |
|
|
messages.insert(0, {"role": "system", "content": f"{agent_token}\nYou are a {agent} agent."}) |
|
|
|
|
|
|
|
|
return await super().process_query( |
|
|
query=query, |
|
|
system_message=system_message, |
|
|
use_multiple_experts=use_multiple_experts, |
|
|
**kwargs |
|
|
) |
|
|
|
|
|
def get_agent_statistics(self) -> Dict[str, Any]: |
|
|
"""Get statistics about agent support""" |
|
|
stats = { |
|
|
"total_agents": len(self.agents), |
|
|
"agents": self.agents, |
|
|
"agent_expert_mapping": self.agent_expert_mapping, |
|
|
"expert_agent_support": {} |
|
|
} |
|
|
|
|
|
for expert_id, expert in self.experts.items(): |
|
|
if isinstance(expert, MultiAgentMoEExpert): |
|
|
stats["expert_agent_support"][expert_id] = { |
|
|
"specialization": expert.specialization, |
|
|
"supported_agents": expert.supported_agents |
|
|
} |
|
|
|
|
|
return stats |
|
|
|
|
|
def save_agent_configuration(self, output_dir: str): |
|
|
"""Save agent configuration for deployment""" |
|
|
os.makedirs(output_dir, exist_ok=True) |
|
|
|
|
|
config_data = { |
|
|
"agents": self.agents, |
|
|
"agent_expert_mapping": self.agent_expert_mapping, |
|
|
"agent_token_config": { |
|
|
"agent_prefix": self.multi_agent_config.agent_prefix, |
|
|
"agent_suffix": self.multi_agent_config.agent_suffix |
|
|
}, |
|
|
"expert_agent_support": self.get_agent_statistics()["expert_agent_support"] |
|
|
} |
|
|
|
|
|
config_file = os.path.join(output_dir, "agent_config.json") |
|
|
with open(config_file, 'w') as f: |
|
|
json.dump(config_data, f, indent=2) |
|
|
|
|
|
logger.info(f"Saved agent configuration to {config_file}") |
|
|
return config_file |
|
|
|
|
|
class MultiAgentMoEAdapter: |
|
|
""" |
|
|
Adapter for integrating multi-agent MoE with existing systems |
|
|
""" |
|
|
|
|
|
def __init__(self, multi_agent_moe: MultiAgentMoEFramework): |
|
|
self.multi_agent_moe = multi_agent_moe |
|
|
|
|
|
async def process_agent_query(self, agent: str, query: str, **kwargs) -> Dict[str, Any]: |
|
|
"""Process query for specific agent""" |
|
|
return await self.multi_agent_moe.process_query( |
|
|
query=query, |
|
|
agent=agent, |
|
|
**kwargs |
|
|
) |
|
|
|
|
|
def get_agent_capabilities(self, agent: str) -> Dict[str, Any]: |
|
|
"""Get capabilities for specific agent""" |
|
|
if agent not in self.multi_agent_moe.agents: |
|
|
return {"error": f"Agent '{agent}' not found"} |
|
|
|
|
|
expert_type = self.multi_agent_moe.agent_expert_mapping.get(agent, "general") |
|
|
|
|
|
return { |
|
|
"agent": agent, |
|
|
"expert_type": expert_type, |
|
|
"supported": True, |
|
|
"capabilities": self.multi_agent_moe.expert_specializations.get(expert_type, []) |
|
|
} |
|
|
|
|
|
def list_available_agents(self) -> List[Dict[str, Any]]: |
|
|
"""List all available agents and their capabilities""" |
|
|
agents_info = [] |
|
|
|
|
|
for agent in self.multi_agent_moe.agents: |
|
|
expert_type = self.multi_agent_moe.agent_expert_mapping.get(agent, "general") |
|
|
agents_info.append({ |
|
|
"agent": agent, |
|
|
"expert_type": expert_type, |
|
|
"capabilities": self.multi_agent_moe.expert_specializations.get(expert_type, []) |
|
|
}) |
|
|
|
|
|
return agents_info |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
|
|
|
|
|
|
|
moe_config = Phi35MoEConfig() |
|
|
multi_agent_config = MultiAgentMoEConfig( |
|
|
moe_config=moe_config, |
|
|
agent_prefix="<|agent:", |
|
|
agent_suffix="|>" |
|
|
) |
|
|
|
|
|
|
|
|
framework = MultiAgentMoEFramework(multi_agent_config) |
|
|
|
|
|
|
|
|
agents = ["SWE", "SQE", "DevOps", "Architect", "Security"] |
|
|
|
|
|
|
|
|
agent_expert_mapping = { |
|
|
"SWE": "code", |
|
|
"SQE": "code", |
|
|
"DevOps": "code", |
|
|
"Architect": "reasoning", |
|
|
"Security": "reasoning" |
|
|
} |
|
|
|
|
|
print("Multi-agent MoE framework ready") |
|
|
print(f"Agents: {agents}") |
|
|
print(f"Agent-expert mapping: {agent_expert_mapping}") |
|
|
|