File size: 14,662 Bytes
8816dfd | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 | """
Capacity Estimation Node
This node handles the estimation of compute capacity requirements for model deployment.
Currently minimal implementation - placeholder for future capacity estimation logic.
Key Features:
- Compute capacity estimation (placeholder)
- Resource requirement analysis (placeholder)
- State management for workflow
Author: ComputeAgent Team
License: Private
"""
import logging
import math
from typing import Dict, Any
logger = logging.getLogger("CapacityEstimation")
# Mapping dtype to factor (bytes per parameter)
DTYPE_FACTOR = {
# Standard PyTorch dtypes
"auto": 2,
"half": 2,
"float16": 2,
"fp16": 2,
"bfloat16": 2,
"bf16": 2,
"float": 4,
"float32": 4,
"fp32": 4,
# Quantized dtypes
"fp8": 1,
"fp8_e4m3": 1,
"fp8_e5m2": 1,
"f8_e4m3": 1, # HuggingFace naming convention
"f8_e5m2": 1,
"int8": 1,
"int4": 0.5,
}
KV_CACHE_DTYPE_FACTOR = {
"auto": None, # Will be set to model dtype factor
"float32": 4,
"fp32": 4,
"float16": 2,
"fp16": 2,
"bfloat16": 2,
"bf16": 2,
"fp8": 1,
"fp8_e5m2": 1,
"fp8_e4m3": 1,
"f8_e4m3": 1, # HuggingFace naming convention
"f8_e5m2": 1,
"int8": 1,
}
# GPU specifications (in GB)
GPU_SPECS = {
"RTX 4090": 24,
"RTX 5090": 32,
}
# GPU pricing (in EUR per hour)
GPU_PRICING = {
"RTX 4090": 0.2,
"RTX 5090": 0.4,
}
def normalize_dtype(dtype: str) -> str:
"""
Normalize dtype string to a canonical form for consistent lookup.
Args:
dtype: Raw dtype string (e.g., "F8_E4M3", "BF16", "float16")
Returns:
Normalized dtype string in lowercase with underscores
"""
if not dtype:
return "auto"
# Convert to lowercase and handle common variations
normalized = dtype.lower()
# Handle HuggingFace safetensors naming conventions
# F8_E4M3 -> f8_e4m3, BF16 -> bf16, etc.
return normalized
def get_dtype_factor(dtype: str, default: int = 2) -> float:
"""
Get the bytes-per-parameter factor for a given dtype.
Args:
dtype: Data type string
default: Default factor if dtype not found
Returns:
Factor (bytes per parameter)
"""
normalized = normalize_dtype(dtype)
return DTYPE_FACTOR.get(normalized, default)
def estimate_vllm_gpu_memory(
num_params: int,
dtype: str = "auto",
num_hidden_layers: int = None,
hidden_size: int = None,
intermediate_size: int = None,
num_key_value_heads: int = None,
head_dim: int = None,
max_model_len: int = 2048,
max_num_seqs: int = 256,
max_num_batched_tokens: int = 2048,
kv_cache_dtype: str = "auto",
gpu_memory_utilization: float = 0.9,
cpu_offload_gb: float = 0.0,
is_quantized: bool = None # NEW: indicate if num_params is already quantized
) -> float:
"""
Estimate GPU memory for a model. Handles:
1. Full parameter info -> detailed estimation
2. Only num_params and dtype -> rough estimation
Returns memory in GB
Args:
num_params: Number of parameters. For quantized models from HF API,
this is already in the quantized format.
is_quantized: If True, num_params represents quantized size.
If None, auto-detect from dtype.
"""
constant_margin = 1.5
dtype_factor = get_dtype_factor(dtype, default=2)
# Auto-detect if model is quantized
if is_quantized is None:
quantized_dtypes = ["fp8", "f8_e4m3", "f8_e5m2", "int8", "int4", "fp8_e4m3", "fp8_e5m2"]
is_quantized = normalize_dtype(dtype) in quantized_dtypes
# Case 1: Only num_params available (simplified)
if None in [num_hidden_layers, hidden_size, intermediate_size, num_key_value_heads, head_dim]:
if is_quantized:
# num_params already represents quantized size
# HF API returns parameter count in the quantized dtype
# So we DON'T multiply by dtype_factor again
model_weight = num_params / 1e9 # Already accounts for quantization
else:
# For non-quantized models, calculate weight from params
model_weight = (num_params * dtype_factor) / 1e9
# Rough activation estimate (typically FP16 regardless of weight dtype)
# Activation memory is roughly 1-2x model weight for transformer models
activation_estimate = model_weight * 1.5
estimated_gpu_memory = (model_weight + activation_estimate + constant_margin) / gpu_memory_utilization - cpu_offload_gb
return estimated_gpu_memory
# Case 2: Full info available -> detailed vLLM formula
if is_quantized:
model_weight = num_params / 1e9
else:
model_weight = (num_params * dtype_factor) / 1e9
if kv_cache_dtype == "auto":
# For quantized models, KV cache often uses FP16/BF16, not FP8
kv_cache_dtype_factor = 2 if is_quantized else dtype_factor
else:
normalized_kv = normalize_dtype(kv_cache_dtype)
kv_cache_dtype_factor = KV_CACHE_DTYPE_FACTOR.get(normalized_kv, 2)
per_seq_kv_cache_memory = (2 * num_key_value_heads * head_dim * num_hidden_layers *
kv_cache_dtype_factor * max_model_len) / 1e9
total_kv_cache_memory = min(
per_seq_kv_cache_memory * max_num_seqs,
(2 * num_hidden_layers * hidden_size * kv_cache_dtype_factor * max_num_batched_tokens) / 1e9
)
# Activations are typically FP16/BF16 even for quantized models
activation_dtype_factor = 2 # Assume FP16 activations
activation_peak_memory = max_model_len * ((18 * hidden_size) + (4 * intermediate_size)) * activation_dtype_factor / 1e9
required_gpu_memory = (model_weight + total_kv_cache_memory + activation_peak_memory + constant_margin) / gpu_memory_utilization - cpu_offload_gb
return required_gpu_memory
def calculate_gpu_requirements(estimated_memory_gb: float) -> Dict[str, Any]:
"""
Calculate number of GPUs needed and costs for different GPU types.
Args:
estimated_memory_gb: Estimated GPU memory requirement in GB
Returns:
Dictionary containing GPU requirements and cost information
"""
gpu_requirements = {}
cost_estimates = {}
for gpu_type, gpu_memory in GPU_SPECS.items():
# Account for ~10% overhead for communication and fragmentation in multi-GPU setup
usable_memory = gpu_memory * 0.9
num_gpus = math.ceil(estimated_memory_gb / usable_memory)
# Calculate costs
hourly_cost = num_gpus * GPU_PRICING[gpu_type]
daily_cost = hourly_cost * 24
weekly_cost = hourly_cost * 24 * 7
gpu_requirements[gpu_type] = num_gpus
cost_estimates[gpu_type] = {
"hourly": hourly_cost,
"daily": daily_cost,
"weekly": weekly_cost
}
return {
"gpu_requirements": gpu_requirements,
"cost_estimates": cost_estimates
}
async def capacity_estimation_node(state: Dict[str, Any]) -> Dict[str, Any]:
"""
Estimate GPU memory for a model deployment using vLLM-based computation.
Handles both initial estimation and re-estimation with custom inference config.
"""
# Check if this is a re-estimation
is_re_estimation = state.get("needs_re_estimation", False)
if is_re_estimation:
logger.info("π Starting capacity re-estimation with custom inference configuration")
# Reset the re-estimation flag
state["needs_re_estimation"] = False
state["capacity_approved"] = False
else:
logger.info("β‘ Starting capacity estimation node")
try:
model_name = state.get("model_name")
model_info = state.get("model_info")
if not model_name or not model_info:
logger.error("β Missing model information")
state["capacity_estimation_status"] = "error"
state["error"] = "Model information required for capacity estimation"
return state
# Extract safetensors info
dtype = model_info.get("dtype", "auto")
num_params = model_info.get("num_params", None)
# Extract required parameters for GPU memory estimation
params = {
"num_params": num_params,
"dtype": dtype,
"num_hidden_layers": model_info.get("num_hidden_layers"),
"hidden_size": model_info.get("hidden_size"),
"intermediate_size": model_info.get("intermediate_size"),
"num_key_value_heads": model_info.get("num_key_value_heads"),
"head_dim": model_info.get("head_dim"),
"max_model_len": model_info.get("max_model_len", 2048),
"max_num_seqs": model_info.get("max_num_seqs", 256),
"max_num_batched_tokens": model_info.get("max_num_batched_tokens", 2048),
"kv_cache_dtype": model_info.get("kv_cache_dtype", "auto"),
"gpu_memory_utilization": model_info.get("gpu_memory_utilization", 0.9),
"cpu_offload_gb": model_info.get("cpu_offload_gb", 0.0)
}
estimated_gpu_memory = estimate_vllm_gpu_memory(**params)
# Calculate GPU requirements and costs
gpu_data = calculate_gpu_requirements(estimated_gpu_memory)
gpu_requirements = gpu_data["gpu_requirements"]
cost_estimates = gpu_data["cost_estimates"]
# Store in state
state["estimated_gpu_memory"] = estimated_gpu_memory
state["gpu_requirements"] = gpu_requirements
state["cost_estimates"] = cost_estimates
state["capacity_estimation_status"] = "success"
# Build comprehensive response
model_size_b = num_params / 1e9 if num_params else "Unknown"
# Model architecture details
architecture_info = []
if model_info.get("num_hidden_layers"):
architecture_info.append(f"**Layers:** {model_info['num_hidden_layers']}")
if model_info.get("hidden_size"):
architecture_info.append(f"**Hidden Size:** {model_info['hidden_size']}")
if model_info.get("num_attention_heads"):
architecture_info.append(f"**Attention Heads:** {model_info['num_attention_heads']}")
if model_info.get("num_key_value_heads"):
architecture_info.append(f"**KV Heads:** {model_info['num_key_value_heads']}")
if model_info.get("intermediate_size"):
architecture_info.append(f"**Intermediate Size:** {model_info['intermediate_size']}")
if model_info.get("max_position_embeddings"):
architecture_info.append(f"**Max Position Embeddings:** {model_info['max_position_embeddings']}")
architecture_section = "\n ".join(architecture_info) if architecture_info else "Limited architecture information available"
# Inference configuration
inference_config = f"""**Max Model Length:** {params['max_model_len']}
**Max Sequences:** {params['max_num_seqs']}
**Max Batched Tokens:** {params['max_num_batched_tokens']}
**KV Cache dtype:** {params['kv_cache_dtype']}
**GPU Memory Utilization:** {params['gpu_memory_utilization']*100:.0f}%"""
# GPU requirements and cost section
gpu_req_lines = []
cost_lines = []
# Highlight RTX 4090 and 5090
for gpu_type in ["RTX 4090", "RTX 5090"]:
if gpu_type in gpu_requirements:
num_gpus = gpu_requirements[gpu_type]
gpu_memory = GPU_SPECS[gpu_type]
costs = cost_estimates[gpu_type]
gpu_req_lines.append(f"**{gpu_type}** ({gpu_memory}GB): **{num_gpus} GPU{'s' if num_gpus > 1 else ''}**")
cost_lines.append(f"**{gpu_type}:** β¬{costs['hourly']:.2f}/hour | β¬{costs['daily']:.2f}/day | β¬{costs['weekly']:.2f}/week")
gpu_requirements_section = "\n ".join(gpu_req_lines)
cost_section = "\n ".join(cost_lines)
# Build final response
estimation_title = "**Capacity Re-Estimation Complete**" if is_re_estimation else "**Capacity Estimation Complete**"
custom_note = "*Note: Re-estimated with custom inference configuration. " if is_re_estimation else "*Note: "
GPU_type = state['custom_inference_config']['GPU_type'] if is_re_estimation else model_info.get('GPU_type', 'RTX 4090')
location = state['custom_inference_config']['location'] if is_re_estimation else model_info.get('location', 'UAE-1')
state["response"] = f"""
{estimation_title}
**Model Information:**
**Name:** {model_name}
**Parameters:** {model_size_b:.2f}B
**Data Type:** {dtype}
**Architecture Details:**
{architecture_section}
**Inference Configuration:**
{inference_config}
**Estimated GPU Memory Required:** {estimated_gpu_memory:.2f} GB
**GPU Requirements:**
{gpu_requirements_section}
**Cost Estimates:**
{cost_section}
**Selected GPU Type:** {GPU_type}
**Deployment Location:** {location}
{custom_note}This estimation includes model weights, KV cache, activation peak, and a safety margin. Multi-GPU setups account for ~10% overhead for communication.*"""
logger.info(f"β
Estimated GPU memory: {estimated_gpu_memory:.2f} GB")
logger.info(f"π GPU Requirements: RTX 4090: {gpu_requirements.get('RTX 4090', 'N/A')}, RTX 5090: {gpu_requirements.get('RTX 5090', 'N/A')}")
# Prepare state for human approval - set pending capacity approval
state["pending_capacity_approval"] = True
state["needs_re_estimation"] = False # Reset flag after processing
state["current_step"] = "capacity_estimation_complete"
except Exception as e:
logger.error(f"β Error in capacity estimation: {str(e)}")
state["capacity_estimation_status"] = "error"
state["error"] = str(e)
state["response"] = f"""β **Capacity Estimation Failed**
**Model:** {state.get('model_name', 'Unknown')}
**Error:** {str(e)}
Please check if:
1. The model exists on HuggingFace
2. You have access to the model (if it's gated)
3. Your HuggingFace token is valid"""
return state |