Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """ | |
| Debug and validation test runner for Multi-Agent NLP System. | |
| Orchestrates infrastructure validation, agent testing, and end-to-end workflow tests. | |
| """ | |
| import asyncio | |
| import subprocess | |
| import sys | |
| import os | |
| import time | |
| from pathlib import Path | |
| from typing import Dict, List, Optional | |
| class DebugTestRunner: | |
| """Orchestrates debugging and validation tests for the multi-agent system.""" | |
| def __init__(self, project_root: str = None): | |
| self.project_root = Path(project_root or "/Users/cuonghuynh/CascadeProjects/multi-agent-nlp-system") | |
| self.test_results = {} | |
| async def run_all_debug_tests(self): | |
| """Run all debugging and validation tests in sequence.""" | |
| print("π Starting Multi-Agent NLP System Debug & Validation") | |
| print("=" * 60) | |
| test_phases = [ | |
| ("Infrastructure Validation", self.run_infrastructure_validation), | |
| ("Docker Services Check", self.check_docker_services), | |
| ("Agent Health Check", self.run_agent_health_check), | |
| ("Kafka Communication Test", self.test_kafka_communication), | |
| ("End-to-End Workflow Test", self.run_e2e_workflow_test), | |
| ("Performance Monitoring", self.run_performance_monitoring), | |
| ] | |
| overall_success = True | |
| for phase_name, phase_func in test_phases: | |
| print(f"\nπ Phase: {phase_name}") | |
| print("-" * 40) | |
| try: | |
| success = await phase_func() | |
| self.test_results[phase_name] = success | |
| if success: | |
| print(f"β {phase_name} completed successfully") | |
| else: | |
| print(f"β {phase_name} failed") | |
| overall_success = False | |
| except Exception as e: | |
| print(f"π₯ {phase_name} crashed: {str(e)}") | |
| self.test_results[phase_name] = False | |
| overall_success = False | |
| self.print_final_summary(overall_success) | |
| return overall_success | |
| async def run_infrastructure_validation(self) -> bool: | |
| """Run the infrastructure validation script.""" | |
| try: | |
| # Check if validation script exists | |
| validation_script = self.project_root / "check_infrastructure.py" | |
| if not validation_script.exists(): | |
| print("β Infrastructure validation script not found") | |
| return False | |
| # Install required packages | |
| print("π¦ Installing validation dependencies...") | |
| result = subprocess.run([ | |
| sys.executable, "-m", "pip", "install", "-r", | |
| str(self.project_root / "requirements-validation.txt") | |
| ], capture_output=True, text=True) | |
| if result.returncode != 0: | |
| print(f"β οΈ Package installation warnings: {result.stderr}") | |
| # Run infrastructure validation | |
| print("π§ Running infrastructure validation...") | |
| result = subprocess.run([ | |
| sys.executable, str(validation_script) | |
| ], cwd=str(self.project_root), capture_output=True, text=True) | |
| print(result.stdout) | |
| if result.stderr: | |
| print(f"Errors: {result.stderr}") | |
| return result.returncode == 0 | |
| except Exception as e: | |
| print(f"Infrastructure validation error: {str(e)}") | |
| return False | |
| async def check_docker_services(self) -> bool: | |
| """Check if Docker services are running properly.""" | |
| try: | |
| import docker | |
| client = docker.from_env() | |
| # Check if containers are running | |
| containers = client.containers.list() | |
| print(f"π¦ Found {len(containers)} running containers:") | |
| for container in containers: | |
| print(f" - {container.name}: {container.status}") | |
| # Check for expected services | |
| expected_services = ['kafka', 'redis', 'minio', 'zookeeper'] | |
| running_services = [c.name for c in containers] | |
| missing_services = [] | |
| for service in expected_services: | |
| if not any(service in name for name in running_services): | |
| missing_services.append(service) | |
| if missing_services: | |
| print(f"β οΈ Missing services: {missing_services}") | |
| print("π‘ Try running: docker-compose -f docker-compose-minimal.yml up -d") | |
| return False | |
| return True | |
| except Exception as e: | |
| print(f"Docker services check error: {str(e)}") | |
| return False | |
| async def run_agent_health_check(self) -> bool: | |
| """Check health of individual agents.""" | |
| try: | |
| # Look for agent directories | |
| agent_dirs = [] | |
| for item in self.project_root.iterdir(): | |
| if item.is_dir() and 'agent' in item.name.lower(): | |
| agent_dirs.append(item) | |
| if not agent_dirs: | |
| print("β οΈ No agent directories found") | |
| print("π‘ This is expected if you haven't created agent services yet") | |
| else: | |
| print(f"π€ Found {len(agent_dirs)} agent directories:") | |
| for agent_dir in agent_dirs: | |
| print(f" - {agent_dir.name}") | |
| # Test Kafka client functionality | |
| print("π Testing shared Kafka client...") | |
| kafka_test_result = await self.test_kafka_client() | |
| return kafka_test_result | |
| except Exception as e: | |
| print(f"Agent health check error: {str(e)}") | |
| return False | |
| async def test_kafka_client(self) -> bool: | |
| """Test the shared Kafka client functionality.""" | |
| try: | |
| # Add the shared directory to Python path | |
| sys.path.insert(0, str(self.project_root / "shared")) | |
| from kafka_client import create_kafka_config, KafkaManager | |
| # Create Kafka configuration | |
| config = create_kafka_config(bootstrap_servers="localhost:9092") | |
| # Test Kafka manager | |
| manager = KafkaManager(config) | |
| print(" Testing Kafka producer...") | |
| await manager.start_producer() | |
| # Send test message | |
| success = await manager.send_message( | |
| "test-topic", | |
| {"test": "message", "timestamp": time.time()} | |
| ) | |
| if success: | |
| print(" β Kafka producer test passed") | |
| else: | |
| print(" β Kafka producer test failed") | |
| return False | |
| # Test consumer | |
| print(" Testing Kafka consumer...") | |
| consumer = await manager.create_consumer("test-group", ["test-topic"]) | |
| # Register a test handler | |
| def test_handler(message): | |
| print(f" π¨ Received test message: {message['value']}") | |
| consumer.register_handler("test-topic", test_handler) | |
| # Cleanup | |
| await manager.shutdown() | |
| print(" β Kafka client tests passed") | |
| return True | |
| except Exception as e: | |
| print(f" β Kafka client test error: {str(e)}") | |
| return False | |
| async def test_kafka_communication(self) -> bool: | |
| """Test inter-service communication via Kafka.""" | |
| try: | |
| print("π‘ Testing Kafka communication between services...") | |
| # This would test actual message passing between agents | |
| # For now, we'll do a basic connectivity test | |
| from kafka import KafkaProducer, KafkaConsumer | |
| import json | |
| # Test producer | |
| producer = KafkaProducer( | |
| bootstrap_servers=['localhost:9092'], | |
| value_serializer=lambda v: json.dumps(v).encode('utf-8') | |
| ) | |
| # Send test messages to different topics | |
| test_topics = ['nlp-requests', 'image-generation', 'video-processing'] | |
| for topic in test_topics: | |
| future = producer.send(topic, {'test': f'message for {topic}'}) | |
| result = future.get(timeout=5) | |
| print(f" β Message sent to {topic}") | |
| producer.close() | |
| return True | |
| except Exception as e: | |
| print(f"Kafka communication test error: {str(e)}") | |
| return False | |
| async def run_e2e_workflow_test(self) -> bool: | |
| """Run end-to-end workflow tests.""" | |
| try: | |
| print("π Running end-to-end workflow tests...") | |
| # This would test complete workflows through the system | |
| # For now, we'll simulate a basic workflow | |
| workflow_steps = [ | |
| "API Gateway receives request", | |
| "Request routed to appropriate agent", | |
| "Agent processes request", | |
| "Results stored in MinIO", | |
| "Response sent back through Kafka", | |
| "API Gateway returns response" | |
| ] | |
| for i, step in enumerate(workflow_steps, 1): | |
| print(f" {i}. {step}") | |
| await asyncio.sleep(0.1) # Simulate processing time | |
| print(" β E2E workflow simulation completed") | |
| return True | |
| except Exception as e: | |
| print(f"E2E workflow test error: {str(e)}") | |
| return False | |
| async def run_performance_monitoring(self) -> bool: | |
| """Run performance monitoring checks.""" | |
| try: | |
| print("π Running performance monitoring...") | |
| import psutil | |
| # Check system resources | |
| cpu_percent = psutil.cpu_percent(interval=1) | |
| memory = psutil.virtual_memory() | |
| disk = psutil.disk_usage('/') | |
| print(f" CPU Usage: {cpu_percent}%") | |
| print(f" Memory Usage: {memory.percent}%") | |
| print(f" Disk Usage: {disk.percent}%") | |
| # Check if resources are within acceptable limits | |
| if cpu_percent > 90: | |
| print(" β οΈ High CPU usage detected") | |
| return False | |
| if memory.percent > 90: | |
| print(" β οΈ High memory usage detected") | |
| return False | |
| print(" β System performance within acceptable limits") | |
| return True | |
| except Exception as e: | |
| print(f"Performance monitoring error: {str(e)}") | |
| return False | |
| def print_final_summary(self, overall_success: bool): | |
| """Print final summary of all test results.""" | |
| print("\n" + "=" * 60) | |
| print("π FINAL DEBUG & VALIDATION SUMMARY") | |
| print("=" * 60) | |
| for test_name, result in self.test_results.items(): | |
| status = "β PASS" if result else "β FAIL" | |
| print(f"{test_name:<30} {status}") | |
| passed = sum(1 for result in self.test_results.values() if result) | |
| total = len(self.test_results) | |
| print(f"\nOverall Result: {passed}/{total} tests passed") | |
| if overall_success: | |
| print("π All debugging and validation tests passed!") | |
| print("π‘ Your multi-agent NLP system is ready for production!") | |
| else: | |
| print("β οΈ Some issues detected. Please review and fix before proceeding.") | |
| print("\nπ§ Troubleshooting tips:") | |
| print(" 1. Ensure Docker is running: docker --version") | |
| print(" 2. Start services: docker-compose -f docker-compose-minimal.yml up -d") | |
| print(" 3. Check logs: docker-compose -f docker-compose-minimal.yml logs") | |
| print(" 4. Verify network connectivity") | |
| async def main(): | |
| """Main function to run all debug tests.""" | |
| runner = DebugTestRunner() | |
| try: | |
| success = await runner.run_all_debug_tests() | |
| sys.exit(0 if success else 1) | |
| except KeyboardInterrupt: | |
| print("\nβΉοΈ Debug tests interrupted by user") | |
| sys.exit(1) | |
| except Exception as e: | |
| print(f"\nπ₯ Unexpected error: {str(e)}") | |
| sys.exit(1) | |
| if __name__ == "__main__": | |
| asyncio.run(main()) | |