Spaces:
Sleeping
Sleeping
| import os | |
| import json | |
| from typing import Tuple | |
| from easydict import EasyDict | |
| import yaml | |
| import subprocess | |
| from enum import Enum, unique | |
| from ding.interaction.base import split_http_address | |
| from .default_helper import one_time_warning | |
| DEFAULT_NAMESPACE = 'default' | |
| DEFAULT_POD_NAME = 'dijob-example-coordinator' | |
| DEFAULT_API_VERSION = '/v1alpha1' | |
| DEFAULT_K8S_COLLECTOR_PORT = 22270 | |
| DEFAULT_K8S_LEARNER_PORT = 22271 | |
| DEFAULT_K8S_AGGREGATOR_SLAVE_PORT = 22272 | |
| DEFAULT_K8S_COORDINATOR_PORT = 22273 | |
| DEFAULT_K8S_AGGREGATOR_MASTER_PORT = 22273 | |
| def get_operator_server_kwargs(cfg: EasyDict) -> dict: | |
| """ | |
| Overview: | |
| Get kwarg dict from config file | |
| Arguments: | |
| - cfg (:obj:`EasyDict`) System config | |
| Returns: | |
| - result (:obj:`dict`) Containing ``api_version``, ``namespace``, ``name``, ``port``, ``host``. | |
| """ | |
| namespace = os.environ.get('KUBERNETES_POD_NAMESPACE', DEFAULT_NAMESPACE) | |
| name = os.environ.get('KUBERNETES_POD_NAME', DEFAULT_POD_NAME) | |
| url = cfg.get('system_addr', None) or os.environ.get('KUBERNETES_SERVER_URL', None) | |
| assert url, 'please set environment variable KUBERNETES_SERVER_URL in Kubenetes platform.' | |
| api_version = cfg.get('api_version', None) \ | |
| or os.environ.get('KUBERNETES_SERVER_API_VERSION', DEFAULT_API_VERSION) | |
| try: | |
| host, port = url.split(":")[0], int(url.split(":")[1]) | |
| except Exception as e: | |
| host, port, _, _ = split_http_address(url) | |
| return { | |
| 'api_version': api_version, | |
| 'namespace': namespace, | |
| 'name': name, | |
| 'host': host, | |
| 'port': port, | |
| } | |
| def exist_operator_server() -> bool: | |
| """ | |
| Overview: | |
| Check if the 'KUBERNETES_SERVER_URL' environment variable exists. | |
| """ | |
| return 'KUBERNETES_SERVER_URL' in os.environ | |
| def pod_exec_command(kubeconfig: str, name: str, namespace: str, cmd: str) -> Tuple[int, str]: | |
| """ | |
| Overview: | |
| Execute command in pod | |
| Arguments: | |
| - kubeconfig (:obj:`str`) The path of kubeconfig file | |
| - name (:obj:`str`) The name of pod | |
| - namespace (:obj:`str`) The namespace of pod | |
| """ | |
| try: | |
| from kubernetes import config | |
| from kubernetes.client import CoreV1Api | |
| from kubernetes.client.rest import ApiException | |
| from kubernetes.stream import stream | |
| except ModuleNotFoundError as e: | |
| one_time_warning("You have not installed kubernetes package! Please try 'pip install DI-engine[k8s]'.") | |
| exit(-1) | |
| config.load_kube_config(config_file=kubeconfig) | |
| core_v1 = CoreV1Api() | |
| resp = None | |
| try: | |
| resp = core_v1.read_namespaced_pod(name=name, namespace=namespace) | |
| except ApiException as e: | |
| if e.status != 404: | |
| return -1, "Unknown error: %s" % e | |
| if not resp: | |
| return -1, f"Pod {name} does not exist." | |
| if resp.status.phase != 'Running': | |
| return -1, f"Pod {name} is not in Running." | |
| exec_command = ['/bin/sh', '-c', cmd] | |
| resp = stream( | |
| core_v1.connect_get_namespaced_pod_exec, | |
| name, | |
| namespace, | |
| command=exec_command, | |
| stderr=False, | |
| stdin=False, | |
| stdout=True, | |
| tty=False | |
| ) | |
| resp = resp.replace("\'", "\"") \ | |
| .replace('None', 'null') \ | |
| .replace(': False', ': 0') \ | |
| .replace(': True', ': 1') \ | |
| .replace('"^([+-]?[0-9.]+)([eEinumkKMGTP]*[-+]?[0-9]*)$"', '\\"^([+-]?[0-9.]+)([eEinumkKMGTP]*[-+]?[0-9]*)$\\"') | |
| resp = json.loads(resp) | |
| return resp['code'], resp['message'] | |
| class K8sType(Enum): | |
| Local = 1 | |
| K3s = 2 | |
| class K8sLauncher(object): | |
| """ | |
| Overview: | |
| object to manage the K8s cluster | |
| Interfaces: | |
| ``__init__``, ``_load``, ``create_cluster``, ``_check_k3d_tools``, ``delete_cluster``, ``preload_images`` | |
| """ | |
| def __init__(self, config_path: str) -> None: | |
| """ | |
| Overview: | |
| Initialize the K8sLauncher object. | |
| Arguments: | |
| - config_path (:obj:`str`): The path of the config file. | |
| """ | |
| self.name = None | |
| self.servers = 1 | |
| self.agents = 0 | |
| self.type = K8sType.Local | |
| self._images = [] | |
| self._load(config_path) | |
| self._check_k3d_tools() | |
| def _load(self, config_path: str) -> None: | |
| """ | |
| Overview: | |
| Load the config file. | |
| Arguments: | |
| - config_path (:obj:`str`): The path of the config file. | |
| """ | |
| with open(config_path, 'r') as f: | |
| data = yaml.safe_load(f) | |
| self.name = data.get('name') if data.get('name') else self.name | |
| if data.get('servers'): | |
| if type(data.get('servers')) is not int: | |
| raise TypeError(f"servers' type is expected int, actual {type(data.get('servers'))}") | |
| self.servers = data.get('servers') | |
| if data.get('agents'): | |
| if type(data.get('agents')) is not int: | |
| raise TypeError(f"agents' type is expected int, actual {type(data.get('agents'))}") | |
| self.agents = data.get('agents') | |
| if data.get('type'): | |
| if data.get('type') == 'k3s': | |
| self.type = K8sType.K3s | |
| elif data.get('type') == 'local': | |
| self.type = K8sType.Local | |
| else: | |
| raise ValueError(f"no type found for {data.get('type')}") | |
| if data.get('preload_images'): | |
| if type(data.get('preload_images')) is not list: | |
| raise TypeError(f"preload_images' type is expected list, actual {type(data.get('preload_images'))}") | |
| self._images = data.get('preload_images') | |
| def _check_k3d_tools(self) -> None: | |
| """ | |
| Overview: | |
| Check if the k3d tools exist. | |
| """ | |
| if self.type != K8sType.K3s: | |
| return | |
| args = ['which', 'k3d'] | |
| proc = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE) | |
| out, _ = proc.communicate() | |
| if out.decode('utf-8') == '': | |
| raise FileNotFoundError( | |
| "No k3d tools found, please install by executing ./ding/scripts/install-k8s-tools.sh" | |
| ) | |
| def create_cluster(self) -> None: | |
| """ | |
| Overview: | |
| Create the k8s cluster. | |
| """ | |
| print('Creating k8s cluster...') | |
| if self.type != K8sType.K3s: | |
| return | |
| args = ['k3d', 'cluster', 'create', f'{self.name}', f'--servers={self.servers}', f'--agents={self.agents}'] | |
| proc = subprocess.Popen(args, stderr=subprocess.PIPE) | |
| _, err = proc.communicate() | |
| err_str = err.decode('utf-8').strip() | |
| if err_str != '' and 'WARN' not in err_str: | |
| if 'already exists' in err_str: | |
| print('K8s cluster already exists') | |
| else: | |
| raise RuntimeError(f'Failed to create cluster {self.name}: {err_str}') | |
| # preload images | |
| self.preload_images(self._images) | |
| def delete_cluster(self) -> None: | |
| """ | |
| Overview: | |
| Delete the k8s cluster. | |
| """ | |
| print('Deleting k8s cluster...') | |
| if self.type != K8sType.K3s: | |
| return | |
| args = ['k3d', 'cluster', 'delete', f'{self.name}'] | |
| proc = subprocess.Popen(args, stderr=subprocess.PIPE) | |
| _, err = proc.communicate() | |
| err_str = err.decode('utf-8').strip() | |
| if err_str != '' and 'WARN' not in err_str and \ | |
| 'NotFound' not in err_str: | |
| raise RuntimeError(f'Failed to delete cluster {self.name}: {err_str}') | |
| def preload_images(self, images: list) -> None: | |
| """ | |
| Overview: | |
| Preload images. | |
| """ | |
| if self.type != K8sType.K3s or len(images) == 0: | |
| return | |
| args = ['k3d', 'image', 'import', f'--cluster={self.name}'] | |
| args += images | |
| proc = subprocess.Popen(args, stderr=subprocess.PIPE) | |
| _, err = proc.communicate() | |
| err_str = err.decode('utf-8').strip() | |
| if err_str != '' and 'WARN' not in err_str: | |
| raise RuntimeError(f'Failed to preload images: {err_str}') | |