|
|
""" |
|
|
Capacity Estimation Node |
|
|
|
|
|
This node handles the estimation of compute capacity requirements for model deployment. |
|
|
Currently minimal implementation - placeholder for future capacity estimation logic. |
|
|
|
|
|
Key Features: |
|
|
- Compute capacity estimation (placeholder) |
|
|
- Resource requirement analysis (placeholder) |
|
|
- State management for workflow |
|
|
|
|
|
Author: ComputeAgent Team |
|
|
License: Private |
|
|
""" |
|
|
|
|
|
import logging |
|
|
import math |
|
|
from typing import Dict, Any |
|
|
|
|
|
logger = logging.getLogger("CapacityEstimation") |
|
|
|
|
|
|
|
|
DTYPE_FACTOR = { |
|
|
|
|
|
"auto": 2, |
|
|
"half": 2, |
|
|
"float16": 2, |
|
|
"fp16": 2, |
|
|
"bfloat16": 2, |
|
|
"bf16": 2, |
|
|
"float": 4, |
|
|
"float32": 4, |
|
|
"fp32": 4, |
|
|
|
|
|
"fp8": 1, |
|
|
"fp8_e4m3": 1, |
|
|
"fp8_e5m2": 1, |
|
|
"f8_e4m3": 1, |
|
|
"f8_e5m2": 1, |
|
|
"int8": 1, |
|
|
"int4": 0.5, |
|
|
} |
|
|
|
|
|
KV_CACHE_DTYPE_FACTOR = { |
|
|
"auto": None, |
|
|
"float32": 4, |
|
|
"fp32": 4, |
|
|
"float16": 2, |
|
|
"fp16": 2, |
|
|
"bfloat16": 2, |
|
|
"bf16": 2, |
|
|
"fp8": 1, |
|
|
"fp8_e5m2": 1, |
|
|
"fp8_e4m3": 1, |
|
|
"f8_e4m3": 1, |
|
|
"f8_e5m2": 1, |
|
|
"int8": 1, |
|
|
} |
|
|
|
|
|
|
|
|
GPU_SPECS = { |
|
|
"RTX 4090": 24, |
|
|
"RTX 5090": 32, |
|
|
} |
|
|
|
|
|
|
|
|
GPU_PRICING = { |
|
|
"RTX 4090": 0.2, |
|
|
"RTX 5090": 0.4, |
|
|
} |
|
|
|
|
|
def normalize_dtype(dtype: str) -> str: |
|
|
""" |
|
|
Normalize dtype string to a canonical form for consistent lookup. |
|
|
|
|
|
Args: |
|
|
dtype: Raw dtype string (e.g., "F8_E4M3", "BF16", "float16") |
|
|
|
|
|
Returns: |
|
|
Normalized dtype string in lowercase with underscores |
|
|
""" |
|
|
if not dtype: |
|
|
return "auto" |
|
|
|
|
|
|
|
|
normalized = dtype.lower() |
|
|
|
|
|
|
|
|
|
|
|
return normalized |
|
|
|
|
|
|
|
|
def get_dtype_factor(dtype: str, default: int = 2) -> float: |
|
|
""" |
|
|
Get the bytes-per-parameter factor for a given dtype. |
|
|
|
|
|
Args: |
|
|
dtype: Data type string |
|
|
default: Default factor if dtype not found |
|
|
|
|
|
Returns: |
|
|
Factor (bytes per parameter) |
|
|
""" |
|
|
normalized = normalize_dtype(dtype) |
|
|
return DTYPE_FACTOR.get(normalized, default) |
|
|
|
|
|
def estimate_vllm_gpu_memory( |
|
|
num_params: int, |
|
|
dtype: str = "auto", |
|
|
num_hidden_layers: int = None, |
|
|
hidden_size: int = None, |
|
|
intermediate_size: int = None, |
|
|
num_key_value_heads: int = None, |
|
|
head_dim: int = None, |
|
|
max_model_len: int = 2048, |
|
|
max_num_seqs: int = 256, |
|
|
max_num_batched_tokens: int = 2048, |
|
|
kv_cache_dtype: str = "auto", |
|
|
gpu_memory_utilization: float = 0.9, |
|
|
cpu_offload_gb: float = 0.0, |
|
|
is_quantized: bool = None |
|
|
) -> float: |
|
|
""" |
|
|
Estimate GPU memory for a model. Handles: |
|
|
1. Full parameter info -> detailed estimation |
|
|
2. Only num_params and dtype -> rough estimation |
|
|
Returns memory in GB |
|
|
|
|
|
Args: |
|
|
num_params: Number of parameters. For quantized models from HF API, |
|
|
this is already in the quantized format. |
|
|
is_quantized: If True, num_params represents quantized size. |
|
|
If None, auto-detect from dtype. |
|
|
""" |
|
|
constant_margin = 1.5 |
|
|
|
|
|
dtype_factor = get_dtype_factor(dtype, default=2) |
|
|
|
|
|
|
|
|
if is_quantized is None: |
|
|
quantized_dtypes = ["fp8", "f8_e4m3", "f8_e5m2", "int8", "int4", "fp8_e4m3", "fp8_e5m2"] |
|
|
is_quantized = normalize_dtype(dtype) in quantized_dtypes |
|
|
|
|
|
|
|
|
if None in [num_hidden_layers, hidden_size, intermediate_size, num_key_value_heads, head_dim]: |
|
|
if is_quantized: |
|
|
|
|
|
|
|
|
|
|
|
model_weight = num_params / 1e9 |
|
|
else: |
|
|
|
|
|
model_weight = (num_params * dtype_factor) / 1e9 |
|
|
|
|
|
|
|
|
|
|
|
activation_estimate = model_weight * 1.5 |
|
|
|
|
|
estimated_gpu_memory = (model_weight + activation_estimate + constant_margin) / gpu_memory_utilization - cpu_offload_gb |
|
|
return estimated_gpu_memory |
|
|
|
|
|
|
|
|
if is_quantized: |
|
|
model_weight = num_params / 1e9 |
|
|
else: |
|
|
model_weight = (num_params * dtype_factor) / 1e9 |
|
|
|
|
|
if kv_cache_dtype == "auto": |
|
|
|
|
|
kv_cache_dtype_factor = 2 if is_quantized else dtype_factor |
|
|
else: |
|
|
normalized_kv = normalize_dtype(kv_cache_dtype) |
|
|
kv_cache_dtype_factor = KV_CACHE_DTYPE_FACTOR.get(normalized_kv, 2) |
|
|
|
|
|
per_seq_kv_cache_memory = (2 * num_key_value_heads * head_dim * num_hidden_layers * |
|
|
kv_cache_dtype_factor * max_model_len) / 1e9 |
|
|
|
|
|
total_kv_cache_memory = min( |
|
|
per_seq_kv_cache_memory * max_num_seqs, |
|
|
(2 * num_hidden_layers * hidden_size * kv_cache_dtype_factor * max_num_batched_tokens) / 1e9 |
|
|
) |
|
|
|
|
|
|
|
|
activation_dtype_factor = 2 |
|
|
activation_peak_memory = max_model_len * ((18 * hidden_size) + (4 * intermediate_size)) * activation_dtype_factor / 1e9 |
|
|
|
|
|
required_gpu_memory = (model_weight + total_kv_cache_memory + activation_peak_memory + constant_margin) / gpu_memory_utilization - cpu_offload_gb |
|
|
|
|
|
return required_gpu_memory |
|
|
|
|
|
|
|
|
def calculate_gpu_requirements(estimated_memory_gb: float) -> Dict[str, Any]: |
|
|
""" |
|
|
Calculate number of GPUs needed and costs for different GPU types. |
|
|
|
|
|
Args: |
|
|
estimated_memory_gb: Estimated GPU memory requirement in GB |
|
|
|
|
|
Returns: |
|
|
Dictionary containing GPU requirements and cost information |
|
|
""" |
|
|
gpu_requirements = {} |
|
|
cost_estimates = {} |
|
|
|
|
|
for gpu_type, gpu_memory in GPU_SPECS.items(): |
|
|
|
|
|
usable_memory = gpu_memory * 0.9 |
|
|
num_gpus = math.ceil(estimated_memory_gb / usable_memory) |
|
|
|
|
|
|
|
|
hourly_cost = num_gpus * GPU_PRICING[gpu_type] |
|
|
daily_cost = hourly_cost * 24 |
|
|
weekly_cost = hourly_cost * 24 * 7 |
|
|
|
|
|
gpu_requirements[gpu_type] = num_gpus |
|
|
cost_estimates[gpu_type] = { |
|
|
"hourly": hourly_cost, |
|
|
"daily": daily_cost, |
|
|
"weekly": weekly_cost |
|
|
} |
|
|
|
|
|
return { |
|
|
"gpu_requirements": gpu_requirements, |
|
|
"cost_estimates": cost_estimates |
|
|
} |
|
|
|
|
|
|
|
|
async def capacity_estimation_node(state: Dict[str, Any]) -> Dict[str, Any]: |
|
|
""" |
|
|
Estimate GPU memory for a model deployment using vLLM-based computation. |
|
|
Handles both initial estimation and re-estimation with custom inference config. |
|
|
""" |
|
|
|
|
|
is_re_estimation = state.get("needs_re_estimation", False) |
|
|
if is_re_estimation: |
|
|
logger.info("π Starting capacity re-estimation with custom inference configuration") |
|
|
|
|
|
state["needs_re_estimation"] = False |
|
|
state["capacity_approved"] = False |
|
|
else: |
|
|
logger.info("β‘ Starting capacity estimation node") |
|
|
|
|
|
try: |
|
|
model_name = state.get("model_name") |
|
|
model_info = state.get("model_info") |
|
|
|
|
|
if not model_name or not model_info: |
|
|
logger.error("β Missing model information") |
|
|
state["capacity_estimation_status"] = "error" |
|
|
state["error"] = "Model information required for capacity estimation" |
|
|
return state |
|
|
|
|
|
|
|
|
dtype = model_info.get("dtype", "auto") |
|
|
num_params = model_info.get("num_params", None) |
|
|
|
|
|
|
|
|
params = { |
|
|
"num_params": num_params, |
|
|
"dtype": dtype, |
|
|
"num_hidden_layers": model_info.get("num_hidden_layers"), |
|
|
"hidden_size": model_info.get("hidden_size"), |
|
|
"intermediate_size": model_info.get("intermediate_size"), |
|
|
"num_key_value_heads": model_info.get("num_key_value_heads"), |
|
|
"head_dim": model_info.get("head_dim"), |
|
|
"max_model_len": model_info.get("max_model_len", 2048), |
|
|
"max_num_seqs": model_info.get("max_num_seqs", 256), |
|
|
"max_num_batched_tokens": model_info.get("max_num_batched_tokens", 2048), |
|
|
"kv_cache_dtype": model_info.get("kv_cache_dtype", "auto"), |
|
|
"gpu_memory_utilization": model_info.get("gpu_memory_utilization", 0.9), |
|
|
"cpu_offload_gb": model_info.get("cpu_offload_gb", 0.0) |
|
|
} |
|
|
|
|
|
estimated_gpu_memory = estimate_vllm_gpu_memory(**params) |
|
|
|
|
|
|
|
|
gpu_data = calculate_gpu_requirements(estimated_gpu_memory) |
|
|
gpu_requirements = gpu_data["gpu_requirements"] |
|
|
cost_estimates = gpu_data["cost_estimates"] |
|
|
|
|
|
|
|
|
state["estimated_gpu_memory"] = estimated_gpu_memory |
|
|
state["gpu_requirements"] = gpu_requirements |
|
|
state["cost_estimates"] = cost_estimates |
|
|
state["capacity_estimation_status"] = "success" |
|
|
|
|
|
|
|
|
model_size_b = num_params / 1e9 if num_params else "Unknown" |
|
|
|
|
|
|
|
|
architecture_info = [] |
|
|
if model_info.get("num_hidden_layers"): |
|
|
architecture_info.append(f"**Layers:** {model_info['num_hidden_layers']}") |
|
|
if model_info.get("hidden_size"): |
|
|
architecture_info.append(f"**Hidden Size:** {model_info['hidden_size']}") |
|
|
if model_info.get("num_attention_heads"): |
|
|
architecture_info.append(f"**Attention Heads:** {model_info['num_attention_heads']}") |
|
|
if model_info.get("num_key_value_heads"): |
|
|
architecture_info.append(f"**KV Heads:** {model_info['num_key_value_heads']}") |
|
|
if model_info.get("intermediate_size"): |
|
|
architecture_info.append(f"**Intermediate Size:** {model_info['intermediate_size']}") |
|
|
if model_info.get("max_position_embeddings"): |
|
|
architecture_info.append(f"**Max Position Embeddings:** {model_info['max_position_embeddings']}") |
|
|
|
|
|
architecture_section = "\n ".join(architecture_info) if architecture_info else "Limited architecture information available" |
|
|
|
|
|
|
|
|
inference_config = f"""**Max Model Length:** {params['max_model_len']} |
|
|
**Max Sequences:** {params['max_num_seqs']} |
|
|
**Max Batched Tokens:** {params['max_num_batched_tokens']} |
|
|
**KV Cache dtype:** {params['kv_cache_dtype']} |
|
|
**GPU Memory Utilization:** {params['gpu_memory_utilization']*100:.0f}%""" |
|
|
|
|
|
|
|
|
gpu_req_lines = [] |
|
|
cost_lines = [] |
|
|
|
|
|
|
|
|
for gpu_type in ["RTX 4090", "RTX 5090"]: |
|
|
if gpu_type in gpu_requirements: |
|
|
num_gpus = gpu_requirements[gpu_type] |
|
|
gpu_memory = GPU_SPECS[gpu_type] |
|
|
costs = cost_estimates[gpu_type] |
|
|
|
|
|
gpu_req_lines.append(f"**{gpu_type}** ({gpu_memory}GB): **{num_gpus} GPU{'s' if num_gpus > 1 else ''}**") |
|
|
cost_lines.append(f"**{gpu_type}:** β¬{costs['hourly']:.2f}/hour | β¬{costs['daily']:.2f}/day | β¬{costs['weekly']:.2f}/week") |
|
|
|
|
|
gpu_requirements_section = "\n ".join(gpu_req_lines) |
|
|
cost_section = "\n ".join(cost_lines) |
|
|
|
|
|
|
|
|
estimation_title = "**Capacity Re-Estimation Complete**" if is_re_estimation else "**Capacity Estimation Complete**" |
|
|
custom_note = "*Note: Re-estimated with custom inference configuration. " if is_re_estimation else "*Note: " |
|
|
|
|
|
GPU_type = state['custom_inference_config']['GPU_type'] if is_re_estimation else model_info.get('GPU_type', 'RTX 4090') |
|
|
location = state['custom_inference_config']['location'] if is_re_estimation else model_info.get('location', 'UAE-1') |
|
|
|
|
|
state["response"] = f""" |
|
|
{estimation_title} |
|
|
|
|
|
**Model Information:** |
|
|
**Name:** {model_name} |
|
|
**Parameters:** {model_size_b:.2f}B |
|
|
**Data Type:** {dtype} |
|
|
|
|
|
**Architecture Details:** |
|
|
{architecture_section} |
|
|
|
|
|
**Inference Configuration:** |
|
|
{inference_config} |
|
|
|
|
|
**Estimated GPU Memory Required:** {estimated_gpu_memory:.2f} GB |
|
|
|
|
|
**GPU Requirements:** |
|
|
{gpu_requirements_section} |
|
|
|
|
|
**Cost Estimates:** |
|
|
{cost_section} |
|
|
|
|
|
**Selected GPU Type:** {GPU_type} |
|
|
**Deployment Location:** {location} |
|
|
|
|
|
{custom_note}This estimation includes model weights, KV cache, activation peak, and a safety margin. Multi-GPU setups account for ~10% overhead for communication.*""" |
|
|
|
|
|
logger.info(f"β
Estimated GPU memory: {estimated_gpu_memory:.2f} GB") |
|
|
logger.info(f"π GPU Requirements: RTX 4090: {gpu_requirements.get('RTX 4090', 'N/A')}, RTX 5090: {gpu_requirements.get('RTX 5090', 'N/A')}") |
|
|
|
|
|
|
|
|
state["pending_capacity_approval"] = True |
|
|
state["needs_re_estimation"] = False |
|
|
state["current_step"] = "capacity_estimation_complete" |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"β Error in capacity estimation: {str(e)}") |
|
|
state["capacity_estimation_status"] = "error" |
|
|
state["error"] = str(e) |
|
|
state["response"] = f"""β **Capacity Estimation Failed** |
|
|
|
|
|
**Model:** {state.get('model_name', 'Unknown')} |
|
|
**Error:** {str(e)} |
|
|
|
|
|
Please check if: |
|
|
1. The model exists on HuggingFace |
|
|
2. You have access to the model (if it's gated) |
|
|
3. Your HuggingFace token is valid""" |
|
|
|
|
|
return state |