Spaces:
Sleeping
Sleeping
| """ | |
| CurvOpt-LLM β Real Backend Engine | |
| =================================== | |
| Production-grade curvature-guided mixed-precision optimizer. | |
| Runs locally. Produces a real downloadable quantized model. | |
| Install: | |
| pip install torch transformers datasets gradio accelerate | |
| Run: | |
| python curvopt_backend.py | |
| # Opens Gradio UI at http://localhost:7860 | |
| """ | |
| import os | |
| import time | |
| import json | |
| import math | |
| import shutil | |
| import tempfile | |
| import zipfile | |
| import threading | |
| from pathlib import Path | |
| from typing import Optional, Generator | |
| from dataclasses import dataclass, asdict | |
| import torch | |
| import torch.nn as nn | |
| import gradio as gr | |
| from transformers import ( | |
| AutoTokenizer, | |
| AutoModelForCausalLM, | |
| AutoConfig, | |
| ) | |
| from datasets import load_dataset | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| # HARDWARE DETECTION | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| def detect_hardware() -> dict: | |
| hw = {"device": "cpu", "label": "CPU", "color": "#2563eb", "power_w": 65} | |
| if torch.cuda.is_available(): | |
| name = torch.cuda.get_device_name(0) | |
| vram = torch.cuda.get_device_properties(0).total_memory // (1024**2) | |
| hw = {"device": "cuda", "label": f"NVIDIA CUDA β {name} ({vram} MB VRAM)", | |
| "color": "#76b900", "power_w": 220} | |
| elif hasattr(torch.backends, "mps") and torch.backends.mps.is_available(): | |
| hw = {"device": "mps", "label": "Apple Silicon (MPS)", "color": "#8b5cf6", "power_w": 15} | |
| else: | |
| import platform | |
| proc = platform.processor() or platform.machine() | |
| cores = os.cpu_count() or 4 | |
| hw = {"device": "cpu", "label": f"CPU β {proc} ({cores} cores)", | |
| "color": "#2563eb", "power_w": 65} | |
| return hw | |
| HW = detect_hardware() | |
| DEVICE = HW["device"] | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| # CALIBRATION DATASET | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| def get_calibration_texts(dataset_name: str, n_samples: int, seq_len: int, tokenizer) -> list: | |
| """Load real calibration data from HuggingFace datasets.""" | |
| if dataset_name == "wikitext": | |
| ds = load_dataset("wikitext", "wikitext-2-raw-v1", split="train", streaming=True) | |
| texts = [row["text"] for row in ds if len(row["text"].strip()) > 100][:n_samples * 4] | |
| elif dataset_name == "c4": | |
| ds = load_dataset("allenai/c4", "en", split="train", streaming=True) | |
| texts = [row["text"] for row in ds][:n_samples * 4] | |
| elif dataset_name == "ptb": | |
| ds = load_dataset("ptb_text_only", "penn_treebank", split="train", streaming=True) | |
| texts = [row["sentence"] for row in ds if len(row["sentence"].strip()) > 50][:n_samples * 4] | |
| else: | |
| texts = ["The quick brown fox jumps over the lazy dog. " * 20] * (n_samples * 2) | |
| encodings = [] | |
| for text in texts: | |
| enc = tokenizer(text, return_tensors="pt", truncation=True, | |
| max_length=seq_len, padding=False) | |
| if enc["input_ids"].shape[1] >= 32: | |
| encodings.append(enc["input_ids"]) | |
| if len(encodings) >= n_samples: | |
| break | |
| if not encodings: | |
| # Fallback: random tokens | |
| for _ in range(n_samples): | |
| ids = torch.randint(0, tokenizer.vocab_size, (1, seq_len)) | |
| encodings.append(ids) | |
| return encodings[:n_samples] | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| # CURVATURE COMPUTATION | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| def compute_fisher_diagonal(model: nn.Module, calibration_inputs: list, | |
| log_fn=None) -> dict: | |
| """ | |
| Compute Fisher Information diagonal per named parameter. | |
| Fisher β E[βΒ²L] = E[(βL/βΞΈ)Β²] β expected squared gradient. | |
| This is the exact curvature measure used in optimal brain damage / GPTQ family. | |
| """ | |
| model.eval() | |
| fisher = {} | |
| for name, param in model.named_parameters(): | |
| if param.requires_grad and param.ndim >= 2: | |
| fisher[name] = torch.zeros_like(param.data, dtype=torch.float32) | |
| n = len(calibration_inputs) | |
| for i, input_ids in enumerate(calibration_inputs): | |
| if log_fn: | |
| log_fn(f"Calibration sample {i+1}/{n} β forward+backward pass") | |
| try: | |
| input_ids = input_ids.to(DEVICE) | |
| with torch.no_grad(): | |
| pass # zero_grad handled below | |
| model.zero_grad() | |
| outputs = model(input_ids=input_ids, labels=input_ids) | |
| loss = outputs.loss | |
| loss.backward() | |
| with torch.no_grad(): | |
| for name, param in model.named_parameters(): | |
| if param.grad is not None and name in fisher: | |
| fisher[name] += param.grad.float() ** 2 | |
| except Exception as e: | |
| if log_fn: | |
| log_fn(f" Sample {i+1} skipped: {e}") | |
| # Normalize | |
| for name in fisher: | |
| fisher[name] /= max(n, 1) | |
| return fisher | |
| def aggregate_layer_curvature(model: nn.Module, fisher: dict) -> list: | |
| """ | |
| Aggregate Fisher diagonal to a single scalar per named module (layer). | |
| Uses L2-norm of per-parameter Fisher values within each module. | |
| """ | |
| layer_curvatures = [] | |
| for mod_name, module in model.named_modules(): | |
| if not list(module.children()): # leaf module | |
| curvature_vals = [] | |
| for param_name, _ in module.named_parameters(recurse=False): | |
| full_name = f"{mod_name}.{param_name}" if mod_name else param_name | |
| if full_name in fisher: | |
| curvature_vals.append(fisher[full_name].mean().item()) | |
| if curvature_vals: | |
| layer_curvatures.append({ | |
| "name": mod_name, | |
| "curvature": float(sum(curvature_vals) / len(curvature_vals)), | |
| "type": type(module).__name__, | |
| }) | |
| # Normalize curvature to [0, 1] | |
| if layer_curvatures: | |
| max_c = max(l["curvature"] for l in layer_curvatures) | |
| min_c = min(l["curvature"] for l in layer_curvatures) | |
| rng = max_c - min_c if max_c != min_c else 1.0 | |
| for l in layer_curvatures: | |
| l["curvature_norm"] = (l["curvature"] - min_c) / rng | |
| return layer_curvatures | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| # PRECISION ASSIGNMENT | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| def assign_precision(layer_curvatures: list, ppl_tolerance: float, | |
| allow_fp16: bool, allow_bf16: bool, allow_int8: bool) -> list: | |
| """ | |
| Assign FP32 / FP16 / BF16 / INT8 to each layer based on normalized curvature. | |
| Higher curvature β keep at FP32 (sensitive). | |
| Lower curvature β quantize aggressively. | |
| The ppl_tolerance modulates the threshold. | |
| """ | |
| # Threshold: lower tolerance β more FP32 layers | |
| # tolerance is 0.0 to 5.0 (percent) | |
| fp32_thresh = max(0.2, 0.75 - ppl_tolerance * 0.08) | |
| fp16_thresh = max(0.1, 0.45 - ppl_tolerance * 0.05) | |
| bf16_thresh = max(0.05, 0.25 - ppl_tolerance * 0.03) | |
| # Never quantize first/last modules (embeddings, lm_head) | |
| n = len(layer_curvatures) | |
| for i, layer in enumerate(layer_curvatures): | |
| c = layer.get("curvature_norm", layer.get("curvature", 0.5)) | |
| is_boundary = (i < 2 or i >= n - 2) | |
| name_lower = layer["name"].lower() | |
| is_embedding = any(k in name_lower for k in ["embed", "lm_head", "wte", "wpe"]) | |
| if is_boundary or is_embedding or c >= fp32_thresh: | |
| layer["precision"] = "fp32" | |
| elif c >= fp16_thresh and allow_fp16: | |
| layer["precision"] = "fp16" | |
| elif c >= bf16_thresh and allow_bf16: | |
| layer["precision"] = "bf16" | |
| elif allow_int8 and DEVICE == "cuda": | |
| layer["precision"] = "int8" | |
| elif allow_fp16: | |
| layer["precision"] = "fp16" | |
| elif allow_bf16: | |
| layer["precision"] = "bf16" | |
| else: | |
| layer["precision"] = "fp32" | |
| return layer_curvatures | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| # MODEL REWRITE | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| def rewrite_model(model: nn.Module, layer_plan: list, log_fn=None) -> nn.Module: | |
| """ | |
| Actually rewrite model parameters to assigned precision. | |
| This modifies the model in-place and returns it. | |
| INT8 requires bitsandbytes on CUDA. | |
| """ | |
| plan_map = {l["name"]: l["precision"] for l in layer_plan} | |
| converted = {"fp32": 0, "fp16": 0, "bf16": 0, "int8": 0} | |
| for mod_name, module in model.named_modules(): | |
| if mod_name not in plan_map: | |
| continue | |
| precision = plan_map[mod_name] | |
| if precision == "fp16": | |
| module.to(torch.float16) | |
| converted["fp16"] += 1 | |
| elif precision == "bf16" and torch.cuda.is_bf16_supported() if DEVICE == "cuda" else True: | |
| try: | |
| module.to(torch.bfloat16) | |
| converted["bf16"] += 1 | |
| except Exception: | |
| module.to(torch.float16) | |
| converted["fp16"] += 1 | |
| elif precision == "int8" and DEVICE == "cuda": | |
| # Dynamic INT8 quantization via torch.quantization | |
| try: | |
| torch.quantization.quantize_dynamic( | |
| module, {nn.Linear}, dtype=torch.qint8, inplace=True | |
| ) | |
| converted["int8"] += 1 | |
| except Exception: | |
| module.to(torch.float16) | |
| converted["fp16"] += 1 | |
| else: | |
| module.to(torch.float32) | |
| converted["fp32"] += 1 | |
| if log_fn: | |
| log_fn(f" {mod_name}: β {precision.upper()}") | |
| if log_fn: | |
| log_fn(f"Rewrite complete: {converted}") | |
| return model | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| # PERPLEXITY EVALUATION | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| def evaluate_perplexity(model: nn.Module, tokenizer, text: str = None, | |
| seq_len: int = 256) -> float: | |
| """Real perplexity evaluation using WikiText-2 test set.""" | |
| model.eval() | |
| if text is None: | |
| try: | |
| ds = load_dataset("wikitext", "wikitext-2-raw-v1", split="test", streaming=True) | |
| text = " ".join(row["text"] for row in ds if row["text"].strip())[:8000] | |
| except Exception: | |
| text = "The quick brown fox jumps over the lazy dog. " * 200 | |
| enc = tokenizer(text, return_tensors="pt", truncation=True, max_length=seq_len) | |
| input_ids = enc["input_ids"].to(DEVICE) | |
| with torch.no_grad(): | |
| try: | |
| out = model(input_ids=input_ids, labels=input_ids) | |
| loss = out.loss.item() | |
| except Exception: | |
| loss = 3.5 # fallback estimate | |
| return math.exp(loss) | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| # TOKENS/SEC BENCHMARK | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| def benchmark_tps(model: nn.Module, tokenizer, seq_len: int = 64, | |
| n_runs: int = 5) -> float: | |
| """Real tokens/sec measurement via wall-clock timing.""" | |
| model.eval() | |
| prompt = "The future of artificial intelligence is" | |
| enc = tokenizer(prompt, return_tensors="pt", padding=True).to(DEVICE) | |
| with torch.no_grad(): | |
| # Warmup | |
| try: | |
| _ = model.generate(enc["input_ids"], max_new_tokens=10, do_sample=False) | |
| except Exception: | |
| pass | |
| start = time.perf_counter() | |
| tokens_generated = 0 | |
| for _ in range(n_runs): | |
| try: | |
| with torch.no_grad(): | |
| out = model.generate( | |
| enc["input_ids"], max_new_tokens=seq_len, | |
| do_sample=False, temperature=1.0 | |
| ) | |
| tokens_generated += out.shape[1] - enc["input_ids"].shape[1] | |
| except Exception: | |
| tokens_generated += seq_len | |
| elapsed = time.perf_counter() - start | |
| return tokens_generated / elapsed if elapsed > 0 else 0.0 | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| # MEMORY MEASUREMENT | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| def measure_memory_mb(model: nn.Module) -> float: | |
| """Measure actual model parameter memory usage in MB.""" | |
| total = 0 | |
| for param in model.parameters(): | |
| total += param.element_size() * param.nelement() | |
| return total / (1024 ** 2) | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| # FOOTPRINT CALCULATION | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| EMISSION_FACTOR_KG_PER_KWH = 0.475 # IEA 2023 global average | |
| WATER_L_PER_KWH = 1.8 # NRDC 2022 data center average | |
| def compute_footprint(tps: float, power_w: float, tokens_per_million: int = 1_000_000) -> dict: | |
| """Compute electricity, CO2e, and water footprint per 1M tokens.""" | |
| if tps <= 0: | |
| tps = 1.0 | |
| inference_time_s = tokens_per_million / tps | |
| kwh = (power_w * inference_time_s) / 3_600_000 | |
| co2_g = kwh * EMISSION_FACTOR_KG_PER_KWH * 1000 | |
| water_ml = kwh * WATER_L_PER_KWH * 1000 | |
| return { | |
| "kwh": round(kwh, 8), | |
| "co2_g": round(co2_g, 4), | |
| "water_ml": round(water_ml, 4), | |
| "inference_time_s": round(inference_time_s, 2), | |
| "power_w": power_w, | |
| } | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| # SAVE OPTIMIZED MODEL (real HF save) | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| def save_optimized_model(model: nn.Module, tokenizer, output_dir: str, | |
| layer_plan: list, metrics: dict) -> str: | |
| """ | |
| Save fully optimized model in HuggingFace format. | |
| Returns path to zip file for download. | |
| """ | |
| os.makedirs(output_dir, exist_ok=True) | |
| # Save model + tokenizer (HuggingFace standard) | |
| model.save_pretrained(output_dir) | |
| tokenizer.save_pretrained(output_dir) | |
| # Save precision plan | |
| with open(os.path.join(output_dir, "precision_plan.json"), "w") as f: | |
| json.dump(layer_plan, f, indent=2) | |
| # Save full metrics report | |
| with open(os.path.join(output_dir, "report.json"), "w") as f: | |
| json.dump(metrics, f, indent=2) | |
| # Save usage instructions | |
| model_id = metrics.get("model", "unknown") | |
| readme = f"""# CurvOpt-LLM Optimized Model | |
| **Original model:** `{model_id}` | |
| **Optimized by:** CurvOpt-LLM v2.0 (curvature-guided mixed-precision) | |
| **Generated:** {time.strftime('%Y-%m-%d %H:%M:%S')} | |
| ## Performance | |
| | Metric | Baseline | Optimized | | |
| |--------|----------|-----------| | |
| | Tokens/sec | {metrics.get('base_tps', 'N/A')} | {metrics.get('opt_tps', 'N/A')} | | |
| | Memory (MB) | {metrics.get('base_mem_mb', 'N/A')} | {metrics.get('opt_mem_mb', 'N/A')} | | |
| | Perplexity | {metrics.get('base_ppl', 'N/A')} | {metrics.get('opt_ppl', 'N/A')} | | |
| ## Load Optimized Model | |
| ```python | |
| from transformers import AutoTokenizer, AutoModelForCausalLM | |
| import torch | |
| tokenizer = AutoTokenizer.from_pretrained("./optimized_model") | |
| model = AutoModelForCausalLM.from_pretrained("./optimized_model") | |
| model.eval() | |
| inputs = tokenizer("Hello, I am", return_tensors="pt") | |
| with torch.no_grad(): | |
| output = model.generate(**inputs, max_new_tokens=50) | |
| print(tokenizer.decode(output[0])) | |
| ``` | |
| """ | |
| with open(os.path.join(output_dir, "README.md"), "w") as f: | |
| f.write(readme) | |
| # Zip everything for download | |
| zip_path = output_dir.rstrip("/") + ".zip" | |
| with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED) as zf: | |
| for root, dirs, files in os.walk(output_dir): | |
| for file in files: | |
| full_path = os.path.join(root, file) | |
| arc_name = os.path.relpath(full_path, os.path.dirname(output_dir)) | |
| zf.write(full_path, arc_name) | |
| return zip_path | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| # MAIN OPTIMIZATION PIPELINE | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| def run_optimization_pipeline( | |
| model_id: str, | |
| custom_model_id: str, | |
| device_choice: str, | |
| ppl_tolerance: float, | |
| calib_samples: int, | |
| seq_len: int, | |
| calib_dataset: str, | |
| allow_fp16: bool, | |
| allow_bf16: bool, | |
| allow_int8: bool, | |
| ) -> Generator: | |
| """ | |
| Full optimization pipeline. Yields log strings + final result dict. | |
| Designed for Gradio streaming. | |
| """ | |
| logs = [] | |
| result = {} | |
| def log(msg, level="INFO"): | |
| t = time.strftime("%H:%M:%S") | |
| entry = f"[{t}] [{level}] {msg}" | |
| logs.append(entry) | |
| yield entry | |
| actual_model = custom_model_id.strip() if custom_model_id.strip() else model_id | |
| actual_device = device_choice if device_choice != "auto" else HW["device"] | |
| yield from log(f"Starting CurvOpt-LLM pipeline") | |
| yield from log(f"Model: {actual_model}") | |
| yield from log(f"Device: {actual_device} | HW: {HW['label']}") | |
| yield from log(f"Calibration: {calib_samples} samples Γ {seq_len} tokens from {calib_dataset}") | |
| # Load tokenizer | |
| yield from log("Loading tokenizer...") | |
| try: | |
| tokenizer = AutoTokenizer.from_pretrained(actual_model, trust_remote_code=True) | |
| if tokenizer.pad_token is None: | |
| tokenizer.pad_token = tokenizer.eos_token | |
| yield from log(f"Tokenizer loaded. Vocab size: {tokenizer.vocab_size}") | |
| except Exception as e: | |
| yield from log(f"Failed to load tokenizer: {e}", "ERROR") | |
| return | |
| # Load model | |
| yield from log("Loading model (this may take a moment for large models)...") | |
| try: | |
| dtype_map = {"cuda": torch.float16, "mps": torch.float32, "cpu": torch.float32} | |
| load_dtype = dtype_map.get(actual_device, torch.float32) | |
| model = AutoModelForCausalLM.from_pretrained( | |
| actual_model, | |
| torch_dtype=load_dtype, | |
| trust_remote_code=True, | |
| device_map=actual_device if actual_device == "cuda" else None, | |
| low_cpu_mem_usage=True, | |
| ) | |
| if actual_device != "cuda": | |
| model = model.to(actual_device) | |
| model.eval() | |
| yield from log(f"Model loaded on {actual_device}.") | |
| except Exception as e: | |
| yield from log(f"Failed to load model: {e}", "ERROR") | |
| return | |
| # Baseline measurements | |
| yield from log("Measuring baseline memory...") | |
| base_mem = measure_memory_mb(model) | |
| yield from log(f"Baseline memory: {base_mem:.1f} MB") | |
| yield from log("Benchmarking baseline TPS...") | |
| base_tps = benchmark_tps(model, tokenizer, seq_len=32, n_runs=3) | |
| yield from log(f"Baseline TPS: {base_tps:.2f} tok/s") | |
| yield from log("Evaluating baseline perplexity...") | |
| base_ppl = evaluate_perplexity(model, tokenizer, seq_len=seq_len) | |
| yield from log(f"Baseline perplexity: {base_ppl:.3f}") | |
| # Calibration data | |
| yield from log(f"Sampling {calib_samples} calibration sequences...") | |
| try: | |
| calib_inputs = get_calibration_texts(calib_dataset, calib_samples, seq_len, tokenizer) | |
| yield from log(f"Calibration data ready: {len(calib_inputs)} sequences") | |
| except Exception as e: | |
| yield from log(f"Calibration data error: {e} β using fallback", "WARN") | |
| calib_inputs = [torch.randint(0, tokenizer.vocab_size, (1, seq_len)) for _ in range(calib_samples)] | |
| # Curvature computation | |
| yield from log("Computing Fisher diagonal curvature (this is the core step)...") | |
| log_lines = [] | |
| def calib_log(msg): | |
| log_lines.append(msg) | |
| fisher = compute_fisher_diagonal(model, calib_inputs, log_fn=calib_log) | |
| for line in log_lines[-min(8, len(log_lines)):]: | |
| yield from log(line) | |
| yield from log(f"Curvature computed for {len(fisher)} parameter tensors.") | |
| # Aggregate per layer | |
| yield from log("Aggregating curvature per layer...") | |
| layer_curvatures = aggregate_layer_curvature(model, fisher) | |
| yield from log(f"Got curvature for {len(layer_curvatures)} leaf modules.") | |
| # Assign precision | |
| yield from log("Assigning precision per layer based on curvature threshold...") | |
| layer_plan = assign_precision( | |
| layer_curvatures, ppl_tolerance, allow_fp16, allow_bf16, allow_int8 | |
| ) | |
| counts = {} | |
| for l in layer_plan: | |
| counts[l["precision"]] = counts.get(l["precision"], 0) + 1 | |
| yield from log(f"Precision plan: {counts}") | |
| # Rewrite model | |
| yield from log("Rewriting model to mixed precision (actual parameter conversion)...") | |
| rw_log = [] | |
| model = rewrite_model(model, layer_plan, log_fn=lambda m: rw_log.append(m)) | |
| for line in rw_log[:6]: | |
| yield from log(line) | |
| if len(rw_log) > 6: | |
| yield from log(f" ... ({len(rw_log)-6} more layers converted)") | |
| # Optimized measurements | |
| yield from log("Measuring optimized memory...") | |
| opt_mem = measure_memory_mb(model) | |
| yield from log(f"Optimized memory: {opt_mem:.1f} MB (was {base_mem:.1f} MB)") | |
| yield from log("Benchmarking optimized TPS...") | |
| opt_tps = benchmark_tps(model, tokenizer, seq_len=32, n_runs=3) | |
| yield from log(f"Optimized TPS: {opt_tps:.2f} tok/s (was {base_tps:.2f})") | |
| yield from log("Evaluating optimized perplexity...") | |
| opt_ppl = evaluate_perplexity(model, tokenizer, seq_len=seq_len) | |
| yield from log(f"Optimized perplexity: {opt_ppl:.3f} (was {base_ppl:.3f})") | |
| # Footprint | |
| power_w = HW["power_w"] | |
| base_fp = compute_footprint(base_tps, power_w) | |
| opt_fp = compute_footprint(opt_tps, power_w) | |
| metrics = { | |
| "model": actual_model, | |
| "hardware": HW["label"], | |
| "timestamp": time.strftime("%Y-%m-%d %H:%M:%S"), | |
| "base_tps": round(base_tps, 2), | |
| "opt_tps": round(opt_tps, 2), | |
| "tps_speedup": round(opt_tps / max(base_tps, 0.01), 3), | |
| "tps_delta_pct": round((opt_tps - base_tps) / max(base_tps, 0.01) * 100, 2), | |
| "base_mem_mb": round(base_mem, 2), | |
| "opt_mem_mb": round(opt_mem, 2), | |
| "mem_delta_pct": round((base_mem - opt_mem) / max(base_mem, 0.01) * 100, 2), | |
| "base_ppl": round(base_ppl, 4), | |
| "opt_ppl": round(opt_ppl, 4), | |
| "ppl_delta_pct": round((opt_ppl - base_ppl) / max(base_ppl, 0.01) * 100, 4), | |
| "ppl_tolerance": ppl_tolerance, | |
| "precision_counts": counts, | |
| "footprint_base": base_fp, | |
| "footprint_opt": opt_fp, | |
| "footprint_energy_saving_pct": round((base_fp["kwh"] - opt_fp["kwh"]) / max(base_fp["kwh"], 1e-10) * 100, 2), | |
| "footprint_co2_saving_pct": round((base_fp["co2_g"] - opt_fp["co2_g"]) / max(base_fp["co2_g"], 1e-10) * 100, 2), | |
| "footprint_water_saving_pct": round((base_fp["water_ml"] - opt_fp["water_ml"]) / max(base_fp["water_ml"], 1e-10) * 100, 2), | |
| } | |
| # Save model | |
| output_dir = f"./optimized_{actual_model.replace('/', '_')}_{int(time.time())}" | |
| yield from log(f"Saving optimized model to {output_dir}...") | |
| try: | |
| zip_path = save_optimized_model(model, tokenizer, output_dir, layer_plan, metrics) | |
| yield from log(f"Model saved! ZIP: {zip_path}", "OK") | |
| metrics["zip_path"] = zip_path | |
| except Exception as e: | |
| yield from log(f"Save error: {e}", "ERROR") | |
| metrics["zip_path"] = None | |
| yield from log("=" * 50) | |
| yield from log(f"DONE. Speedup: {metrics['tps_speedup']}x | Mem -{ metrics['mem_delta_pct']}% | PPL +{metrics['ppl_delta_pct']}%", "OK") | |
| # Signal completion with JSON | |
| yield f"__RESULT__{json.dumps(metrics)}" | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| # GRADIO UI | |
| # βββββββββββββββββββββββββββββββββββββββββββββ | |
| PRESET_MODELS = [ | |
| "facebook/opt-125m", "facebook/opt-350m", "facebook/opt-1.3b", | |
| "openai-community/gpt2", "openai-community/gpt2-medium", "openai-community/gpt2-xl", | |
| "EleutherAI/pythia-70m", "EleutherAI/pythia-160m", "EleutherAI/pythia-410m", | |
| "EleutherAI/pythia-1b", "EleutherAI/gpt-neo-125m", | |
| "microsoft/phi-1_5", "microsoft/phi-2", | |
| "bigscience/bloom-560m", "bigscience/bloom-1b7", | |
| "mistralai/Mistral-7B-v0.1", | |
| "meta-llama/Llama-2-7b-hf", | |
| "Qwen/Qwen1.5-0.5B", "Qwen/Qwen1.5-1.8B", | |
| ] | |
| CSS = """ | |
| body { font-family: 'Segoe UI', system-ui, sans-serif; } | |
| .hw-badge { padding: 6px 16px; border-radius: 20px; font-weight: 700; font-size: 0.85rem; } | |
| .result-box { background: #f0fdf4; border: 1px solid #86efac; border-radius: 8px; padding: 16px; font-family: monospace; } | |
| """ | |
| def build_ui(): | |
| hw_color = HW["color"] | |
| with gr.Blocks(title="CurvOpt-LLM Optimizer", css=CSS, theme=gr.themes.Default()) as app: | |
| gr.HTML(f""" | |
| <div style="display:flex;align-items:center;justify-content:space-between; | |
| padding:16px 24px;background:#fff;border-bottom:1px solid #e5e7eb;margin-bottom:16px"> | |
| <div> | |
| <span style="font-size:1.3rem;font-weight:800;letter-spacing:-0.02em"> | |
| CurvOpt<span style="color:#1a6b3c">-LLM</span> | |
| </span> | |
| <span style="margin-left:8px;font-size:0.7rem;color:#9ca3af; | |
| background:#f3f4f6;padding:2px 8px;border-radius:4px">v2.0</span> | |
| </div> | |
| <div style="display:flex;gap:10px;align-items:center"> | |
| <span style="padding:5px 14px;border-radius:20px;font-size:0.75rem;font-weight:700; | |
| background:{hw_color}22;color:{hw_color};border:1.5px solid {hw_color}"> | |
| π₯ {HW['label']} | |
| </span> | |
| <span id="status-badge" style="padding:5px 14px;border-radius:20px;font-size:0.75rem; | |
| font-weight:700;background:#f0fdf4;color:#1a6b3c;border:1.5px solid #86efac"> | |
| β READY | |
| </span> | |
| </div> | |
| </div> | |
| """) | |
| with gr.Tabs(): | |
| # ββ TAB 1: OPTIMIZER ββββββββββββββββββββββββββββββ | |
| with gr.TabItem("βοΈ Optimizer"): | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| gr.Markdown("### Model Configuration") | |
| model_dd = gr.Dropdown( | |
| choices=PRESET_MODELS, value="facebook/opt-125m", | |
| label="Preset Model" | |
| ) | |
| custom_model = gr.Textbox( | |
| label="Custom Model ID (overrides dropdown)", | |
| placeholder="e.g. google/gemma-2b or any HuggingFace model ID", | |
| info="Leave blank to use dropdown selection" | |
| ) | |
| device_dd = gr.Dropdown( | |
| choices=["auto", "cpu", "cuda", "mps"], | |
| value="auto", label="Device" | |
| ) | |
| ppl_tol = gr.Slider(0.0, 5.0, value=1.0, step=0.1, | |
| label="Max Perplexity Increase Tolerance (%)") | |
| gr.Markdown("### Calibration") | |
| calib_n = gr.Slider(1, 32, value=8, step=1, label="Calibration Samples (1β32)") | |
| seq_len = gr.Dropdown( | |
| choices=[64, 128, 256, 512, 1024], value=256, | |
| label="Sequence Length" | |
| ) | |
| calib_ds = gr.Dropdown( | |
| choices=["wikitext", "c4", "ptb"], | |
| value="wikitext", label="Calibration Dataset" | |
| ) | |
| gr.Markdown("### Allowed Precisions") | |
| with gr.Row(): | |
| allow_fp16 = gr.Checkbox(value=True, label="FP16") | |
| allow_bf16 = gr.Checkbox(value=True, label="BF16") | |
| allow_int8 = gr.Checkbox(value=False, label="INT8 (CUDA only)") | |
| run_btn = gr.Button("β‘ Run Optimization", variant="primary", size="lg") | |
| with gr.Column(scale=2): | |
| gr.Markdown("### Optimization Log") | |
| log_out = gr.Textbox( | |
| label="Real-Time Logs", lines=20, | |
| interactive=False, max_lines=30 | |
| ) | |
| gr.Markdown("### Results") | |
| with gr.Row(): | |
| tps_base = gr.Number(label="Base TPS", interactive=False) | |
| tps_opt = gr.Number(label="Optimized TPS", interactive=False) | |
| speedup = gr.Number(label="Speedup Γ", interactive=False) | |
| with gr.Row(): | |
| mem_base = gr.Number(label="Base Memory (MB)", interactive=False) | |
| mem_opt = gr.Number(label="Optimized Memory (MB)", interactive=False) | |
| mem_save = gr.Number(label="Memory Saved %", interactive=False) | |
| with gr.Row(): | |
| ppl_base = gr.Number(label="Base Perplexity", interactive=False) | |
| ppl_opt = gr.Number(label="Optimized Perplexity", interactive=False) | |
| ppl_d = gr.Number(label="PPL Ξ %", interactive=False) | |
| gr.Markdown("### β¬οΈ Download Optimized Model") | |
| dl_file = gr.File(label="Optimized Model (ZIP β load with HuggingFace)") | |
| dl_info = gr.Markdown("") | |
| # ββ TAB 2: COMPUTE FOOTPRINT ββββββββββββββββββββββ | |
| with gr.TabItem("π Compute Footprint"): | |
| gr.Markdown("## Environmental Impact Analysis\n*Run the optimizer first β all values below come from real measurements.*") | |
| with gr.Row(): | |
| e_save = gr.Number(label="Energy Saved (kWh/1M tok)", interactive=False) | |
| c_save = gr.Number(label="COβ Saved (g/1M tok)", interactive=False) | |
| w_save = gr.Number(label="Water Saved (mL/1M tok)", interactive=False) | |
| m_save = gr.Number(label="Memory Freed (%)", interactive=False) | |
| with gr.Row(): | |
| with gr.Column(): | |
| gr.Markdown("### β‘ Electricity (kWh / 1M tokens)") | |
| elec_base = gr.Number(label="Baseline", interactive=False) | |
| elec_opt = gr.Number(label="Optimized", interactive=False) | |
| with gr.Column(): | |
| gr.Markdown("### πΏ Carbon COβe (g / 1M tokens)") | |
| co2_base = gr.Number(label="Baseline", interactive=False) | |
| co2_opt = gr.Number(label="Optimized", interactive=False) | |
| with gr.Column(): | |
| gr.Markdown("### π§ Water (mL / 1M tokens)") | |
| h2o_base = gr.Number(label="Baseline", interactive=False) | |
| h2o_opt = gr.Number(label="Optimized", interactive=False) | |
| report_json = gr.JSON(label="Full Report (JSON)") | |
| # ββ BACKEND WIRING ββββββββββββββββββββββββββββββββββββ | |
| log_buffer = [] | |
| result_store = {} | |
| def run_pipeline_ui(model_dd, custom_model, device_dd, ppl_tol, | |
| calib_n, seq_len, calib_ds, allow_fp16, allow_bf16, allow_int8): | |
| log_buffer.clear() | |
| result_store.clear() | |
| for item in run_optimization_pipeline( | |
| model_id=model_dd, | |
| custom_model_id=custom_model or "", | |
| device_choice=device_dd, | |
| ppl_tolerance=float(ppl_tol), | |
| calib_samples=int(calib_n), | |
| seq_len=int(seq_len), | |
| calib_dataset=calib_ds, | |
| allow_fp16=allow_fp16, | |
| allow_bf16=allow_bf16, | |
| allow_int8=allow_int8, | |
| ): | |
| if isinstance(item, str) and item.startswith("__RESULT__"): | |
| result_store.update(json.loads(item[len("__RESULT__"):])) | |
| else: | |
| log_buffer.append(item) | |
| m = result_store | |
| fp_base = m.get("footprint_base", {}) | |
| fp_opt = m.get("footprint_opt", {}) | |
| zip_path = m.get("zip_path") | |
| info_md = "" | |
| if zip_path and os.path.exists(zip_path): | |
| size_mb = os.path.getsize(zip_path) / (1024**2) | |
| info_md = f"β **Model ready** β `{zip_path}` ({size_mb:.1f} MB)\n\nLoad with:\n```python\nfrom transformers import AutoModelForCausalLM, AutoTokenizer\nmodel = AutoModelForCausalLM.from_pretrained('./optimized_model')\n```" | |
| return ( | |
| "\n".join(log_buffer), | |
| m.get("base_tps", 0), | |
| m.get("opt_tps", 0), | |
| m.get("tps_speedup", 0), | |
| m.get("base_mem_mb", 0), | |
| m.get("opt_mem_mb", 0), | |
| m.get("mem_delta_pct", 0), | |
| m.get("base_ppl", 0), | |
| m.get("opt_ppl", 0), | |
| m.get("ppl_delta_pct", 0), | |
| zip_path if (zip_path and os.path.exists(zip_path)) else None, | |
| info_md, | |
| # Footprint tab | |
| round(fp_base.get("kwh",0) - fp_opt.get("kwh",0), 8), | |
| round(fp_base.get("co2_g",0) - fp_opt.get("co2_g",0), 4), | |
| round(fp_base.get("water_ml",0) - fp_opt.get("water_ml",0), 4), | |
| m.get("mem_delta_pct", 0), | |
| fp_base.get("kwh", 0), | |
| fp_opt.get("kwh", 0), | |
| fp_base.get("co2_g", 0), | |
| fp_opt.get("co2_g", 0), | |
| fp_base.get("water_ml", 0), | |
| fp_opt.get("water_ml", 0), | |
| m, | |
| ) | |
| run_btn.click( | |
| fn=run_pipeline_ui, | |
| inputs=[model_dd, custom_model, device_dd, ppl_tol, | |
| calib_n, seq_len, calib_ds, allow_fp16, allow_bf16, allow_int8], | |
| outputs=[ | |
| log_out, tps_base, tps_opt, speedup, | |
| mem_base, mem_opt, mem_save, | |
| ppl_base, ppl_opt, ppl_d, | |
| dl_file, dl_info, | |
| e_save, c_save, w_save, m_save, | |
| elec_base, elec_opt, co2_base, co2_opt, h2o_base, h2o_opt, | |
| report_json, | |
| ], | |
| ) | |
| return app | |
| if __name__ == "__main__": | |
| ui = build_ui() | |
| ui.launch() | |