multi-agent-nlp-system / check_infrastructure.py
CHKIM79's picture
Deploy Multi-Agent NLP System
d59ae58
#!/usr/bin/env python3
"""
Infrastructure validation script for Multi-Agent NLP System.
Checks Docker, Kafka, and other critical infrastructure components.
"""
import asyncio
import subprocess
import sys
import json
import time
from typing import Dict, List, Tuple, Optional
import docker
import requests
from kafka import KafkaProducer, KafkaConsumer
from kafka.errors import KafkaError
class InfrastructureValidator:
"""Validates infrastructure components for the multi-agent system."""
def __init__(self):
self.results = {}
self.docker_client = None
async def run_all_checks(self) -> Dict[str, bool]:
"""Run all infrastructure validation checks."""
print("๐Ÿ” Starting infrastructure validation...")
print("=" * 50)
checks = [
("Docker", self.check_docker),
("Docker Compose", self.check_docker_compose),
("Kafka", self.check_kafka),
("Redis", self.check_redis),
("MinIO/S3", self.check_minio),
("Network Connectivity", self.check_network),
("System Resources", self.check_system_resources),
]
for check_name, check_func in checks:
print(f"\n๐Ÿ“‹ Checking {check_name}...")
try:
result = await check_func()
self.results[check_name] = result
status = "โœ… PASS" if result else "โŒ FAIL"
print(f" {status}")
except Exception as e:
self.results[check_name] = False
print(f" โŒ ERROR: {str(e)}")
return self.results
async def check_docker(self) -> bool:
"""Check if Docker is running and accessible."""
try:
self.docker_client = docker.from_env()
# Test Docker connectivity
self.docker_client.ping()
# Check Docker version
version = self.docker_client.version()
print(f" Docker version: {version['Version']}")
# List running containers
containers = self.docker_client.containers.list()
print(f" Running containers: {len(containers)}")
return True
except Exception as e:
print(f" Docker error: {str(e)}")
return False
async def check_docker_compose(self) -> bool:
"""Check if Docker Compose is available and working."""
try:
# Check if docker-compose command exists
result = subprocess.run(
["docker-compose", "--version"],
capture_output=True,
text=True,
timeout=10
)
if result.returncode == 0:
print(f" {result.stdout.strip()}")
# Check if compose.yaml exists
import os
compose_files = [
"/Users/cuonghuynh/CascadeProjects/multi-agent-nlp-system/docker-compose-minimal.yml",
"/Users/cuonghuynh/CascadeProjects/multi-agent-nlp-system/docker-compose.yml",
"/Users/cuonghuynh/CascadeProjects/multi-agent-nlp-system/docker-compose.yaml"
]
for compose_file in compose_files:
if os.path.exists(compose_file):
print(f" Found compose file: {compose_file}")
return True
print(" Warning: No compose file found")
return False
else:
print(f" Docker Compose not found: {result.stderr}")
return False
except Exception as e:
print(f" Docker Compose error: {str(e)}")
return False
async def check_kafka(self) -> bool:
"""Check if Kafka is accessible."""
try:
# Try to connect to Kafka
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
request_timeout_ms=5000
)
# Test producing a message
future = producer.send('health-check', {'test': 'message'})
producer.flush(timeout=5)
producer.close()
print(" Kafka producer test: SUCCESS")
# Test consumer
consumer = KafkaConsumer(
'health-check',
bootstrap_servers=['localhost:9092'],
auto_offset_reset='latest',
consumer_timeout_ms=2000
)
consumer.close()
print(" Kafka consumer test: SUCCESS")
return True
except Exception as e:
print(f" Kafka error: {str(e)}")
print(" Note: Kafka might not be running. Start with: docker-compose -f docker-compose-minimal.yml up -d")
return False
async def check_redis(self) -> bool:
"""Check if Redis is accessible."""
try:
import redis
r = redis.Redis(host='localhost', port=6379, db=0, socket_timeout=5)
# Test Redis connectivity
r.ping()
# Test basic operations
r.set('health-check', 'test')
value = r.get('health-check')
r.delete('health-check')
print(" Redis connectivity: SUCCESS")
return True
except ImportError:
print(" Redis package not installed: pip3 install redis")
return False
except Exception as e:
print(f" Redis error: {str(e)}")
print(" Note: Redis might not be running. Start with: docker-compose -f docker-compose-minimal.yml up -d")
return False
async def check_minio(self) -> bool:
"""Check if MinIO/S3 storage is accessible."""
try:
# Test MinIO API endpoint
response = requests.get(
'http://localhost:9000/minio/health/live',
timeout=5
)
if response.status_code == 200:
print(" MinIO health check: SUCCESS")
return True
else:
print(f" MinIO health check failed: {response.status_code}")
return False
except requests.exceptions.RequestException as e:
print(f" MinIO error: {str(e)}")
print(" Note: MinIO might not be running. Start with: docker-compose -f docker-compose-minimal.yml up -d")
return False
async def check_network(self) -> bool:
"""Check network connectivity between services."""
try:
# Test localhost connectivity
response = requests.get('http://localhost:8080/health', timeout=5)
if response.status_code == 200:
print(" API Gateway health: SUCCESS")
else:
print(" API Gateway not responding")
except requests.exceptions.RequestException:
print(" API Gateway not running")
# Check Docker network
if self.docker_client:
try:
networks = self.docker_client.networks.list()
print(f" Docker networks: {len(networks)}")
return True
except Exception as e:
print(f" Network check error: {str(e)}")
return False
return True
async def check_system_resources(self) -> bool:
"""Check system resource availability."""
try:
import psutil
# Check CPU usage
cpu_percent = psutil.cpu_percent(interval=1)
print(f" CPU usage: {cpu_percent}%")
# Check memory usage
memory = psutil.virtual_memory()
print(f" Memory usage: {memory.percent}% ({memory.used // (1024**3)}GB / {memory.total // (1024**3)}GB)")
# Check disk usage
disk = psutil.disk_usage('/')
print(f" Disk usage: {disk.percent}% ({disk.used // (1024**3)}GB / {disk.total // (1024**3)}GB)")
# Warning thresholds
if cpu_percent > 80:
print(" โš ๏ธ High CPU usage detected")
if memory.percent > 80:
print(" โš ๏ธ High memory usage detected")
if disk.percent > 80:
print(" โš ๏ธ High disk usage detected")
return True
except ImportError:
print(" psutil package not installed: pip3 install psutil")
return False
except Exception as e:
print(f" System resources error: {str(e)}")
return False
def print_summary(self):
"""Print a summary of all validation results."""
print("\n" + "=" * 50)
print("๐Ÿ“Š INFRASTRUCTURE VALIDATION SUMMARY")
print("=" * 50)
passed = sum(1 for result in self.results.values() if result)
total = len(self.results)
for check_name, result in self.results.items():
status = "โœ… PASS" if result else "โŒ FAIL"
print(f"{check_name:<20} {status}")
print(f"\nOverall: {passed}/{total} checks passed")
if passed == total:
print("๐ŸŽ‰ All infrastructure checks passed!")
return True
else:
print("โš ๏ธ Some infrastructure issues detected. Please review and fix.")
return False
async def main():
"""Main function to run infrastructure validation."""
validator = InfrastructureValidator()
try:
results = await validator.run_all_checks()
success = validator.print_summary()
# Exit with appropriate code
sys.exit(0 if success else 1)
except KeyboardInterrupt:
print("\nโน๏ธ Validation interrupted by user")
sys.exit(1)
except Exception as e:
print(f"\n๐Ÿ’ฅ Unexpected error: {str(e)}")
sys.exit(1)
if __name__ == "__main__":
asyncio.run(main())