Instructions to use HaadesX/Iconoclast with libraries, inference providers, notebooks, and local apps. Follow these links to get started.
- Libraries
- Transformers
How to use HaadesX/Iconoclast with Transformers:
# Load model directly from transformers import AutoModel model = AutoModel.from_pretrained("HaadesX/Iconoclast", dtype="auto") - Notebooks
- Google Colab
- Kaggle
| # SPDX-License-Identifier: AGPL-3.0-or-later | |
| # Copyright (C) 2025-2026 Philipp Emanuel Weidmann <pew@worldwidemann.com> + contributors | |
| import gc | |
| import getpass | |
| import os | |
| import random | |
| from dataclasses import dataclass | |
| from importlib.metadata import version | |
| from pathlib import Path | |
| from typing import Any, TypeVar | |
| import numpy as np | |
| import questionary | |
| import torch | |
| from accelerate.utils import ( | |
| is_mlu_available, | |
| is_musa_available, | |
| is_sdaa_available, | |
| is_xpu_available, | |
| ) | |
| from datasets import DatasetDict, ReadInstruction, load_dataset, load_from_disk | |
| from datasets.config import DATASET_STATE_JSON_FILENAME | |
| from datasets.download.download_manager import DownloadMode | |
| from datasets.utils.info_utils import VerificationMode | |
| from optuna import Trial | |
| from psutil import Process | |
| from questionary import Choice, Style | |
| from rich.console import Console | |
| from .config import DatasetSpecification, Settings | |
| print = Console(highlight=False).print | |
| def set_random_seed(seed: int): | |
| random.seed(seed) | |
| np.random.seed(seed) | |
| torch.manual_seed(seed) | |
| if torch.cuda.is_available(): | |
| torch.cuda.manual_seed_all(seed) | |
| def print_memory_usage(): | |
| def p(label: str, size_in_bytes: int): | |
| print(f"[grey50]{label}: [bold]{size_in_bytes / (1024**3):.2f} GB[/][/]") | |
| p("Resident system RAM", Process().memory_info().rss) | |
| if torch.cuda.is_available(): | |
| count = torch.cuda.device_count() | |
| allocated = sum(torch.cuda.memory_allocated(device) for device in range(count)) | |
| reserved = sum(torch.cuda.memory_reserved(device) for device in range(count)) | |
| p("Allocated GPU VRAM", allocated) | |
| p("Reserved GPU VRAM", reserved) | |
| elif is_xpu_available(): | |
| count = torch.xpu.device_count() | |
| allocated = sum(torch.xpu.memory_allocated(device) for device in range(count)) | |
| reserved = sum(torch.xpu.memory_reserved(device) for device in range(count)) | |
| p("Allocated XPU memory", allocated) | |
| p("Reserved XPU memory", reserved) | |
| elif torch.backends.mps.is_available(): | |
| p("Allocated MPS memory", torch.mps.current_allocated_memory()) | |
| p("Driver (reserved) MPS memory", torch.mps.driver_allocated_memory()) | |
| def is_notebook() -> bool: | |
| # Check for specific environment variables (Colab, Kaggle). | |
| # This is necessary because when running as a subprocess (e.g. !iconoclast), | |
| # get_ipython() might not be available or might not reflect the notebook environment. | |
| if os.getenv("COLAB_GPU") or os.getenv("KAGGLE_KERNEL_RUN_TYPE"): | |
| return True | |
| # Check IPython shell type (for library usage). | |
| try: | |
| from IPython import get_ipython # ty:ignore[unresolved-import] | |
| shell = get_ipython() | |
| if shell is None: | |
| return False | |
| shell_name = shell.__class__.__name__ | |
| if shell_name in ["ZMQInteractiveShell", "Shell"]: | |
| return True | |
| if "google.colab" in str(shell.__class__): | |
| return True | |
| return False | |
| except (ImportError, NameError, AttributeError): | |
| return False | |
| def prompt_select(message: str, choices: list[Any]) -> Any: | |
| if is_notebook(): | |
| print() | |
| print(message) | |
| real_choices = [] | |
| for i, choice in enumerate(choices, 1): | |
| if isinstance(choice, Choice): | |
| print(f"[{i}] {choice.title}") | |
| real_choices.append(choice.value) | |
| else: | |
| print(f"[{i}] {choice}") | |
| real_choices.append(choice) | |
| while True: | |
| try: | |
| selection = input("Enter number: ") | |
| index = int(selection) - 1 | |
| if 0 <= index < len(real_choices): | |
| return real_choices[index] | |
| print( | |
| f"[red]Please enter a number between 1 and {len(real_choices)}[/]" | |
| ) | |
| except ValueError: | |
| print("[red]Invalid input. Please enter a number.[/]") | |
| else: | |
| return questionary.select( | |
| message, | |
| choices=choices, | |
| style=Style([("highlighted", "reverse")]), | |
| ).ask() | |
| def prompt_text( | |
| message: str, | |
| default: str = "", | |
| qmark: str = "?", | |
| unsafe: bool = False, | |
| ) -> str: | |
| if is_notebook(): | |
| print() | |
| result = input(f"{message} [{default}]: " if default else f"{message}: ") | |
| return result if result else default | |
| else: | |
| question = questionary.text(message, default=default, qmark=qmark) | |
| if unsafe: | |
| return question.unsafe_ask() | |
| else: | |
| return question.ask() | |
| def prompt_path(message: str) -> str: | |
| if is_notebook(): | |
| return prompt_text(message) | |
| else: | |
| return questionary.path(message, only_directories=True).ask() | |
| def prompt_password(message: str) -> str: | |
| if is_notebook(): | |
| print() | |
| return getpass.getpass(message) | |
| else: | |
| return questionary.password(message).ask() | |
| def format_duration(seconds: float) -> str: | |
| seconds = round(seconds) | |
| hours, seconds = divmod(seconds, 3600) | |
| minutes, seconds = divmod(seconds, 60) | |
| if hours > 0: | |
| return f"{hours}h {minutes}m" | |
| elif minutes > 0: | |
| return f"{minutes}m {seconds}s" | |
| else: | |
| return f"{seconds}s" | |
| class Prompt: | |
| system: str | |
| user: str | |
| def load_prompts( | |
| settings: Settings, | |
| specification: DatasetSpecification, | |
| ) -> list[Prompt]: | |
| path = specification.dataset | |
| name = specification.name | |
| split_str = specification.split | |
| if os.path.isdir(path): | |
| if Path(path, DATASET_STATE_JSON_FILENAME).exists(): | |
| # Dataset saved with datasets.save_to_disk; needs special handling. | |
| # Path should be the subdirectory for a particular split. | |
| dataset = load_from_disk(path) | |
| assert not isinstance(dataset, DatasetDict), ( | |
| "Loading dataset dicts is not supported" | |
| ) | |
| # Parse the split instructions. | |
| instruction = ReadInstruction.from_spec(split_str) | |
| # Associate the split with its number of examples (lines). | |
| split_name = str(dataset.split) | |
| name2len = {split_name: len(dataset)} | |
| # Convert the instructions to absolute indices and select the first one. | |
| abs_instruction = instruction.to_absolute(name2len)[0] | |
| # Get the dataset by applying the indices. | |
| dataset = dataset[abs_instruction.from_ : abs_instruction.to] | |
| else: | |
| # Path is a local directory. | |
| download_mode = ( | |
| DownloadMode.FORCE_REDOWNLOAD | |
| if settings.reload_local_datasets | |
| else DownloadMode.REUSE_DATASET_IF_EXISTS | |
| ) | |
| dataset = load_dataset( | |
| path, | |
| name=name, | |
| split=split_str, | |
| # Don't require the number of examples (lines) per split to be pre-defined. | |
| verification_mode=VerificationMode.NO_CHECKS, | |
| download_mode=download_mode, | |
| ) | |
| else: | |
| # Probably a repository path; let load_dataset figure it out. | |
| dataset = load_dataset(path, name=name, split=split_str) | |
| prompts = list(dataset[specification.column]) | |
| if specification.prefix: | |
| prompts = [f"{specification.prefix} {prompt}" for prompt in prompts] | |
| if specification.suffix: | |
| prompts = [f"{prompt} {specification.suffix}" for prompt in prompts] | |
| system_prompt = ( | |
| settings.system_prompt | |
| if specification.system_prompt is None | |
| else specification.system_prompt | |
| ) | |
| return [ | |
| Prompt( | |
| system=system_prompt, | |
| user=prompt, | |
| ) | |
| for prompt in prompts | |
| ] | |
| T = TypeVar("T") | |
| def batchify(items: list[T], batch_size: int) -> list[list[T]]: | |
| return [items[i : i + batch_size] for i in range(0, len(items), batch_size)] | |
| def empty_cache(): | |
| # Collecting garbage is not an idempotent operation, and to avoid OOM errors, | |
| # gc.collect() has to be called both before and after emptying the backend cache. | |
| # This behavior is retained from the upstream implementation because it helps avoid OOMs. | |
| gc.collect() | |
| if torch.cuda.is_available(): | |
| torch.cuda.empty_cache() | |
| elif is_xpu_available(): | |
| torch.xpu.empty_cache() | |
| elif is_mlu_available(): | |
| torch.mlu.empty_cache() # ty:ignore[unresolved-attribute] | |
| elif is_sdaa_available(): | |
| torch.sdaa.empty_cache() # ty:ignore[unresolved-attribute] | |
| elif is_musa_available(): | |
| torch.musa.empty_cache() # ty:ignore[unresolved-attribute] | |
| elif torch.backends.mps.is_available(): | |
| torch.mps.empty_cache() | |
| gc.collect() | |
| def get_trial_parameters(trial: Trial) -> dict[str, str]: | |
| params = {} | |
| component_direction_methods = trial.user_attrs.get("component_direction_methods") | |
| if isinstance(component_direction_methods, dict): | |
| component_direction_indices = trial.user_attrs.get( | |
| "component_direction_indices", | |
| {}, | |
| ) | |
| component_direction_scopes = trial.user_attrs.get( | |
| "component_direction_scopes", | |
| {}, | |
| ) | |
| component_direction_blends = trial.user_attrs.get( | |
| "component_direction_blends", | |
| {}, | |
| ) | |
| for component, direction_method in component_direction_methods.items(): | |
| params[f"{component}.direction_method"] = direction_method | |
| params[f"{component}.direction_blend"] = ( | |
| f"{component_direction_blends.get(component, 0.0):.2f}" | |
| ) | |
| direction_index = component_direction_indices.get(component) | |
| direction_scope = component_direction_scopes.get(component, "global") | |
| params[f"{component}.direction_index"] = ( | |
| "per layer" | |
| if direction_scope == "per layer" or direction_index is None | |
| else f"{direction_index:.2f}" | |
| ) | |
| else: | |
| params["direction_method"] = trial.user_attrs.get("direction_method", "mean") | |
| params["direction_blend"] = f"{trial.user_attrs.get('direction_blend', 0.0):.2f}" | |
| direction_index = trial.user_attrs["direction_index"] | |
| params["direction_index"] = ( | |
| "per layer" if (direction_index is None) else f"{direction_index:.2f}" | |
| ) | |
| for component, parameters in trial.user_attrs["parameters"].items(): | |
| for name, value in parameters.items(): | |
| params[f"{component}.{name}"] = f"{value:.2f}" | |
| return params | |
| def get_readme_intro( | |
| settings: Settings, | |
| trial: Trial, | |
| base_refusals: int, | |
| base_overrefusals: int, | |
| good_prompts: list[Prompt], | |
| bad_prompts: list[Prompt], | |
| ) -> str: | |
| if Path(settings.model).exists(): | |
| # Hide the path, which may contain private information. | |
| model_link = "a model" | |
| else: | |
| model_link = f"[{settings.model}](https://huggingface.co/{settings.model})" | |
| return f"""# This is a modified version of { | |
| model_link | |
| }, made using Iconoclast v{version("iconoclast-llm")} | |
| ## Abliteration parameters | |
| | Parameter | Value | | |
| | :-------- | :---: | | |
| { | |
| chr(10).join( | |
| [ | |
| f"| **{name}** | {value} |" | |
| for name, value in get_trial_parameters(trial).items() | |
| ] | |
| ) | |
| } | |
| ## Performance | |
| | Metric | This model | Original model ({model_link}) | | |
| | :----- | :--------: | :---------------------------: | | |
| | **KL divergence** | {trial.user_attrs["kl_divergence"]:.4f} | 0 *(by definition)* | | |
| | **Refusals** | {trial.user_attrs["refusals"]}/{len(bad_prompts)} | {base_refusals}/{ | |
| len(bad_prompts) | |
| } | | |
| | **Overrefusals** | {trial.user_attrs.get("overrefusals", "n/a")}/{len(good_prompts)} | {base_overrefusals}/{len(good_prompts)} | | |
| ----- | |
| """ | |