AMOP / app.py
broadfield-dev's picture
Update app.py
6e5122c verified
raw
history blame
16.9 kB
import gradio as gr
import torch
import os
import logging
import time
import tempfile
import shutil
import subprocess
from datetime import datetime
from huggingface_hub import HfApi
from transformers import AutoConfig, AutoModel, AutoTokenizer
from optimum.onnxruntime import ORTQuantizer
from optimum.onnxruntime.configuration import AutoQuantizationConfig
import torch.nn.utils.prune as prune
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
HF_TOKEN = os.getenv("HF_TOKEN")
if not HF_TOKEN:
logging.warning("HF_TOKEN environment variable not set. Packaging and uploading will fail.")
api = HfApi()
OUTPUT_DIR = "optimized_models"
os.makedirs(OUTPUT_DIR, exist_ok=True)
def stage_1_analyze_model(model_id: str):
log_stream = "[STAGE 1] Analyzing model...\n"
try:
config = AutoConfig.from_pretrained(model_id, trust_remote_code=True, token=HF_TOKEN)
model_type = config.model_type
analysis_report = f"""
### Model Analysis Report
- **Model ID:** `{model_id}`
- **Architecture:** `{model_type}`
"""
recommendation = ""
if 'llama' in model_type or 'gpt' in model_type or 'mistral' in model_type or 'gemma' in model_type:
recommendation = "**Recommendation:** This is a Large Language Model (LLM). For the best CPU performance and community support, the **GGUF Pipeline** is highly recommended. The ONNX pipeline is a viable alternative."
else:
recommendation = "**Recommendation:** This is likely an encoder model. The **ONNX Pipeline** is recommended. Pruning may offer size reduction, but its impact on performance can vary."
log_stream += f"Analysis complete. Architecture: {model_type}.\n"
return log_stream, analysis_report + "\n" + recommendation, gr.Accordion(open=True)
except Exception as e:
error_msg = f"Failed to analyze model '{model_id}'. Error: {e}"
logging.error(error_msg)
return log_stream + error_msg, "Could not analyze model. Please check the model ID and try again.", gr.Accordion(open=False)
def stage_2_prune_model(model, prune_percentage: float):
if prune_percentage == 0:
return model, "Skipped pruning as percentage was 0."
log_stream = "[STAGE 2] Pruning model...\n"
for name, module in model.named_modules():
if isinstance(module, torch.nn.Linear):
prune.l1_unstructured(module, name='weight', amount=prune_percentage / 100.0)
prune.remove(module, 'weight')
log_stream += f"Pruning complete with {prune_percentage}% target.\n"
return model, log_stream
def stage_3_4_onnx_quantize(model_path: str, calibration_data_path: str):
log_stream = "[STAGE 3 & 4] Converting to ONNX and Quantizing...\n"
run_id = datetime.now().strftime("%Y%m%d-%H%M%S")
model_name = os.path.basename(model_path)
onnx_path = os.path.join(OUTPUT_DIR, f"{model_name}-{run_id}-onnx")
try:
log_stream += "Executing `optimum-cli export onnx` via subprocess...\n"
export_command = [
"optimum-cli", "export", "onnx",
"--model", model_path,
"--trust-remote-code",
onnx_path
]
process = subprocess.run(export_command, check=True, capture_output=True, text=True)
log_stream += process.stdout
if process.stderr: log_stream += f"[STDERR]\n{process.stderr}\n"
log_stream += f"Successfully exported base model to ONNX at: {onnx_path}\n"
except subprocess.CalledProcessError as e:
error_msg = f"Failed during `optimum-cli export onnx`. Error:\n{e.stderr}"
logging.error(error_msg)
raise RuntimeError(error_msg)
try:
quantizer = ORTQuantizer.from_pretrained(onnx_path)
if calibration_data_path:
log_stream += "Performing STATIC quantization with user-provided calibration data.\n"
dqconfig = AutoQuantizationConfig.avx512_vnni(is_static=True, per_channel=False)
from datasets import load_dataset
calibration_dataset = quantizer.get_calibration_dataset(
"text",
dataset_args={"path": calibration_data_path, "split": "train"},
num_samples=100,
dataset_num_proc=1,
)
quantized_path = os.path.join(onnx_path, "quantized-static")
quantizer.quantize(save_dir=quantized_path, quantization_config=dqconfig, calibration_dataset=calibration_dataset)
else:
log_stream += "Performing DYNAMIC quantization.\n"
dqconfig = AutoQuantizationConfig.avx512_vnni(is_static=False, per_channel=False)
quantized_path = os.path.join(onnx_path, "quantized-dynamic")
quantizer.quantize(save_dir=quantized_path, quantization_config=dqconfig)
log_stream += f"Successfully quantized model to: {quantized_path}\n"
return quantized_path, log_stream
except Exception as e:
error_msg = f"Failed during ONNX quantization step. Error: {e}"
logging.error(error_msg, exc_info=True)
raise RuntimeError(error_msg)
def stage_3_4_gguf_quantize(model_id: str, quantization_strategy: str):
log_stream = f"[STAGE 3 & 4] Converting to GGUF with '{quantization_strategy}' quantization...\n"
run_id = datetime.now().strftime("%Y%m%d-%H%M%S")
model_name = model_id.replace('/', '_')
gguf_path = os.path.join(OUTPUT_DIR, f"{model_name}-{run_id}-gguf")
os.makedirs(gguf_path, exist_ok=True)
output_file = os.path.join(gguf_path, "model.gguf")
try:
log_stream += "Executing `optimum-cli export gguf` via subprocess...\n"
export_command = [
"optimum-cli", "export", "gguf",
"--model", model_id,
"--quantization_strategy", quantization_strategy,
"--trust-remote-code",
output_file
]
process = subprocess.run(export_command, check=True, capture_output=True, text=True)
log_stream += process.stdout
if process.stderr: log_stream += f"[STDERR]\n{process.stderr}\n"
log_stream += f"Successfully exported and quantized model to GGUF at: {gguf_path}\n"
return gguf_path, log_stream
except subprocess.CalledProcessError as e:
error_msg = f"Failed during `optimum-cli export gguf`. Error:\n{e.stderr}"
logging.error(error_msg)
raise RuntimeError(error_msg)
def stage_5_package_and_upload(model_id: str, optimized_model_path: str, pipeline_log: str, options: dict):
log_stream = "[STAGE 5] Packaging and Uploading...\n"
if not HF_TOKEN:
return "Skipping upload: HF_TOKEN not found.", log_stream + "Skipping upload: HF_TOKEN not found."
try:
repo_name = f"{model_id.split('/')[-1]}-amop-cpu-{options['pipeline_type'].lower()}"
repo_url = api.create_repo(repo_id=repo_name, exist_ok=True, token=HF_TOKEN)
if options['pipeline_type'] == "GGUF":
template_file = "model_card_template_gguf.md"
else:
template_file = "model_card_template.md"
with open(template_file, "r", encoding="utf-8") as f:
template_content = f.read()
model_card_content = template_content.format(
repo_name=repo_name, model_id=model_id, optimization_date=datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
pruning_status="Enabled" if options.get('prune', False) else "Disabled",
pruning_percent=options.get('prune_percent', 0),
quant_type=options.get('quant_type', 'N/A'),
repo_id=repo_url.repo_id, pipeline_log=pipeline_log
)
readme_path = os.path.join(optimized_model_path, "README.md")
with open(readme_path, "w", encoding="utf-8") as f:
f.write(model_card_content)
if options['pipeline_type'] == "ONNX":
tokenizer = AutoTokenizer.from_pretrained(model_id, trust_remote_code=True)
tokenizer.save_pretrained(optimized_model_path)
api.upload_folder(folder_path=optimized_model_path, repo_id=repo_url.repo_id, repo_type="model", token=HF_TOKEN)
final_message = f"Success! Your optimized model is available at: huggingface.co/{repo_url.repo_id}"
log_stream += "Upload complete.\n"
return final_message, log_stream
except Exception as e:
error_msg = f"Failed to upload to the Hub. Error: {e}"
logging.error(error_msg, exc_info=True)
return f"Error: {error_msg}", log_stream + error_msg
def run_amop_pipeline(model_id: str, pipeline_type: str, do_prune: bool, prune_percent: float, onnx_quant_type: str, calibration_file, gguf_quant_type: str):
if not model_id:
yield {log_output: "Please enter a Model ID.", final_output: gr.Label(value="Idle", label="Status")}
return
initial_log = f"[START] AMOP {pipeline_type} Pipeline Initiated.\n"
yield {
run_button: gr.Button(interactive=False, value="πŸš€ Running..."),
analyze_button: gr.Button(interactive=False),
final_output: gr.Label(value={"label": f"RUNNING ({pipeline_type})"}, show_label=True),
log_output: initial_log
}
full_log = initial_log
temp_model_dir = None
try:
repo_name_suffix = f"-amop-cpu-{pipeline_type.lower()}"
repo_id_for_link = f"{api.whoami()['name']}/{model_id.split('/')[-1]}{repo_name_suffix}"
if pipeline_type == "ONNX":
full_log += "Loading base model for pruning...\n"
yield {final_output: gr.Label(value="Loading model (1/5)"), log_output: full_log}
model = AutoModel.from_pretrained(model_id, trust_remote_code=True)
tokenizer = AutoTokenizer.from_pretrained(model_id, trust_remote_code=True)
full_log += f"Successfully loaded base model '{model_id}'.\n"
yield {final_output: gr.Label(value="Pruning model (2/5)"), log_output: full_log}
if do_prune:
model, log = stage_2_prune_model(model, prune_percent)
full_log += log
else:
full_log += "[STAGE 2] Pruning skipped by user.\n"
temp_model_dir = tempfile.mkdtemp()
model.save_pretrained(temp_model_dir)
tokenizer.save_pretrained(temp_model_dir)
full_log += f"Saved intermediate model to temporary directory: {temp_model_dir}\n"
yield {final_output: gr.Label(value="Converting to ONNX (3/5)"), log_output: full_log}
calib_path = calibration_file.name if onnx_quant_type == "Static" and calibration_file else None
optimized_path, log = stage_3_4_onnx_quantize(temp_model_dir, calib_path)
full_log += log
options = {'pipeline_type': 'ONNX', 'prune': do_prune, 'prune_percent': prune_percent, 'quant_type': onnx_quant_type}
elif pipeline_type == "GGUF":
full_log += "[STAGE 1 & 2] Loading and Pruning are skipped for GGUF pipeline.\n"
yield {final_output: gr.Label(value="Converting to GGUF (3/5)"), log_output: full_log}
optimized_path, log = stage_3_4_gguf_quantize(model_id, gguf_quant_type)
full_log += log
options = {'pipeline_type': 'GGUF', 'quant_type': gguf_quant_type}
else:
raise ValueError("Invalid pipeline type selected.")
yield {final_output: gr.Label(value="Packaging & Uploading (4/5)"), log_output: full_log}
final_message, log = stage_5_package_and_upload(model_id, optimized_path, full_log, options)
full_log += log
yield {
final_output: gr.Label(value="SUCCESS", label="Status"),
log_output: full_log,
success_box: gr.Markdown(f"βœ… **Success!** Your optimized model is available here: [{repo_id_for_link}](https://huggingface.co/{repo_id_for_link})", visible=True),
run_button: gr.Button(interactive=True, value="Run Optimization Pipeline", variant="primary"),
analyze_button: gr.Button(interactive=True, value="Analyze Model")
}
except Exception as e:
logging.error(f"AMOP Pipeline failed. Error: {e}", exc_info=True)
full_log += f"\n[ERROR] Pipeline failed: {e}"
yield {
final_output: gr.Label(value="ERROR", label="Status"),
log_output: full_log,
success_box: gr.Markdown(f"❌ **An error occurred.** Check the logs for details.", visible=True),
run_button: gr.Button(interactive=True, value="Run Optimization Pipeline", variant="primary"),
analyze_button: gr.Button(interactive=True, value="Analyze Model")
}
finally:
if temp_model_dir and os.path.exists(temp_model_dir):
shutil.rmtree(temp_model_dir)
logging.info(f"Cleaned up temporary directory: {temp_model_dir}")
with gr.Blocks(theme=gr.themes.Soft()) as demo:
gr.Markdown("# πŸš€ AMOP: Adaptive Model Optimization Pipeline")
gr.Markdown("Turn any Hugging Face Hub model into a CPU-optimized version using ONNX or GGUF.")
if not HF_TOKEN:
gr.Warning("You have not set your HF_TOKEN in the Space secrets! The final 'upload' step will be skipped. Please add a secret with the key `HF_TOKEN` and your Hugging Face write token as the value.")
with gr.Row():
with gr.Column(scale=1):
gr.Markdown("### 1. Select a Model")
model_id_input = gr.Textbox(
label="Hugging Face Model ID",
placeholder="e.g., gpt2, meta-llama/Llama-2-7b-chat-hf",
)
analyze_button = gr.Button("πŸ” Analyze Model", variant="secondary")
with gr.Accordion("βš™οΈ 2. Configure Optimization", open=False) as optimization_accordion:
analysis_report_output = gr.Markdown()
pipeline_type_radio = gr.Radio(
["ONNX", "GGUF"], label="Select Optimization Pipeline", info="GGUF is recommended for LLMs, ONNX for others."
)
with gr.Group(visible=False) as onnx_options:
gr.Markdown("#### ONNX Pipeline Options")
prune_checkbox = gr.Checkbox(label="Enable Pruning", value=False, info="Removes redundant weights. Applied before ONNX conversion.")
prune_slider = gr.Slider(minimum=0, maximum=90, value=20, step=5, label="Pruning Percentage (%)")
onnx_quant_radio = gr.Radio(["Dynamic", "Static"], label="ONNX Quantization Type", value="Dynamic", info="Static may offer better performance but requires calibration data.")
calibration_file_upload = gr.File(label="Upload Calibration Data (.txt)", visible=False, file_types=['.txt'])
with gr.Group(visible=False) as gguf_options:
gr.Markdown("#### GGUF Pipeline Options")
gguf_quant_dropdown = gr.Dropdown(
["q4_k_m", "q5_k_m", "q8_0", "f16"],
label="GGUF Quantization Strategy",
value="q4_k_m",
info="q4_k_m is a good balance of size and quality."
)
run_button = gr.Button("πŸš€ Run Optimization Pipeline", variant="primary")
with gr.Column(scale=2):
gr.Markdown("### Pipeline Status & Logs")
final_output = gr.Label(value="Idle", label="Status", show_label=True)
success_box = gr.Markdown(visible=False)
log_output = gr.Textbox(label="Live Logs", lines=20, interactive=False, max_lines=20)
def update_ui_for_pipeline(pipeline_type):
return {
onnx_options: gr.Group(visible=pipeline_type == "ONNX"),
gguf_options: gr.Group(visible=pipeline_type == "GGUF")
}
def update_ui_for_quant_type(quant_type):
return gr.File(visible=quant_type == "Static")
pipeline_type_radio.change(fn=update_ui_for_pipeline, inputs=pipeline_type_radio, outputs=[onnx_options, gguf_options])
onnx_quant_radio.change(fn=update_ui_for_quant_type, inputs=onnx_quant_radio, outputs=[calibration_file_upload])
analyze_button.click(
fn=stage_1_analyze_model,
inputs=[model_id_input],
outputs=[log_output, analysis_report_output, optimization_accordion]
)
run_button.click(
fn=run_amop_pipeline,
inputs=[model_id_input, pipeline_type_radio, prune_checkbox, prune_slider, onnx_quant_radio, calibration_file_upload, gguf_quant_dropdown],
outputs=[run_button, analyze_button, final_output, log_output, success_box]
)
if __name__ == "__main__":
demo.launch(debug=True)