phi35-moe-multimodal / src /models /phi35_moe_integration.py
Mango-Metrics-NLM
feat: Phi-3.5-MoE multi-agent model repository
c8b77b5
#!/usr/bin/env python3
"""
Phi-3.5-MoE Integration Module
This module integrates Microsoft's Phi-3.5-MoE-instruct model as a baseline
for expanding our current MoE (Mixture of Experts) framework.
Based on: https://huggingface.co/microsoft/Phi-3.5-MoE-instruct
- 41.9B parameters
- 128K context length
- Multilingual support
- Strong reasoning capabilities (code, math, logic)
"""
import torch
import logging
from typing import Dict, List, Optional, Any, Union
from transformers import (
AutoModelForCausalLM,
AutoTokenizer,
pipeline,
Pipeline
)
from pathlib import Path
import json
import asyncio
from dataclasses import dataclass
logger = logging.getLogger(__name__)
@dataclass
class Phi35MoEConfig:
"""Configuration for Phi-3.5-MoE integration"""
model_name: str = "microsoft/Phi-3.5-MoE-instruct"
device_map: str = "auto"
torch_dtype: str = "auto"
trust_remote_code: bool = False
max_new_tokens: int = 500
temperature: float = 0.7
top_p: float = 0.9
do_sample: bool = True
context_length: int = 128000 # 128K context length
vocabulary_size: int = 32064
class Phi35MoEExpert:
"""Individual expert using Phi-3.5-MoE model"""
def __init__(self, config: Phi35MoEConfig, expert_id: str, specialization: str):
self.config = config
self.expert_id = expert_id
self.specialization = specialization
self.model = None
self.tokenizer = None
self.pipeline = None
self.is_loaded = False
async def load_model(self):
"""Load the Phi-3.5-MoE model and tokenizer"""
try:
logger.info(f"Loading Phi-3.5-MoE model for expert {self.expert_id}")
# Load model
self.model = AutoModelForCausalLM.from_pretrained(
self.config.model_name,
device_map=self.config.device_map,
torch_dtype=self.config.torch_dtype,
trust_remote_code=self.config.trust_remote_code,
)
# Load tokenizer
self.tokenizer = AutoTokenizer.from_pretrained(self.config.model_name)
# Create pipeline
self.pipeline = pipeline(
"text-generation",
model=self.model,
tokenizer=self.tokenizer,
device_map=self.config.device_map,
)
self.is_loaded = True
logger.info(f"Successfully loaded Phi-3.5-MoE model for expert {self.expert_id}")
except Exception as e:
logger.error(f"Failed to load Phi-3.5-MoE model for expert {self.expert_id}: {e}")
raise
async def generate_response(
self,
messages: List[Dict[str, str]],
**generation_kwargs
) -> Dict[str, Any]:
"""Generate response using Phi-3.5-MoE model"""
if not self.is_loaded:
await self.load_model()
try:
# Set default generation parameters
generation_args = {
"max_new_tokens": self.config.max_new_tokens,
"temperature": self.config.temperature,
"top_p": self.config.top_p,
"do_sample": self.config.do_sample,
"return_full_text": False,
**generation_kwargs
}
# Generate response
response = self.pipeline(messages, **generation_args)
return {
"expert_id": self.expert_id,
"specialization": self.specialization,
"response": response[0]["generated_text"] if response else "",
"model": self.config.model_name,
"success": True
}
except Exception as e:
logger.error(f"Error generating response for expert {self.expert_id}: {e}")
return {
"expert_id": self.expert_id,
"specialization": self.specialization,
"response": "",
"error": str(e),
"success": False
}
class Phi35MoERouter:
"""Router for selecting appropriate Phi-3.5-MoE experts"""
def __init__(self):
self.expert_specializations = {
"code": ["programming", "software", "development", "coding", "algorithm", "python", "javascript", "java", "function", "code"],
"math": ["mathematics", "calculation", "equation", "formula", "statistics", "derivative", "integral", "algebra", "calculus", "math", "solve", "calculate"],
"reasoning": ["logic", "analysis", "reasoning", "problem-solving", "critical", "explain", "why", "how", "because"],
"multilingual": ["translation", "language", "multilingual", "localization", "translate", "spanish", "french", "german"],
"general": ["general", "conversation", "assistance", "help", "hello", "hi", "what", "who", "when", "where"]
}
def route_query(self, query: str, available_experts: List[Phi35MoEExpert]) -> List[Phi35MoEExpert]:
"""Route query to appropriate experts based on content analysis"""
query_lower = query.lower()
selected_experts = []
# Score each expert based on query content
expert_scores = {}
for expert in available_experts:
score = 0
for keyword in self.expert_specializations.get(expert.specialization, []):
if keyword in query_lower:
score += 1
expert_scores[expert.expert_id] = score
# Select experts with highest scores
sorted_experts = sorted(expert_scores.items(), key=lambda x: x[1], reverse=True)
# Always include at least one expert (preferably general)
if sorted_experts and sorted_experts[0][1] > 0:
# Select experts with score > 0
for expert_id, score in sorted_experts:
if score > 0:
expert = next((e for e in available_experts if e.expert_id == expert_id), None)
if expert:
selected_experts.append(expert)
else:
# Fallback to general expert
general_expert = next((e for e in available_experts if e.specialization == "general"), None)
if general_expert:
selected_experts.append(general_expert)
return selected_experts[:3] # Limit to top 3 experts
class EnhancedMoEFramework:
"""Enhanced MoE framework using Phi-3.5-MoE as baseline"""
def __init__(self, config: Optional[Phi35MoEConfig] = None):
self.config = config or Phi35MoEConfig()
self.experts: Dict[str, Phi35MoEExpert] = {}
self.router = Phi35MoERouter()
self.is_initialized = False
async def initialize_experts(self, expert_configs: List[Dict[str, str]]):
"""Initialize multiple Phi-3.5-MoE experts with different specializations"""
logger.info("Initializing Phi-3.5-MoE experts...")
for expert_config in expert_configs:
expert_id = expert_config["expert_id"]
specialization = expert_config["specialization"]
expert = Phi35MoEExpert(
config=self.config,
expert_id=expert_id,
specialization=specialization
)
self.experts[expert_id] = expert
self.is_initialized = True
logger.info(f"Initialized {len(self.experts)} Phi-3.5-MoE experts")
async def process_query(
self,
query: str,
system_message: Optional[str] = None,
use_multiple_experts: bool = True
) -> Dict[str, Any]:
"""Process query using the enhanced MoE framework"""
if not self.is_initialized:
raise RuntimeError("MoE framework not initialized. Call initialize_experts() first.")
# Prepare messages
messages = []
if system_message:
messages.append({"role": "system", "content": system_message})
messages.append({"role": "user", "content": query})
if use_multiple_experts:
# Route to multiple experts
selected_experts = self.router.route_query(query, list(self.experts.values()))
# Generate responses from selected experts
tasks = []
for expert in selected_experts:
task = expert.generate_response(messages)
tasks.append(task)
# Wait for all responses
responses = await asyncio.gather(*tasks, return_exceptions=True)
# Process responses
successful_responses = []
for response in responses:
if isinstance(response, dict) and response.get("success", False):
successful_responses.append(response)
# Combine responses
combined_response = self._combine_responses(successful_responses)
return {
"query": query,
"responses": successful_responses,
"combined_response": combined_response,
"num_experts_used": len(successful_responses),
"success": len(successful_responses) > 0
}
else:
# Use single expert (general)
general_expert = self.experts.get("general")
if not general_expert:
general_expert = list(self.experts.values())[0] # Fallback to first expert
response = await general_expert.generate_response(messages)
return {
"query": query,
"response": response,
"success": response.get("success", False)
}
def _combine_responses(self, responses: List[Dict[str, Any]]) -> str:
"""Combine multiple expert responses into a coherent answer"""
if not responses:
return "No responses available."
if len(responses) == 1:
return responses[0]["response"]
# Combine responses with expert attribution
combined = "Based on analysis from multiple experts:\n\n"
for i, response in enumerate(responses, 1):
expert_id = response.get("expert_id", f"Expert {i}")
specialization = response.get("specialization", "general")
expert_response = response.get("response", "")
combined += f"**{expert_id} ({specialization}):**\n{expert_response}\n\n"
return combined.strip()
async def benchmark_performance(self, test_queries: List[str]) -> Dict[str, Any]:
"""Benchmark the enhanced MoE framework performance"""
logger.info("Starting Phi-3.5-MoE framework benchmark...")
results = {
"total_queries": len(test_queries),
"successful_queries": 0,
"failed_queries": 0,
"average_response_time": 0,
"expert_usage_stats": {},
"detailed_results": []
}
total_time = 0
for i, query in enumerate(test_queries):
start_time = asyncio.get_event_loop().time()
try:
result = await self.process_query(query)
end_time = asyncio.get_event_loop().time()
response_time = end_time - start_time
total_time += response_time
if result.get("success", False):
results["successful_queries"] += 1
else:
results["failed_queries"] += 1
# Track expert usage
if "responses" in result:
for response in result["responses"]:
expert_id = response.get("expert_id", "unknown")
results["expert_usage_stats"][expert_id] = results["expert_usage_stats"].get(expert_id, 0) + 1
results["detailed_results"].append({
"query": query,
"success": result.get("success", False),
"response_time": response_time,
"num_experts_used": result.get("num_experts_used", 1)
})
except Exception as e:
logger.error(f"Error processing query {i}: {e}")
results["failed_queries"] += 1
results["detailed_results"].append({
"query": query,
"success": False,
"error": str(e),
"response_time": 0
})
results["average_response_time"] = total_time / len(test_queries) if test_queries else 0
results["success_rate"] = results["successful_queries"] / results["total_queries"] if test_queries else 0
logger.info(f"Benchmark completed. Success rate: {results['success_rate']:.2%}")
return results
# Example usage and testing
async def main():
"""Example usage of the enhanced MoE framework"""
# Initialize configuration
config = Phi35MoEConfig(
max_new_tokens=300,
temperature=0.7
)
# Initialize framework
moe_framework = EnhancedMoEFramework(config)
# Define expert configurations
expert_configs = [
{"expert_id": "code_expert", "specialization": "code"},
{"expert_id": "math_expert", "specialization": "math"},
{"expert_id": "reasoning_expert", "specialization": "reasoning"},
{"expert_id": "general_expert", "specialization": "general"}
]
# Initialize experts
await moe_framework.initialize_experts(expert_configs)
# Test queries
test_queries = [
"How do I implement a binary search algorithm in Python?",
"What is the derivative of x^2 + 3x + 1?",
"Explain the logical reasoning behind the Monty Hall problem",
"Hello, how are you today?"
]
# Process queries
for query in test_queries:
print(f"\n{'='*50}")
print(f"Query: {query}")
print(f"{'='*50}")
result = await moe_framework.process_query(query)
if result.get("success", False):
if "combined_response" in result:
print(f"Combined Response:\n{result['combined_response']}")
else:
print(f"Response:\n{result['response']['response']}")
else:
print("Failed to generate response")
# Run benchmark
print(f"\n{'='*50}")
print("Running Performance Benchmark...")
print(f"{'='*50}")
benchmark_results = await moe_framework.benchmark_performance(test_queries)
print(f"Success Rate: {benchmark_results['success_rate']:.2%}")
print(f"Average Response Time: {benchmark_results['average_response_time']:.2f}s")
print(f"Expert Usage Stats: {benchmark_results['expert_usage_stats']}")
if __name__ == "__main__":
asyncio.run(main())