""" Capacity Estimation Node This node handles the estimation of compute capacity requirements for model deployment. Currently minimal implementation - placeholder for future capacity estimation logic. Key Features: - Compute capacity estimation (placeholder) - Resource requirement analysis (placeholder) - State management for workflow Author: ComputeAgent Team License: Private """ import logging import math from typing import Dict, Any logger = logging.getLogger("CapacityEstimation") # Mapping dtype to factor (bytes per parameter) DTYPE_FACTOR = { # Standard PyTorch dtypes "auto": 2, "half": 2, "float16": 2, "fp16": 2, "bfloat16": 2, "bf16": 2, "float": 4, "float32": 4, "fp32": 4, # Quantized dtypes "fp8": 1, "fp8_e4m3": 1, "fp8_e5m2": 1, "f8_e4m3": 1, # HuggingFace naming convention "f8_e5m2": 1, "int8": 1, "int4": 0.5, } KV_CACHE_DTYPE_FACTOR = { "auto": None, # Will be set to model dtype factor "float32": 4, "fp32": 4, "float16": 2, "fp16": 2, "bfloat16": 2, "bf16": 2, "fp8": 1, "fp8_e5m2": 1, "fp8_e4m3": 1, "f8_e4m3": 1, # HuggingFace naming convention "f8_e5m2": 1, "int8": 1, } # GPU specifications (in GB) GPU_SPECS = { "RTX 4090": 24, "RTX 5090": 32, } # GPU pricing (in EUR per hour) GPU_PRICING = { "RTX 4090": 0.2, "RTX 5090": 0.4, } def normalize_dtype(dtype: str) -> str: """ Normalize dtype string to a canonical form for consistent lookup. Args: dtype: Raw dtype string (e.g., "F8_E4M3", "BF16", "float16") Returns: Normalized dtype string in lowercase with underscores """ if not dtype: return "auto" # Convert to lowercase and handle common variations normalized = dtype.lower() # Handle HuggingFace safetensors naming conventions # F8_E4M3 -> f8_e4m3, BF16 -> bf16, etc. return normalized def get_dtype_factor(dtype: str, default: int = 2) -> float: """ Get the bytes-per-parameter factor for a given dtype. Args: dtype: Data type string default: Default factor if dtype not found Returns: Factor (bytes per parameter) """ normalized = normalize_dtype(dtype) return DTYPE_FACTOR.get(normalized, default) def estimate_vllm_gpu_memory( num_params: int, dtype: str = "auto", num_hidden_layers: int = None, hidden_size: int = None, intermediate_size: int = None, num_key_value_heads: int = None, head_dim: int = None, max_model_len: int = 2048, max_num_seqs: int = 256, max_num_batched_tokens: int = 2048, kv_cache_dtype: str = "auto", gpu_memory_utilization: float = 0.9, cpu_offload_gb: float = 0.0, is_quantized: bool = None # NEW: indicate if num_params is already quantized ) -> float: """ Estimate GPU memory for a model. Handles: 1. Full parameter info -> detailed estimation 2. Only num_params and dtype -> rough estimation Returns memory in GB Args: num_params: Number of parameters. For quantized models from HF API, this is already in the quantized format. is_quantized: If True, num_params represents quantized size. If None, auto-detect from dtype. """ constant_margin = 1.5 dtype_factor = get_dtype_factor(dtype, default=2) # Auto-detect if model is quantized if is_quantized is None: quantized_dtypes = ["fp8", "f8_e4m3", "f8_e5m2", "int8", "int4", "fp8_e4m3", "fp8_e5m2"] is_quantized = normalize_dtype(dtype) in quantized_dtypes # Case 1: Only num_params available (simplified) if None in [num_hidden_layers, hidden_size, intermediate_size, num_key_value_heads, head_dim]: if is_quantized: # num_params already represents quantized size # HF API returns parameter count in the quantized dtype # So we DON'T multiply by dtype_factor again model_weight = num_params / 1e9 # Already accounts for quantization else: # For non-quantized models, calculate weight from params model_weight = (num_params * dtype_factor) / 1e9 # Rough activation estimate (typically FP16 regardless of weight dtype) # Activation memory is roughly 1-2x model weight for transformer models activation_estimate = model_weight * 1.5 estimated_gpu_memory = (model_weight + activation_estimate + constant_margin) / gpu_memory_utilization - cpu_offload_gb return estimated_gpu_memory # Case 2: Full info available -> detailed vLLM formula if is_quantized: model_weight = num_params / 1e9 else: model_weight = (num_params * dtype_factor) / 1e9 if kv_cache_dtype == "auto": # For quantized models, KV cache often uses FP16/BF16, not FP8 kv_cache_dtype_factor = 2 if is_quantized else dtype_factor else: normalized_kv = normalize_dtype(kv_cache_dtype) kv_cache_dtype_factor = KV_CACHE_DTYPE_FACTOR.get(normalized_kv, 2) per_seq_kv_cache_memory = (2 * num_key_value_heads * head_dim * num_hidden_layers * kv_cache_dtype_factor * max_model_len) / 1e9 total_kv_cache_memory = min( per_seq_kv_cache_memory * max_num_seqs, (2 * num_hidden_layers * hidden_size * kv_cache_dtype_factor * max_num_batched_tokens) / 1e9 ) # Activations are typically FP16/BF16 even for quantized models activation_dtype_factor = 2 # Assume FP16 activations activation_peak_memory = max_model_len * ((18 * hidden_size) + (4 * intermediate_size)) * activation_dtype_factor / 1e9 required_gpu_memory = (model_weight + total_kv_cache_memory + activation_peak_memory + constant_margin) / gpu_memory_utilization - cpu_offload_gb return required_gpu_memory def calculate_gpu_requirements(estimated_memory_gb: float) -> Dict[str, Any]: """ Calculate number of GPUs needed and costs for different GPU types. Args: estimated_memory_gb: Estimated GPU memory requirement in GB Returns: Dictionary containing GPU requirements and cost information """ gpu_requirements = {} cost_estimates = {} for gpu_type, gpu_memory in GPU_SPECS.items(): # Account for ~10% overhead for communication and fragmentation in multi-GPU setup usable_memory = gpu_memory * 0.9 num_gpus = math.ceil(estimated_memory_gb / usable_memory) # Calculate costs hourly_cost = num_gpus * GPU_PRICING[gpu_type] daily_cost = hourly_cost * 24 weekly_cost = hourly_cost * 24 * 7 gpu_requirements[gpu_type] = num_gpus cost_estimates[gpu_type] = { "hourly": hourly_cost, "daily": daily_cost, "weekly": weekly_cost } return { "gpu_requirements": gpu_requirements, "cost_estimates": cost_estimates } async def capacity_estimation_node(state: Dict[str, Any]) -> Dict[str, Any]: """ Estimate GPU memory for a model deployment using vLLM-based computation. Handles both initial estimation and re-estimation with custom inference config. """ # Check if this is a re-estimation is_re_estimation = state.get("needs_re_estimation", False) if is_re_estimation: logger.info("🔄 Starting capacity re-estimation with custom inference configuration") # Reset the re-estimation flag state["needs_re_estimation"] = False state["capacity_approved"] = False else: logger.info("⚡ Starting capacity estimation node") try: model_name = state.get("model_name") model_info = state.get("model_info") if not model_name or not model_info: logger.error("❌ Missing model information") state["capacity_estimation_status"] = "error" state["error"] = "Model information required for capacity estimation" return state # Extract safetensors info dtype = model_info.get("dtype", "auto") num_params = model_info.get("num_params", None) # Extract required parameters for GPU memory estimation params = { "num_params": num_params, "dtype": dtype, "num_hidden_layers": model_info.get("num_hidden_layers"), "hidden_size": model_info.get("hidden_size"), "intermediate_size": model_info.get("intermediate_size"), "num_key_value_heads": model_info.get("num_key_value_heads"), "head_dim": model_info.get("head_dim"), "max_model_len": model_info.get("max_model_len", 2048), "max_num_seqs": model_info.get("max_num_seqs", 256), "max_num_batched_tokens": model_info.get("max_num_batched_tokens", 2048), "kv_cache_dtype": model_info.get("kv_cache_dtype", "auto"), "gpu_memory_utilization": model_info.get("gpu_memory_utilization", 0.9), "cpu_offload_gb": model_info.get("cpu_offload_gb", 0.0) } estimated_gpu_memory = estimate_vllm_gpu_memory(**params) # Calculate GPU requirements and costs gpu_data = calculate_gpu_requirements(estimated_gpu_memory) gpu_requirements = gpu_data["gpu_requirements"] cost_estimates = gpu_data["cost_estimates"] # Store in state state["estimated_gpu_memory"] = estimated_gpu_memory state["gpu_requirements"] = gpu_requirements state["cost_estimates"] = cost_estimates state["capacity_estimation_status"] = "success" # Build comprehensive response model_size_b = num_params / 1e9 if num_params else "Unknown" # Model architecture details architecture_info = [] if model_info.get("num_hidden_layers"): architecture_info.append(f"**Layers:** {model_info['num_hidden_layers']}") if model_info.get("hidden_size"): architecture_info.append(f"**Hidden Size:** {model_info['hidden_size']}") if model_info.get("num_attention_heads"): architecture_info.append(f"**Attention Heads:** {model_info['num_attention_heads']}") if model_info.get("num_key_value_heads"): architecture_info.append(f"**KV Heads:** {model_info['num_key_value_heads']}") if model_info.get("intermediate_size"): architecture_info.append(f"**Intermediate Size:** {model_info['intermediate_size']}") if model_info.get("max_position_embeddings"): architecture_info.append(f"**Max Position Embeddings:** {model_info['max_position_embeddings']}") architecture_section = "\n ".join(architecture_info) if architecture_info else "Limited architecture information available" # Inference configuration inference_config = f"""**Max Model Length:** {params['max_model_len']} **Max Sequences:** {params['max_num_seqs']} **Max Batched Tokens:** {params['max_num_batched_tokens']} **KV Cache dtype:** {params['kv_cache_dtype']} **GPU Memory Utilization:** {params['gpu_memory_utilization']*100:.0f}%""" # GPU requirements and cost section gpu_req_lines = [] cost_lines = [] # Highlight RTX 4090 and 5090 for gpu_type in ["RTX 4090", "RTX 5090"]: if gpu_type in gpu_requirements: num_gpus = gpu_requirements[gpu_type] gpu_memory = GPU_SPECS[gpu_type] costs = cost_estimates[gpu_type] gpu_req_lines.append(f"**{gpu_type}** ({gpu_memory}GB): **{num_gpus} GPU{'s' if num_gpus > 1 else ''}**") cost_lines.append(f"**{gpu_type}:** €{costs['hourly']:.2f}/hour | €{costs['daily']:.2f}/day | €{costs['weekly']:.2f}/week") gpu_requirements_section = "\n ".join(gpu_req_lines) cost_section = "\n ".join(cost_lines) # Build final response estimation_title = "**Capacity Re-Estimation Complete**" if is_re_estimation else "**Capacity Estimation Complete**" custom_note = "*Note: Re-estimated with custom inference configuration. " if is_re_estimation else "*Note: " GPU_type = state['custom_inference_config']['GPU_type'] if is_re_estimation else model_info.get('GPU_type', 'RTX 4090') location = state['custom_inference_config']['location'] if is_re_estimation else model_info.get('location', 'UAE-1') state["response"] = f""" {estimation_title} **Model Information:** **Name:** {model_name} **Parameters:** {model_size_b:.2f}B **Data Type:** {dtype} **Architecture Details:** {architecture_section} **Inference Configuration:** {inference_config} **Estimated GPU Memory Required:** {estimated_gpu_memory:.2f} GB **GPU Requirements:** {gpu_requirements_section} **Cost Estimates:** {cost_section} **Selected GPU Type:** {GPU_type} **Deployment Location:** {location} {custom_note}This estimation includes model weights, KV cache, activation peak, and a safety margin. Multi-GPU setups account for ~10% overhead for communication.*""" logger.info(f"✅ Estimated GPU memory: {estimated_gpu_memory:.2f} GB") logger.info(f"📊 GPU Requirements: RTX 4090: {gpu_requirements.get('RTX 4090', 'N/A')}, RTX 5090: {gpu_requirements.get('RTX 5090', 'N/A')}") # Prepare state for human approval - set pending capacity approval state["pending_capacity_approval"] = True state["needs_re_estimation"] = False # Reset flag after processing state["current_step"] = "capacity_estimation_complete" except Exception as e: logger.error(f"❌ Error in capacity estimation: {str(e)}") state["capacity_estimation_status"] = "error" state["error"] = str(e) state["response"] = f"""❌ **Capacity Estimation Failed** **Model:** {state.get('model_name', 'Unknown')} **Error:** {str(e)} Please check if: 1. The model exists on HuggingFace 2. You have access to the model (if it's gated) 3. Your HuggingFace token is valid""" return state