Spaces:
Paused
Paused
| #!/usr/bin/env python3 | |
| """ | |
| Multi-Model Concurrent Processing Test for Felix Framework. | |
| This script verifies that agents can use different models simultaneously | |
| on a single LM Studio server, demonstrating true concurrent processing | |
| with model specialization. | |
| Requirements: | |
| - LM Studio running on http://127.0.0.1:1234 with models: | |
| - qwen/qwen3-4b-2507 | |
| - qwen/qwen3-4b-thinking-2507 | |
| - google/gemma-3-12b | |
| Usage: | |
| python examples/test_multi_model.py | |
| """ | |
| import sys | |
| import time | |
| import asyncio | |
| from pathlib import Path | |
| from typing import List, Dict, Any | |
| # Add src to path for imports | |
| sys.path.insert(0, str(Path(__file__).parent.parent / "src")) | |
| from blog_writer import FelixBlogWriter | |
| from llm.multi_server_client import LMStudioClientPool | |
| class MultiModelTester: | |
| """Test framework for verifying multi-model concurrent processing.""" | |
| def __init__(self): | |
| self.config_path = "config/multi_model_config.json" | |
| self.test_topics = [ | |
| "Quantum computing breakthrough", | |
| "AI safety research", | |
| "Climate change solutions" | |
| ] | |
| def verify_config(self) -> bool: | |
| """Verify the multi-model configuration exists and is valid.""" | |
| config_file = Path(self.config_path) | |
| if not config_file.exists(): | |
| print(f"β Configuration file not found: {self.config_path}") | |
| return False | |
| try: | |
| import json | |
| with open(config_file, 'r') as f: | |
| config = json.load(f) | |
| required_models = [ | |
| "qwen/qwen3-4b-2507", | |
| "qwen/qwen3-4b-thinking-2507", | |
| "google/gemma-3-12b" | |
| ] | |
| config_models = [server["model"] for server in config["servers"]] | |
| print(f"β Configuration loaded: {len(config['servers'])} model configurations") | |
| for model in required_models: | |
| if model in config_models: | |
| print(f" β {model}") | |
| else: | |
| print(f" β {model} - MISSING") | |
| return False | |
| return True | |
| except Exception as e: | |
| print(f"β Configuration error: {e}") | |
| return False | |
| async def test_model_selection(self) -> Dict[str, Any]: | |
| """Test that different agent types select different models.""" | |
| print(f"\nπ§ͺ TESTING MODEL SELECTION") | |
| print("=" * 50) | |
| # Create client pool | |
| client_pool = LMStudioClientPool(config_path=self.config_path, debug_mode=True) | |
| # Display pool status | |
| client_pool.display_pool_status() | |
| # Test health of all model endpoints | |
| print(f"\nπ₯ Testing model endpoint health...") | |
| health_results = await client_pool.health_check_all_servers() | |
| healthy_count = sum(1 for healthy in health_results.values() if healthy) | |
| total_count = len(health_results) | |
| if healthy_count == 0: | |
| print(f"β No healthy model endpoints (0/{total_count})") | |
| return {"error": "No healthy endpoints"} | |
| print(f"β Healthy endpoints: {healthy_count}/{total_count}") | |
| for endpoint, healthy in health_results.items(): | |
| status = "β " if healthy else "β" | |
| print(f" {status} {endpoint}") | |
| # Test agent type to model mapping | |
| print(f"\nπ― Testing agent type mappings...") | |
| agent_types = ["research", "analysis", "synthesis", "critic"] | |
| mappings = {} | |
| for agent_type in agent_types: | |
| server_name = client_pool.get_server_for_agent_type(agent_type) | |
| if server_name: | |
| model_name = client_pool.servers[server_name].model | |
| mappings[agent_type] = { | |
| "server": server_name, | |
| "model": model_name | |
| } | |
| print(f" {agent_type} β {server_name} ({model_name})") | |
| else: | |
| print(f" β {agent_type} β NO SERVER ASSIGNED") | |
| await client_pool.close_all() | |
| return { | |
| "health_results": health_results, | |
| "healthy_count": healthy_count, | |
| "total_count": total_count, | |
| "agent_mappings": mappings | |
| } | |
| async def test_concurrent_processing(self, topic: str) -> Dict[str, Any]: | |
| """Test actual concurrent processing with different models.""" | |
| print(f"\nπ TESTING CONCURRENT PROCESSING") | |
| print(f"Topic: {topic}") | |
| print("=" * 50) | |
| # Create Felix writer with multi-model config | |
| writer = FelixBlogWriter( | |
| server_config_path=self.config_path, | |
| debug_mode=True, | |
| strict_mode=True # For faster testing | |
| ) | |
| # Test connection | |
| if not writer.test_lm_studio_connection(): | |
| return {"error": "Connection test failed"} | |
| # Create team | |
| print(f"\nπ₯ Creating agent team...") | |
| writer.create_blog_writing_team(complexity="medium") | |
| # Show team composition | |
| print(f"\nπ Agent Team:") | |
| for agent in writer.agents: | |
| model_info = "Unknown model" | |
| if hasattr(writer.llm_client, 'get_server_for_agent_type'): | |
| server_name = writer.llm_client.get_server_for_agent_type(agent.agent_type) | |
| if server_name and server_name in writer.llm_client.servers: | |
| model_info = writer.llm_client.servers[server_name].model | |
| print(f" {agent.agent_id} ({agent.agent_type}) β {model_info} @ t={agent.spawn_time:.2f}") | |
| # Run the session | |
| print(f"\nβ‘ Starting concurrent processing session...") | |
| start_time = time.perf_counter() | |
| results = await writer.run_blog_writing_session_async( | |
| topic=topic, | |
| simulation_time=1.0 | |
| ) | |
| end_time = time.perf_counter() | |
| total_duration = end_time - start_time | |
| # Analyze results for concurrency evidence | |
| agents_participated = results["agents_participated"] | |
| # Group by model used | |
| model_usage = {} | |
| for agent_info in agents_participated: | |
| agent_type = agent_info["agent_type"] | |
| if agent_type not in model_usage: | |
| model_usage[agent_type] = [] | |
| model_usage[agent_type].append(agent_info) | |
| # Check for overlapping processing times | |
| time_overlaps = [] | |
| for i, agent1 in enumerate(agents_participated): | |
| for agent2 in agents_participated[i+1:]: | |
| time1 = agent1["spawn_time"] | |
| time2 = agent2["spawn_time"] | |
| if abs(time1 - time2) < 0.1: # Processing within 0.1 time units | |
| time_overlaps.append((agent1["agent_id"], agent2["agent_id"], abs(time1 - time2))) | |
| return { | |
| "success": results["final_output"] is not None, | |
| "total_duration": total_duration, | |
| "total_tokens": results["session_stats"]["total_tokens_used"], | |
| "agents_participated": len(agents_participated), | |
| "model_usage": model_usage, | |
| "time_overlaps": time_overlaps, | |
| "final_content_length": len(results["final_output"]["content"]) if results["final_output"] else 0, | |
| "llm_client_stats": results["session_stats"].get("llm_client_stats", {}), | |
| "final_output": results["final_output"] | |
| } | |
| def analyze_concurrency(self, test_result: Dict[str, Any]) -> Dict[str, Any]: | |
| """Analyze test results for evidence of concurrent processing.""" | |
| if "error" in test_result: | |
| return {"error": test_result["error"]} | |
| analysis = { | |
| "concurrent_evidence": {}, | |
| "model_distribution": {}, | |
| "performance_metrics": {} | |
| } | |
| # Evidence of concurrency | |
| time_overlaps = test_result.get("time_overlaps", []) | |
| analysis["concurrent_evidence"] = { | |
| "overlapping_agents": len(time_overlaps), | |
| "total_agents": test_result.get("agents_participated", 0), | |
| "overlap_percentage": (len(time_overlaps) / max(test_result.get("agents_participated", 1), 1)) * 100, | |
| "evidence_found": len(time_overlaps) > 0 | |
| } | |
| # Model distribution | |
| model_usage = test_result.get("model_usage", {}) | |
| analysis["model_distribution"] = { | |
| "agent_types_used": list(model_usage.keys()), | |
| "different_models": len(set(model_usage.keys())), | |
| "model_specialization": len(model_usage) > 1 | |
| } | |
| # Performance metrics | |
| llm_stats = test_result.get("llm_client_stats", {}) | |
| analysis["performance_metrics"] = { | |
| "total_duration": test_result.get("total_duration", 0), | |
| "total_tokens": test_result.get("total_tokens", 0), | |
| "content_quality": test_result.get("final_content_length", 0), | |
| "llm_requests": llm_stats.get("total_requests", 0), | |
| "avg_response_time": llm_stats.get("average_response_time", 0) | |
| } | |
| return analysis | |
| def display_results(self, model_test: Dict[str, Any], processing_test: Dict[str, Any], analysis: Dict[str, Any]): | |
| """Display comprehensive test results.""" | |
| print(f"\n{'='*60}") | |
| print(f"π MULTI-MODEL TEST RESULTS") | |
| print(f"{'='*60}") | |
| # Model selection results | |
| if "error" not in model_test: | |
| print(f"\nβ MODEL SELECTION TEST:") | |
| print(f" Healthy endpoints: {model_test['healthy_count']}/{model_test['total_count']}") | |
| print(f" Agent mappings configured: {len(model_test['agent_mappings'])}") | |
| for agent_type, mapping in model_test['agent_mappings'].items(): | |
| print(f" {agent_type} β {mapping['model']}") | |
| # Concurrent processing results | |
| if "error" not in processing_test: | |
| print(f"\nπ CONCURRENT PROCESSING TEST:") | |
| print(f" Success: {'β ' if processing_test['success'] else 'β'}") | |
| print(f" Duration: {processing_test['total_duration']:.2f}s") | |
| print(f" Tokens: {processing_test['total_tokens']}") | |
| print(f" Agents participated: {processing_test['agents_participated']}") | |
| print(f" Content generated: {processing_test['final_content_length']} chars") | |
| # Concurrency analysis | |
| if "error" not in analysis: | |
| print(f"\nπ CONCURRENCY ANALYSIS:") | |
| evidence = analysis["concurrent_evidence"] | |
| print(f" Overlapping agents: {evidence['overlapping_agents']}") | |
| print(f" Overlap rate: {evidence['overlap_percentage']:.1f}%") | |
| if evidence["evidence_found"]: | |
| print(f" β CONCURRENT PROCESSING DETECTED") | |
| else: | |
| print(f" β οΈ No clear concurrency evidence (may still be concurrent)") | |
| distribution = analysis["model_distribution"] | |
| print(f" Different agent types: {distribution['different_models']}") | |
| if distribution["model_specialization"]: | |
| print(f" β MODEL SPECIALIZATION WORKING") | |
| else: | |
| print(f" β Model specialization not detected") | |
| # Final verdict | |
| print(f"\nπ FINAL VERDICT:") | |
| model_ok = "error" not in model_test and model_test['healthy_count'] > 0 | |
| processing_ok = "error" not in processing_test and processing_test['success'] | |
| concurrency_ok = "error" not in analysis and ( | |
| analysis["concurrent_evidence"]["evidence_found"] or | |
| analysis["model_distribution"]["model_specialization"] | |
| ) | |
| if model_ok and processing_ok and concurrency_ok: | |
| print("β MULTI-MODEL CONCURRENT PROCESSING VERIFIED!") | |
| print(" - Multiple models accessible β ") | |
| print(" - Agent type specialization β ") | |
| print(" - Concurrent processing β ") | |
| elif model_ok and processing_ok: | |
| print("β οΈ MULTI-MODEL PROCESSING WORKING (concurrency unclear)") | |
| print(" - Multiple models accessible β ") | |
| print(" - Processing successful β ") | |
| print(" - Concurrent evidence limited β οΈ") | |
| else: | |
| print("β MULTI-MODEL SETUP NEEDS ATTENTION") | |
| if not model_ok: | |
| print(" - Model endpoint issues β") | |
| if not processing_ok: | |
| print(" - Processing failed β") | |
| async def run_full_test(self): | |
| """Run complete multi-model test suite.""" | |
| print("π§ͺ Felix Framework Multi-Model Concurrent Processing Test") | |
| print("=" * 60) | |
| # Verify configuration | |
| if not self.verify_config(): | |
| print("β Configuration verification failed") | |
| return | |
| # Test model selection | |
| model_test = await self.test_model_selection() | |
| if "error" in model_test: | |
| print(f"β Model selection test failed: {model_test['error']}") | |
| return | |
| # Test concurrent processing | |
| test_topic = self.test_topics[0] | |
| processing_test = await self.test_concurrent_processing(test_topic) | |
| # Analyze results | |
| analysis = self.analyze_concurrency(processing_test) | |
| # Display comprehensive results | |
| self.display_results(model_test, processing_test, analysis) | |
| # Save results | |
| timestamp = int(time.time()) | |
| filename = f"multi_model_test_results_{timestamp}.json" | |
| import json | |
| results_data = { | |
| "timestamp": timestamp, | |
| "test_topic": test_topic, | |
| "model_test": model_test, | |
| "processing_test": processing_test, | |
| "analysis": analysis | |
| } | |
| with open(filename, 'w') as f: | |
| json.dump(results_data, f, indent=2) | |
| print(f"\nπΎ Detailed results saved to: {filename}") | |
| async def main(): | |
| """Main test function.""" | |
| tester = MultiModelTester() | |
| await tester.run_full_test() | |
| if __name__ == "__main__": | |
| asyncio.run(main()) |