import gradio as gr
import yaml
import math
import matplotlib.pyplot as plt
import plotly.graph_objects as go
import os
import json
from huggingface_hub import hf_hub_download, HfApi
# --- Configuration & Constants ---
HARDWARE_FILE = "hardware_data.yaml"
MODELS_FILE = "models.yaml"
# Physics Constants
COMPUTE_EFFICIENCY = 0.45
MEMORY_EFFICIENCY = 0.70
INTERCONNECT_EFFICIENCY = 0.65
# Defaults
ACTIVATION_MEMORY_BUFFER_GB = 0.5
DEFAULT_GPU_OVERHEAD_PCT = 20
# Embedding Models VRAM Est. (Weights + Runtime Buffer)
EMBEDDING_MODELS = {
"External/API (No Local VRAM)": 0.0,
"Mini (All-MiniLM-L6) ~0.2GB": 0.2,
"Standard (MPNet-Base/BGE-Base) ~0.6GB": 0.6,
"Large (BGE-M3/GTE-Large) ~2.5GB": 2.5,
"LLM-Based (E5-Mistral-7B) ~16GB": 16.0,
}
# Reranker Models VRAM Est. (Weights + Batch Processing Buffer)
RERANKER_MODELS = {
"None (Skip Reranking)": 0.0,
"Small (BGE-Reranker-Base) ~0.5GB": 0.5,
"Large (BGE-Reranker-Large) ~1.5GB": 1.5,
"LLM-Based (BGE-Reranker-v2-Gemma) ~10GB": 10.0,
}
# --- Data Loading ---
def load_hardware_data():
if not os.path.exists(HARDWARE_FILE):
return {}
with open(HARDWARE_FILE, "r") as f:
data = yaml.safe_load(f)
return {gpu["name"]: gpu for gpu in data["gpus"]}
def load_models_data():
if not os.path.exists(MODELS_FILE):
return {}
with open(MODELS_FILE, "r") as f:
data = yaml.safe_load(f) or {}
return data.get("models", {})
HARDWARE_DB = load_hardware_data()
MODELS_DB = load_models_data()
# --- Model Analysis ---
class ModelAnalyzer:
def __init__(self, repo_id, hf_token=None):
self.repo_id = repo_id
self.config = {}
self.error = None
self.api = HfApi(token=hf_token.strip() if hf_token else None)
# 1. Try to get Model Info (Total Params) from API first
self.total_params_safetensors = None
try:
model_info = self.api.model_info(repo_id)
if hasattr(model_info, "safetensors") and model_info.safetensors and "total" in model_info.safetensors:
self.total_params_safetensors = model_info.safetensors["total"]
except Exception:
pass # Fallback to config parsing
# 2. Load Config
if repo_id in MODELS_DB:
self.config = MODELS_DB[repo_id]
else:
try:
token = hf_token.strip() if hf_token else None
config_path = hf_hub_download(
repo_id=repo_id, filename="config.json", token=token
)
with open(config_path, "r") as f:
self.config = json.load(f)
except Exception as e:
self.error = f"Failed to fetch model: {str(e)}"
return
try:
# Handle nested configs (common in multimodal)
if "text_config" in self.config:
self.llm_config = self.config["text_config"]
elif "llm_config" in self.config:
self.llm_config = self.config["llm_config"]
else:
self.llm_config = self.config
self.hidden_size = self.llm_config.get("hidden_size", 4096)
self.num_layers = self.llm_config.get("num_hidden_layers", 32)
self.num_heads = self.llm_config.get("num_attention_heads", 32)
self.num_kv_heads = self.llm_config.get("num_key_value_heads", self.num_heads)
self.vocab_size = self.llm_config.get("vocab_size", 32000)
self.max_context = self.llm_config.get("max_position_embeddings", 4096)
self.intermediate_size = self.llm_config.get(
"intermediate_size", self.hidden_size * 4
)
# MoE detection
self.is_moe = False
self.num_experts = 1
self.active_experts = 1
# Check for MoE config patterns
self._detect_moe()
# Calculate Parameters
self.calculate_params()
except Exception as e:
self.error = f"Error parsing config: {str(e)}"
def _detect_moe(self):
archs = self.config.get("architectures", [])
keys = set(self.config.keys()) | set(self.llm_config.keys())
if (
any("moe" in a.lower() for a in archs)
or any("moe" in k.lower() for k in keys)
or any("expert" in k.lower() for k in keys)
):
self.is_moe = True
if self.is_moe:
self.num_experts = (
self.llm_config.get("num_local_experts")
or self.llm_config.get("num_experts")
or self.llm_config.get("n_routed_experts")
or 8
)
self.active_experts = (
self.llm_config.get("num_experts_per_tok")
or self.llm_config.get("num_experts_per_token")
or 2
)
elif "notes" in self.config and "moe" in self.config["notes"]:
moe_cfg = self.config["notes"]["moe"]
self.is_moe = True
self.num_experts = moe_cfg.get("num_local_experts", 8)
self.active_experts = moe_cfg.get("num_experts_per_tok", 2)
def calculate_params(self):
# If we got exact params from safetensors, use that
if self.total_params_safetensors:
self.total_params = self.total_params_safetensors
else:
# Fallback calculation
self.params_embed = self.vocab_size * self.hidden_size
head_dim = self.hidden_size // self.num_heads
kv_dim = head_dim * self.num_kv_heads
self.params_attn = (
(self.hidden_size * self.hidden_size)
+ (self.hidden_size * kv_dim) * 2
+ (self.hidden_size * self.hidden_size)
)
dense_mlp = 3 * self.hidden_size * self.intermediate_size
if self.is_moe:
mlp_total = dense_mlp * self.num_experts
else:
mlp_total = dense_mlp
self.params_norm = 2 * self.hidden_size
self.params_layer_total = (
self.params_attn + mlp_total + self.params_norm
)
self.total_params = self.params_embed + (
self.num_layers * self.params_layer_total
)
# Active Params Calculation (using improved heuristic for MoE)
if self.is_moe:
expert_param_fraction = 0.8 # 80% of params are in experts
always_active = self.total_params * (1 - expert_param_fraction)
expert_params = self.total_params * expert_param_fraction
expert_ratio = self.active_experts / self.num_experts
self.active_params = int(
always_active + (expert_params * expert_ratio)
)
else:
self.active_params = self.total_params
# --- Calculation Engine ---
def calculate_dimensioning(
model_name_or_repo,
hf_token,
gpu_name,
connectivity_type,
concurrent_users,
context_in,
context_out,
quantization,
gpu_overhead_pct,
rag_enabled,
rag_model_key,
reranker_model_key,
):
analyzer = ModelAnalyzer(model_name_or_repo, hf_token)
if analyzer.error:
return error_result(analyzer.error)
if gpu_name not in HARDWARE_DB:
return error_result(f"GPU '{gpu_name}' not found in database.")
gpu_spec = HARDWARE_DB[gpu_name]
# 2. Interconnect & Bandwidth Logic
nvlink_bw = gpu_spec.get("interconnect_bw_gb_s", 0)
pcie_bw = gpu_spec.get("pcie_bw_gb_s", 64)
gpu_has_nvlink = nvlink_bw > 0
if connectivity_type == "NVLink":
if not gpu_has_nvlink:
return error_result(f"Error: {gpu_name} does not support NVLink.")
using_nvlink = True
interconnect_bw_effective = nvlink_bw * INTERCONNECT_EFFICIENCY * 1e9
elif connectivity_type == "PCIe / Standard":
using_nvlink = False
interconnect_bw_effective = pcie_bw * 1e9 # PCIe usually raw
else: # Auto
using_nvlink = gpu_has_nvlink
interconnect_bw_effective = (
(nvlink_bw if using_nvlink else pcie_bw) * 1e9
)
# --- Precision ---
fp4_supported = gpu_spec.get("fp4_supported", False)
if quantization == "FP16/BF16":
bytes_per_param = 2
elif quantization == "INT8":
bytes_per_param = 1
elif quantization == "FP4":
if not fp4_supported:
return error_result(f"Error: {gpu_name} does not support FP4.")
bytes_per_param = 0.5
else:
bytes_per_param = 2
# --- MEMORY CALCULATION ---
# Static Footprint
mem_weights = analyzer.total_params * bytes_per_param
# RAG Memory (Embedding + Reranker)
mem_rag = 0
if rag_enabled:
embed_gb = EMBEDDING_MODELS.get(rag_model_key, 0.6)
rerank_gb = RERANKER_MODELS.get(reranker_model_key, 0.5)
mem_rag = (embed_gb + rerank_gb) * (1024**3)
static_footprint = mem_weights + mem_rag
# Dynamic Footprint (KV + Activation per user)
head_dim = analyzer.hidden_size // analyzer.num_heads
total_tokens = context_in + context_out
# KV Cache
kv_bytes = 2
mem_kv_per_user = (
2
* analyzer.num_layers
* analyzer.num_kv_heads
* head_dim
* total_tokens
* kv_bytes
)
# Activation buffer
mem_act_per_user = ACTIVATION_MEMORY_BUFFER_GB * 1024**3
dynamic_per_user = mem_kv_per_user + mem_act_per_user
total_dynamic = dynamic_per_user * concurrent_users
# Total & Overhead
raw_total_mem = static_footprint + total_dynamic
total_mem_required = raw_total_mem * (1 + gpu_overhead_pct / 100)
gpu_mem_capacity = gpu_spec["memory_gb"] * (1024**3)
num_gpus = math.ceil(total_mem_required / gpu_mem_capacity)
# --- LATENCY CALCULATION ---
compute_mode = "fp16_tflops_dense"
single_gpu_flops = (
gpu_spec.get(compute_mode, 100) * 1e12 * COMPUTE_EFFICIENCY
)
if quantization == "FP4":
single_gpu_flops *= 2.5
single_gpu_bw = (
gpu_spec.get("bandwidth_gb_s", 1000) * 1e9 * MEMORY_EFFICIENCY
)
if num_gpus == 1:
effective_flops = single_gpu_flops
effective_mem_bw = single_gpu_bw
ttft_penalty = 2.0
itl_penalty = 1.0
elif using_nvlink:
effective_flops = single_gpu_flops * num_gpus
effective_mem_bw = single_gpu_bw * num_gpus
ttft_penalty = 2.0
itl_penalty = 1.0
else:
# PCIe Bottleneck Logic
effective_flops = single_gpu_flops * num_gpus
effective_mem_bw = single_gpu_bw # Capped at single card
n = num_gpus
ttft_penalty = 1.2 * n * n - n
itl_penalty = n
# TTFT (Prefill) + RAG Latency
# 1. RAG Processing (Embedding + Reranking)
t_rag_processing = 0
if rag_enabled:
# Base Embedding Latency (Encode Query)
if "Mini" in rag_model_key:
t_rag_processing += 0.02
elif "Large" in rag_model_key:
t_rag_processing += 0.05
elif "LLM" in rag_model_key:
t_rag_processing += 0.15
else:
t_rag_processing += 0.03
# Reranking Latency (Process Documents)
if "None" not in reranker_model_key:
if "Small" in reranker_model_key:
t_rag_processing += 0.15 # 150ms
elif "Large" in reranker_model_key:
t_rag_processing += 0.35 # 350ms
elif "LLM" in reranker_model_key:
t_rag_processing += 0.80 # 800ms
# 2. LLM Compute Time
prefill_ops = 2 * analyzer.active_params * context_in * concurrent_users
t_compute_prefill = (prefill_ops / effective_flops) * ttft_penalty
t_mem_prefill = mem_weights / effective_mem_bw
ttft = max(t_compute_prefill, t_mem_prefill) + t_rag_processing
# ITL (Decode)
gen_ops = 2 * analyzer.active_params * concurrent_users
t_compute_gen = (gen_ops / effective_flops) * itl_penalty
bytes_per_step = mem_weights + (total_dynamic / concurrent_users)
t_mem_gen = (bytes_per_step / effective_mem_bw) * itl_penalty
itl = max(t_compute_gen, t_mem_gen)
# --- Result Formatting ---
server_name = gpu_spec.get("recommended_server", "Contact Lenovo Support")
if num_gpus > 8:
server_name += " (Requires Multi-Node Clustering)"
warnings = []
if not using_nvlink and num_gpus > 1:
warnings.append(
f"⚠️ No NVLink: Effective Bandwidth capped at {gpu_spec['bandwidth_gb_s']} GB/s. High latency penalty."
)
if itl > 0.150:
warnings.append(
f"⚠️ High Latency: ITL is {itl * 1000:.0f}ms (>150ms)."
)
if t_rag_processing > 0.5:
warnings.append(
f"⚠️ High RAG Latency: Reranking is adding {t_rag_processing * 1000:.0f}ms to TTFT."
)
if analyzer.is_moe:
warnings.append(
f"ℹ️ MoE Model: Active params {analyzer.active_params / 1e9:.1f}B used for compute."
)
if rag_enabled:
warnings.append(
f"ℹ️ RAG Enabled: Allocating {mem_rag / (1024**3):.1f}GB for Models (Embed+Rerank)."
)
# Chart (Per GPU)
overhead_bytes = raw_total_mem * (gpu_overhead_pct / 100)
fig = create_mem_chart_per_gpu(
mem_weights,
mem_rag,
total_dynamic,
overhead_bytes,
gpu_mem_capacity,
num_gpus,
)
# Textual memory breakdown for accessibility (WCAG 1.1.1 - Text Alternatives)
w_per_gb = (mem_weights / num_gpus) / (1024**3)
r_per_gb = (mem_rag / num_gpus) / (1024**3)
d_per_gb = (total_dynamic / num_gpus) / (1024**3)
o_per_gb = (overhead_bytes / num_gpus) / (1024**3)
cap_gb = gpu_mem_capacity / (1024**3)
used_gb = w_per_gb + r_per_gb + d_per_gb + o_per_gb
free_gb = max(0, cap_gb - used_gb)
total_used_pct = (used_gb / cap_gb * 100) if cap_gb > 0 else 0
# Calculate percentages for display
w_pct = (w_per_gb / cap_gb * 100) if cap_gb > 0 else 0
r_pct = (r_per_gb / cap_gb * 100) if cap_gb > 0 else 0
d_pct = (d_per_gb / cap_gb * 100) if cap_gb > 0 else 0
o_pct = (o_per_gb / cap_gb * 100) if cap_gb > 0 else 0
free_pct = (free_gb / cap_gb * 100) if cap_gb > 0 else 0
mem_text_alt = (
f"Per-GPU Memory Breakdown (Total Capacity: {cap_gb:.0f} GB):\n"
f"• Weights: {w_per_gb:.1f} GB ({w_pct:.1f}%) - Model parameters stored in memory. Fixed size based on model architecture and quantization.\n"
f"• RAG Models: {r_per_gb:.1f} GB ({r_pct:.1f}%) - Embedding and reranker models. Only allocated if RAG is enabled.\n"
f"• Dynamic (KV+Act): {d_per_gb:.1f} GB ({d_pct:.1f}%) - KV cache and activation buffers. Grows with concurrent users, input context length, and output tokens.\n"
f"• Overhead: {o_per_gb:.1f} GB ({o_pct:.1f}%) - CUDA context, memory fragmentation, and system buffers. Configurable percentage of total memory.\n"
f"• Free: {free_gb:.1f} GB ({free_pct:.1f}%) - Available memory headroom for additional operations."
)
return (
f"{analyzer.total_params / 1e9:.1f}B (Active: {analyzer.active_params / 1e9:.1f}B)",
f"{total_mem_required / (1024**3):.1f} GB",
num_gpus,
f"{ttft * 1000:.0f} ms",
f"{itl * 1000:.0f} ms",
server_name,
"\n".join(warnings) if warnings else "No warnings.",
fig,
mem_text_alt,
)
def create_mem_chart_per_gpu(
weights, rag, dynamic, overhead, single_gpu_cap, num_gpus
):
# Normalize to Per-GPU view
w_per = (weights / num_gpus) / (1024**3)
r_per = (rag / num_gpus) / (1024**3)
d_per = (dynamic / num_gpus) / (1024**3)
o_per = (overhead / num_gpus) / (1024**3)
cap_gb = single_gpu_cap / (1024**3)
used = w_per + r_per + d_per + o_per
free = max(0, cap_gb - used)
# Modern, accessible color palette (WCAG AA compliant)
labels = ["Weights", "RAG Models", "Dynamic (KV+Act)", "Overhead", "Free (Per GPU)"]
values = [w_per, r_per, d_per, o_per, free]
# Filter out zero values for cleaner chart
clean_labels = []
clean_values = []
colors_full = ["#4A90E2", "#10b981", "#8b5cf6", "#f59e0b", "#BDC3C7"]
clean_colors = []
for i, val in enumerate(values):
if val > 0.05: # Only show if > 50MB
clean_labels.append(labels[i])
clean_values.append(val)
clean_colors.append(colors_full[i])
# Professional color palette: Blue, Green, Purple, Orange, Gray
colors = clean_colors if clean_colors else colors_full[: len(clean_values)]
# Calculate percentages for hover text
total = sum(clean_values) if clean_values else sum(values)
percentages = [
(v / total * 100) if total > 0 else 0
for v in (clean_values if clean_values else values)
]
# Create hover text with detailed information
display_labels = clean_labels if clean_labels else labels
display_values = clean_values if clean_values else values
hover_texts = [
f"{display_labels[i]}
"
f"Value: {display_values[i]:.1f} GB
"
f"Percentage: {percentages[i]:.1f}%
"
f"Capacity: {cap_gb:.0f} GB"
for i in range(len(display_labels))
]
# Create donut chart using plotly
fig = go.Figure(
data=[
go.Pie(
labels=display_labels,
values=display_values,
hole=0.5, # Creates the donut (hole in the middle)
marker=dict(colors=colors, line=dict(color="#FFFFFF", width=2)),
textinfo="label+percent",
textposition="outside",
hovertemplate="%{hovertext}",
hovertext=hover_texts,
)
]
)
# Update layout for better appearance
fig.update_layout(
title={
"text": f"Per-GPU Memory Usage (Capacity: {cap_gb:.0f} GB)",
"x": 0.5,
"xanchor": "center",
"font": {"size": 16, "family": "Arial, sans-serif"},
},
showlegend=False,
font=dict(family="Arial, sans-serif", size=12),
margin=dict(l=20, r=20, t=50, b=20),
height=500,
)
return fig
def error_result(msg):
# Create an empty plotly figure for error state
empty_fig = go.Figure()
empty_fig.add_annotation(
text="Error: Unable to generate chart",
xref="paper",
yref="paper",
x=0.5,
y=0.5,
showarrow=False,
font=dict(size=14),
)
empty_fig.update_layout(
title="Memory Breakdown",
height=500,
showlegend=False,
)
return (
"Error",
"Error",
0,
"-",
"-",
"Check Inputs",
f"Error: {msg}",
empty_fig,
"Memory breakdown not available due to calculation error.",
)
# --- UI Setup ---
# Custom CSS for better font rendering
custom_css = """
* {
font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', 'Roboto', 'Oxygen', 'Ubuntu', 'Cantarell', 'Fira Sans', 'Droid Sans', 'Helvetica Neue', sans-serif !important;
-webkit-font-smoothing: antialiased;
-moz-osx-font-smoothing: grayscale;
}
"""
with gr.Blocks(title="GPUguesstimator") as demo:
gr.Markdown(
"""
# GPUguesstimator
Physics-based sizing tool for calculating VRAM requirements, compute capacity, and interconnect bottlenecks for Large Language Model inference.
"""
)
with gr.Row():
with gr.Column():
gr.Markdown("## Workload Configuration")
model_keys = list(MODELS_DB.keys())
model_dd = gr.Dropdown(
choices=model_keys + ["Custom"],
value=model_keys[0] if model_keys else "Custom",
label="Model Preset",
info="Select a preset model or choose Custom to enter a HuggingFace repository ID",
)
repo_input = gr.Textbox(
label="HuggingFace Repository ID",
value=model_keys[0] if model_keys else "",
placeholder="e.g., meta-llama/Meta-Llama-3-70B-Instruct",
info="Enter the HuggingFace model repository identifier",
)
hf_token = gr.Textbox(
label="HuggingFace Token (Optional)",
type="password",
info="Required for accessing gated models. Leave empty for public models.",
)
users = gr.Slider(
1,
500,
value=50,
step=1,
label="Concurrent Users",
info="Number of simultaneous inference requests to handle",
)
ctx_in = gr.Slider(
128,
128000,
value=1024,
step=128,
label="Input Context Length (Tokens)",
info="Maximum number of input tokens per request",
)
ctx_out = gr.Slider(
128,
16384,
value=256,
step=128,
label="Output Tokens (Generation Length)",
info="Maximum number of tokens to generate per request",
)
with gr.Group():
gr.Markdown("#### Retrieval Augmented Generation (RAG)")
rag_chk = gr.Checkbox(
label="Enable RAG Pipeline", value=False
)
with gr.Row():
rag_model_dd = gr.Dropdown(
choices=list(EMBEDDING_MODELS.keys()),
value="Standard (MPNet-Base/BGE-Base) ~0.6GB",
label="Embedding Model",
interactive=True,
)
rerank_model_dd = gr.Dropdown(
choices=list(RERANKER_MODELS.keys()),
value="None (Skip Reranking)",
label="Reranker Model",
interactive=True,
)
gr.Markdown("## Infrastructure Configuration")
gpu_keys = list(HARDWARE_DB.keys())
default_gpu = gpu_keys[0] if gpu_keys else "NVIDIA H100-80GB SXM5"
gpu_select = gr.Dropdown(
choices=gpu_keys,
value=default_gpu,
label="GPU Model",
info="Select the GPU model for inference",
)
conn_select = gr.Dropdown(
choices=["Auto", "NVLink", "PCIe / Standard"],
value="Auto",
label="Interconnect Type",
info="Auto uses GPU default, NVLink for high-bandwidth, PCIe for standard connections",
)
quant_select = gr.Dropdown(
choices=["FP16/BF16", "INT8", "FP4"],
value="FP16/BF16",
label="Quantization Precision",
info="Model weight precision: FP16/BF16 (standard), INT8 (8-bit), FP4 (4-bit, requires Blackwell)",
)
overhead_slider = gr.Slider(
0,
50,
value=20,
step=5,
label="GPU Memory Overhead %",
info="Additional memory overhead percentage for CUDA context, fragmentation, and system buffers",
)
btn = gr.Button("Calculate Sizing", variant="primary", size="lg")
with gr.Column():
gr.Markdown("## Sizing Results")
with gr.Group():
res_gpus = gr.Number(
label="GPUs Required",
precision=0,
info="Minimum number of GPUs needed to fit the model and workload",
)
res_server = gr.Textbox(
label="Recommended Lenovo Server",
info="Suggested Lenovo server configuration",
)
res_vram = gr.Textbox(
label="Total VRAM Required",
info="Total video memory needed across all GPUs",
)
res_params = gr.Textbox(
label="Model Parameters",
info="Total number of model parameters in billions",
)
with gr.Row():
res_ttft = gr.Textbox(
label="TTFT - Time to First Token (Prefill latency)",
info="time to process input and generate first token",
)
res_itl = gr.Textbox(
label="ITL - Inter-Token Latency",
info="time between each generated token",
)
res_warnings = gr.Textbox(
label="Analysis Notes and Warnings",
lines=4,
info="Important notes, warnings, and recommendations about the configuration",
)
plot_output = gr.Plot(label="Per-GPU Memory Breakdown Chart")
mem_text_alt = gr.Textbox(
label="Memory Breakdown (Text Description)",
info="Textual description of memory allocation for screen readers and accessibility",
lines=6,
)
def update_repo(choice):
return choice if choice != "Custom" else ""
model_dd.change(update_repo, model_dd, repo_input)
btn.click(
calculate_dimensioning,
inputs=[
repo_input,
hf_token,
gpu_select,
conn_select,
users,
ctx_in,
ctx_out,
quant_select,
overhead_slider,
rag_chk,
rag_model_dd,
rerank_model_dd,
],
outputs=[
res_params,
res_vram,
res_gpus,
res_ttft,
res_itl,
res_server,
res_warnings,
plot_output,
mem_text_alt,
],
)
if __name__ == "__main__":
demo.launch(theme=gr.themes.Soft(), css=custom_css)