|
|
|
|
|
""" |
|
|
Phi-3.5-MoE Integration Module |
|
|
|
|
|
This module integrates Microsoft's Phi-3.5-MoE-instruct model as a baseline |
|
|
for expanding our current MoE (Mixture of Experts) framework. |
|
|
|
|
|
Based on: https://huggingface.co/microsoft/Phi-3.5-MoE-instruct |
|
|
- 41.9B parameters |
|
|
- 128K context length |
|
|
- Multilingual support |
|
|
- Strong reasoning capabilities (code, math, logic) |
|
|
""" |
|
|
|
|
|
import torch |
|
|
import logging |
|
|
from typing import Dict, List, Optional, Any, Union |
|
|
from transformers import ( |
|
|
AutoModelForCausalLM, |
|
|
AutoTokenizer, |
|
|
pipeline, |
|
|
Pipeline |
|
|
) |
|
|
from pathlib import Path |
|
|
import json |
|
|
import asyncio |
|
|
from dataclasses import dataclass |
|
|
|
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
@dataclass |
|
|
class Phi35MoEConfig: |
|
|
"""Configuration for Phi-3.5-MoE integration""" |
|
|
model_name: str = "microsoft/Phi-3.5-MoE-instruct" |
|
|
device_map: str = "auto" |
|
|
torch_dtype: str = "auto" |
|
|
trust_remote_code: bool = False |
|
|
max_new_tokens: int = 500 |
|
|
temperature: float = 0.7 |
|
|
top_p: float = 0.9 |
|
|
do_sample: bool = True |
|
|
context_length: int = 128000 |
|
|
vocabulary_size: int = 32064 |
|
|
|
|
|
class Phi35MoEExpert: |
|
|
"""Individual expert using Phi-3.5-MoE model""" |
|
|
|
|
|
def __init__(self, config: Phi35MoEConfig, expert_id: str, specialization: str): |
|
|
self.config = config |
|
|
self.expert_id = expert_id |
|
|
self.specialization = specialization |
|
|
self.model = None |
|
|
self.tokenizer = None |
|
|
self.pipeline = None |
|
|
self.is_loaded = False |
|
|
|
|
|
async def load_model(self): |
|
|
"""Load the Phi-3.5-MoE model and tokenizer""" |
|
|
try: |
|
|
logger.info(f"Loading Phi-3.5-MoE model for expert {self.expert_id}") |
|
|
|
|
|
|
|
|
self.model = AutoModelForCausalLM.from_pretrained( |
|
|
self.config.model_name, |
|
|
device_map=self.config.device_map, |
|
|
torch_dtype=self.config.torch_dtype, |
|
|
trust_remote_code=self.config.trust_remote_code, |
|
|
) |
|
|
|
|
|
|
|
|
self.tokenizer = AutoTokenizer.from_pretrained(self.config.model_name) |
|
|
|
|
|
|
|
|
self.pipeline = pipeline( |
|
|
"text-generation", |
|
|
model=self.model, |
|
|
tokenizer=self.tokenizer, |
|
|
device_map=self.config.device_map, |
|
|
) |
|
|
|
|
|
self.is_loaded = True |
|
|
logger.info(f"Successfully loaded Phi-3.5-MoE model for expert {self.expert_id}") |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Failed to load Phi-3.5-MoE model for expert {self.expert_id}: {e}") |
|
|
raise |
|
|
|
|
|
async def generate_response( |
|
|
self, |
|
|
messages: List[Dict[str, str]], |
|
|
**generation_kwargs |
|
|
) -> Dict[str, Any]: |
|
|
"""Generate response using Phi-3.5-MoE model""" |
|
|
if not self.is_loaded: |
|
|
await self.load_model() |
|
|
|
|
|
try: |
|
|
|
|
|
generation_args = { |
|
|
"max_new_tokens": self.config.max_new_tokens, |
|
|
"temperature": self.config.temperature, |
|
|
"top_p": self.config.top_p, |
|
|
"do_sample": self.config.do_sample, |
|
|
"return_full_text": False, |
|
|
**generation_kwargs |
|
|
} |
|
|
|
|
|
|
|
|
response = self.pipeline(messages, **generation_args) |
|
|
|
|
|
return { |
|
|
"expert_id": self.expert_id, |
|
|
"specialization": self.specialization, |
|
|
"response": response[0]["generated_text"] if response else "", |
|
|
"model": self.config.model_name, |
|
|
"success": True |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error generating response for expert {self.expert_id}: {e}") |
|
|
return { |
|
|
"expert_id": self.expert_id, |
|
|
"specialization": self.specialization, |
|
|
"response": "", |
|
|
"error": str(e), |
|
|
"success": False |
|
|
} |
|
|
|
|
|
class Phi35MoERouter: |
|
|
"""Router for selecting appropriate Phi-3.5-MoE experts""" |
|
|
|
|
|
def __init__(self): |
|
|
self.expert_specializations = { |
|
|
"code": ["programming", "software", "development", "coding", "algorithm", "python", "javascript", "java", "function", "code"], |
|
|
"math": ["mathematics", "calculation", "equation", "formula", "statistics", "derivative", "integral", "algebra", "calculus", "math", "solve", "calculate"], |
|
|
"reasoning": ["logic", "analysis", "reasoning", "problem-solving", "critical", "explain", "why", "how", "because"], |
|
|
"multilingual": ["translation", "language", "multilingual", "localization", "translate", "spanish", "french", "german"], |
|
|
"general": ["general", "conversation", "assistance", "help", "hello", "hi", "what", "who", "when", "where"] |
|
|
} |
|
|
|
|
|
def route_query(self, query: str, available_experts: List[Phi35MoEExpert]) -> List[Phi35MoEExpert]: |
|
|
"""Route query to appropriate experts based on content analysis""" |
|
|
query_lower = query.lower() |
|
|
selected_experts = [] |
|
|
|
|
|
|
|
|
expert_scores = {} |
|
|
for expert in available_experts: |
|
|
score = 0 |
|
|
for keyword in self.expert_specializations.get(expert.specialization, []): |
|
|
if keyword in query_lower: |
|
|
score += 1 |
|
|
expert_scores[expert.expert_id] = score |
|
|
|
|
|
|
|
|
sorted_experts = sorted(expert_scores.items(), key=lambda x: x[1], reverse=True) |
|
|
|
|
|
|
|
|
if sorted_experts and sorted_experts[0][1] > 0: |
|
|
|
|
|
for expert_id, score in sorted_experts: |
|
|
if score > 0: |
|
|
expert = next((e for e in available_experts if e.expert_id == expert_id), None) |
|
|
if expert: |
|
|
selected_experts.append(expert) |
|
|
else: |
|
|
|
|
|
general_expert = next((e for e in available_experts if e.specialization == "general"), None) |
|
|
if general_expert: |
|
|
selected_experts.append(general_expert) |
|
|
|
|
|
return selected_experts[:3] |
|
|
|
|
|
class EnhancedMoEFramework: |
|
|
"""Enhanced MoE framework using Phi-3.5-MoE as baseline""" |
|
|
|
|
|
def __init__(self, config: Optional[Phi35MoEConfig] = None): |
|
|
self.config = config or Phi35MoEConfig() |
|
|
self.experts: Dict[str, Phi35MoEExpert] = {} |
|
|
self.router = Phi35MoERouter() |
|
|
self.is_initialized = False |
|
|
|
|
|
async def initialize_experts(self, expert_configs: List[Dict[str, str]]): |
|
|
"""Initialize multiple Phi-3.5-MoE experts with different specializations""" |
|
|
logger.info("Initializing Phi-3.5-MoE experts...") |
|
|
|
|
|
for expert_config in expert_configs: |
|
|
expert_id = expert_config["expert_id"] |
|
|
specialization = expert_config["specialization"] |
|
|
|
|
|
expert = Phi35MoEExpert( |
|
|
config=self.config, |
|
|
expert_id=expert_id, |
|
|
specialization=specialization |
|
|
) |
|
|
|
|
|
self.experts[expert_id] = expert |
|
|
|
|
|
self.is_initialized = True |
|
|
logger.info(f"Initialized {len(self.experts)} Phi-3.5-MoE experts") |
|
|
|
|
|
async def process_query( |
|
|
self, |
|
|
query: str, |
|
|
system_message: Optional[str] = None, |
|
|
use_multiple_experts: bool = True |
|
|
) -> Dict[str, Any]: |
|
|
"""Process query using the enhanced MoE framework""" |
|
|
if not self.is_initialized: |
|
|
raise RuntimeError("MoE framework not initialized. Call initialize_experts() first.") |
|
|
|
|
|
|
|
|
messages = [] |
|
|
if system_message: |
|
|
messages.append({"role": "system", "content": system_message}) |
|
|
messages.append({"role": "user", "content": query}) |
|
|
|
|
|
if use_multiple_experts: |
|
|
|
|
|
selected_experts = self.router.route_query(query, list(self.experts.values())) |
|
|
|
|
|
|
|
|
tasks = [] |
|
|
for expert in selected_experts: |
|
|
task = expert.generate_response(messages) |
|
|
tasks.append(task) |
|
|
|
|
|
|
|
|
responses = await asyncio.gather(*tasks, return_exceptions=True) |
|
|
|
|
|
|
|
|
successful_responses = [] |
|
|
for response in responses: |
|
|
if isinstance(response, dict) and response.get("success", False): |
|
|
successful_responses.append(response) |
|
|
|
|
|
|
|
|
combined_response = self._combine_responses(successful_responses) |
|
|
|
|
|
return { |
|
|
"query": query, |
|
|
"responses": successful_responses, |
|
|
"combined_response": combined_response, |
|
|
"num_experts_used": len(successful_responses), |
|
|
"success": len(successful_responses) > 0 |
|
|
} |
|
|
else: |
|
|
|
|
|
general_expert = self.experts.get("general") |
|
|
if not general_expert: |
|
|
general_expert = list(self.experts.values())[0] |
|
|
|
|
|
response = await general_expert.generate_response(messages) |
|
|
return { |
|
|
"query": query, |
|
|
"response": response, |
|
|
"success": response.get("success", False) |
|
|
} |
|
|
|
|
|
def _combine_responses(self, responses: List[Dict[str, Any]]) -> str: |
|
|
"""Combine multiple expert responses into a coherent answer""" |
|
|
if not responses: |
|
|
return "No responses available." |
|
|
|
|
|
if len(responses) == 1: |
|
|
return responses[0]["response"] |
|
|
|
|
|
|
|
|
combined = "Based on analysis from multiple experts:\n\n" |
|
|
for i, response in enumerate(responses, 1): |
|
|
expert_id = response.get("expert_id", f"Expert {i}") |
|
|
specialization = response.get("specialization", "general") |
|
|
expert_response = response.get("response", "") |
|
|
|
|
|
combined += f"**{expert_id} ({specialization}):**\n{expert_response}\n\n" |
|
|
|
|
|
return combined.strip() |
|
|
|
|
|
async def benchmark_performance(self, test_queries: List[str]) -> Dict[str, Any]: |
|
|
"""Benchmark the enhanced MoE framework performance""" |
|
|
logger.info("Starting Phi-3.5-MoE framework benchmark...") |
|
|
|
|
|
results = { |
|
|
"total_queries": len(test_queries), |
|
|
"successful_queries": 0, |
|
|
"failed_queries": 0, |
|
|
"average_response_time": 0, |
|
|
"expert_usage_stats": {}, |
|
|
"detailed_results": [] |
|
|
} |
|
|
|
|
|
total_time = 0 |
|
|
|
|
|
for i, query in enumerate(test_queries): |
|
|
start_time = asyncio.get_event_loop().time() |
|
|
|
|
|
try: |
|
|
result = await self.process_query(query) |
|
|
end_time = asyncio.get_event_loop().time() |
|
|
response_time = end_time - start_time |
|
|
total_time += response_time |
|
|
|
|
|
if result.get("success", False): |
|
|
results["successful_queries"] += 1 |
|
|
else: |
|
|
results["failed_queries"] += 1 |
|
|
|
|
|
|
|
|
if "responses" in result: |
|
|
for response in result["responses"]: |
|
|
expert_id = response.get("expert_id", "unknown") |
|
|
results["expert_usage_stats"][expert_id] = results["expert_usage_stats"].get(expert_id, 0) + 1 |
|
|
|
|
|
results["detailed_results"].append({ |
|
|
"query": query, |
|
|
"success": result.get("success", False), |
|
|
"response_time": response_time, |
|
|
"num_experts_used": result.get("num_experts_used", 1) |
|
|
}) |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error processing query {i}: {e}") |
|
|
results["failed_queries"] += 1 |
|
|
results["detailed_results"].append({ |
|
|
"query": query, |
|
|
"success": False, |
|
|
"error": str(e), |
|
|
"response_time": 0 |
|
|
}) |
|
|
|
|
|
results["average_response_time"] = total_time / len(test_queries) if test_queries else 0 |
|
|
results["success_rate"] = results["successful_queries"] / results["total_queries"] if test_queries else 0 |
|
|
|
|
|
logger.info(f"Benchmark completed. Success rate: {results['success_rate']:.2%}") |
|
|
return results |
|
|
|
|
|
|
|
|
async def main(): |
|
|
"""Example usage of the enhanced MoE framework""" |
|
|
|
|
|
|
|
|
config = Phi35MoEConfig( |
|
|
max_new_tokens=300, |
|
|
temperature=0.7 |
|
|
) |
|
|
|
|
|
|
|
|
moe_framework = EnhancedMoEFramework(config) |
|
|
|
|
|
|
|
|
expert_configs = [ |
|
|
{"expert_id": "code_expert", "specialization": "code"}, |
|
|
{"expert_id": "math_expert", "specialization": "math"}, |
|
|
{"expert_id": "reasoning_expert", "specialization": "reasoning"}, |
|
|
{"expert_id": "general_expert", "specialization": "general"} |
|
|
] |
|
|
|
|
|
|
|
|
await moe_framework.initialize_experts(expert_configs) |
|
|
|
|
|
|
|
|
test_queries = [ |
|
|
"How do I implement a binary search algorithm in Python?", |
|
|
"What is the derivative of x^2 + 3x + 1?", |
|
|
"Explain the logical reasoning behind the Monty Hall problem", |
|
|
"Hello, how are you today?" |
|
|
] |
|
|
|
|
|
|
|
|
for query in test_queries: |
|
|
print(f"\n{'='*50}") |
|
|
print(f"Query: {query}") |
|
|
print(f"{'='*50}") |
|
|
|
|
|
result = await moe_framework.process_query(query) |
|
|
|
|
|
if result.get("success", False): |
|
|
if "combined_response" in result: |
|
|
print(f"Combined Response:\n{result['combined_response']}") |
|
|
else: |
|
|
print(f"Response:\n{result['response']['response']}") |
|
|
else: |
|
|
print("Failed to generate response") |
|
|
|
|
|
|
|
|
print(f"\n{'='*50}") |
|
|
print("Running Performance Benchmark...") |
|
|
print(f"{'='*50}") |
|
|
|
|
|
benchmark_results = await moe_framework.benchmark_performance(test_queries) |
|
|
|
|
|
print(f"Success Rate: {benchmark_results['success_rate']:.2%}") |
|
|
print(f"Average Response Time: {benchmark_results['average_response_time']:.2f}s") |
|
|
print(f"Expert Usage Stats: {benchmark_results['expert_usage_stats']}") |
|
|
|
|
|
if __name__ == "__main__": |
|
|
asyncio.run(main()) |
|
|
|