Spaces:
Paused
Paused
| """ | |
| Multi-Cloud Infrastructure Provisioning | |
| """ | |
| import asyncio | |
| import json | |
| import logging | |
| import os | |
| import time | |
| from datetime import datetime | |
| from enum import Enum | |
| from typing import Any, Optional | |
| import boto3 | |
| logger = logging.getLogger(__name__) | |
| class CloudProvider(Enum): | |
| AWS = "aws" | |
| GCP = "gcp" | |
| AZURE = "azure" | |
| DIGITAL_OCEAN = "digitalocean" | |
| class MultiCloudProvider: | |
| """Individual cloud provider configuration""" | |
| def __init__(self, provider: CloudProvider, region: str, credentials: dict[str, str]): | |
| self.provider = provider | |
| self.region = region | |
| self.credentials = credentials | |
| self.client = None | |
| self.resources = {} | |
| self.is_connected = False | |
| def _get_client(self): | |
| """Get cloud provider client""" | |
| try: | |
| if self.provider == CloudProvider.AWS: | |
| return self._get_aws_client() | |
| elif self.provider == CloudProvider.GCP: | |
| return self._get_gcp_client() | |
| elif self.provider == CloudProvider.AZURE: | |
| return self._get_azure_client() | |
| elif self.provider == CloudProvider.DIGITAL_OCEAN: | |
| return self._get_digital_ocean_client() | |
| else: | |
| return None | |
| except Exception as e: | |
| logger.error(f"Error getting {self.provider.value} client: {e}") | |
| return None | |
| def _get_aws_client(self): | |
| """Get AWS boto3 client""" | |
| try: | |
| session = boto3.Session( | |
| aws_access_key_id=self.credentials.get("aws_access_key_id"), | |
| aws_secret_access_key=self.credentials.get("aws_secret_access_key"), | |
| aws_session_token=self.credentials.get("aws_session_token"), | |
| region_name=self.region, | |
| ) | |
| self.client = { | |
| "ec2": session.client("ec2"), | |
| "s3": session.client("s3"), | |
| "rds": session.client("rds"), | |
| "elb": session.client("elbv2"), | |
| "autoscaling": session.client("autoscaling"), | |
| "cloudformation": session.client("cloudformation"), | |
| "route53": session.client("route53"), | |
| } | |
| return self.client | |
| except Exception as e: | |
| logger.error(f"Error creating AWS client: {e}") | |
| return {} | |
| def _get_gcp_client(self): | |
| """Get Google Cloud client""" | |
| try: | |
| from google.cloud import compute_v1, storage | |
| self.client = { | |
| "compute": compute_v1.Client( | |
| credentials=self.credentials.get("gcp_credentials"), | |
| project=self.credentials.get("gcp_project_id"), | |
| service_account_json=self.credentials.get("gcp_service_account_json"), | |
| ), | |
| "storage": storage.Client( | |
| credentials=self.credentials.get("gcp_credentials"), | |
| project=self.credentials.get("gcp_project_id"), | |
| ), | |
| } | |
| return self.client | |
| except Exception as e: | |
| logger.error(f"Error creating GCP client: {e}") | |
| return {} | |
| def _get_azure_client(self): | |
| """Get Azure client""" | |
| try: | |
| from azure.identity import ClientSecretCredential | |
| from azure.mgmt.compute import ComputeManagementClient | |
| from azure.storage.blob import BlobServiceClient | |
| credential = ClientSecretCredential( | |
| client_id=self.credentials.get("azure_client_id"), | |
| secret=self.credentials.get("azure_client_secret"), | |
| tenant_id=self.credentials.get("azure_tenant_id"), | |
| ) | |
| self.client = { | |
| "compute": ComputeManagementClient( | |
| credential=credential, | |
| subscription_id=self.credentials.get("azure_subscription_id"), | |
| resource_group=self.credentials.get("azure_resource_group"), | |
| ), | |
| "storage": BlobServiceClient( | |
| account_name=self.credentials.get("azure_storage_account_name"), | |
| account_key=self.credentials.get("azure_storage_account_key"), | |
| credential=credential, | |
| ), | |
| } | |
| return self.client | |
| except Exception as e: | |
| logger.error(f"Error creating Azure client: {e}") | |
| return {} | |
| def _get_digital_ocean_client(self): | |
| """Get DigitalOcean client""" | |
| try: | |
| import digitalocean | |
| self.client = { | |
| "droplet": digitalocean.Manager( | |
| token=self.credentials.get("digitalocean_token"), | |
| ), | |
| "volumes": digitalocean.Manager( | |
| token=self.credentials.get("digitalocean_token"), | |
| ), | |
| "spaces": digitalocean.Manager( | |
| token=self.credentials.get("digitalocean_token"), | |
| ), | |
| } | |
| return self.client | |
| except Exception as e: | |
| logger.error(f"Error creating DigitalOcean client: {e}") | |
| return {} | |
| async def test_connection(self) -> bool: | |
| """Test cloud provider connection""" | |
| try: | |
| if self.provider == CloudProvider.AWS: | |
| # Test AWS STS | |
| sts_client = self.client.get("sts") | |
| sts_client.get_caller_identity() | |
| logger.info("AWS connection test successful") | |
| return True | |
| elif self.provider == CloudProvider.GCP: | |
| # Test GCP Compute | |
| zones = self.client["compute"].zones().list() | |
| logger.info(f"GCP connection test successful: {len(zones)} zones available") | |
| return True | |
| elif self.provider == CloudProvider.AZURE: | |
| # Test Azure Compute | |
| vm_sizes = self.client["compute"].virtual_machine_sizes.list() | |
| logger.info(f"Azure connection test successful: {len(vm_sizes)} VM sizes available") | |
| return True | |
| elif self.provider == CloudProvider.DIGITAL_OCEAN: | |
| # Test DigitalOcean | |
| droplets = self.client["droplet"].get_all_droplets() | |
| logger.info(f"DigitalOcean connection test successful: {len(droplets)} droplets") | |
| return True | |
| else: | |
| logger.warning(f"Unknown provider: {self.provider.value}") | |
| return False | |
| except Exception as e: | |
| logger.error(f"Connection test failed for {self.provider.value}: {e}") | |
| return False | |
| async def get_resources(self) -> dict[str, Any]: | |
| """Get current resources from cloud provider""" | |
| try: | |
| resources = {"provider": self.provider.value, "region": self.region} | |
| if self.provider == CloudProvider.AWS: | |
| # Get EC2 instances | |
| ec2_client = self.client["ec2"] | |
| instances = ec2_client.describe_instances() | |
| resources["instances"] = [ | |
| { | |
| "id": instance.id, | |
| "type": instance.instance_type, | |
| "state": instance.state["Name"], | |
| "public_ip": instance.public_ip_address or "", | |
| "private_ip": instance.private_ip_address or "", | |
| "instance_type": instance.instance_type, | |
| "launch_time": instance.launch_time, | |
| } | |
| for instance in instances["Instances"] | |
| ] | |
| # Get S3 buckets | |
| s3_client = self.client["s3"] | |
| buckets = s3_client.list_buckets() | |
| resources["storage"] = [ | |
| {"name": bucket.name, "creation_date": bucket.creation_date, "region": bucket.region} | |
| for bucket in buckets["Buckets"] | |
| ] | |
| elif self.provider == CloudProvider.GCP: | |
| # Get GCE instances | |
| compute = self.client["compute"] | |
| instances = compute.aggregated_list(project=self.credentials.get("gcp_project_id")) | |
| resources["instances"] = [ | |
| { | |
| "id": instance.id, | |
| "name": instance.name, | |
| "status": instance.status, | |
| "machine_type": instance.machine_type, | |
| "creation_timestamp": instance.creation_timestamp, | |
| "public_ip": instance.network_interfaces[0].access_configs[0]["natIP"] | |
| if instance.network_interfaces | |
| else [], | |
| } | |
| for instance in instances | |
| if instance | |
| ] | |
| elif self.provider == CloudProvider.AZURE: | |
| # Get Azure VMs | |
| compute = self.client["compute"] | |
| vms = compute.virtual_machines.list_all() | |
| resources["instances"] = [ | |
| { | |
| "id": vm.name, | |
| "name": vm.name, | |
| "power_state": vm.power_state, | |
| "location": vm.location, | |
| "vm_size": vm.hardware_profile.vm_size, | |
| "os_type": vm.storage_profile.os_type, | |
| } | |
| for vm in vms | |
| ] | |
| elif self.provider == CloudProvider.DIGITAL_OCEAN: | |
| # Get DigitalOcean Droplets | |
| droplets = self.client["droplet"].get_all_droplets() | |
| resources["instances"] = [ | |
| { | |
| "id": droplet.id, | |
| "name": droplet.name, | |
| "status": droplet.status, | |
| "region": droplet.region.slug, | |
| "size": droplet.size.slug, | |
| "memory": droplet.memory, | |
| "vcpus": droplet.vcpus, | |
| "disk": droplet.disk, | |
| } | |
| for droplet in droplets | |
| ] | |
| self.resources = resources | |
| self.is_connected = True | |
| return resources | |
| except Exception as e: | |
| logger.error(f"Error getting resources from {self.provider.value}: {e}") | |
| return {"error": str(e)} | |
| async def deploy_instance(self, instance_config: dict[str, Any]) -> dict[str, Any]: | |
| """Deploy instance to cloud provider""" | |
| try: | |
| if self.provider == CloudProvider.AWS: | |
| return await self._deploy_aws_instance(instance_config) | |
| elif self.provider == CloudProvider.GCP: | |
| return await self._deploy_gcp_instance(instance_config) | |
| elif self.provider == CloudProvider.AZURE: | |
| return await self._deploy_azure_instance(instance_config) | |
| elif self.provider == CloudProvider.DIGITAL_OCEAN: | |
| return await self._deploy_digital_ocean_instance(instance_config) | |
| else: | |
| return {"error": f"Unsupported provider: {self.provider.value}"} | |
| except Exception as e: | |
| logger.error(f"Error deploying instance to {self.provider.value}: {e}") | |
| return {"error": str(e)} | |
| async def _deploy_aws_instance(self, instance_config: dict[str, Any]) -> dict[str, Any]: | |
| """Deploy AWS EC2 instance""" | |
| try: | |
| ec2_client = self.client["ec2"] | |
| # Create instance | |
| response = ec2_client.run_instances( | |
| ImageId=instance_config.get("image_id", "ami-12345678"), # Default AMI | |
| MinCount=1, | |
| MaxCount=1, | |
| InstanceType=instance_config.get("instance_type", "t2.micro"), | |
| KeyName=instance_config.get("key_name", "zenith-key"), | |
| SecurityGroupIds=[instance_config.get("security_group_id", "sg-12345678")], | |
| SubnetId=instance_config.get("subnet_id", "subnet-12345678"), | |
| Ipv6AddressCount=1, | |
| TagSpecifications=[ | |
| {"Key": "Name", "Value": instance_config.get("name", "zenith-instance")}, | |
| {"Key": "Environment", "Value": instance_config.get("environment", "production")}, | |
| ], | |
| ) | |
| if response and "Instances" in response: | |
| instance = response["Instances"][0] | |
| # Wait for instance to be running | |
| await self._wait_for_instance(instance["InstanceId"], instance_config.get("name")) | |
| # Associate elastic IP if requested | |
| if instance_config.get("associate_elastic_ip", False): | |
| self._associate_elastic_ip(instance["InstanceId"], instance["InstanceId"]) | |
| # Create DNS record if requested | |
| if instance_config.get("create_dns_record", False): | |
| self._create_dns_record(instance["InstanceId"], instance_config.get("name"), instance.get("domain")) | |
| return { | |
| "success": True, | |
| "instance_id": instance["InstanceId"], | |
| "instance_name": instance_config.get("name"), | |
| "public_ip": instance.get("PublicIpAddress"), | |
| "private_ip": instance.get("PrivateIpAddress"), | |
| "provider": "aws", | |
| "region": self.region, | |
| } | |
| else: | |
| return {"error": "Failed to create AWS instance", "details": response} | |
| except Exception as e: | |
| logger.error(f"Error deploying AWS instance: {e}") | |
| return {"error": str(e)} | |
| async def _deploy_gcp_instance(self, instance_config: dict[str, Any]) -> dict[str, Any]: | |
| """Deploy GCP Compute instance""" | |
| try: | |
| compute = self.client["compute"] | |
| project = self.credentials.get("gcp_project_id") | |
| zone = instance_config.get("zone", "us-central1-a") | |
| # Create instance configuration | |
| instance_config_gcp = { | |
| "name": instance_config.get("name", "zenith-instance"), | |
| "machine_type": instance_config.get("instance_type", "e2-medium"), | |
| "image_project": project, | |
| "zone": zone, | |
| "tags": [{"key": "environment", "value": instance_config.get("environment", "production")}], | |
| } | |
| # Create instance | |
| operation = compute.instances().insert(project=project, zone=zone, instance_resource=instance_config_gcp) | |
| result = operation.execute() | |
| if result.status == "DONE": | |
| await self._wait_for_gcp_instance(result.name, instance_config.get("name")) | |
| return { | |
| "success": True, | |
| "instance_name": instance_config.get("name"), | |
| "instance_id": result.id, | |
| "public_ip": self._get_gcp_instance_ip(result.name), | |
| "provider": "gcp", | |
| "region": self.region, | |
| } | |
| else: | |
| return {"error": "Failed to create GCP instance", "details": result} | |
| except Exception as e: | |
| logger.error(f"Error deploying GCP instance: {e}") | |
| return {"error": str(e)} | |
| async def _deploy_azure_instance(self, instance_config: dict[str, Any]) -> dict[str, Any]: | |
| """Deploy Azure VM""" | |
| try: | |
| compute = self.client["compute"] | |
| resource_group = self.credentials.get("azure_resource_group") | |
| location = instance_config.get("location", "eastus") | |
| # Create VM configuration | |
| vm_params = { | |
| "location": location, | |
| "resource_group": resource_group, | |
| "vm_size": instance_config.get("instance_type", "Standard_B2s"), | |
| "admin_username": instance_config.get("admin_username", "zenithadmin"), | |
| "admin_password": instance_config.get("admin_password", "securepassword123"), | |
| "computer_name": instance_config.get("name", "zenith-vm"), | |
| "tags": [{"key": "environment", "value": instance_config.get("environment", "production")}], | |
| } | |
| # Create poller to wait for deployment | |
| poller = compute.virtual_machines.begin_create_or_update(**vm_params) | |
| vm_instance = None | |
| while not vm_instance: | |
| vm_instance = poller.result() | |
| time.sleep(10) | |
| # Start the VM if created successfully | |
| if vm_instance.provisioning_state == "Succeeded": | |
| compute.virtual_machines.start(vm_instance.name) | |
| vm_instance = compute.virtual_machines.get(vm_instance.name) | |
| await self._wait_for_azure_vm(vm_instance.name, instance_config.get("name")) | |
| return { | |
| "success": True, | |
| "instance_name": instance_config.get("name"), | |
| "instance_id": vm_instance.id, | |
| "public_ip": vm_instance.network.public_ip_address, | |
| "provider": "azure", | |
| "region": self.region, | |
| } | |
| else: | |
| return {"error": "VM creation failed", "details": str(vm_instance)} | |
| except Exception as e: | |
| logger.error(f"Error deploying Azure VM: {e}") | |
| return {"error": str(e)} | |
| async def _deploy_digital_ocean_instance(self, instance_config: dict[str, Any]) -> dict[str, Any]: | |
| """Deploy DigitalOcean Droplet""" | |
| try: | |
| droplet_client = self.client["droplet"] | |
| # Create Droplet | |
| droplet_config = { | |
| "name": instance_config.get("name", "zenith-droplet"), | |
| "region": instance_config.get("region", "nyc3"), | |
| "size": instance_config.get("size", "s-2vcpu-4gb"), | |
| "image": instance_config.get("image", "ubuntu-22-04"), | |
| "tags": [{"key": "environment", "value": instance_config.get("environment", "production")}], | |
| } | |
| # Create droplet | |
| droplet = droplet_client.droplet.create(**droplet_config) | |
| droplet_id = droplet.id | |
| # Wait for droplet to be active | |
| await self._wait_for_digital_ocean_droplet(droplet_id) | |
| # Create DNS record if requested | |
| if instance_config.get("create_dns_record", False): | |
| await self._create_digital_ocean_dns( | |
| droplet_id, instance_config.get("name"), instance_config.get("domain") | |
| ) | |
| return { | |
| "success": True, | |
| "instance_name": instance_config.get("name"), | |
| "instance_id": droplet_id, | |
| "public_ip": droplet.networks.v4[0].ip_address, | |
| "provider": "digitalocean", | |
| "region": instance_config.get("region"), | |
| } | |
| except Exception as e: | |
| logger.error(f"Error deploying DigitalOcean Droplet: {e}") | |
| return {"error": str(e)} | |
| async def _wait_for_instance(self, instance_id: str, instance_name: str): | |
| """Wait for instance to become ready""" | |
| max_wait_time = 600 | |
| wait_start = time.time() | |
| while time.time() - wait_start < max_wait_time: | |
| try: | |
| if self.provider == CloudProvider.AWS: | |
| ec2_client = self.client["ec2"] | |
| instance = ec2_client.describe_instances(InstanceIds=[instance_id])[0] | |
| if instance["State"]["Name"] == "running": | |
| logger.info(f"Instance {instance_name} is ready") | |
| return True | |
| elif self.provider == CloudProvider.GCP: | |
| compute = self.client["compute"] | |
| instance = compute.get_instance(instance_id, project=self.credentials.get("gcp_project_id")) | |
| if instance.status == "RUNNING": | |
| logger.info(f"Instance {instance_name} is ready") | |
| return True | |
| elif self.provider == CloudProvider.AZURE: | |
| compute = self.client["compute"] | |
| vm = compute.virtual_machines.get(instance_id) | |
| if vm.provisioning_state == "SUCCEEDED": | |
| vm_instance = compute.virtual_machines.get(instance_id) | |
| compute.virtual_machines.start(vm_instance.name) | |
| await self._wait_for_azure_vm(vm_instance.name, instance_name) | |
| elif vm_instance.state == "VM running": | |
| logger.info(f"Instance {instance_name} is ready") | |
| return True | |
| elif self.provider == CloudProvider.DIGITAL_OCEAN: | |
| droplet_client = self.client["droplet"] | |
| droplet = droplet_client.get_droplet(instance_id) | |
| if droplet.status == "active": | |
| logger.info(f"Droplet {instance_name} is active") | |
| return True | |
| await asyncio.sleep(10) | |
| except Exception as e: | |
| logger.error(f"Error waiting for instance: {e}") | |
| return False | |
| def _get_aws_instance_ip(self, instance_id: str) -> Optional[str]: | |
| """Get public IP of AWS instance""" | |
| try: | |
| ec2_client = self.client["ec2"] | |
| instance = ec2_client.describe_instances(InstanceIds=[instance_id])[0] | |
| return instance.get("PublicIpAddress") | |
| except Exception as e: | |
| logger.error(f"Error getting AWS instance IP: {e}") | |
| return None | |
| async def _get_gcp_instance_ip(self, instance_name: str) -> Optional[str]: | |
| """Get public IP of GCP instance""" | |
| try: | |
| compute = self.client["compute"] | |
| instance = compute.get_instance(instance_name, project=self.credentials.get("gcp_project_id")) | |
| for _ in range(60): | |
| try: | |
| instance = compute.get_instance(instance_name, project=self.credentials.get("gcp_project_id")) | |
| if instance.status == "RUNNING": | |
| network_interfaces = instance.network_interfaces | |
| for interface in network_interfaces: | |
| if interface.access_configs: | |
| return interface.access_configs[0].nat_ip | |
| except Exception: | |
| pass | |
| await asyncio.sleep(10) | |
| return None | |
| except Exception as e: | |
| logger.error(f"Error getting GCP instance IP: {e}") | |
| return None | |
| def _get_azure_vm_ip(self, vm_name: str) -> Optional[str]: | |
| """Get public IP of Azure VM""" | |
| try: | |
| compute = self.client["compute"] | |
| vm = compute.virtual_machines.get(vm_name) | |
| if vm.state == "VM running": | |
| network_interfaces = vm.network_profile.network_interfaces | |
| for interface in network_interfaces: | |
| if interface.access_configs: | |
| return interface.access_configs[0].name | |
| except Exception as e: | |
| logger.error(f"Error getting Azure VM IP: {e}") | |
| return None | |
| def _get_digital_ocean_droplet_ip(self, droplet_id: str) -> Optional[str]: | |
| """Get public IP of DigitalOcean droplet""" | |
| try: | |
| droplet_client = self.client["droplet"] | |
| droplet = droplet_client.get_droplet(droplet_id) | |
| if droplet.status == "active": | |
| network_interfaces = droplet.networks.v4 | |
| for interface in network_interfaces: | |
| if interface.ip_address: | |
| return interface.ip_address | |
| except Exception as e: | |
| logger.error(f"Error getting DigitalOcean droplet IP: {e}") | |
| return None | |
| async def _associate_elastic_ip(self, instance_id: str, allocation_id: str): | |
| """Associate Elastic IP with AWS instance""" | |
| try: | |
| ec2_client = self.client["ec2"] | |
| ec2_client.associate_address( | |
| InstanceId=instance_id, | |
| AllocationId=allocation_id, | |
| ) | |
| logger.info(f"Elastic IP associated for instance {instance_id}") | |
| return True | |
| except Exception as e: | |
| logger.error(f"Error associating Elastic IP: {e}") | |
| return False | |
| async def _create_dns_record(self, resource_id: str, name: str, domain: str = None): | |
| """Create DNS record for instance""" | |
| if not domain: | |
| return False | |
| try: | |
| logger.info(f"DNS record would be created for {name} -> {domain}") | |
| return True | |
| except Exception as e: | |
| logger.error(f"Error creating DNS record: {e}") | |
| return False | |
| async def _create_digital_ocean_dns(self, droplet_id: str, name: str, domain: str = None): | |
| """Create DNS record for DigitalOcean droplet""" | |
| if not domain: | |
| return False | |
| try: | |
| logger.info(f"DNS record would be created for {name} -> {domain}") | |
| return True | |
| except Exception as e: | |
| logger.error(f"Error creating DNS record: {e}") | |
| return False | |
| class MultiCloudOrchestrator: | |
| """Multi-cloud deployment orchestration""" | |
| def __init__(self): | |
| self.cloud_providers = [] | |
| self.active_deployments = [] | |
| self.is_running = False | |
| self.deployment_history = [] | |
| self.redis = None | |
| async def initialize_providers(self, redis_client=None): | |
| """Initialize cloud providers from environment variables""" | |
| try: | |
| # Configure AWS | |
| if os.getenv("AWS_ENABLED", "true").lower() == "true": | |
| aws_creds = { | |
| "aws_access_key_id": os.getenv("AWS_ACCESS_KEY_ID"), | |
| "aws_secret_access_key": os.getenv("AWS_SECRET_ACCESS_KEY"), | |
| "aws_region": os.getenv("AWS_REGION", "us-east-1"), | |
| } | |
| aws_provider = MultiCloudProvider(CloudProvider.AWS, os.getenv("AWS_REGION", "us-east-1"), aws_creds) | |
| self.cloud_providers.append(aws_provider) | |
| logger.info("AWS provider configured") | |
| # Configure GCP | |
| if os.getenv("GCP_ENABLED", "true").lower() == "true": | |
| gcp_creds = { | |
| "gcp_credentials": os.getenv("GCP_CREDENTIALS"), | |
| "gcp_project_id": os.getenv("GCP_PROJECT_ID"), | |
| "gcp_region": os.getenv("GCP_REGION", "us-central1"), | |
| } | |
| gcp_provider = MultiCloudProvider(CloudProvider.GCP, os.getenv("GCP_REGION", "us-central1"), gcp_creds) | |
| self.cloud_providers.append(gcp_provider) | |
| logger.info("GCP provider configured") | |
| # Configure Azure | |
| if os.getenv("AZURE_ENABLED", "true").lower() == "true": | |
| azure_creds = { | |
| "azure_client_id": os.getenv("AZURE_CLIENT_ID"), | |
| "azure_client_secret": os.getenv("AZURE_CLIENT_SECRET"), | |
| "azure_subscription_id": os.getenv("AZURE_SUBSCRIPTION_ID"), | |
| "azure_tenant_id": os.getenv("AZURE_TENANT_ID"), | |
| "azure_resource_group": os.getenv("AZURE_RESOURCE_GROUP"), | |
| } | |
| azure_provider = MultiCloudProvider( | |
| CloudProvider.AZURE, os.getenv("AZURE_REGION", "eastus"), azure_creds | |
| ) | |
| self.cloud_providers.append(azure_provider) | |
| logger.info("Azure provider configured") | |
| # Configure DigitalOcean | |
| if os.getenv("DIGITAL_OCEAN_ENABLED", "true").lower() == "true": | |
| do_creds = { | |
| "digitalocean_token": os.getenv("DIGITAL_OCEAN_TOKEN"), | |
| "digitalocean_region": os.getenv("DIGITAL_OCEAN_REGION", "nyc3"), | |
| } | |
| do_provider = MultiCloudProvider( | |
| CloudProvider.DIGITAL_OCEAN, os.getenv("DIGITAL_OCEAN_REGION", "nyc3"), do_creds | |
| ) | |
| self.cloud_providers.append(do_provider) | |
| logger.info("DigitalOcean provider configured") | |
| self.redis = redis_client | |
| # Test connections | |
| for provider in self.cloud_providers: | |
| connected = await provider.test_connection() | |
| if connected: | |
| provider.is_connected = True | |
| logger.info(f"Connected to {provider.provider.value}") | |
| else: | |
| logger.error(f"Failed to connect to {provider.value}") | |
| logger.info(f"Multi-cloud setup complete: {len(self.cloud_providers)} providers configured") | |
| return True | |
| except Exception as e: | |
| logger.error(f"Error initializing cloud providers: {e}") | |
| return False | |
| async def deploy_multi_cloud_instance( | |
| self, provider: CloudProvider, instance_config: dict[str, Any] | |
| ) -> dict[str, Any]: | |
| """Deploy instance to specified cloud provider""" | |
| try: | |
| # Find provider | |
| provider = next((p for p in self.cloud_providers if p.provider == provider and p.is_connected), None) | |
| if not provider: | |
| return {"error": f"No available {provider.value} provider"} | |
| # Deploy instance | |
| result = await provider.deploy_instance(instance_config) | |
| deployment = { | |
| "provider": provider.provider.value, | |
| "provider_region": provider.region, | |
| "instance_config": instance_config, | |
| "result": result, | |
| "timestamp": datetime.utcnow().isoformat(), | |
| } | |
| self.active_deployments.append(deployment) | |
| self.deployment_history.append(deployment) | |
| # Store deployment in Redis | |
| if self.redis: | |
| await self.redis.lpush("deployment_history", json.dumps(deployment)) | |
| await self.redis.ltrim("deployment_history", 0, 1000) | |
| return result | |
| except Exception as e: | |
| logger.error(f"Error deploying to {provider.value}: {e}") | |
| return {"error": str(e)} | |
| async def get_all_resources(self) -> dict[str, Any]: | |
| """Get resources from all connected cloud providers""" | |
| resources = {"providers": [], "total_instances": 0, "total_cost_estimate": 0} | |
| for provider in self.cloud_providers: | |
| if provider.is_connected: | |
| try: | |
| provider_resources = await provider.get_resources() | |
| provider_resources["provider"] = provider.provider.value | |
| resources["total_instances"] += len(provider_resources.get("instances", [])) | |
| # Estimate cost (simplified) | |
| instance_costs = { | |
| "t2.micro": 0.011, | |
| "t2.small": 0.018, | |
| "t2.medium": 0.036, | |
| "t2.large": 0.072, | |
| "c2.large": 0.144, | |
| "m5.large": 0.192, | |
| "s-1vcpu-1gb": 0.010, | |
| "s-2vcpu-2gb": 0.020, | |
| "s-4vcpu-4gb": 0.040, | |
| "s-8vcpu-8gb": 0.080, | |
| "s-16vcpu-64gb": 1.280, | |
| "s-32vcpu-128gb": 2.560, | |
| "m6-16vcpu-64gb": 1.920, | |
| "c-32vcpu-128gb": 3.840, | |
| "m-32vcpu-16gb": 3.840, | |
| } | |
| cost_estimate = sum( | |
| [ | |
| instance_costs.get(instance.get("machine_type", ""), 0.01) | |
| for instance in provider_resources.get("instances", []) | |
| ] | |
| ) | |
| resources["cost_estimate"] = cost_estimate | |
| resources["resources"] = provider_resources | |
| except Exception as e: | |
| logger.error(f"Error getting resources from {provider.value}: {e}") | |
| resources = {"provider": provider.provider.value, "error": str(e)} | |
| return resources | |
| async def health_check_all_providers(self) -> dict[str, Any]: | |
| """Health check all cloud providers""" | |
| health_status = { | |
| "total_providers": len(self.cloud_providers), | |
| "healthy_providers": 0, | |
| "unhealthy_providers": 0, | |
| "provider_status": {}, | |
| } | |
| for provider in self.cloud_providers: | |
| try: | |
| is_healthy = await provider.test_connection() | |
| health_status[provider.provider.value] = { | |
| "connected": is_healthy, | |
| "last_check": datetime.utcnow().isoformat(), | |
| } | |
| if is_healthy: | |
| health_status["healthy_providers"] += 1 | |
| health_status["provider_status"][provider.provider.value] = health_status | |
| except Exception as e: | |
| health_status["provider_status"][provider.provider.value] = { | |
| "connected": False, | |
| "error": str(e), | |
| "last_check": datetime.utcnow().isoformat(), | |
| } | |
| return health_status | |
| async def start_monitoring(self): | |
| """Start continuous monitoring of all cloud providers""" | |
| self.is_running = True | |
| logger.info("Multi-cloud monitoring started") | |
| async def monitoring_loop(): | |
| while self.is_running: | |
| try: | |
| health_status = await self.health_check_all_providers() | |
| total_providers = health_status["total_providers"] | |
| healthy_providers = health_status["healthy_providers"] | |
| if healthy_providers < total_providers: | |
| logger.warning(f"Only {healthy_providers}/{total_providers} cloud providers are healthy") | |
| else: | |
| logger.info(f"All {healthy_providers}/{total_providers} cloud providers are healthy") | |
| # Take corrective action if needed | |
| for provider in self.cloud_providers: | |
| if not provider.is_connected: | |
| logger.error(f"Attempting to reconnect to {provider.provider.value}") | |
| await provider.test_connection() | |
| await asyncio.sleep(300) # Check every 5 minutes | |
| except Exception as e: | |
| logger.error(f"Monitoring loop error: {e}") | |
| await asyncio.sleep(60) | |
| self.monitoring_task = asyncio.create_task(monitoring_loop()) | |
| logger.info("Multi-cloud monitoring task started") | |
| async def stop_monitoring(self): | |
| """Stop multi-cloud monitoring""" | |
| self.is_running = False | |
| if hasattr(self, "monitoring_task"): | |
| self.monitoring_task.cancel() | |
| logger.info("Multi-cloud monitoring stopped") | |
| async def graceful_shutdown(self): | |
| """Gracefully shutdown multi-cloud orchestration""" | |
| self.is_running = False | |
| await self.stop_monitoring() | |
| logger.info("Multi-cloud orchestration gracefully shutdown") | |
| # Global multi-cloud instance | |
| multi_cloud_orchestrator = MultiCloudOrchestrator() | |
| # Initialization function | |
| async def initialize_multi_cloud_infrastructure(redis_client=None): | |
| """Initialize multi-cloud infrastructure""" | |
| orchestrator = MultiCloudOrchestrator() | |
| success = await orchestrator.initialize_providers(redis_client) | |
| if success: | |
| await orchestrator.start_monitoring() | |
| logger.info("Multi-cloud infrastructure initialized") | |
| return orchestrator | |