phi35-moe-multimodal / src /models /multi_agent_moe_integration.py
Mango-Metrics-NLM
feat: Phi-3.5-MoE multi-agent model repository
c8b77b5
#!/usr/bin/env python3
"""
Multi-Agent MoE Integration
This module provides integration between the multi-agent training system and the existing
MoE framework, allowing for seamless combination of agent-specific conditioning and
expert specialization.
"""
import os
import json
import logging
from typing import Dict, List, Optional, Tuple, Any, Union
from dataclasses import dataclass
from pathlib import Path
import torch
from transformers import AutoTokenizer, AutoModelForCausalLM
from .phi35_moe_integration import EnhancedMoEFramework, Phi35MoEConfig, Phi35MoEExpert
from ..multi_agent_tokenization.agent_tokenizer import AgentTokenManager, AgentTokenConfig
from ..multi_agent_training.multi_agent_trainer import MultiAgentTrainingConfig
logger = logging.getLogger(__name__)
@dataclass
class MultiAgentMoEConfig:
"""Configuration for multi-agent MoE integration"""
# Base MoE configuration
moe_config: Phi35MoEConfig
# Multi-agent configuration
agent_prefix: str = "<|agent:"
agent_suffix: str = "|>"
agents_file: Optional[str] = None
# Integration settings
enable_agent_conditioning: bool = True
enable_expert_routing: bool = True
hybrid_mode: bool = True # Use both agent tokens and expert routing
# Model paths
base_model_path: str = "microsoft/Phi-3.5-MoE-instruct"
lora_adapter_path: Optional[str] = None
# Agent-Expert mapping
agent_expert_mapping: Optional[Dict[str, str]] = None
class MultiAgentMoEExpert(Phi35MoEExpert):
"""
Enhanced MoE expert with multi-agent support
"""
def __init__(self, expert_id: str, specialization: str, config: Phi35MoEConfig,
agent_manager: Optional[AgentTokenManager] = None):
super().__init__(expert_id, specialization, config)
self.agent_manager = agent_manager
self.supported_agents: List[str] = []
def add_agent_support(self, agent: str):
"""Add agent support to this expert"""
if agent not in self.supported_agents:
self.supported_agents.append(agent)
logger.info(f"Added agent '{agent}' support to expert '{self.expert_id}'")
def format_agent_prompt(self, agent: str, messages: List[Dict[str, str]]) -> List[Dict[str, str]]:
"""Format messages with agent conditioning"""
if not self.agent_manager or agent not in self.supported_agents:
return messages
# Add agent token to system message or create one
formatted_messages = messages.copy()
# Check if there's a system message
has_system = any(msg.get("role") == "system" for msg in formatted_messages)
if not has_system:
# Add system message with agent token
agent_token = f"{self.agent_manager.config.agent_prefix}{agent}{self.agent_manager.config.agent_suffix}"
system_message = {
"role": "system",
"content": f"You are a {agent} agent specialized in {self.specialization}."
}
formatted_messages.insert(0, system_message)
else:
# Update existing system message
for msg in formatted_messages:
if msg.get("role") == "system":
agent_token = f"{self.agent_manager.config.agent_prefix}{agent}{self.agent_manager.config.agent_suffix}"
msg["content"] = f"{agent_token}\n{msg['content']}"
break
return formatted_messages
async def generate_response(self, messages: List[Dict[str, str]],
agent: Optional[str] = None, **kwargs) -> Dict[str, Any]:
"""Generate response with optional agent conditioning"""
if agent and agent in self.supported_agents:
messages = self.format_agent_prompt(agent, messages)
# Call parent method
return await super().generate_response(messages, **kwargs)
class MultiAgentMoERouter:
"""
Enhanced router that considers both agent and content for expert selection
"""
def __init__(self, agent_manager: Optional[AgentTokenManager] = None):
self.agent_manager = agent_manager
self.agent_expert_mapping: Dict[str, str] = {}
self.expert_specializations = {
"code": ["programming", "software", "development", "coding", "algorithm", "python", "javascript", "java", "function", "code"],
"math": ["mathematics", "calculation", "equation", "formula", "statistics", "derivative", "integral", "algebra", "calculus", "math", "solve", "calculate"],
"reasoning": ["logic", "analysis", "reasoning", "problem-solving", "critical", "explain", "why", "how", "because"],
"multilingual": ["translation", "language", "multilingual", "localization", "translate", "spanish", "french", "german"],
"general": ["general", "conversation", "assistance", "help", "hello", "hi", "what", "who", "when", "where"]
}
def set_agent_expert_mapping(self, mapping: Dict[str, str]):
"""Set mapping from agents to preferred experts"""
self.agent_expert_mapping = mapping
logger.info(f"Set agent-expert mapping: {mapping}")
def extract_agent_from_messages(self, messages: List[Dict[str, str]]) -> Optional[str]:
"""Extract agent from messages"""
if not self.agent_manager:
return None
# Look for agent token in system message
for msg in messages:
if msg.get("role") == "system":
content = msg.get("content", "")
agent = self.agent_manager.extract_agent_from_text(content)
if agent:
return agent
return None
def route_experts(self, messages: List[Dict[str, str]], available_experts: List[MultiAgentMoEExpert]) -> List[MultiAgentMoEExpert]:
"""Route to appropriate experts considering both agent and content"""
# Extract agent
agent = self.extract_agent_from_messages(messages)
# Get content for analysis
content = ""
for msg in messages:
if msg.get("role") in ["user", "assistant"]:
content += " " + msg.get("content", "")
content_lower = content.lower()
# First, try agent-based routing
if agent and agent in self.agent_expert_mapping:
preferred_expert_type = self.agent_expert_mapping[agent]
agent_experts = [exp for exp in available_experts if exp.specialization == preferred_expert_type]
if agent_experts:
logger.debug(f"Routing agent '{agent}' to {preferred_expert_type} expert")
return agent_experts
# Fall back to content-based routing
for specialization, keywords in self.expert_specializations.items():
if any(keyword in content_lower for keyword in keywords):
content_experts = [exp for exp in available_experts if exp.specialization == specialization]
if content_experts:
logger.debug(f"Routing based on content to {specialization} expert")
return content_experts
# Default to general expert
general_experts = [exp for exp in available_experts if exp.specialization == "general"]
if general_experts:
logger.debug("Routing to general expert")
return general_experts
# Return all experts if no specific routing
return available_experts
class MultiAgentMoEFramework(EnhancedMoEFramework):
"""
Enhanced MoE framework with multi-agent support
"""
def __init__(self, config: MultiAgentMoEConfig):
super().__init__(config.moe_config)
self.multi_agent_config = config
self.agent_manager: Optional[AgentTokenManager] = None
self.agent_expert_mapping: Dict[str, str] = {}
self.agents: List[str] = []
async def initialize_agents(self, agents: List[str], agent_expert_mapping: Optional[Dict[str, str]] = None):
"""Initialize multi-agent support"""
self.agents = agents
# Create agent token manager
agent_config = AgentTokenConfig(
agent_prefix=self.multi_agent_config.agent_prefix,
agent_suffix=self.multi_agent_config.agent_suffix
)
self.agent_manager = AgentTokenManager(agent_config)
# Set agent-expert mapping
if agent_expert_mapping:
self.agent_expert_mapping = agent_expert_mapping
else:
# Default mapping based on agent names
self.agent_expert_mapping = self._create_default_mapping(agents)
# Update router with agent manager
if hasattr(self, 'router') and isinstance(self.router, MultiAgentMoERouter):
self.router.agent_manager = self.agent_manager
self.router.set_agent_expert_mapping(self.agent_expert_mapping)
logger.info(f"Initialized multi-agent support for {len(agents)} agents")
logger.info(f"Agent-expert mapping: {self.agent_expert_mapping}")
def _create_default_mapping(self, agents: List[str]) -> Dict[str, str]:
"""Create default agent-expert mapping"""
mapping = {}
for agent in agents:
agent_lower = agent.lower()
if any(keyword in agent_lower for keyword in ["swe", "developer", "programmer", "engineer"]):
mapping[agent] = "code"
elif any(keyword in agent_lower for keyword in ["sqa", "tester", "qa", "quality"]):
mapping[agent] = "code"
elif any(keyword in agent_lower for keyword in ["devops", "ops", "deployment"]):
mapping[agent] = "code"
elif any(keyword in agent_lower for keyword in ["architect", "design", "system"]):
mapping[agent] = "reasoning"
elif any(keyword in agent_lower for keyword in ["security", "sec", "cyber"]):
mapping[agent] = "reasoning"
elif any(keyword in agent_lower for keyword in ["math", "analyst", "data"]):
mapping[agent] = "math"
elif any(keyword in agent_lower for keyword in ["translate", "localization", "lang"]):
mapping[agent] = "multilingual"
else:
mapping[agent] = "general"
return mapping
async def initialize_experts(self, expert_configs: List[Dict[str, str]]):
"""Initialize experts with multi-agent support"""
# Call parent method
await super().initialize_experts(expert_configs)
# Enhance experts with agent support
for expert_id, expert in self.experts.items():
if isinstance(expert, MultiAgentMoEExpert):
# Add agent support based on mapping
for agent, expert_type in self.agent_expert_mapping.items():
if expert.specialization == expert_type:
expert.add_agent_support(agent)
async def process_query(self, query: str, agent: Optional[str] = None,
system_message: Optional[str] = None,
use_multiple_experts: bool = True, **kwargs) -> Dict[str, Any]:
"""Process query with optional agent conditioning"""
# Format messages with agent context
messages = [{"role": "user", "content": query}]
if system_message:
messages.insert(0, {"role": "system", "content": system_message})
# Add agent conditioning if specified
if agent and self.agent_manager:
agent_token = f"{self.agent_manager.config.agent_prefix}{agent}{self.agent_manager.config.agent_suffix}"
if messages[0].get("role") == "system":
messages[0]["content"] = f"{agent_token}\n{messages[0]['content']}"
else:
messages.insert(0, {"role": "system", "content": f"{agent_token}\nYou are a {agent} agent."})
# Process with enhanced framework
return await super().process_query(
query=query,
system_message=system_message,
use_multiple_experts=use_multiple_experts,
**kwargs
)
def get_agent_statistics(self) -> Dict[str, Any]:
"""Get statistics about agent support"""
stats = {
"total_agents": len(self.agents),
"agents": self.agents,
"agent_expert_mapping": self.agent_expert_mapping,
"expert_agent_support": {}
}
for expert_id, expert in self.experts.items():
if isinstance(expert, MultiAgentMoEExpert):
stats["expert_agent_support"][expert_id] = {
"specialization": expert.specialization,
"supported_agents": expert.supported_agents
}
return stats
def save_agent_configuration(self, output_dir: str):
"""Save agent configuration for deployment"""
os.makedirs(output_dir, exist_ok=True)
config_data = {
"agents": self.agents,
"agent_expert_mapping": self.agent_expert_mapping,
"agent_token_config": {
"agent_prefix": self.multi_agent_config.agent_prefix,
"agent_suffix": self.multi_agent_config.agent_suffix
},
"expert_agent_support": self.get_agent_statistics()["expert_agent_support"]
}
config_file = os.path.join(output_dir, "agent_config.json")
with open(config_file, 'w') as f:
json.dump(config_data, f, indent=2)
logger.info(f"Saved agent configuration to {config_file}")
return config_file
class MultiAgentMoEAdapter:
"""
Adapter for integrating multi-agent MoE with existing systems
"""
def __init__(self, multi_agent_moe: MultiAgentMoEFramework):
self.multi_agent_moe = multi_agent_moe
async def process_agent_query(self, agent: str, query: str, **kwargs) -> Dict[str, Any]:
"""Process query for specific agent"""
return await self.multi_agent_moe.process_query(
query=query,
agent=agent,
**kwargs
)
def get_agent_capabilities(self, agent: str) -> Dict[str, Any]:
"""Get capabilities for specific agent"""
if agent not in self.multi_agent_moe.agents:
return {"error": f"Agent '{agent}' not found"}
expert_type = self.multi_agent_moe.agent_expert_mapping.get(agent, "general")
return {
"agent": agent,
"expert_type": expert_type,
"supported": True,
"capabilities": self.multi_agent_moe.expert_specializations.get(expert_type, [])
}
def list_available_agents(self) -> List[Dict[str, Any]]:
"""List all available agents and their capabilities"""
agents_info = []
for agent in self.multi_agent_moe.agents:
expert_type = self.multi_agent_moe.agent_expert_mapping.get(agent, "general")
agents_info.append({
"agent": agent,
"expert_type": expert_type,
"capabilities": self.multi_agent_moe.expert_specializations.get(expert_type, [])
})
return agents_info
# Example usage and testing
if __name__ == "__main__":
# Configure logging
logging.basicConfig(level=logging.INFO)
# Example configuration
moe_config = Phi35MoEConfig()
multi_agent_config = MultiAgentMoEConfig(
moe_config=moe_config,
agent_prefix="<|agent:",
agent_suffix="|>"
)
# Create framework
framework = MultiAgentMoEFramework(multi_agent_config)
# Example agents
agents = ["SWE", "SQE", "DevOps", "Architect", "Security"]
# Example agent-expert mapping
agent_expert_mapping = {
"SWE": "code",
"SQE": "code",
"DevOps": "code",
"Architect": "reasoning",
"Security": "reasoning"
}
print("Multi-agent MoE framework ready")
print(f"Agents: {agents}")
print(f"Agent-expert mapping: {agent_expert_mapping}")