#!/usr/bin/env python3 """ Phi-3.5-MoE Integration Module This module integrates Microsoft's Phi-3.5-MoE-instruct model as a baseline for expanding our current MoE (Mixture of Experts) framework. Based on: https://huggingface.co/microsoft/Phi-3.5-MoE-instruct - 41.9B parameters - 128K context length - Multilingual support - Strong reasoning capabilities (code, math, logic) """ import torch import logging from typing import Dict, List, Optional, Any, Union from transformers import ( AutoModelForCausalLM, AutoTokenizer, pipeline, Pipeline ) from pathlib import Path import json import asyncio from dataclasses import dataclass logger = logging.getLogger(__name__) @dataclass class Phi35MoEConfig: """Configuration for Phi-3.5-MoE integration""" model_name: str = "microsoft/Phi-3.5-MoE-instruct" device_map: str = "auto" torch_dtype: str = "auto" trust_remote_code: bool = False max_new_tokens: int = 500 temperature: float = 0.7 top_p: float = 0.9 do_sample: bool = True context_length: int = 128000 # 128K context length vocabulary_size: int = 32064 class Phi35MoEExpert: """Individual expert using Phi-3.5-MoE model""" def __init__(self, config: Phi35MoEConfig, expert_id: str, specialization: str): self.config = config self.expert_id = expert_id self.specialization = specialization self.model = None self.tokenizer = None self.pipeline = None self.is_loaded = False async def load_model(self): """Load the Phi-3.5-MoE model and tokenizer""" try: logger.info(f"Loading Phi-3.5-MoE model for expert {self.expert_id}") # Load model self.model = AutoModelForCausalLM.from_pretrained( self.config.model_name, device_map=self.config.device_map, torch_dtype=self.config.torch_dtype, trust_remote_code=self.config.trust_remote_code, ) # Load tokenizer self.tokenizer = AutoTokenizer.from_pretrained(self.config.model_name) # Create pipeline self.pipeline = pipeline( "text-generation", model=self.model, tokenizer=self.tokenizer, device_map=self.config.device_map, ) self.is_loaded = True logger.info(f"Successfully loaded Phi-3.5-MoE model for expert {self.expert_id}") except Exception as e: logger.error(f"Failed to load Phi-3.5-MoE model for expert {self.expert_id}: {e}") raise async def generate_response( self, messages: List[Dict[str, str]], **generation_kwargs ) -> Dict[str, Any]: """Generate response using Phi-3.5-MoE model""" if not self.is_loaded: await self.load_model() try: # Set default generation parameters generation_args = { "max_new_tokens": self.config.max_new_tokens, "temperature": self.config.temperature, "top_p": self.config.top_p, "do_sample": self.config.do_sample, "return_full_text": False, **generation_kwargs } # Generate response response = self.pipeline(messages, **generation_args) return { "expert_id": self.expert_id, "specialization": self.specialization, "response": response[0]["generated_text"] if response else "", "model": self.config.model_name, "success": True } except Exception as e: logger.error(f"Error generating response for expert {self.expert_id}: {e}") return { "expert_id": self.expert_id, "specialization": self.specialization, "response": "", "error": str(e), "success": False } class Phi35MoERouter: """Router for selecting appropriate Phi-3.5-MoE experts""" def __init__(self): self.expert_specializations = { "code": ["programming", "software", "development", "coding", "algorithm", "python", "javascript", "java", "function", "code"], "math": ["mathematics", "calculation", "equation", "formula", "statistics", "derivative", "integral", "algebra", "calculus", "math", "solve", "calculate"], "reasoning": ["logic", "analysis", "reasoning", "problem-solving", "critical", "explain", "why", "how", "because"], "multilingual": ["translation", "language", "multilingual", "localization", "translate", "spanish", "french", "german"], "general": ["general", "conversation", "assistance", "help", "hello", "hi", "what", "who", "when", "where"] } def route_query(self, query: str, available_experts: List[Phi35MoEExpert]) -> List[Phi35MoEExpert]: """Route query to appropriate experts based on content analysis""" query_lower = query.lower() selected_experts = [] # Score each expert based on query content expert_scores = {} for expert in available_experts: score = 0 for keyword in self.expert_specializations.get(expert.specialization, []): if keyword in query_lower: score += 1 expert_scores[expert.expert_id] = score # Select experts with highest scores sorted_experts = sorted(expert_scores.items(), key=lambda x: x[1], reverse=True) # Always include at least one expert (preferably general) if sorted_experts and sorted_experts[0][1] > 0: # Select experts with score > 0 for expert_id, score in sorted_experts: if score > 0: expert = next((e for e in available_experts if e.expert_id == expert_id), None) if expert: selected_experts.append(expert) else: # Fallback to general expert general_expert = next((e for e in available_experts if e.specialization == "general"), None) if general_expert: selected_experts.append(general_expert) return selected_experts[:3] # Limit to top 3 experts class EnhancedMoEFramework: """Enhanced MoE framework using Phi-3.5-MoE as baseline""" def __init__(self, config: Optional[Phi35MoEConfig] = None): self.config = config or Phi35MoEConfig() self.experts: Dict[str, Phi35MoEExpert] = {} self.router = Phi35MoERouter() self.is_initialized = False async def initialize_experts(self, expert_configs: List[Dict[str, str]]): """Initialize multiple Phi-3.5-MoE experts with different specializations""" logger.info("Initializing Phi-3.5-MoE experts...") for expert_config in expert_configs: expert_id = expert_config["expert_id"] specialization = expert_config["specialization"] expert = Phi35MoEExpert( config=self.config, expert_id=expert_id, specialization=specialization ) self.experts[expert_id] = expert self.is_initialized = True logger.info(f"Initialized {len(self.experts)} Phi-3.5-MoE experts") async def process_query( self, query: str, system_message: Optional[str] = None, use_multiple_experts: bool = True ) -> Dict[str, Any]: """Process query using the enhanced MoE framework""" if not self.is_initialized: raise RuntimeError("MoE framework not initialized. Call initialize_experts() first.") # Prepare messages messages = [] if system_message: messages.append({"role": "system", "content": system_message}) messages.append({"role": "user", "content": query}) if use_multiple_experts: # Route to multiple experts selected_experts = self.router.route_query(query, list(self.experts.values())) # Generate responses from selected experts tasks = [] for expert in selected_experts: task = expert.generate_response(messages) tasks.append(task) # Wait for all responses responses = await asyncio.gather(*tasks, return_exceptions=True) # Process responses successful_responses = [] for response in responses: if isinstance(response, dict) and response.get("success", False): successful_responses.append(response) # Combine responses combined_response = self._combine_responses(successful_responses) return { "query": query, "responses": successful_responses, "combined_response": combined_response, "num_experts_used": len(successful_responses), "success": len(successful_responses) > 0 } else: # Use single expert (general) general_expert = self.experts.get("general") if not general_expert: general_expert = list(self.experts.values())[0] # Fallback to first expert response = await general_expert.generate_response(messages) return { "query": query, "response": response, "success": response.get("success", False) } def _combine_responses(self, responses: List[Dict[str, Any]]) -> str: """Combine multiple expert responses into a coherent answer""" if not responses: return "No responses available." if len(responses) == 1: return responses[0]["response"] # Combine responses with expert attribution combined = "Based on analysis from multiple experts:\n\n" for i, response in enumerate(responses, 1): expert_id = response.get("expert_id", f"Expert {i}") specialization = response.get("specialization", "general") expert_response = response.get("response", "") combined += f"**{expert_id} ({specialization}):**\n{expert_response}\n\n" return combined.strip() async def benchmark_performance(self, test_queries: List[str]) -> Dict[str, Any]: """Benchmark the enhanced MoE framework performance""" logger.info("Starting Phi-3.5-MoE framework benchmark...") results = { "total_queries": len(test_queries), "successful_queries": 0, "failed_queries": 0, "average_response_time": 0, "expert_usage_stats": {}, "detailed_results": [] } total_time = 0 for i, query in enumerate(test_queries): start_time = asyncio.get_event_loop().time() try: result = await self.process_query(query) end_time = asyncio.get_event_loop().time() response_time = end_time - start_time total_time += response_time if result.get("success", False): results["successful_queries"] += 1 else: results["failed_queries"] += 1 # Track expert usage if "responses" in result: for response in result["responses"]: expert_id = response.get("expert_id", "unknown") results["expert_usage_stats"][expert_id] = results["expert_usage_stats"].get(expert_id, 0) + 1 results["detailed_results"].append({ "query": query, "success": result.get("success", False), "response_time": response_time, "num_experts_used": result.get("num_experts_used", 1) }) except Exception as e: logger.error(f"Error processing query {i}: {e}") results["failed_queries"] += 1 results["detailed_results"].append({ "query": query, "success": False, "error": str(e), "response_time": 0 }) results["average_response_time"] = total_time / len(test_queries) if test_queries else 0 results["success_rate"] = results["successful_queries"] / results["total_queries"] if test_queries else 0 logger.info(f"Benchmark completed. Success rate: {results['success_rate']:.2%}") return results # Example usage and testing async def main(): """Example usage of the enhanced MoE framework""" # Initialize configuration config = Phi35MoEConfig( max_new_tokens=300, temperature=0.7 ) # Initialize framework moe_framework = EnhancedMoEFramework(config) # Define expert configurations expert_configs = [ {"expert_id": "code_expert", "specialization": "code"}, {"expert_id": "math_expert", "specialization": "math"}, {"expert_id": "reasoning_expert", "specialization": "reasoning"}, {"expert_id": "general_expert", "specialization": "general"} ] # Initialize experts await moe_framework.initialize_experts(expert_configs) # Test queries test_queries = [ "How do I implement a binary search algorithm in Python?", "What is the derivative of x^2 + 3x + 1?", "Explain the logical reasoning behind the Monty Hall problem", "Hello, how are you today?" ] # Process queries for query in test_queries: print(f"\n{'='*50}") print(f"Query: {query}") print(f"{'='*50}") result = await moe_framework.process_query(query) if result.get("success", False): if "combined_response" in result: print(f"Combined Response:\n{result['combined_response']}") else: print(f"Response:\n{result['response']['response']}") else: print("Failed to generate response") # Run benchmark print(f"\n{'='*50}") print("Running Performance Benchmark...") print(f"{'='*50}") benchmark_results = await moe_framework.benchmark_performance(test_queries) print(f"Success Rate: {benchmark_results['success_rate']:.2%}") print(f"Average Response Time: {benchmark_results['average_response_time']:.2f}s") print(f"Expert Usage Stats: {benchmark_results['expert_usage_stats']}") if __name__ == "__main__": asyncio.run(main())