import logging from typing import List, Optional, Dict, Any, Union from pydantic import BaseModel, Field from typing import Any from enum import Enum from constant import Constants import requests # Enum for instance status class InstanceStatus(Enum): CREATED = 0 DEPLOYED = 1 STARTING = 2 RUNNING = 3 ERRORED = 4 TERMINATING = 5 TERMINATED = 6 STOPPING = 7 STOPPED = 8 class Timestamp(BaseModel): seconds: int nanos: int # GPU model class GPUInfo(BaseModel): model: str # Port info class PortInfo(BaseModel): protocol: str container_port: int node_port: int class InstanceSpending(BaseModel): instance_id: str hourly_price: float total_spend: float # InstanceInfo for GET method class InstanceInfo(BaseModel): # New fields added id: Optional[str] = None deployment_id: Optional[str] = None name: Optional[str] = None user_id: Optional[str] = None container_image: Optional[str] = None status: Optional[InstanceStatus] = None status_string: Optional[str] = None additional_info: Optional[str] = None type: Optional[int] = None created_at: Optional[Timestamp] = None updated_at: Optional[Timestamp] = None ready_at: Optional[Timestamp] = None stopped_at: Optional[Timestamp] = None cpu: Optional[int] = None memory: Optional[int] = None gpu: Optional[List[GPUInfo]] = None disk: Optional[int] = None bandwidth: Optional[int] = None ssh_key_id: Optional[str] = None location: Optional[str] = None ports: Optional[Dict[str, PortInfo]] = None hive_environment_variables: Optional[Dict[str, Any]] = None environment_variables: Optional[Dict[str, Any]] = None runtime: Optional[int] = None spending: Optional[InstanceSpending] = None def __init__(self, **data): super().__init__(**data) if self.status_string is None and isinstance(self.status, InstanceStatus): self.status_string = self.status.name # Spending info model class InstanceSpending(BaseModel): instance_id: str hourly_price: float total_spend: float # Configuration mappings based on the UI tables GPU_CONFIGS = { "1x RTX 4090": { "gpu": ["RTX 4090"], "cpu": 8, "memory": 48, "disk": 250, "bandwidth": 1000 }, "2x RTX 4090": { "gpu": ["RTX 4090", "RTX 4090"], "cpu": 16, "memory": 96, "disk": 500, "bandwidth": 1000 }, "4x RTX 4090": { "gpu": ["RTX 4090", "RTX 4090", "RTX 4090", "RTX 4090"], "cpu": 32, "memory": 192, "disk": 1000, "bandwidth": 1000 }, "8x RTX 4090": { "gpu": ["RTX 4090", "RTX 4090", "RTX 4090", "RTX 4090", "RTX 4090", "RTX 4090", "RTX 4090", "RTX 4090"], "cpu": 64, "memory": 384, "disk": 2000, "bandwidth": 1000 }, "1x RTX 5090": { "gpu": ["RTX 5090"], "cpu": 8, "memory": 48, "disk": 250, "bandwidth": 1000 }, "2x RTX 5090": { "gpu": ["RTX 5090", "RTX 5090"], "cpu": 16, "memory": 96, "disk": 500, "bandwidth": 1000 }, "4x RTX 5090": { "gpu": ["RTX 5090", "RTX 5090", "RTX 5090", "RTX 5090"], "cpu": 32, "memory": 192, "disk": 1000, "bandwidth": 1000 }, "8x RTX 5090": { "gpu": ["RTX 5090", "RTX 5090", "RTX 5090", "RTX 5090", "RTX 5090", "RTX 5090", "RTX 5090", "RTX 5090"], "cpu": 64, "memory": 384, "disk": 2000, "bandwidth": 1000 } } VCPU_CONFIGS = { "2vCPU": { "gpu": [], "cpu": 2, "memory": 4, "disk": 50, "bandwidth": 250 }, "4vCPU": { "gpu": [], "cpu": 4, "memory": 8, "disk": 100, "bandwidth": 250 }, "8vCPU": { "gpu": [], "cpu": 8, "memory": 16, "disk": 200, "bandwidth": 500 }, "16vCPU": { "gpu": [], "cpu": 16, "memory": 32, "disk": 400, "bandwidth": 1000 }, "32vCPU": { "gpu": [], "cpu": 32, "memory": 64, "disk": 800, "bandwidth": 1000 } } # Location-GPU validation map (using API format - lowercase) LOCATION_GPU_MAP = { "france": ["RTX 4090"], "uae": ["RTX 4090"], "texas": ["RTX 5090"], "uae-2": ["RTX 5090"] } class HiveComputeAPI: """ A wrapper class that provides methods to interact with the Hive Compute API. """ def __init__(self, base_url: str = Constants.HIVE_COMPUTE_BASE_API_URL, token: str = Constants.HIVE_COMPUTE_DEFAULT_API_TOKEN): """ Initializes the HiveComputeAPI handler. Args: base_url (str): The base URL of the Hive Compute API. token (str): The authentication token for the Hive Compute API. Note: The ModelRouter will automatically refresh the map of served models upon initialization. """ self.base_url = base_url.strip("/") self.token = token self.logger = logging.getLogger(__name__) def __fetch_instance_structure(self, instance_json) -> InstanceInfo: """ Fetches the structure of an instance from the API. Returns: InstanceInfo: An InstanceInfo object representing the structure of an instance. """ # Ensure instance_json is a dict if not isinstance(instance_json, dict): return {} # Convert only problematic fields if "status" in instance_json and not isinstance(instance_json["status"], InstanceStatus): try: instance_json["status"] = InstanceStatus(instance_json["status"]) except Exception: instance_json["status"] = InstanceStatus.CREATED for field in ["created_at", "updated_at", "ready_at", "stopped_at"]: value = instance_json.get(field) if isinstance(value, dict): instance_json[field] = Timestamp(**value) else: instance_json[field] = None if "gpu" in instance_json: instance_json["gpu"] = [GPUInfo(**gpu) for gpu in instance_json.get("gpu", []) if isinstance(gpu, dict)] if "ports" in instance_json: instance_json["ports"] = {k: PortInfo(**v) for k, v in instance_json.get("ports", {}).items() if isinstance(v, dict)} return InstanceInfo(**instance_json) def get_all_instances(self) -> List[InstanceInfo]: """ Fetches all compute instances for the authenticated user. Returns: List[InstanceInfo]: A list of InstanceInfo objects representing the user's compute instances. """ try: response = requests.get(f"{self.base_url}/instances", headers={ "Authorization": f"Bearer {self.token}" }) response.raise_for_status() response_json = response.json() spending_map = response_json.get("spending", {}) instances = [] for inst in response_json.get("instances", []): inst_struct = self.__fetch_instance_structure(inst) spend = spending_map.get(inst.get("id")) if spend: inst_struct.spending = InstanceSpending(**spend) instances.append(InstanceInfo.model_validate(inst_struct)) return instances except requests.RequestException as e: self.logger.error(f"Failed to fetch instances: {e}") return [] def create_instance( self, name: str = "default", location: str = "uae", # Changed default to API format config: str = "1x RTX 4090", container_image: str = "Dockerfile.vulkan", tcp_ports: Optional[List[int]] = None, https_ports: Optional[List[int]] = None, udp_ports: Optional[List[int]] = None, launch_jupyter_notebook: bool = False, instance_type: int = 0, custom_config: Optional[Dict[str, Any]] = None ) -> Optional[Dict[str, Any]]: """ Creates a new compute instance using predefined configurations or custom settings. Args: name (str): Name of the instance. Defaults to "default". location (str): Location where the instance will be deployed. Defaults to "uae". Valid locations: france, uae, texas, uae-2 config (str): Predefined configuration. Options: GPU configs: "1x RTX 4090", "2x RTX 4090", "4x RTX 4090", "8x RTX 4090", "1x RTX 5090", "2x RTX 5090", "4x RTX 5090", "8x RTX 5090" vCPU configs: "2vCPU", "4vCPU", "8vCPU", "16vCPU", "32vCPU" Defaults to "1x RTX 4090". container_image (str): Docker container image to use. Defaults to "Dockerfile.vulkan". tcp_ports (List[int], optional): List of TCP ports to expose. https_ports (List[int], optional): List of HTTPS ports to expose. udp_ports (List[int], optional): List of UDP ports to expose. launch_jupyter_notebook (bool): Whether to launch Jupyter notebook. Defaults to False. instance_type (int): Type of instance. Defaults to 0. custom_config (Dict[str, Any], optional): Custom configuration to override defaults. Keys: cpu, memory, disk, bandwidth, gpu Returns: Optional[Dict[str, Any]]: A dictionary with 'id' and 'status' keys if successful, None otherwise. Raises: ValueError: If configuration is invalid or GPU type not available in location. """ # Combine all configs ALL_CONFIGS = {**GPU_CONFIGS, **VCPU_CONFIGS} # Validate configuration if config not in ALL_CONFIGS: available_configs = list(ALL_CONFIGS.keys()) raise ValueError( f"Invalid config: {config}. Available configs: {available_configs}" ) # Get base configuration instance_config = ALL_CONFIGS[config].copy() # Apply custom config if provided if custom_config: instance_config.update(custom_config) # Validate location if location not in LOCATION_GPU_MAP: raise ValueError( f"Invalid location: {location}. Valid locations: {list(LOCATION_GPU_MAP.keys())}" ) # Validate GPU type for location (only if GPU instance) if instance_config["gpu"]: # If not empty (i.e., GPU instance) gpu_type = instance_config["gpu"][0] # Get the GPU model if gpu_type not in LOCATION_GPU_MAP[location]: raise ValueError( f"GPU type '{gpu_type}' not available in location '{location}'. " f"Available GPUs: {LOCATION_GPU_MAP[location]}" ) # Build the payload - exact format matching the API request payload = { "bandwidth": instance_config["bandwidth"], "container_image": container_image, "cpu": instance_config["cpu"], "disk": instance_config["disk"], "gpu": instance_config["gpu"], "https_ports": https_ports if https_ports is not None else [8888], "launch_jupyter_notebook": launch_jupyter_notebook, "location": location, "memory": instance_config["memory"], "name": name, "tcp_ports": tcp_ports if tcp_ports is not None else [], "type": instance_type, "udp_ports": udp_ports if udp_ports is not None else [] } # Log the payload for debugging self.logger.info(f"Creating instance with payload: {payload}") try: response = requests.post( f"{self.base_url}/instances/instance", headers={ "Authorization": f"Bearer {self.token}", "Content-Type": "application/json" }, json=payload ) # Log response details for debugging self.logger.info(f"Response status code: {response.status_code}") if response.status_code != 200: self.logger.error(f"Response body: {response.text}") response.raise_for_status() response_data = response.json() instance_data = response_data.get("instance", {}) return { "id": instance_data.get("id"), "status": instance_data.get("status") } except requests.RequestException as e: self.logger.error(f"Failed to create instance: {e}") if hasattr(e, 'response') and e.response is not None: self.logger.error(f"Response content: {e.response.text}") return None def get_available_locations(self, gpu_type: Optional[str] = None) -> List[str]: """ Get available locations, optionally filtered by GPU type. Args: gpu_type (str, optional): GPU model to filter locations by. Returns: List[str]: List of available locations. """ if gpu_type: return [loc for loc, gpus in LOCATION_GPU_MAP.items() if gpu_type in gpus] return list(LOCATION_GPU_MAP.keys())