Spaces:
Paused
Paused
| import gradio as gr | |
| import torch | |
| import os | |
| import logging | |
| import time | |
| import tempfile | |
| import shutil | |
| import subprocess | |
| from datetime import datetime | |
| from huggingface_hub import HfApi | |
| from transformers import AutoConfig, AutoModel, AutoTokenizer | |
| from optimum.onnxruntime import ORTQuantizer | |
| from optimum.onnxruntime.configuration import AutoQuantizationConfig | |
| import torch.nn.utils.prune as prune | |
| logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') | |
| HF_TOKEN = os.getenv("HF_TOKEN") | |
| if not HF_TOKEN: | |
| logging.warning("HF_TOKEN environment variable not set. Packaging and uploading will fail.") | |
| api = HfApi() | |
| OUTPUT_DIR = "optimized_models" | |
| os.makedirs(OUTPUT_DIR, exist_ok=True) | |
| def stage_1_analyze_model(model_id: str): | |
| log_stream = "[STAGE 1] Analyzing model...\n" | |
| try: | |
| config = AutoConfig.from_pretrained(model_id, trust_remote_code=True, token=HF_TOKEN) | |
| model_type = config.model_type | |
| analysis_report = f""" | |
| ### Model Analysis Report | |
| - **Model ID:** `{model_id}` | |
| - **Architecture:** `{model_type}` | |
| """ | |
| recommendation = "" | |
| if 'llama' in model_type or 'gpt' in model_type or 'mistral' in model_type or 'gemma' in model_type: | |
| recommendation = "**Recommendation:** This is a Large Language Model (LLM). For the best CPU performance and community support, the **GGUF Pipeline** is highly recommended. The ONNX pipeline is a viable alternative." | |
| else: | |
| recommendation = "**Recommendation:** This is likely an encoder model. The **ONNX Pipeline** is recommended. Pruning may offer size reduction, but its impact on performance can vary." | |
| log_stream += f"Analysis complete. Architecture: {model_type}.\n" | |
| return log_stream, analysis_report + "\n" + recommendation, gr.Accordion(open=True) | |
| except Exception as e: | |
| error_msg = f"Failed to analyze model '{model_id}'. Error: {e}" | |
| logging.error(error_msg) | |
| return log_stream + error_msg, "Could not analyze model. Please check the model ID and try again.", gr.Accordion(open=False) | |
| def stage_2_prune_model(model, prune_percentage: float): | |
| if prune_percentage == 0: | |
| return model, "Skipped pruning as percentage was 0." | |
| log_stream = "[STAGE 2] Pruning model...\n" | |
| for name, module in model.named_modules(): | |
| if isinstance(module, torch.nn.Linear): | |
| prune.l1_unstructured(module, name='weight', amount=prune_percentage / 100.0) | |
| prune.remove(module, 'weight') | |
| log_stream += f"Pruning complete with {prune_percentage}% target.\n" | |
| return model, log_stream | |
| def stage_3_4_onnx_quantize(model_path: str, calibration_data_path: str): | |
| log_stream = "[STAGE 3 & 4] Converting to ONNX and Quantizing...\n" | |
| run_id = datetime.now().strftime("%Y%m%d-%H%M%S") | |
| model_name = os.path.basename(model_path) | |
| onnx_path = os.path.join(OUTPUT_DIR, f"{model_name}-{run_id}-onnx") | |
| try: | |
| log_stream += "Executing `optimum-cli export onnx` via subprocess...\n" | |
| export_command = [ | |
| "optimum-cli", "export", "onnx", | |
| "--model", model_path, | |
| "--trust-remote-code", | |
| onnx_path | |
| ] | |
| process = subprocess.run(export_command, check=True, capture_output=True, text=True) | |
| log_stream += process.stdout | |
| if process.stderr: log_stream += f"[STDERR]\n{process.stderr}\n" | |
| log_stream += f"Successfully exported base model to ONNX at: {onnx_path}\n" | |
| except subprocess.CalledProcessError as e: | |
| error_msg = f"Failed during `optimum-cli export onnx`. Error:\n{e.stderr}" | |
| logging.error(error_msg) | |
| raise RuntimeError(error_msg) | |
| try: | |
| quantizer = ORTQuantizer.from_pretrained(onnx_path) | |
| if calibration_data_path: | |
| log_stream += "Performing STATIC quantization with user-provided calibration data.\n" | |
| dqconfig = AutoQuantizationConfig.avx512_vnni(is_static=True, per_channel=False) | |
| from datasets import load_dataset | |
| calibration_dataset = quantizer.get_calibration_dataset( | |
| "text", | |
| dataset_args={"path": calibration_data_path, "split": "train"}, | |
| num_samples=100, | |
| dataset_num_proc=1, | |
| ) | |
| quantized_path = os.path.join(onnx_path, "quantized-static") | |
| quantizer.quantize(save_dir=quantized_path, quantization_config=dqconfig, calibration_dataset=calibration_dataset) | |
| else: | |
| log_stream += "Performing DYNAMIC quantization.\n" | |
| dqconfig = AutoQuantizationConfig.avx512_vnni(is_static=False, per_channel=False) | |
| quantized_path = os.path.join(onnx_path, "quantized-dynamic") | |
| quantizer.quantize(save_dir=quantized_path, quantization_config=dqconfig) | |
| log_stream += f"Successfully quantized model to: {quantized_path}\n" | |
| return quantized_path, log_stream | |
| except Exception as e: | |
| error_msg = f"Failed during ONNX quantization step. Error: {e}" | |
| logging.error(error_msg, exc_info=True) | |
| raise RuntimeError(error_msg) | |
| def stage_3_4_gguf_quantize(model_id: str, quantization_strategy: str): | |
| log_stream = f"[STAGE 3 & 4] Converting to GGUF with '{quantization_strategy}' quantization...\n" | |
| run_id = datetime.now().strftime("%Y%m%d-%H%M%S") | |
| model_name = model_id.replace('/', '_') | |
| gguf_path = os.path.join(OUTPUT_DIR, f"{model_name}-{run_id}-gguf") | |
| os.makedirs(gguf_path, exist_ok=True) | |
| output_file = os.path.join(gguf_path, "model.gguf") | |
| try: | |
| log_stream += "Executing `optimum-cli export gguf` via subprocess...\n" | |
| export_command = [ | |
| "optimum-cli", "export", "gguf", | |
| "--model", model_id, | |
| "--quantization_strategy", quantization_strategy, | |
| "--trust-remote-code", | |
| output_file | |
| ] | |
| process = subprocess.run(export_command, check=True, capture_output=True, text=True) | |
| log_stream += process.stdout | |
| if process.stderr: log_stream += f"[STDERR]\n{process.stderr}\n" | |
| log_stream += f"Successfully exported and quantized model to GGUF at: {gguf_path}\n" | |
| return gguf_path, log_stream | |
| except subprocess.CalledProcessError as e: | |
| error_msg = f"Failed during `optimum-cli export gguf`. Error:\n{e.stderr}" | |
| logging.error(error_msg) | |
| raise RuntimeError(error_msg) | |
| def stage_5_package_and_upload(model_id: str, optimized_model_path: str, pipeline_log: str, options: dict): | |
| log_stream = "[STAGE 5] Packaging and Uploading...\n" | |
| if not HF_TOKEN: | |
| return "Skipping upload: HF_TOKEN not found.", log_stream + "Skipping upload: HF_TOKEN not found." | |
| try: | |
| repo_name = f"{model_id.split('/')[-1]}-amop-cpu-{options['pipeline_type'].lower()}" | |
| repo_url = api.create_repo(repo_id=repo_name, exist_ok=True, token=HF_TOKEN) | |
| if options['pipeline_type'] == "GGUF": | |
| template_file = "model_card_template_gguf.md" | |
| else: | |
| template_file = "model_card_template.md" | |
| with open(template_file, "r", encoding="utf-8") as f: | |
| template_content = f.read() | |
| model_card_content = template_content.format( | |
| repo_name=repo_name, model_id=model_id, optimization_date=datetime.now().strftime("%Y-%m-%d %H:%M:%S"), | |
| pruning_status="Enabled" if options.get('prune', False) else "Disabled", | |
| pruning_percent=options.get('prune_percent', 0), | |
| quant_type=options.get('quant_type', 'N/A'), | |
| repo_id=repo_url.repo_id, pipeline_log=pipeline_log | |
| ) | |
| readme_path = os.path.join(optimized_model_path, "README.md") | |
| with open(readme_path, "w", encoding="utf-8") as f: | |
| f.write(model_card_content) | |
| if options['pipeline_type'] == "ONNX": | |
| tokenizer = AutoTokenizer.from_pretrained(model_id, trust_remote_code=True) | |
| tokenizer.save_pretrained(optimized_model_path) | |
| api.upload_folder(folder_path=optimized_model_path, repo_id=repo_url.repo_id, repo_type="model", token=HF_TOKEN) | |
| final_message = f"Success! Your optimized model is available at: huggingface.co/{repo_url.repo_id}" | |
| log_stream += "Upload complete.\n" | |
| return final_message, log_stream | |
| except Exception as e: | |
| error_msg = f"Failed to upload to the Hub. Error: {e}" | |
| logging.error(error_msg, exc_info=True) | |
| return f"Error: {error_msg}", log_stream + error_msg | |
| def run_amop_pipeline(model_id: str, pipeline_type: str, do_prune: bool, prune_percent: float, onnx_quant_type: str, calibration_file, gguf_quant_type: str): | |
| if not model_id: | |
| yield {log_output: "Please enter a Model ID.", final_output: gr.Label(value="Idle", label="Status")} | |
| return | |
| initial_log = f"[START] AMOP {pipeline_type} Pipeline Initiated.\n" | |
| yield { | |
| run_button: gr.Button(interactive=False, value="π Running..."), | |
| analyze_button: gr.Button(interactive=False), | |
| final_output: gr.Label(value={"label": f"RUNNING ({pipeline_type})"}, show_label=True), | |
| log_output: initial_log | |
| } | |
| full_log = initial_log | |
| temp_model_dir = None | |
| try: | |
| repo_name_suffix = f"-amop-cpu-{pipeline_type.lower()}" | |
| repo_id_for_link = f"{api.whoami()['name']}/{model_id.split('/')[-1]}{repo_name_suffix}" | |
| if pipeline_type == "ONNX": | |
| full_log += "Loading base model for pruning...\n" | |
| yield {final_output: gr.Label(value="Loading model (1/5)"), log_output: full_log} | |
| model = AutoModel.from_pretrained(model_id, trust_remote_code=True) | |
| tokenizer = AutoTokenizer.from_pretrained(model_id, trust_remote_code=True) | |
| full_log += f"Successfully loaded base model '{model_id}'.\n" | |
| yield {final_output: gr.Label(value="Pruning model (2/5)"), log_output: full_log} | |
| if do_prune: | |
| model, log = stage_2_prune_model(model, prune_percent) | |
| full_log += log | |
| else: | |
| full_log += "[STAGE 2] Pruning skipped by user.\n" | |
| temp_model_dir = tempfile.mkdtemp() | |
| model.save_pretrained(temp_model_dir) | |
| tokenizer.save_pretrained(temp_model_dir) | |
| full_log += f"Saved intermediate model to temporary directory: {temp_model_dir}\n" | |
| yield {final_output: gr.Label(value="Converting to ONNX (3/5)"), log_output: full_log} | |
| calib_path = calibration_file.name if onnx_quant_type == "Static" and calibration_file else None | |
| optimized_path, log = stage_3_4_onnx_quantize(temp_model_dir, calib_path) | |
| full_log += log | |
| options = {'pipeline_type': 'ONNX', 'prune': do_prune, 'prune_percent': prune_percent, 'quant_type': onnx_quant_type} | |
| elif pipeline_type == "GGUF": | |
| full_log += "[STAGE 1 & 2] Loading and Pruning are skipped for GGUF pipeline.\n" | |
| yield {final_output: gr.Label(value="Converting to GGUF (3/5)"), log_output: full_log} | |
| optimized_path, log = stage_3_4_gguf_quantize(model_id, gguf_quant_type) | |
| full_log += log | |
| options = {'pipeline_type': 'GGUF', 'quant_type': gguf_quant_type} | |
| else: | |
| raise ValueError("Invalid pipeline type selected.") | |
| yield {final_output: gr.Label(value="Packaging & Uploading (4/5)"), log_output: full_log} | |
| final_message, log = stage_5_package_and_upload(model_id, optimized_path, full_log, options) | |
| full_log += log | |
| yield { | |
| final_output: gr.Label(value="SUCCESS", label="Status"), | |
| log_output: full_log, | |
| success_box: gr.Markdown(f"β **Success!** Your optimized model is available here: [{repo_id_for_link}](https://huggingface.co/{repo_id_for_link})", visible=True), | |
| run_button: gr.Button(interactive=True, value="Run Optimization Pipeline", variant="primary"), | |
| analyze_button: gr.Button(interactive=True, value="Analyze Model") | |
| } | |
| except Exception as e: | |
| logging.error(f"AMOP Pipeline failed. Error: {e}", exc_info=True) | |
| full_log += f"\n[ERROR] Pipeline failed: {e}" | |
| yield { | |
| final_output: gr.Label(value="ERROR", label="Status"), | |
| log_output: full_log, | |
| success_box: gr.Markdown(f"β **An error occurred.** Check the logs for details.", visible=True), | |
| run_button: gr.Button(interactive=True, value="Run Optimization Pipeline", variant="primary"), | |
| analyze_button: gr.Button(interactive=True, value="Analyze Model") | |
| } | |
| finally: | |
| if temp_model_dir and os.path.exists(temp_model_dir): | |
| shutil.rmtree(temp_model_dir) | |
| logging.info(f"Cleaned up temporary directory: {temp_model_dir}") | |
| with gr.Blocks(theme=gr.themes.Soft()) as demo: | |
| gr.Markdown("# π AMOP: Adaptive Model Optimization Pipeline") | |
| gr.Markdown("Turn any Hugging Face Hub model into a CPU-optimized version using ONNX or GGUF.") | |
| if not HF_TOKEN: | |
| gr.Warning("You have not set your HF_TOKEN in the Space secrets! The final 'upload' step will be skipped. Please add a secret with the key `HF_TOKEN` and your Hugging Face write token as the value.") | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| gr.Markdown("### 1. Select a Model") | |
| model_id_input = gr.Textbox( | |
| label="Hugging Face Model ID", | |
| placeholder="e.g., gpt2, meta-llama/Llama-2-7b-chat-hf", | |
| ) | |
| analyze_button = gr.Button("π Analyze Model", variant="secondary") | |
| with gr.Accordion("βοΈ 2. Configure Optimization", open=False) as optimization_accordion: | |
| analysis_report_output = gr.Markdown() | |
| pipeline_type_radio = gr.Radio( | |
| ["ONNX", "GGUF"], label="Select Optimization Pipeline", info="GGUF is recommended for LLMs, ONNX for others." | |
| ) | |
| with gr.Group(visible=False) as onnx_options: | |
| gr.Markdown("#### ONNX Pipeline Options") | |
| prune_checkbox = gr.Checkbox(label="Enable Pruning", value=False, info="Removes redundant weights. Applied before ONNX conversion.") | |
| prune_slider = gr.Slider(minimum=0, maximum=90, value=20, step=5, label="Pruning Percentage (%)") | |
| onnx_quant_radio = gr.Radio(["Dynamic", "Static"], label="ONNX Quantization Type", value="Dynamic", info="Static may offer better performance but requires calibration data.") | |
| calibration_file_upload = gr.File(label="Upload Calibration Data (.txt)", visible=False, file_types=['.txt']) | |
| with gr.Group(visible=False) as gguf_options: | |
| gr.Markdown("#### GGUF Pipeline Options") | |
| gguf_quant_dropdown = gr.Dropdown( | |
| ["q4_k_m", "q5_k_m", "q8_0", "f16"], | |
| label="GGUF Quantization Strategy", | |
| value="q4_k_m", | |
| info="q4_k_m is a good balance of size and quality." | |
| ) | |
| run_button = gr.Button("π Run Optimization Pipeline", variant="primary") | |
| with gr.Column(scale=2): | |
| gr.Markdown("### Pipeline Status & Logs") | |
| final_output = gr.Label(value="Idle", label="Status", show_label=True) | |
| success_box = gr.Markdown(visible=False) | |
| log_output = gr.Textbox(label="Live Logs", lines=20, interactive=False, max_lines=20) | |
| def update_ui_for_pipeline(pipeline_type): | |
| return { | |
| onnx_options: gr.Group(visible=pipeline_type == "ONNX"), | |
| gguf_options: gr.Group(visible=pipeline_type == "GGUF") | |
| } | |
| def update_ui_for_quant_type(quant_type): | |
| return gr.File(visible=quant_type == "Static") | |
| pipeline_type_radio.change(fn=update_ui_for_pipeline, inputs=pipeline_type_radio, outputs=[onnx_options, gguf_options]) | |
| onnx_quant_radio.change(fn=update_ui_for_quant_type, inputs=onnx_quant_radio, outputs=[calibration_file_upload]) | |
| analyze_button.click( | |
| fn=stage_1_analyze_model, | |
| inputs=[model_id_input], | |
| outputs=[log_output, analysis_report_output, optimization_accordion] | |
| ) | |
| run_button.click( | |
| fn=run_amop_pipeline, | |
| inputs=[model_id_input, pipeline_type_radio, prune_checkbox, prune_slider, onnx_quant_radio, calibration_file_upload, gguf_quant_dropdown], | |
| outputs=[run_button, analyze_button, final_output, log_output, success_box] | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch(debug=True) |