carraraig's picture
Hello
8816dfd
"""
Capacity Estimation Node
This node handles the estimation of compute capacity requirements for model deployment.
Currently minimal implementation - placeholder for future capacity estimation logic.
Key Features:
- Compute capacity estimation (placeholder)
- Resource requirement analysis (placeholder)
- State management for workflow
Author: ComputeAgent Team
License: Private
"""
import logging
import math
from typing import Dict, Any
logger = logging.getLogger("CapacityEstimation")
# Mapping dtype to factor (bytes per parameter)
DTYPE_FACTOR = {
# Standard PyTorch dtypes
"auto": 2,
"half": 2,
"float16": 2,
"fp16": 2,
"bfloat16": 2,
"bf16": 2,
"float": 4,
"float32": 4,
"fp32": 4,
# Quantized dtypes
"fp8": 1,
"fp8_e4m3": 1,
"fp8_e5m2": 1,
"f8_e4m3": 1, # HuggingFace naming convention
"f8_e5m2": 1,
"int8": 1,
"int4": 0.5,
}
KV_CACHE_DTYPE_FACTOR = {
"auto": None, # Will be set to model dtype factor
"float32": 4,
"fp32": 4,
"float16": 2,
"fp16": 2,
"bfloat16": 2,
"bf16": 2,
"fp8": 1,
"fp8_e5m2": 1,
"fp8_e4m3": 1,
"f8_e4m3": 1, # HuggingFace naming convention
"f8_e5m2": 1,
"int8": 1,
}
# GPU specifications (in GB)
GPU_SPECS = {
"RTX 4090": 24,
"RTX 5090": 32,
}
# GPU pricing (in EUR per hour)
GPU_PRICING = {
"RTX 4090": 0.2,
"RTX 5090": 0.4,
}
def normalize_dtype(dtype: str) -> str:
"""
Normalize dtype string to a canonical form for consistent lookup.
Args:
dtype: Raw dtype string (e.g., "F8_E4M3", "BF16", "float16")
Returns:
Normalized dtype string in lowercase with underscores
"""
if not dtype:
return "auto"
# Convert to lowercase and handle common variations
normalized = dtype.lower()
# Handle HuggingFace safetensors naming conventions
# F8_E4M3 -> f8_e4m3, BF16 -> bf16, etc.
return normalized
def get_dtype_factor(dtype: str, default: int = 2) -> float:
"""
Get the bytes-per-parameter factor for a given dtype.
Args:
dtype: Data type string
default: Default factor if dtype not found
Returns:
Factor (bytes per parameter)
"""
normalized = normalize_dtype(dtype)
return DTYPE_FACTOR.get(normalized, default)
def estimate_vllm_gpu_memory(
num_params: int,
dtype: str = "auto",
num_hidden_layers: int = None,
hidden_size: int = None,
intermediate_size: int = None,
num_key_value_heads: int = None,
head_dim: int = None,
max_model_len: int = 2048,
max_num_seqs: int = 256,
max_num_batched_tokens: int = 2048,
kv_cache_dtype: str = "auto",
gpu_memory_utilization: float = 0.9,
cpu_offload_gb: float = 0.0,
is_quantized: bool = None # NEW: indicate if num_params is already quantized
) -> float:
"""
Estimate GPU memory for a model. Handles:
1. Full parameter info -> detailed estimation
2. Only num_params and dtype -> rough estimation
Returns memory in GB
Args:
num_params: Number of parameters. For quantized models from HF API,
this is already in the quantized format.
is_quantized: If True, num_params represents quantized size.
If None, auto-detect from dtype.
"""
constant_margin = 1.5
dtype_factor = get_dtype_factor(dtype, default=2)
# Auto-detect if model is quantized
if is_quantized is None:
quantized_dtypes = ["fp8", "f8_e4m3", "f8_e5m2", "int8", "int4", "fp8_e4m3", "fp8_e5m2"]
is_quantized = normalize_dtype(dtype) in quantized_dtypes
# Case 1: Only num_params available (simplified)
if None in [num_hidden_layers, hidden_size, intermediate_size, num_key_value_heads, head_dim]:
if is_quantized:
# num_params already represents quantized size
# HF API returns parameter count in the quantized dtype
# So we DON'T multiply by dtype_factor again
model_weight = num_params / 1e9 # Already accounts for quantization
else:
# For non-quantized models, calculate weight from params
model_weight = (num_params * dtype_factor) / 1e9
# Rough activation estimate (typically FP16 regardless of weight dtype)
# Activation memory is roughly 1-2x model weight for transformer models
activation_estimate = model_weight * 1.5
estimated_gpu_memory = (model_weight + activation_estimate + constant_margin) / gpu_memory_utilization - cpu_offload_gb
return estimated_gpu_memory
# Case 2: Full info available -> detailed vLLM formula
if is_quantized:
model_weight = num_params / 1e9
else:
model_weight = (num_params * dtype_factor) / 1e9
if kv_cache_dtype == "auto":
# For quantized models, KV cache often uses FP16/BF16, not FP8
kv_cache_dtype_factor = 2 if is_quantized else dtype_factor
else:
normalized_kv = normalize_dtype(kv_cache_dtype)
kv_cache_dtype_factor = KV_CACHE_DTYPE_FACTOR.get(normalized_kv, 2)
per_seq_kv_cache_memory = (2 * num_key_value_heads * head_dim * num_hidden_layers *
kv_cache_dtype_factor * max_model_len) / 1e9
total_kv_cache_memory = min(
per_seq_kv_cache_memory * max_num_seqs,
(2 * num_hidden_layers * hidden_size * kv_cache_dtype_factor * max_num_batched_tokens) / 1e9
)
# Activations are typically FP16/BF16 even for quantized models
activation_dtype_factor = 2 # Assume FP16 activations
activation_peak_memory = max_model_len * ((18 * hidden_size) + (4 * intermediate_size)) * activation_dtype_factor / 1e9
required_gpu_memory = (model_weight + total_kv_cache_memory + activation_peak_memory + constant_margin) / gpu_memory_utilization - cpu_offload_gb
return required_gpu_memory
def calculate_gpu_requirements(estimated_memory_gb: float) -> Dict[str, Any]:
"""
Calculate number of GPUs needed and costs for different GPU types.
Args:
estimated_memory_gb: Estimated GPU memory requirement in GB
Returns:
Dictionary containing GPU requirements and cost information
"""
gpu_requirements = {}
cost_estimates = {}
for gpu_type, gpu_memory in GPU_SPECS.items():
# Account for ~10% overhead for communication and fragmentation in multi-GPU setup
usable_memory = gpu_memory * 0.9
num_gpus = math.ceil(estimated_memory_gb / usable_memory)
# Calculate costs
hourly_cost = num_gpus * GPU_PRICING[gpu_type]
daily_cost = hourly_cost * 24
weekly_cost = hourly_cost * 24 * 7
gpu_requirements[gpu_type] = num_gpus
cost_estimates[gpu_type] = {
"hourly": hourly_cost,
"daily": daily_cost,
"weekly": weekly_cost
}
return {
"gpu_requirements": gpu_requirements,
"cost_estimates": cost_estimates
}
async def capacity_estimation_node(state: Dict[str, Any]) -> Dict[str, Any]:
"""
Estimate GPU memory for a model deployment using vLLM-based computation.
Handles both initial estimation and re-estimation with custom inference config.
"""
# Check if this is a re-estimation
is_re_estimation = state.get("needs_re_estimation", False)
if is_re_estimation:
logger.info("πŸ”„ Starting capacity re-estimation with custom inference configuration")
# Reset the re-estimation flag
state["needs_re_estimation"] = False
state["capacity_approved"] = False
else:
logger.info("⚑ Starting capacity estimation node")
try:
model_name = state.get("model_name")
model_info = state.get("model_info")
if not model_name or not model_info:
logger.error("❌ Missing model information")
state["capacity_estimation_status"] = "error"
state["error"] = "Model information required for capacity estimation"
return state
# Extract safetensors info
dtype = model_info.get("dtype", "auto")
num_params = model_info.get("num_params", None)
# Extract required parameters for GPU memory estimation
params = {
"num_params": num_params,
"dtype": dtype,
"num_hidden_layers": model_info.get("num_hidden_layers"),
"hidden_size": model_info.get("hidden_size"),
"intermediate_size": model_info.get("intermediate_size"),
"num_key_value_heads": model_info.get("num_key_value_heads"),
"head_dim": model_info.get("head_dim"),
"max_model_len": model_info.get("max_model_len", 2048),
"max_num_seqs": model_info.get("max_num_seqs", 256),
"max_num_batched_tokens": model_info.get("max_num_batched_tokens", 2048),
"kv_cache_dtype": model_info.get("kv_cache_dtype", "auto"),
"gpu_memory_utilization": model_info.get("gpu_memory_utilization", 0.9),
"cpu_offload_gb": model_info.get("cpu_offload_gb", 0.0)
}
estimated_gpu_memory = estimate_vllm_gpu_memory(**params)
# Calculate GPU requirements and costs
gpu_data = calculate_gpu_requirements(estimated_gpu_memory)
gpu_requirements = gpu_data["gpu_requirements"]
cost_estimates = gpu_data["cost_estimates"]
# Store in state
state["estimated_gpu_memory"] = estimated_gpu_memory
state["gpu_requirements"] = gpu_requirements
state["cost_estimates"] = cost_estimates
state["capacity_estimation_status"] = "success"
# Build comprehensive response
model_size_b = num_params / 1e9 if num_params else "Unknown"
# Model architecture details
architecture_info = []
if model_info.get("num_hidden_layers"):
architecture_info.append(f"**Layers:** {model_info['num_hidden_layers']}")
if model_info.get("hidden_size"):
architecture_info.append(f"**Hidden Size:** {model_info['hidden_size']}")
if model_info.get("num_attention_heads"):
architecture_info.append(f"**Attention Heads:** {model_info['num_attention_heads']}")
if model_info.get("num_key_value_heads"):
architecture_info.append(f"**KV Heads:** {model_info['num_key_value_heads']}")
if model_info.get("intermediate_size"):
architecture_info.append(f"**Intermediate Size:** {model_info['intermediate_size']}")
if model_info.get("max_position_embeddings"):
architecture_info.append(f"**Max Position Embeddings:** {model_info['max_position_embeddings']}")
architecture_section = "\n ".join(architecture_info) if architecture_info else "Limited architecture information available"
# Inference configuration
inference_config = f"""**Max Model Length:** {params['max_model_len']}
**Max Sequences:** {params['max_num_seqs']}
**Max Batched Tokens:** {params['max_num_batched_tokens']}
**KV Cache dtype:** {params['kv_cache_dtype']}
**GPU Memory Utilization:** {params['gpu_memory_utilization']*100:.0f}%"""
# GPU requirements and cost section
gpu_req_lines = []
cost_lines = []
# Highlight RTX 4090 and 5090
for gpu_type in ["RTX 4090", "RTX 5090"]:
if gpu_type in gpu_requirements:
num_gpus = gpu_requirements[gpu_type]
gpu_memory = GPU_SPECS[gpu_type]
costs = cost_estimates[gpu_type]
gpu_req_lines.append(f"**{gpu_type}** ({gpu_memory}GB): **{num_gpus} GPU{'s' if num_gpus > 1 else ''}**")
cost_lines.append(f"**{gpu_type}:** €{costs['hourly']:.2f}/hour | €{costs['daily']:.2f}/day | €{costs['weekly']:.2f}/week")
gpu_requirements_section = "\n ".join(gpu_req_lines)
cost_section = "\n ".join(cost_lines)
# Build final response
estimation_title = "**Capacity Re-Estimation Complete**" if is_re_estimation else "**Capacity Estimation Complete**"
custom_note = "*Note: Re-estimated with custom inference configuration. " if is_re_estimation else "*Note: "
GPU_type = state['custom_inference_config']['GPU_type'] if is_re_estimation else model_info.get('GPU_type', 'RTX 4090')
location = state['custom_inference_config']['location'] if is_re_estimation else model_info.get('location', 'UAE-1')
state["response"] = f"""
{estimation_title}
**Model Information:**
**Name:** {model_name}
**Parameters:** {model_size_b:.2f}B
**Data Type:** {dtype}
**Architecture Details:**
{architecture_section}
**Inference Configuration:**
{inference_config}
**Estimated GPU Memory Required:** {estimated_gpu_memory:.2f} GB
**GPU Requirements:**
{gpu_requirements_section}
**Cost Estimates:**
{cost_section}
**Selected GPU Type:** {GPU_type}
**Deployment Location:** {location}
{custom_note}This estimation includes model weights, KV cache, activation peak, and a safety margin. Multi-GPU setups account for ~10% overhead for communication.*"""
logger.info(f"βœ… Estimated GPU memory: {estimated_gpu_memory:.2f} GB")
logger.info(f"πŸ“Š GPU Requirements: RTX 4090: {gpu_requirements.get('RTX 4090', 'N/A')}, RTX 5090: {gpu_requirements.get('RTX 5090', 'N/A')}")
# Prepare state for human approval - set pending capacity approval
state["pending_capacity_approval"] = True
state["needs_re_estimation"] = False # Reset flag after processing
state["current_step"] = "capacity_estimation_complete"
except Exception as e:
logger.error(f"❌ Error in capacity estimation: {str(e)}")
state["capacity_estimation_status"] = "error"
state["error"] = str(e)
state["response"] = f"""❌ **Capacity Estimation Failed**
**Model:** {state.get('model_name', 'Unknown')}
**Error:** {str(e)}
Please check if:
1. The model exists on HuggingFace
2. You have access to the model (if it's gated)
3. Your HuggingFace token is valid"""
return state