import gradio as gr import torch import os import logging import time import tempfile import shutil import subprocess from datetime import datetime from huggingface_hub import HfApi from transformers import AutoConfig, AutoModel, AutoTokenizer from optimum.onnxruntime import ORTQuantizer from optimum.onnxruntime.configuration import AutoQuantizationConfig import torch.nn.utils.prune as prune logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') HF_TOKEN = os.getenv("HF_TOKEN") if not HF_TOKEN: logging.warning("HF_TOKEN environment variable not set. Packaging and uploading will fail.") api = HfApi() OUTPUT_DIR = "optimized_models" os.makedirs(OUTPUT_DIR, exist_ok=True) def stage_1_analyze_model(model_id: str): log_stream = "[STAGE 1] Analyzing model...\n" try: config = AutoConfig.from_pretrained(model_id, trust_remote_code=True, token=HF_TOKEN) model_type = config.model_type analysis_report = f""" ### Model Analysis Report - **Model ID:** `{model_id}` - **Architecture:** `{model_type}` """ recommendation = "" if 'llama' in model_type or 'gpt' in model_type or 'mistral' in model_type or 'gemma' in model_type: recommendation = "**Recommendation:** This is a Large Language Model (LLM). For the best CPU performance and community support, the **GGUF Pipeline** is highly recommended. The ONNX pipeline is a viable alternative." else: recommendation = "**Recommendation:** This is likely an encoder model. The **ONNX Pipeline** is recommended. Pruning may offer size reduction, but its impact on performance can vary." log_stream += f"Analysis complete. Architecture: {model_type}.\n" return log_stream, analysis_report + "\n" + recommendation, gr.Accordion(open=True) except Exception as e: error_msg = f"Failed to analyze model '{model_id}'. Error: {e}" logging.error(error_msg) return log_stream + error_msg, "Could not analyze model. Please check the model ID and try again.", gr.Accordion(open=False) def stage_2_prune_model(model, prune_percentage: float): if prune_percentage == 0: return model, "Skipped pruning as percentage was 0." log_stream = "[STAGE 2] Pruning model...\n" for name, module in model.named_modules(): if isinstance(module, torch.nn.Linear): prune.l1_unstructured(module, name='weight', amount=prune_percentage / 100.0) prune.remove(module, 'weight') log_stream += f"Pruning complete with {prune_percentage}% target.\n" return model, log_stream def stage_3_4_onnx_quantize(model_path: str, calibration_data_path: str): log_stream = "[STAGE 3 & 4] Converting to ONNX and Quantizing...\n" run_id = datetime.now().strftime("%Y%m%d-%H%M%S") model_name = os.path.basename(model_path) onnx_path = os.path.join(OUTPUT_DIR, f"{model_name}-{run_id}-onnx") try: log_stream += "Executing `optimum-cli export onnx` via subprocess...\n" export_command = [ "optimum-cli", "export", "onnx", "--model", model_path, "--trust-remote-code", onnx_path ] process = subprocess.run(export_command, check=True, capture_output=True, text=True) log_stream += process.stdout if process.stderr: log_stream += f"[STDERR]\n{process.stderr}\n" log_stream += f"Successfully exported base model to ONNX at: {onnx_path}\n" except subprocess.CalledProcessError as e: error_msg = f"Failed during `optimum-cli export onnx`. Error:\n{e.stderr}" logging.error(error_msg) raise RuntimeError(error_msg) try: quantizer = ORTQuantizer.from_pretrained(onnx_path) if calibration_data_path: log_stream += "Performing STATIC quantization with user-provided calibration data.\n" dqconfig = AutoQuantizationConfig.avx512_vnni(is_static=True, per_channel=False) from datasets import load_dataset calibration_dataset = quantizer.get_calibration_dataset( "text", dataset_args={"path": calibration_data_path, "split": "train"}, num_samples=100, dataset_num_proc=1, ) quantized_path = os.path.join(onnx_path, "quantized-static") quantizer.quantize(save_dir=quantized_path, quantization_config=dqconfig, calibration_dataset=calibration_dataset) else: log_stream += "Performing DYNAMIC quantization.\n" dqconfig = AutoQuantizationConfig.avx512_vnni(is_static=False, per_channel=False) quantized_path = os.path.join(onnx_path, "quantized-dynamic") quantizer.quantize(save_dir=quantized_path, quantization_config=dqconfig) log_stream += f"Successfully quantized model to: {quantized_path}\n" return quantized_path, log_stream except Exception as e: error_msg = f"Failed during ONNX quantization step. Error: {e}" logging.error(error_msg, exc_info=True) raise RuntimeError(error_msg) def stage_3_4_gguf_quantize(model_id: str, quantization_strategy: str): log_stream = f"[STAGE 3 & 4] Converting to GGUF with '{quantization_strategy}' quantization...\n" run_id = datetime.now().strftime("%Y%m%d-%H%M%S") model_name = model_id.replace('/', '_') gguf_path = os.path.join(OUTPUT_DIR, f"{model_name}-{run_id}-gguf") os.makedirs(gguf_path, exist_ok=True) output_file = os.path.join(gguf_path, "model.gguf") try: log_stream += "Executing `optimum-cli export gguf` via subprocess...\n" export_command = [ "optimum-cli", "export", "gguf", "--model", model_id, "--quantization_strategy", quantization_strategy, "--trust-remote-code", output_file ] process = subprocess.run(export_command, check=True, capture_output=True, text=True) log_stream += process.stdout if process.stderr: log_stream += f"[STDERR]\n{process.stderr}\n" log_stream += f"Successfully exported and quantized model to GGUF at: {gguf_path}\n" return gguf_path, log_stream except subprocess.CalledProcessError as e: error_msg = f"Failed during `optimum-cli export gguf`. Error:\n{e.stderr}" logging.error(error_msg) raise RuntimeError(error_msg) def stage_5_package_and_upload(model_id: str, optimized_model_path: str, pipeline_log: str, options: dict): log_stream = "[STAGE 5] Packaging and Uploading...\n" if not HF_TOKEN: return "Skipping upload: HF_TOKEN not found.", log_stream + "Skipping upload: HF_TOKEN not found." try: repo_name = f"{model_id.split('/')[-1]}-amop-cpu-{options['pipeline_type'].lower()}" repo_url = api.create_repo(repo_id=repo_name, exist_ok=True, token=HF_TOKEN) if options['pipeline_type'] == "GGUF": template_file = "model_card_template_gguf.md" else: template_file = "model_card_template.md" with open(template_file, "r", encoding="utf-8") as f: template_content = f.read() model_card_content = template_content.format( repo_name=repo_name, model_id=model_id, optimization_date=datetime.now().strftime("%Y-%m-%d %H:%M:%S"), pruning_status="Enabled" if options.get('prune', False) else "Disabled", pruning_percent=options.get('prune_percent', 0), quant_type=options.get('quant_type', 'N/A'), repo_id=repo_url.repo_id, pipeline_log=pipeline_log ) readme_path = os.path.join(optimized_model_path, "README.md") with open(readme_path, "w", encoding="utf-8") as f: f.write(model_card_content) if options['pipeline_type'] == "ONNX": tokenizer = AutoTokenizer.from_pretrained(model_id, trust_remote_code=True) tokenizer.save_pretrained(optimized_model_path) api.upload_folder(folder_path=optimized_model_path, repo_id=repo_url.repo_id, repo_type="model", token=HF_TOKEN) final_message = f"Success! Your optimized model is available at: huggingface.co/{repo_url.repo_id}" log_stream += "Upload complete.\n" return final_message, log_stream except Exception as e: error_msg = f"Failed to upload to the Hub. Error: {e}" logging.error(error_msg, exc_info=True) return f"Error: {error_msg}", log_stream + error_msg def run_amop_pipeline(model_id: str, pipeline_type: str, do_prune: bool, prune_percent: float, onnx_quant_type: str, calibration_file, gguf_quant_type: str): if not model_id: yield {log_output: "Please enter a Model ID.", final_output: gr.Label(value="Idle", label="Status")} return initial_log = f"[START] AMOP {pipeline_type} Pipeline Initiated.\n" yield { run_button: gr.Button(interactive=False, value="🚀 Running..."), analyze_button: gr.Button(interactive=False), final_output: gr.Label(value={"label": f"RUNNING ({pipeline_type})"}, show_label=True), log_output: initial_log } full_log = initial_log temp_model_dir = None try: repo_name_suffix = f"-amop-cpu-{pipeline_type.lower()}" repo_id_for_link = f"{api.whoami()['name']}/{model_id.split('/')[-1]}{repo_name_suffix}" if pipeline_type == "ONNX": full_log += "Loading base model for pruning...\n" yield {final_output: gr.Label(value="Loading model (1/5)"), log_output: full_log} model = AutoModel.from_pretrained(model_id, trust_remote_code=True) tokenizer = AutoTokenizer.from_pretrained(model_id, trust_remote_code=True) full_log += f"Successfully loaded base model '{model_id}'.\n" yield {final_output: gr.Label(value="Pruning model (2/5)"), log_output: full_log} if do_prune: model, log = stage_2_prune_model(model, prune_percent) full_log += log else: full_log += "[STAGE 2] Pruning skipped by user.\n" temp_model_dir = tempfile.mkdtemp() model.save_pretrained(temp_model_dir) tokenizer.save_pretrained(temp_model_dir) full_log += f"Saved intermediate model to temporary directory: {temp_model_dir}\n" yield {final_output: gr.Label(value="Converting to ONNX (3/5)"), log_output: full_log} calib_path = calibration_file.name if onnx_quant_type == "Static" and calibration_file else None optimized_path, log = stage_3_4_onnx_quantize(temp_model_dir, calib_path) full_log += log options = {'pipeline_type': 'ONNX', 'prune': do_prune, 'prune_percent': prune_percent, 'quant_type': onnx_quant_type} elif pipeline_type == "GGUF": full_log += "[STAGE 1 & 2] Loading and Pruning are skipped for GGUF pipeline.\n" yield {final_output: gr.Label(value="Converting to GGUF (3/5)"), log_output: full_log} optimized_path, log = stage_3_4_gguf_quantize(model_id, gguf_quant_type) full_log += log options = {'pipeline_type': 'GGUF', 'quant_type': gguf_quant_type} else: raise ValueError("Invalid pipeline type selected.") yield {final_output: gr.Label(value="Packaging & Uploading (4/5)"), log_output: full_log} final_message, log = stage_5_package_and_upload(model_id, optimized_path, full_log, options) full_log += log yield { final_output: gr.Label(value="SUCCESS", label="Status"), log_output: full_log, success_box: gr.Markdown(f"✅ **Success!** Your optimized model is available here: [{repo_id_for_link}](https://huggingface.co/{repo_id_for_link})", visible=True), run_button: gr.Button(interactive=True, value="Run Optimization Pipeline", variant="primary"), analyze_button: gr.Button(interactive=True, value="Analyze Model") } except Exception as e: logging.error(f"AMOP Pipeline failed. Error: {e}", exc_info=True) full_log += f"\n[ERROR] Pipeline failed: {e}" yield { final_output: gr.Label(value="ERROR", label="Status"), log_output: full_log, success_box: gr.Markdown(f"❌ **An error occurred.** Check the logs for details.", visible=True), run_button: gr.Button(interactive=True, value="Run Optimization Pipeline", variant="primary"), analyze_button: gr.Button(interactive=True, value="Analyze Model") } finally: if temp_model_dir and os.path.exists(temp_model_dir): shutil.rmtree(temp_model_dir) logging.info(f"Cleaned up temporary directory: {temp_model_dir}") with gr.Blocks(theme=gr.themes.Soft()) as demo: gr.Markdown("# 🚀 AMOP: Adaptive Model Optimization Pipeline") gr.Markdown("Turn any Hugging Face Hub model into a CPU-optimized version using ONNX or GGUF.") if not HF_TOKEN: gr.Warning("You have not set your HF_TOKEN in the Space secrets! The final 'upload' step will be skipped. Please add a secret with the key `HF_TOKEN` and your Hugging Face write token as the value.") with gr.Row(): with gr.Column(scale=1): gr.Markdown("### 1. Select a Model") model_id_input = gr.Textbox( label="Hugging Face Model ID", placeholder="e.g., gpt2, meta-llama/Llama-2-7b-chat-hf", ) analyze_button = gr.Button("🔍 Analyze Model", variant="secondary") with gr.Accordion("⚙️ 2. Configure Optimization", open=False) as optimization_accordion: analysis_report_output = gr.Markdown() pipeline_type_radio = gr.Radio( ["ONNX", "GGUF"], label="Select Optimization Pipeline", info="GGUF is recommended for LLMs, ONNX for others." ) with gr.Group(visible=False) as onnx_options: gr.Markdown("#### ONNX Pipeline Options") prune_checkbox = gr.Checkbox(label="Enable Pruning", value=False, info="Removes redundant weights. Applied before ONNX conversion.") prune_slider = gr.Slider(minimum=0, maximum=90, value=20, step=5, label="Pruning Percentage (%)") onnx_quant_radio = gr.Radio(["Dynamic", "Static"], label="ONNX Quantization Type", value="Dynamic", info="Static may offer better performance but requires calibration data.") calibration_file_upload = gr.File(label="Upload Calibration Data (.txt)", visible=False, file_types=['.txt']) with gr.Group(visible=False) as gguf_options: gr.Markdown("#### GGUF Pipeline Options") gguf_quant_dropdown = gr.Dropdown( ["q4_k_m", "q5_k_m", "q8_0", "f16"], label="GGUF Quantization Strategy", value="q4_k_m", info="q4_k_m is a good balance of size and quality." ) run_button = gr.Button("🚀 Run Optimization Pipeline", variant="primary") with gr.Column(scale=2): gr.Markdown("### Pipeline Status & Logs") final_output = gr.Label(value="Idle", label="Status", show_label=True) success_box = gr.Markdown(visible=False) log_output = gr.Textbox(label="Live Logs", lines=20, interactive=False, max_lines=20) def update_ui_for_pipeline(pipeline_type): return { onnx_options: gr.Group(visible=pipeline_type == "ONNX"), gguf_options: gr.Group(visible=pipeline_type == "GGUF") } def update_ui_for_quant_type(quant_type): return gr.File(visible=quant_type == "Static") pipeline_type_radio.change(fn=update_ui_for_pipeline, inputs=pipeline_type_radio, outputs=[onnx_options, gguf_options]) onnx_quant_radio.change(fn=update_ui_for_quant_type, inputs=onnx_quant_radio, outputs=[calibration_file_upload]) analyze_button.click( fn=stage_1_analyze_model, inputs=[model_id_input], outputs=[log_output, analysis_report_output, optimization_accordion] ) run_button.click( fn=run_amop_pipeline, inputs=[model_id_input, pipeline_type_radio, prune_checkbox, prune_slider, onnx_quant_radio, calibration_file_upload, gguf_quant_dropdown], outputs=[run_button, analyze_button, final_output, log_output, success_box] ) if __name__ == "__main__": demo.launch(debug=True)