felix-framework / examples /test_multi_model.py
jkbennitt
Clean hf-space branch and prepare for HuggingFace Spaces deployment
fb867c3
#!/usr/bin/env python3
"""
Multi-Model Concurrent Processing Test for Felix Framework.
This script verifies that agents can use different models simultaneously
on a single LM Studio server, demonstrating true concurrent processing
with model specialization.
Requirements:
- LM Studio running on http://127.0.0.1:1234 with models:
- qwen/qwen3-4b-2507
- qwen/qwen3-4b-thinking-2507
- google/gemma-3-12b
Usage:
python examples/test_multi_model.py
"""
import sys
import time
import asyncio
from pathlib import Path
from typing import List, Dict, Any
# Add src to path for imports
sys.path.insert(0, str(Path(__file__).parent.parent / "src"))
from blog_writer import FelixBlogWriter
from llm.multi_server_client import LMStudioClientPool
class MultiModelTester:
"""Test framework for verifying multi-model concurrent processing."""
def __init__(self):
self.config_path = "config/multi_model_config.json"
self.test_topics = [
"Quantum computing breakthrough",
"AI safety research",
"Climate change solutions"
]
def verify_config(self) -> bool:
"""Verify the multi-model configuration exists and is valid."""
config_file = Path(self.config_path)
if not config_file.exists():
print(f"❌ Configuration file not found: {self.config_path}")
return False
try:
import json
with open(config_file, 'r') as f:
config = json.load(f)
required_models = [
"qwen/qwen3-4b-2507",
"qwen/qwen3-4b-thinking-2507",
"google/gemma-3-12b"
]
config_models = [server["model"] for server in config["servers"]]
print(f"βœ… Configuration loaded: {len(config['servers'])} model configurations")
for model in required_models:
if model in config_models:
print(f" βœ… {model}")
else:
print(f" ❌ {model} - MISSING")
return False
return True
except Exception as e:
print(f"❌ Configuration error: {e}")
return False
async def test_model_selection(self) -> Dict[str, Any]:
"""Test that different agent types select different models."""
print(f"\nπŸ§ͺ TESTING MODEL SELECTION")
print("=" * 50)
# Create client pool
client_pool = LMStudioClientPool(config_path=self.config_path, debug_mode=True)
# Display pool status
client_pool.display_pool_status()
# Test health of all model endpoints
print(f"\nπŸ₯ Testing model endpoint health...")
health_results = await client_pool.health_check_all_servers()
healthy_count = sum(1 for healthy in health_results.values() if healthy)
total_count = len(health_results)
if healthy_count == 0:
print(f"❌ No healthy model endpoints (0/{total_count})")
return {"error": "No healthy endpoints"}
print(f"βœ… Healthy endpoints: {healthy_count}/{total_count}")
for endpoint, healthy in health_results.items():
status = "βœ…" if healthy else "❌"
print(f" {status} {endpoint}")
# Test agent type to model mapping
print(f"\n🎯 Testing agent type mappings...")
agent_types = ["research", "analysis", "synthesis", "critic"]
mappings = {}
for agent_type in agent_types:
server_name = client_pool.get_server_for_agent_type(agent_type)
if server_name:
model_name = client_pool.servers[server_name].model
mappings[agent_type] = {
"server": server_name,
"model": model_name
}
print(f" {agent_type} β†’ {server_name} ({model_name})")
else:
print(f" ❌ {agent_type} β†’ NO SERVER ASSIGNED")
await client_pool.close_all()
return {
"health_results": health_results,
"healthy_count": healthy_count,
"total_count": total_count,
"agent_mappings": mappings
}
async def test_concurrent_processing(self, topic: str) -> Dict[str, Any]:
"""Test actual concurrent processing with different models."""
print(f"\nπŸš€ TESTING CONCURRENT PROCESSING")
print(f"Topic: {topic}")
print("=" * 50)
# Create Felix writer with multi-model config
writer = FelixBlogWriter(
server_config_path=self.config_path,
debug_mode=True,
strict_mode=True # For faster testing
)
# Test connection
if not writer.test_lm_studio_connection():
return {"error": "Connection test failed"}
# Create team
print(f"\nπŸ‘₯ Creating agent team...")
writer.create_blog_writing_team(complexity="medium")
# Show team composition
print(f"\nπŸ“‹ Agent Team:")
for agent in writer.agents:
model_info = "Unknown model"
if hasattr(writer.llm_client, 'get_server_for_agent_type'):
server_name = writer.llm_client.get_server_for_agent_type(agent.agent_type)
if server_name and server_name in writer.llm_client.servers:
model_info = writer.llm_client.servers[server_name].model
print(f" {agent.agent_id} ({agent.agent_type}) β†’ {model_info} @ t={agent.spawn_time:.2f}")
# Run the session
print(f"\n⚑ Starting concurrent processing session...")
start_time = time.perf_counter()
results = await writer.run_blog_writing_session_async(
topic=topic,
simulation_time=1.0
)
end_time = time.perf_counter()
total_duration = end_time - start_time
# Analyze results for concurrency evidence
agents_participated = results["agents_participated"]
# Group by model used
model_usage = {}
for agent_info in agents_participated:
agent_type = agent_info["agent_type"]
if agent_type not in model_usage:
model_usage[agent_type] = []
model_usage[agent_type].append(agent_info)
# Check for overlapping processing times
time_overlaps = []
for i, agent1 in enumerate(agents_participated):
for agent2 in agents_participated[i+1:]:
time1 = agent1["spawn_time"]
time2 = agent2["spawn_time"]
if abs(time1 - time2) < 0.1: # Processing within 0.1 time units
time_overlaps.append((agent1["agent_id"], agent2["agent_id"], abs(time1 - time2)))
return {
"success": results["final_output"] is not None,
"total_duration": total_duration,
"total_tokens": results["session_stats"]["total_tokens_used"],
"agents_participated": len(agents_participated),
"model_usage": model_usage,
"time_overlaps": time_overlaps,
"final_content_length": len(results["final_output"]["content"]) if results["final_output"] else 0,
"llm_client_stats": results["session_stats"].get("llm_client_stats", {}),
"final_output": results["final_output"]
}
def analyze_concurrency(self, test_result: Dict[str, Any]) -> Dict[str, Any]:
"""Analyze test results for evidence of concurrent processing."""
if "error" in test_result:
return {"error": test_result["error"]}
analysis = {
"concurrent_evidence": {},
"model_distribution": {},
"performance_metrics": {}
}
# Evidence of concurrency
time_overlaps = test_result.get("time_overlaps", [])
analysis["concurrent_evidence"] = {
"overlapping_agents": len(time_overlaps),
"total_agents": test_result.get("agents_participated", 0),
"overlap_percentage": (len(time_overlaps) / max(test_result.get("agents_participated", 1), 1)) * 100,
"evidence_found": len(time_overlaps) > 0
}
# Model distribution
model_usage = test_result.get("model_usage", {})
analysis["model_distribution"] = {
"agent_types_used": list(model_usage.keys()),
"different_models": len(set(model_usage.keys())),
"model_specialization": len(model_usage) > 1
}
# Performance metrics
llm_stats = test_result.get("llm_client_stats", {})
analysis["performance_metrics"] = {
"total_duration": test_result.get("total_duration", 0),
"total_tokens": test_result.get("total_tokens", 0),
"content_quality": test_result.get("final_content_length", 0),
"llm_requests": llm_stats.get("total_requests", 0),
"avg_response_time": llm_stats.get("average_response_time", 0)
}
return analysis
def display_results(self, model_test: Dict[str, Any], processing_test: Dict[str, Any], analysis: Dict[str, Any]):
"""Display comprehensive test results."""
print(f"\n{'='*60}")
print(f"πŸŽ‰ MULTI-MODEL TEST RESULTS")
print(f"{'='*60}")
# Model selection results
if "error" not in model_test:
print(f"\nβœ… MODEL SELECTION TEST:")
print(f" Healthy endpoints: {model_test['healthy_count']}/{model_test['total_count']}")
print(f" Agent mappings configured: {len(model_test['agent_mappings'])}")
for agent_type, mapping in model_test['agent_mappings'].items():
print(f" {agent_type} β†’ {mapping['model']}")
# Concurrent processing results
if "error" not in processing_test:
print(f"\nπŸš€ CONCURRENT PROCESSING TEST:")
print(f" Success: {'βœ…' if processing_test['success'] else '❌'}")
print(f" Duration: {processing_test['total_duration']:.2f}s")
print(f" Tokens: {processing_test['total_tokens']}")
print(f" Agents participated: {processing_test['agents_participated']}")
print(f" Content generated: {processing_test['final_content_length']} chars")
# Concurrency analysis
if "error" not in analysis:
print(f"\nπŸ” CONCURRENCY ANALYSIS:")
evidence = analysis["concurrent_evidence"]
print(f" Overlapping agents: {evidence['overlapping_agents']}")
print(f" Overlap rate: {evidence['overlap_percentage']:.1f}%")
if evidence["evidence_found"]:
print(f" βœ… CONCURRENT PROCESSING DETECTED")
else:
print(f" ⚠️ No clear concurrency evidence (may still be concurrent)")
distribution = analysis["model_distribution"]
print(f" Different agent types: {distribution['different_models']}")
if distribution["model_specialization"]:
print(f" βœ… MODEL SPECIALIZATION WORKING")
else:
print(f" ❌ Model specialization not detected")
# Final verdict
print(f"\nπŸ† FINAL VERDICT:")
model_ok = "error" not in model_test and model_test['healthy_count'] > 0
processing_ok = "error" not in processing_test and processing_test['success']
concurrency_ok = "error" not in analysis and (
analysis["concurrent_evidence"]["evidence_found"] or
analysis["model_distribution"]["model_specialization"]
)
if model_ok and processing_ok and concurrency_ok:
print("βœ… MULTI-MODEL CONCURRENT PROCESSING VERIFIED!")
print(" - Multiple models accessible βœ…")
print(" - Agent type specialization βœ…")
print(" - Concurrent processing βœ…")
elif model_ok and processing_ok:
print("⚠️ MULTI-MODEL PROCESSING WORKING (concurrency unclear)")
print(" - Multiple models accessible βœ…")
print(" - Processing successful βœ…")
print(" - Concurrent evidence limited ⚠️")
else:
print("❌ MULTI-MODEL SETUP NEEDS ATTENTION")
if not model_ok:
print(" - Model endpoint issues ❌")
if not processing_ok:
print(" - Processing failed ❌")
async def run_full_test(self):
"""Run complete multi-model test suite."""
print("πŸ§ͺ Felix Framework Multi-Model Concurrent Processing Test")
print("=" * 60)
# Verify configuration
if not self.verify_config():
print("❌ Configuration verification failed")
return
# Test model selection
model_test = await self.test_model_selection()
if "error" in model_test:
print(f"❌ Model selection test failed: {model_test['error']}")
return
# Test concurrent processing
test_topic = self.test_topics[0]
processing_test = await self.test_concurrent_processing(test_topic)
# Analyze results
analysis = self.analyze_concurrency(processing_test)
# Display comprehensive results
self.display_results(model_test, processing_test, analysis)
# Save results
timestamp = int(time.time())
filename = f"multi_model_test_results_{timestamp}.json"
import json
results_data = {
"timestamp": timestamp,
"test_topic": test_topic,
"model_test": model_test,
"processing_test": processing_test,
"analysis": analysis
}
with open(filename, 'w') as f:
json.dump(results_data, f, indent=2)
print(f"\nπŸ’Ύ Detailed results saved to: {filename}")
async def main():
"""Main test function."""
tester = MultiModelTester()
await tester.run_full_test()
if __name__ == "__main__":
asyncio.run(main())