|
|
import logging |
|
|
import os |
|
|
import subprocess |
|
|
import sys |
|
|
import shutil |
|
|
import re |
|
|
from pathlib import Path |
|
|
from typing import List, Optional, Tuple |
|
|
from dataclasses import dataclass |
|
|
|
|
|
import streamlit as st |
|
|
from huggingface_hub import HfApi, whoami, model_info, hf_hub_download |
|
|
import yaml |
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
|
|
|
@dataclass |
|
|
class Config: |
|
|
"""Application configuration.""" |
|
|
|
|
|
hf_token: str |
|
|
hf_username: str |
|
|
is_using_user_token: bool |
|
|
hf_base_url: str = "https://huggingface.co" |
|
|
repo_path: Path = Path("./transformers.js") |
|
|
|
|
|
@classmethod |
|
|
def from_env(cls) -> "Config": |
|
|
"""Create config from environment variables and secrets.""" |
|
|
system_token = os.getenv("HF_TOKEN") |
|
|
user_token = st.session_state.get("user_hf_token") |
|
|
|
|
|
if user_token: |
|
|
hf_username = whoami(token=user_token)["name"] |
|
|
else: |
|
|
hf_username = ( |
|
|
os.getenv("SPACE_AUTHOR_NAME") or whoami(token=system_token)["name"] |
|
|
) |
|
|
|
|
|
hf_token = user_token or system_token |
|
|
|
|
|
if not hf_token: |
|
|
raise ValueError( |
|
|
"When the user token is not provided, the system token must be set." |
|
|
) |
|
|
|
|
|
return cls( |
|
|
hf_token=hf_token, |
|
|
hf_username=hf_username, |
|
|
is_using_user_token=bool(user_token), |
|
|
) |
|
|
|
|
|
|
|
|
class ModelConverter: |
|
|
"""Handles model conversion and upload operations.""" |
|
|
|
|
|
def __init__(self, config: Config): |
|
|
self.config = config |
|
|
self.api = HfApi(token=config.hf_token) |
|
|
|
|
|
def _fetch_original_readme(self, repo_id: str) -> str: |
|
|
try: |
|
|
path = hf_hub_download( |
|
|
repo_id=repo_id, filename="README.md", token=self.config.hf_token |
|
|
) |
|
|
with open(path, "r", encoding="utf-8", errors="ignore") as f: |
|
|
return f.read() |
|
|
except Exception: |
|
|
return "" |
|
|
|
|
|
def _strip_yaml_frontmatter(self, text: str) -> str: |
|
|
if not text: |
|
|
return "" |
|
|
if text.startswith("---"): |
|
|
m = re.match(r"^---[\s\S]*?\n---\s*\n", text) |
|
|
if m: |
|
|
return text[m.end() :] |
|
|
return text |
|
|
|
|
|
def _extract_yaml_frontmatter(self, text: str) -> Tuple[dict, str]: |
|
|
"""Return (frontmatter_dict, body). If no frontmatter, returns ({}, text).""" |
|
|
if not text or not text.startswith("---"): |
|
|
return {}, text or "" |
|
|
m = re.match(r"^---\s*\n([\s\S]*?)\n---\s*\n", text) |
|
|
if not m: |
|
|
return {}, text |
|
|
fm_text = m.group(1) |
|
|
body = text[m.end() :] |
|
|
try: |
|
|
data = yaml.safe_load(fm_text) |
|
|
if not isinstance(data, dict): |
|
|
data = {} |
|
|
except Exception: |
|
|
data = {} |
|
|
return data, body |
|
|
|
|
|
def _pipeline_docs_url(self, pipeline_tag: Optional[str]) -> Optional[str]: |
|
|
base = "https://huggingface.co/docs/transformers.js/api/pipelines" |
|
|
if not pipeline_tag: |
|
|
return base |
|
|
mapping = { |
|
|
"text-classification": "TextClassificationPipeline", |
|
|
"token-classification": "TokenClassificationPipeline", |
|
|
"question-answering": "QuestionAnsweringPipeline", |
|
|
"fill-mask": "FillMaskPipeline", |
|
|
"text2text-generation": "Text2TextGenerationPipeline", |
|
|
"summarization": "SummarizationPipeline", |
|
|
"translation": "TranslationPipeline", |
|
|
"text-generation": "TextGenerationPipeline", |
|
|
"zero-shot-classification": "ZeroShotClassificationPipeline", |
|
|
"feature-extraction": "FeatureExtractionPipeline", |
|
|
"image-feature-extraction": "ImageFeatureExtractionPipeline", |
|
|
"audio-classification": "AudioClassificationPipeline", |
|
|
"zero-shot-audio-classification": "ZeroShotAudioClassificationPipeline", |
|
|
"automatic-speech-recognition": "AutomaticSpeechRecognitionPipeline", |
|
|
"image-to-text": "ImageToTextPipeline", |
|
|
"image-classification": "ImageClassificationPipeline", |
|
|
"image-segmentation": "ImageSegmentationPipeline", |
|
|
"background-removal": "BackgroundRemovalPipeline", |
|
|
"zero-shot-image-classification": "ZeroShotImageClassificationPipeline", |
|
|
"object-detection": "ObjectDetectionPipeline", |
|
|
"zero-shot-object-detection": "ZeroShotObjectDetectionPipeline", |
|
|
"document-question-answering": "DocumentQuestionAnsweringPipeline", |
|
|
"text-to-audio": "TextToAudioPipeline", |
|
|
"image-to-image": "ImageToImagePipeline", |
|
|
"depth-estimation": "DepthEstimationPipeline", |
|
|
} |
|
|
cls = mapping.get(pipeline_tag) |
|
|
if not cls: |
|
|
return base |
|
|
return f"{base}#module_pipelines.{cls}" |
|
|
|
|
|
def _map_pipeline_to_task(self, pipeline_tag: Optional[str]) -> Optional[str]: |
|
|
if not pipeline_tag: |
|
|
return None |
|
|
synonyms = { |
|
|
"vqa": "visual-question-answering", |
|
|
} |
|
|
return synonyms.get(pipeline_tag, pipeline_tag) |
|
|
|
|
|
def setup_repository(self) -> None: |
|
|
"""Ensure the bundled transformers.js repository is present.""" |
|
|
if not self.config.repo_path.exists(): |
|
|
raise RuntimeError( |
|
|
f"Expected transformers.js repository at {self.config.repo_path} but it was not found." |
|
|
) |
|
|
|
|
|
def _run_conversion_subprocess( |
|
|
self, input_model_id: str, extra_args: List[str] = None |
|
|
) -> subprocess.CompletedProcess: |
|
|
"""Run the conversion subprocess with the given arguments.""" |
|
|
cmd = [ |
|
|
sys.executable, |
|
|
"-m", |
|
|
"scripts.convert", |
|
|
"--quantize", |
|
|
"--model_id", |
|
|
input_model_id, |
|
|
] |
|
|
|
|
|
if extra_args: |
|
|
cmd.extend(extra_args) |
|
|
|
|
|
return subprocess.run( |
|
|
cmd, |
|
|
cwd=self.config.repo_path, |
|
|
capture_output=True, |
|
|
text=True, |
|
|
env={ |
|
|
"HF_TOKEN": self.config.hf_token, |
|
|
"TRANSFORMERS_ATTENTION_IMPLEMENTATION": "eager", |
|
|
"PYTORCH_SDP_KERNEL": "math", |
|
|
}, |
|
|
) |
|
|
|
|
|
def convert_model( |
|
|
self, |
|
|
input_model_id: str, |
|
|
trust_remote_code=False, |
|
|
output_attentions=False, |
|
|
) -> Tuple[bool, Optional[str]]: |
|
|
"""Convert the model to ONNX format.""" |
|
|
try: |
|
|
extra_args: List[str] = [] |
|
|
if trust_remote_code: |
|
|
if not self.config.is_using_user_token: |
|
|
raise Exception( |
|
|
"Trust Remote Code requires your own HuggingFace token." |
|
|
) |
|
|
extra_args.append("--trust_remote_code") |
|
|
|
|
|
if output_attentions: |
|
|
extra_args.append("--output_attentions") |
|
|
|
|
|
try: |
|
|
info = model_info(repo_id=input_model_id, token=self.config.hf_token) |
|
|
task = self._map_pipeline_to_task(getattr(info, "pipeline_tag", None)) |
|
|
if task: |
|
|
extra_args.extend(["--task", task]) |
|
|
except Exception: |
|
|
pass |
|
|
|
|
|
result = self._run_conversion_subprocess( |
|
|
input_model_id, extra_args=extra_args or None |
|
|
) |
|
|
|
|
|
if result.returncode != 0: |
|
|
return False, result.stderr |
|
|
|
|
|
return True, result.stderr |
|
|
|
|
|
except Exception as e: |
|
|
return False, str(e) |
|
|
|
|
|
def upload_model(self, input_model_id: str, output_model_id: str) -> Optional[str]: |
|
|
"""Upload the converted model to Hugging Face.""" |
|
|
model_folder_path = self.config.repo_path / "models" / input_model_id |
|
|
|
|
|
try: |
|
|
self.api.create_repo(output_model_id, exist_ok=True, private=False) |
|
|
|
|
|
readme_path = f"{model_folder_path}/README.md" |
|
|
|
|
|
with open(readme_path, "w") as file: |
|
|
file.write(self.generate_readme(input_model_id)) |
|
|
|
|
|
self.api.upload_folder( |
|
|
folder_path=str(model_folder_path), repo_id=output_model_id |
|
|
) |
|
|
return None |
|
|
except Exception as e: |
|
|
return str(e) |
|
|
finally: |
|
|
shutil.rmtree(model_folder_path, ignore_errors=True) |
|
|
|
|
|
def generate_readme(self, imi: str): |
|
|
try: |
|
|
info = model_info(repo_id=imi, token=self.config.hf_token) |
|
|
pipeline_tag = getattr(info, "pipeline_tag", None) |
|
|
except Exception: |
|
|
pipeline_tag = None |
|
|
|
|
|
original_text = self._fetch_original_readme(imi) |
|
|
original_meta, original_body = self._extract_yaml_frontmatter(original_text) |
|
|
original_body = ( |
|
|
original_body or self._strip_yaml_frontmatter(original_text) |
|
|
).strip() |
|
|
|
|
|
merged_meta = {} |
|
|
if isinstance(original_meta, dict): |
|
|
merged_meta.update(original_meta) |
|
|
merged_meta["library_name"] = "transformers.js" |
|
|
merged_meta["base_model"] = [imi] |
|
|
if pipeline_tag is not None: |
|
|
merged_meta["pipeline_tag"] = pipeline_tag |
|
|
|
|
|
fm_yaml = yaml.safe_dump(merged_meta, sort_keys=False).strip() |
|
|
header = f"---\n{fm_yaml}\n---\n\n" |
|
|
|
|
|
parts: List[str] = [] |
|
|
parts.append(header) |
|
|
parts.append(f"# {imi.split('/')[-1]} (ONNX)\n") |
|
|
parts.append( |
|
|
f"This is an ONNX version of [{imi}](https://huggingface.co/{imi}). " |
|
|
"It was automatically converted and uploaded using " |
|
|
"[this Hugging Face Space](https://huggingface.co/spaces/onnx-community/convert-to-onnx)." |
|
|
) |
|
|
|
|
|
docs_url = self._pipeline_docs_url(pipeline_tag) |
|
|
if docs_url: |
|
|
parts.append("\n## Usage with Transformers.js\n") |
|
|
if pipeline_tag: |
|
|
parts.append( |
|
|
f"See the pipeline documentation for `{pipeline_tag}`: {docs_url}" |
|
|
) |
|
|
else: |
|
|
parts.append(f"See the pipelines documentation: {docs_url}") |
|
|
|
|
|
if original_body: |
|
|
parts.append("\n---\n") |
|
|
parts.append(original_body) |
|
|
|
|
|
return "\n\n".join(parts) + "\n" |
|
|
|
|
|
|
|
|
def main(): |
|
|
"""Main application entry point.""" |
|
|
st.write("## Convert a Hugging Face model to ONNX") |
|
|
|
|
|
try: |
|
|
config = Config.from_env() |
|
|
converter = ModelConverter(config) |
|
|
converter.setup_repository() |
|
|
|
|
|
input_model_id = st.text_input( |
|
|
"Enter the Hugging Face model ID to convert. Example: `EleutherAI/pythia-14m`" |
|
|
) |
|
|
|
|
|
if not input_model_id: |
|
|
return |
|
|
|
|
|
st.text_input( |
|
|
f"Optional: Your Hugging Face write token. Fill it if you want to upload the model under your account.", |
|
|
type="password", |
|
|
key="user_hf_token", |
|
|
) |
|
|
trust_remote_code = st.toggle("Optional: Trust Remote Code.") |
|
|
if trust_remote_code: |
|
|
st.warning( |
|
|
"This option should only be enabled for repositories you trust and in which you have read the code, as it will execute arbitrary code present in the model repository. When this option is enabled, you must use your own Hugging Face write token." |
|
|
) |
|
|
|
|
|
output_attentions = False |
|
|
if "whisper" in input_model_id.lower(): |
|
|
output_attentions = st.toggle( |
|
|
"Whether to output attentions from the Whisper model. This is required for word-level (token) timestamps." |
|
|
) |
|
|
|
|
|
if config.hf_username == input_model_id.split("/")[0]: |
|
|
same_repo = st.checkbox( |
|
|
"Upload the ONNX weights to the existing repository" |
|
|
) |
|
|
else: |
|
|
same_repo = False |
|
|
|
|
|
model_name = input_model_id.split("/")[-1] |
|
|
|
|
|
output_model_id = f"{config.hf_username}/{model_name}" |
|
|
|
|
|
if not same_repo: |
|
|
output_model_id += "-ONNX" |
|
|
|
|
|
output_model_url = f"{config.hf_base_url}/{output_model_id}" |
|
|
|
|
|
if not same_repo and converter.api.repo_exists(output_model_id): |
|
|
st.write("This model has already been converted! 🎉") |
|
|
st.link_button(f"Go to {output_model_id}", output_model_url, type="primary") |
|
|
return |
|
|
|
|
|
st.write(f"URL where the model will be converted and uploaded to:") |
|
|
st.code(output_model_url, language="plaintext") |
|
|
|
|
|
if not st.button(label="Proceed", type="primary"): |
|
|
return |
|
|
|
|
|
with st.spinner("Converting model..."): |
|
|
success, stderr = converter.convert_model( |
|
|
input_model_id, |
|
|
trust_remote_code=trust_remote_code, |
|
|
output_attentions=output_attentions, |
|
|
) |
|
|
if not success: |
|
|
st.error(f"Conversion failed: {stderr}") |
|
|
return |
|
|
|
|
|
st.success("Conversion successful!") |
|
|
st.code(stderr) |
|
|
|
|
|
with st.spinner("Uploading model..."): |
|
|
error = converter.upload_model(input_model_id, output_model_id) |
|
|
if error: |
|
|
st.error(f"Upload failed: {error}") |
|
|
return |
|
|
|
|
|
st.success("Upload successful!") |
|
|
st.write("You can now go and view the model on Hugging Face!") |
|
|
st.link_button(f"Go to {output_model_id}", output_model_url, type="primary") |
|
|
|
|
|
except Exception as e: |
|
|
logger.exception("Application error") |
|
|
st.error(f"An error occurred: {str(e)}") |
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
main() |
|
|
|