""" Multi-Cloud Infrastructure Provisioning """ import asyncio import json import logging import os import time from datetime import datetime from enum import Enum from typing import Any, Optional import boto3 logger = logging.getLogger(__name__) class CloudProvider(Enum): AWS = "aws" GCP = "gcp" AZURE = "azure" DIGITAL_OCEAN = "digitalocean" class MultiCloudProvider: """Individual cloud provider configuration""" def __init__(self, provider: CloudProvider, region: str, credentials: dict[str, str]): self.provider = provider self.region = region self.credentials = credentials self.client = None self.resources = {} self.is_connected = False def _get_client(self): """Get cloud provider client""" try: if self.provider == CloudProvider.AWS: return self._get_aws_client() elif self.provider == CloudProvider.GCP: return self._get_gcp_client() elif self.provider == CloudProvider.AZURE: return self._get_azure_client() elif self.provider == CloudProvider.DIGITAL_OCEAN: return self._get_digital_ocean_client() else: return None except Exception as e: logger.error(f"Error getting {self.provider.value} client: {e}") return None def _get_aws_client(self): """Get AWS boto3 client""" try: session = boto3.Session( aws_access_key_id=self.credentials.get("aws_access_key_id"), aws_secret_access_key=self.credentials.get("aws_secret_access_key"), aws_session_token=self.credentials.get("aws_session_token"), region_name=self.region, ) self.client = { "ec2": session.client("ec2"), "s3": session.client("s3"), "rds": session.client("rds"), "elb": session.client("elbv2"), "autoscaling": session.client("autoscaling"), "cloudformation": session.client("cloudformation"), "route53": session.client("route53"), } return self.client except Exception as e: logger.error(f"Error creating AWS client: {e}") return {} def _get_gcp_client(self): """Get Google Cloud client""" try: from google.cloud import compute_v1, storage self.client = { "compute": compute_v1.Client( credentials=self.credentials.get("gcp_credentials"), project=self.credentials.get("gcp_project_id"), service_account_json=self.credentials.get("gcp_service_account_json"), ), "storage": storage.Client( credentials=self.credentials.get("gcp_credentials"), project=self.credentials.get("gcp_project_id"), ), } return self.client except Exception as e: logger.error(f"Error creating GCP client: {e}") return {} def _get_azure_client(self): """Get Azure client""" try: from azure.identity import ClientSecretCredential from azure.mgmt.compute import ComputeManagementClient from azure.storage.blob import BlobServiceClient credential = ClientSecretCredential( client_id=self.credentials.get("azure_client_id"), secret=self.credentials.get("azure_client_secret"), tenant_id=self.credentials.get("azure_tenant_id"), ) self.client = { "compute": ComputeManagementClient( credential=credential, subscription_id=self.credentials.get("azure_subscription_id"), resource_group=self.credentials.get("azure_resource_group"), ), "storage": BlobServiceClient( account_name=self.credentials.get("azure_storage_account_name"), account_key=self.credentials.get("azure_storage_account_key"), credential=credential, ), } return self.client except Exception as e: logger.error(f"Error creating Azure client: {e}") return {} def _get_digital_ocean_client(self): """Get DigitalOcean client""" try: import digitalocean self.client = { "droplet": digitalocean.Manager( token=self.credentials.get("digitalocean_token"), ), "volumes": digitalocean.Manager( token=self.credentials.get("digitalocean_token"), ), "spaces": digitalocean.Manager( token=self.credentials.get("digitalocean_token"), ), } return self.client except Exception as e: logger.error(f"Error creating DigitalOcean client: {e}") return {} async def test_connection(self) -> bool: """Test cloud provider connection""" try: if self.provider == CloudProvider.AWS: # Test AWS STS sts_client = self.client.get("sts") sts_client.get_caller_identity() logger.info("AWS connection test successful") return True elif self.provider == CloudProvider.GCP: # Test GCP Compute zones = self.client["compute"].zones().list() logger.info(f"GCP connection test successful: {len(zones)} zones available") return True elif self.provider == CloudProvider.AZURE: # Test Azure Compute vm_sizes = self.client["compute"].virtual_machine_sizes.list() logger.info(f"Azure connection test successful: {len(vm_sizes)} VM sizes available") return True elif self.provider == CloudProvider.DIGITAL_OCEAN: # Test DigitalOcean droplets = self.client["droplet"].get_all_droplets() logger.info(f"DigitalOcean connection test successful: {len(droplets)} droplets") return True else: logger.warning(f"Unknown provider: {self.provider.value}") return False except Exception as e: logger.error(f"Connection test failed for {self.provider.value}: {e}") return False async def get_resources(self) -> dict[str, Any]: """Get current resources from cloud provider""" try: resources = {"provider": self.provider.value, "region": self.region} if self.provider == CloudProvider.AWS: # Get EC2 instances ec2_client = self.client["ec2"] instances = ec2_client.describe_instances() resources["instances"] = [ { "id": instance.id, "type": instance.instance_type, "state": instance.state["Name"], "public_ip": instance.public_ip_address or "", "private_ip": instance.private_ip_address or "", "instance_type": instance.instance_type, "launch_time": instance.launch_time, } for instance in instances["Instances"] ] # Get S3 buckets s3_client = self.client["s3"] buckets = s3_client.list_buckets() resources["storage"] = [ {"name": bucket.name, "creation_date": bucket.creation_date, "region": bucket.region} for bucket in buckets["Buckets"] ] elif self.provider == CloudProvider.GCP: # Get GCE instances compute = self.client["compute"] instances = compute.aggregated_list(project=self.credentials.get("gcp_project_id")) resources["instances"] = [ { "id": instance.id, "name": instance.name, "status": instance.status, "machine_type": instance.machine_type, "creation_timestamp": instance.creation_timestamp, "public_ip": instance.network_interfaces[0].access_configs[0]["natIP"] if instance.network_interfaces else [], } for instance in instances if instance ] elif self.provider == CloudProvider.AZURE: # Get Azure VMs compute = self.client["compute"] vms = compute.virtual_machines.list_all() resources["instances"] = [ { "id": vm.name, "name": vm.name, "power_state": vm.power_state, "location": vm.location, "vm_size": vm.hardware_profile.vm_size, "os_type": vm.storage_profile.os_type, } for vm in vms ] elif self.provider == CloudProvider.DIGITAL_OCEAN: # Get DigitalOcean Droplets droplets = self.client["droplet"].get_all_droplets() resources["instances"] = [ { "id": droplet.id, "name": droplet.name, "status": droplet.status, "region": droplet.region.slug, "size": droplet.size.slug, "memory": droplet.memory, "vcpus": droplet.vcpus, "disk": droplet.disk, } for droplet in droplets ] self.resources = resources self.is_connected = True return resources except Exception as e: logger.error(f"Error getting resources from {self.provider.value}: {e}") return {"error": str(e)} async def deploy_instance(self, instance_config: dict[str, Any]) -> dict[str, Any]: """Deploy instance to cloud provider""" try: if self.provider == CloudProvider.AWS: return await self._deploy_aws_instance(instance_config) elif self.provider == CloudProvider.GCP: return await self._deploy_gcp_instance(instance_config) elif self.provider == CloudProvider.AZURE: return await self._deploy_azure_instance(instance_config) elif self.provider == CloudProvider.DIGITAL_OCEAN: return await self._deploy_digital_ocean_instance(instance_config) else: return {"error": f"Unsupported provider: {self.provider.value}"} except Exception as e: logger.error(f"Error deploying instance to {self.provider.value}: {e}") return {"error": str(e)} async def _deploy_aws_instance(self, instance_config: dict[str, Any]) -> dict[str, Any]: """Deploy AWS EC2 instance""" try: ec2_client = self.client["ec2"] # Create instance response = ec2_client.run_instances( ImageId=instance_config.get("image_id", "ami-12345678"), # Default AMI MinCount=1, MaxCount=1, InstanceType=instance_config.get("instance_type", "t2.micro"), KeyName=instance_config.get("key_name", "zenith-key"), SecurityGroupIds=[instance_config.get("security_group_id", "sg-12345678")], SubnetId=instance_config.get("subnet_id", "subnet-12345678"), Ipv6AddressCount=1, TagSpecifications=[ {"Key": "Name", "Value": instance_config.get("name", "zenith-instance")}, {"Key": "Environment", "Value": instance_config.get("environment", "production")}, ], ) if response and "Instances" in response: instance = response["Instances"][0] # Wait for instance to be running await self._wait_for_instance(instance["InstanceId"], instance_config.get("name")) # Associate elastic IP if requested if instance_config.get("associate_elastic_ip", False): self._associate_elastic_ip(instance["InstanceId"], instance["InstanceId"]) # Create DNS record if requested if instance_config.get("create_dns_record", False): self._create_dns_record(instance["InstanceId"], instance_config.get("name"), instance.get("domain")) return { "success": True, "instance_id": instance["InstanceId"], "instance_name": instance_config.get("name"), "public_ip": instance.get("PublicIpAddress"), "private_ip": instance.get("PrivateIpAddress"), "provider": "aws", "region": self.region, } else: return {"error": "Failed to create AWS instance", "details": response} except Exception as e: logger.error(f"Error deploying AWS instance: {e}") return {"error": str(e)} async def _deploy_gcp_instance(self, instance_config: dict[str, Any]) -> dict[str, Any]: """Deploy GCP Compute instance""" try: compute = self.client["compute"] project = self.credentials.get("gcp_project_id") zone = instance_config.get("zone", "us-central1-a") # Create instance configuration instance_config_gcp = { "name": instance_config.get("name", "zenith-instance"), "machine_type": instance_config.get("instance_type", "e2-medium"), "image_project": project, "zone": zone, "tags": [{"key": "environment", "value": instance_config.get("environment", "production")}], } # Create instance operation = compute.instances().insert(project=project, zone=zone, instance_resource=instance_config_gcp) result = operation.execute() if result.status == "DONE": await self._wait_for_gcp_instance(result.name, instance_config.get("name")) return { "success": True, "instance_name": instance_config.get("name"), "instance_id": result.id, "public_ip": self._get_gcp_instance_ip(result.name), "provider": "gcp", "region": self.region, } else: return {"error": "Failed to create GCP instance", "details": result} except Exception as e: logger.error(f"Error deploying GCP instance: {e}") return {"error": str(e)} async def _deploy_azure_instance(self, instance_config: dict[str, Any]) -> dict[str, Any]: """Deploy Azure VM""" try: compute = self.client["compute"] resource_group = self.credentials.get("azure_resource_group") location = instance_config.get("location", "eastus") # Create VM configuration vm_params = { "location": location, "resource_group": resource_group, "vm_size": instance_config.get("instance_type", "Standard_B2s"), "admin_username": instance_config.get("admin_username", "zenithadmin"), "admin_password": instance_config.get("admin_password", "securepassword123"), "computer_name": instance_config.get("name", "zenith-vm"), "tags": [{"key": "environment", "value": instance_config.get("environment", "production")}], } # Create poller to wait for deployment poller = compute.virtual_machines.begin_create_or_update(**vm_params) vm_instance = None while not vm_instance: vm_instance = poller.result() time.sleep(10) # Start the VM if created successfully if vm_instance.provisioning_state == "Succeeded": compute.virtual_machines.start(vm_instance.name) vm_instance = compute.virtual_machines.get(vm_instance.name) await self._wait_for_azure_vm(vm_instance.name, instance_config.get("name")) return { "success": True, "instance_name": instance_config.get("name"), "instance_id": vm_instance.id, "public_ip": vm_instance.network.public_ip_address, "provider": "azure", "region": self.region, } else: return {"error": "VM creation failed", "details": str(vm_instance)} except Exception as e: logger.error(f"Error deploying Azure VM: {e}") return {"error": str(e)} async def _deploy_digital_ocean_instance(self, instance_config: dict[str, Any]) -> dict[str, Any]: """Deploy DigitalOcean Droplet""" try: droplet_client = self.client["droplet"] # Create Droplet droplet_config = { "name": instance_config.get("name", "zenith-droplet"), "region": instance_config.get("region", "nyc3"), "size": instance_config.get("size", "s-2vcpu-4gb"), "image": instance_config.get("image", "ubuntu-22-04"), "tags": [{"key": "environment", "value": instance_config.get("environment", "production")}], } # Create droplet droplet = droplet_client.droplet.create(**droplet_config) droplet_id = droplet.id # Wait for droplet to be active await self._wait_for_digital_ocean_droplet(droplet_id) # Create DNS record if requested if instance_config.get("create_dns_record", False): await self._create_digital_ocean_dns( droplet_id, instance_config.get("name"), instance_config.get("domain") ) return { "success": True, "instance_name": instance_config.get("name"), "instance_id": droplet_id, "public_ip": droplet.networks.v4[0].ip_address, "provider": "digitalocean", "region": instance_config.get("region"), } except Exception as e: logger.error(f"Error deploying DigitalOcean Droplet: {e}") return {"error": str(e)} async def _wait_for_instance(self, instance_id: str, instance_name: str): """Wait for instance to become ready""" max_wait_time = 600 wait_start = time.time() while time.time() - wait_start < max_wait_time: try: if self.provider == CloudProvider.AWS: ec2_client = self.client["ec2"] instance = ec2_client.describe_instances(InstanceIds=[instance_id])[0] if instance["State"]["Name"] == "running": logger.info(f"Instance {instance_name} is ready") return True elif self.provider == CloudProvider.GCP: compute = self.client["compute"] instance = compute.get_instance(instance_id, project=self.credentials.get("gcp_project_id")) if instance.status == "RUNNING": logger.info(f"Instance {instance_name} is ready") return True elif self.provider == CloudProvider.AZURE: compute = self.client["compute"] vm = compute.virtual_machines.get(instance_id) if vm.provisioning_state == "SUCCEEDED": vm_instance = compute.virtual_machines.get(instance_id) compute.virtual_machines.start(vm_instance.name) await self._wait_for_azure_vm(vm_instance.name, instance_name) elif vm_instance.state == "VM running": logger.info(f"Instance {instance_name} is ready") return True elif self.provider == CloudProvider.DIGITAL_OCEAN: droplet_client = self.client["droplet"] droplet = droplet_client.get_droplet(instance_id) if droplet.status == "active": logger.info(f"Droplet {instance_name} is active") return True await asyncio.sleep(10) except Exception as e: logger.error(f"Error waiting for instance: {e}") return False def _get_aws_instance_ip(self, instance_id: str) -> Optional[str]: """Get public IP of AWS instance""" try: ec2_client = self.client["ec2"] instance = ec2_client.describe_instances(InstanceIds=[instance_id])[0] return instance.get("PublicIpAddress") except Exception as e: logger.error(f"Error getting AWS instance IP: {e}") return None async def _get_gcp_instance_ip(self, instance_name: str) -> Optional[str]: """Get public IP of GCP instance""" try: compute = self.client["compute"] instance = compute.get_instance(instance_name, project=self.credentials.get("gcp_project_id")) for _ in range(60): try: instance = compute.get_instance(instance_name, project=self.credentials.get("gcp_project_id")) if instance.status == "RUNNING": network_interfaces = instance.network_interfaces for interface in network_interfaces: if interface.access_configs: return interface.access_configs[0].nat_ip except Exception: pass await asyncio.sleep(10) return None except Exception as e: logger.error(f"Error getting GCP instance IP: {e}") return None def _get_azure_vm_ip(self, vm_name: str) -> Optional[str]: """Get public IP of Azure VM""" try: compute = self.client["compute"] vm = compute.virtual_machines.get(vm_name) if vm.state == "VM running": network_interfaces = vm.network_profile.network_interfaces for interface in network_interfaces: if interface.access_configs: return interface.access_configs[0].name except Exception as e: logger.error(f"Error getting Azure VM IP: {e}") return None def _get_digital_ocean_droplet_ip(self, droplet_id: str) -> Optional[str]: """Get public IP of DigitalOcean droplet""" try: droplet_client = self.client["droplet"] droplet = droplet_client.get_droplet(droplet_id) if droplet.status == "active": network_interfaces = droplet.networks.v4 for interface in network_interfaces: if interface.ip_address: return interface.ip_address except Exception as e: logger.error(f"Error getting DigitalOcean droplet IP: {e}") return None async def _associate_elastic_ip(self, instance_id: str, allocation_id: str): """Associate Elastic IP with AWS instance""" try: ec2_client = self.client["ec2"] ec2_client.associate_address( InstanceId=instance_id, AllocationId=allocation_id, ) logger.info(f"Elastic IP associated for instance {instance_id}") return True except Exception as e: logger.error(f"Error associating Elastic IP: {e}") return False async def _create_dns_record(self, resource_id: str, name: str, domain: str = None): """Create DNS record for instance""" if not domain: return False try: logger.info(f"DNS record would be created for {name} -> {domain}") return True except Exception as e: logger.error(f"Error creating DNS record: {e}") return False async def _create_digital_ocean_dns(self, droplet_id: str, name: str, domain: str = None): """Create DNS record for DigitalOcean droplet""" if not domain: return False try: logger.info(f"DNS record would be created for {name} -> {domain}") return True except Exception as e: logger.error(f"Error creating DNS record: {e}") return False class MultiCloudOrchestrator: """Multi-cloud deployment orchestration""" def __init__(self): self.cloud_providers = [] self.active_deployments = [] self.is_running = False self.deployment_history = [] self.redis = None async def initialize_providers(self, redis_client=None): """Initialize cloud providers from environment variables""" try: # Configure AWS if os.getenv("AWS_ENABLED", "true").lower() == "true": aws_creds = { "aws_access_key_id": os.getenv("AWS_ACCESS_KEY_ID"), "aws_secret_access_key": os.getenv("AWS_SECRET_ACCESS_KEY"), "aws_region": os.getenv("AWS_REGION", "us-east-1"), } aws_provider = MultiCloudProvider(CloudProvider.AWS, os.getenv("AWS_REGION", "us-east-1"), aws_creds) self.cloud_providers.append(aws_provider) logger.info("AWS provider configured") # Configure GCP if os.getenv("GCP_ENABLED", "true").lower() == "true": gcp_creds = { "gcp_credentials": os.getenv("GCP_CREDENTIALS"), "gcp_project_id": os.getenv("GCP_PROJECT_ID"), "gcp_region": os.getenv("GCP_REGION", "us-central1"), } gcp_provider = MultiCloudProvider(CloudProvider.GCP, os.getenv("GCP_REGION", "us-central1"), gcp_creds) self.cloud_providers.append(gcp_provider) logger.info("GCP provider configured") # Configure Azure if os.getenv("AZURE_ENABLED", "true").lower() == "true": azure_creds = { "azure_client_id": os.getenv("AZURE_CLIENT_ID"), "azure_client_secret": os.getenv("AZURE_CLIENT_SECRET"), "azure_subscription_id": os.getenv("AZURE_SUBSCRIPTION_ID"), "azure_tenant_id": os.getenv("AZURE_TENANT_ID"), "azure_resource_group": os.getenv("AZURE_RESOURCE_GROUP"), } azure_provider = MultiCloudProvider( CloudProvider.AZURE, os.getenv("AZURE_REGION", "eastus"), azure_creds ) self.cloud_providers.append(azure_provider) logger.info("Azure provider configured") # Configure DigitalOcean if os.getenv("DIGITAL_OCEAN_ENABLED", "true").lower() == "true": do_creds = { "digitalocean_token": os.getenv("DIGITAL_OCEAN_TOKEN"), "digitalocean_region": os.getenv("DIGITAL_OCEAN_REGION", "nyc3"), } do_provider = MultiCloudProvider( CloudProvider.DIGITAL_OCEAN, os.getenv("DIGITAL_OCEAN_REGION", "nyc3"), do_creds ) self.cloud_providers.append(do_provider) logger.info("DigitalOcean provider configured") self.redis = redis_client # Test connections for provider in self.cloud_providers: connected = await provider.test_connection() if connected: provider.is_connected = True logger.info(f"Connected to {provider.provider.value}") else: logger.error(f"Failed to connect to {provider.value}") logger.info(f"Multi-cloud setup complete: {len(self.cloud_providers)} providers configured") return True except Exception as e: logger.error(f"Error initializing cloud providers: {e}") return False async def deploy_multi_cloud_instance( self, provider: CloudProvider, instance_config: dict[str, Any] ) -> dict[str, Any]: """Deploy instance to specified cloud provider""" try: # Find provider provider = next((p for p in self.cloud_providers if p.provider == provider and p.is_connected), None) if not provider: return {"error": f"No available {provider.value} provider"} # Deploy instance result = await provider.deploy_instance(instance_config) deployment = { "provider": provider.provider.value, "provider_region": provider.region, "instance_config": instance_config, "result": result, "timestamp": datetime.utcnow().isoformat(), } self.active_deployments.append(deployment) self.deployment_history.append(deployment) # Store deployment in Redis if self.redis: await self.redis.lpush("deployment_history", json.dumps(deployment)) await self.redis.ltrim("deployment_history", 0, 1000) return result except Exception as e: logger.error(f"Error deploying to {provider.value}: {e}") return {"error": str(e)} async def get_all_resources(self) -> dict[str, Any]: """Get resources from all connected cloud providers""" resources = {"providers": [], "total_instances": 0, "total_cost_estimate": 0} for provider in self.cloud_providers: if provider.is_connected: try: provider_resources = await provider.get_resources() provider_resources["provider"] = provider.provider.value resources["total_instances"] += len(provider_resources.get("instances", [])) # Estimate cost (simplified) instance_costs = { "t2.micro": 0.011, "t2.small": 0.018, "t2.medium": 0.036, "t2.large": 0.072, "c2.large": 0.144, "m5.large": 0.192, "s-1vcpu-1gb": 0.010, "s-2vcpu-2gb": 0.020, "s-4vcpu-4gb": 0.040, "s-8vcpu-8gb": 0.080, "s-16vcpu-64gb": 1.280, "s-32vcpu-128gb": 2.560, "m6-16vcpu-64gb": 1.920, "c-32vcpu-128gb": 3.840, "m-32vcpu-16gb": 3.840, } cost_estimate = sum( [ instance_costs.get(instance.get("machine_type", ""), 0.01) for instance in provider_resources.get("instances", []) ] ) resources["cost_estimate"] = cost_estimate resources["resources"] = provider_resources except Exception as e: logger.error(f"Error getting resources from {provider.value}: {e}") resources = {"provider": provider.provider.value, "error": str(e)} return resources async def health_check_all_providers(self) -> dict[str, Any]: """Health check all cloud providers""" health_status = { "total_providers": len(self.cloud_providers), "healthy_providers": 0, "unhealthy_providers": 0, "provider_status": {}, } for provider in self.cloud_providers: try: is_healthy = await provider.test_connection() health_status[provider.provider.value] = { "connected": is_healthy, "last_check": datetime.utcnow().isoformat(), } if is_healthy: health_status["healthy_providers"] += 1 health_status["provider_status"][provider.provider.value] = health_status except Exception as e: health_status["provider_status"][provider.provider.value] = { "connected": False, "error": str(e), "last_check": datetime.utcnow().isoformat(), } return health_status async def start_monitoring(self): """Start continuous monitoring of all cloud providers""" self.is_running = True logger.info("Multi-cloud monitoring started") async def monitoring_loop(): while self.is_running: try: health_status = await self.health_check_all_providers() total_providers = health_status["total_providers"] healthy_providers = health_status["healthy_providers"] if healthy_providers < total_providers: logger.warning(f"Only {healthy_providers}/{total_providers} cloud providers are healthy") else: logger.info(f"All {healthy_providers}/{total_providers} cloud providers are healthy") # Take corrective action if needed for provider in self.cloud_providers: if not provider.is_connected: logger.error(f"Attempting to reconnect to {provider.provider.value}") await provider.test_connection() await asyncio.sleep(300) # Check every 5 minutes except Exception as e: logger.error(f"Monitoring loop error: {e}") await asyncio.sleep(60) self.monitoring_task = asyncio.create_task(monitoring_loop()) logger.info("Multi-cloud monitoring task started") async def stop_monitoring(self): """Stop multi-cloud monitoring""" self.is_running = False if hasattr(self, "monitoring_task"): self.monitoring_task.cancel() logger.info("Multi-cloud monitoring stopped") async def graceful_shutdown(self): """Gracefully shutdown multi-cloud orchestration""" self.is_running = False await self.stop_monitoring() logger.info("Multi-cloud orchestration gracefully shutdown") # Global multi-cloud instance multi_cloud_orchestrator = MultiCloudOrchestrator() # Initialization function async def initialize_multi_cloud_infrastructure(redis_client=None): """Initialize multi-cloud infrastructure""" orchestrator = MultiCloudOrchestrator() success = await orchestrator.initialize_providers(redis_client) if success: await orchestrator.start_monitoring() logger.info("Multi-cloud infrastructure initialized") return orchestrator