File size: 19,337 Bytes
0f07ba7 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 |
"""
Dynamic Diffusers Pipeline Loader
This module provides dynamic discovery and loading of diffusers pipelines at runtime,
eliminating the need for per-pipeline conditional statements. New pipelines added to
diffusers become available automatically without code changes.
The module also supports discovering other diffusers classes like schedulers, models,
and other components, making it a generic solution for dynamic class loading.
Usage:
from diffusers_dynamic_loader import load_diffusers_pipeline, get_available_pipelines
# Load by class name
pipe = load_diffusers_pipeline(class_name="StableDiffusionPipeline", model_id="...", torch_dtype=torch.float16)
# Load by task alias
pipe = load_diffusers_pipeline(task="text-to-image", model_id="...", torch_dtype=torch.float16)
# Load using model_id (infers from HuggingFace Hub if possible)
pipe = load_diffusers_pipeline(model_id="runwayml/stable-diffusion-v1-5", torch_dtype=torch.float16)
# Get list of available pipelines
available = get_available_pipelines()
# Discover other diffusers classes (schedulers, models, etc.)
schedulers = discover_diffusers_classes("SchedulerMixin")
models = discover_diffusers_classes("ModelMixin")
"""
import importlib
import re
import sys
from typing import Any, Dict, List, Optional, Tuple, Type
# Global cache for discovered pipelines - computed once per process
_pipeline_registry: Optional[Dict[str, Type]] = None
_task_aliases: Optional[Dict[str, List[str]]] = None
# Global cache for other discovered class types
_class_registries: Dict[str, Dict[str, Type]] = {}
def _camel_to_kebab(name: str) -> str:
"""
Convert CamelCase to kebab-case.
Examples:
StableDiffusionPipeline -> stable-diffusion-pipeline
StableDiffusionXLImg2ImgPipeline -> stable-diffusion-xl-img-2-img-pipeline
"""
# Insert hyphen before uppercase letters (but not at the start)
s1 = re.sub('(.)([A-Z][a-z]+)', r'\1-\2', name)
# Insert hyphen before uppercase letters following lowercase letters or numbers
s2 = re.sub('([a-z0-9])([A-Z])', r'\1-\2', s1)
return s2.lower()
def _extract_task_keywords(class_name: str) -> List[str]:
"""
Extract task-related keywords from a pipeline class name.
This function derives useful task aliases from the class name without
hardcoding per-pipeline branches.
Returns a list of potential task aliases for this pipeline.
"""
aliases = []
name_lower = class_name.lower()
# Direct task mappings based on common patterns in class names
task_patterns = {
'text2image': ['text-to-image', 'txt2img', 'text2image'],
'texttoimage': ['text-to-image', 'txt2img', 'text2image'],
'txt2img': ['text-to-image', 'txt2img', 'text2image'],
'img2img': ['image-to-image', 'img2img', 'image2image'],
'image2image': ['image-to-image', 'img2img', 'image2image'],
'imagetoimage': ['image-to-image', 'img2img', 'image2image'],
'img2video': ['image-to-video', 'img2vid', 'img2video'],
'imagetovideo': ['image-to-video', 'img2vid', 'img2video'],
'text2video': ['text-to-video', 'txt2vid', 'text2video'],
'texttovideo': ['text-to-video', 'txt2vid', 'text2video'],
'inpaint': ['inpainting', 'inpaint'],
'depth2img': ['depth-to-image', 'depth2img'],
'depthtoimage': ['depth-to-image', 'depth2img'],
'controlnet': ['controlnet', 'control-net'],
'upscale': ['upscaling', 'upscale', 'super-resolution'],
'superresolution': ['upscaling', 'upscale', 'super-resolution'],
}
# Check for each pattern in the class name
for pattern, task_aliases in task_patterns.items():
if pattern in name_lower:
aliases.extend(task_aliases)
# Also detect general pipeline types from the class name structure
# E.g., StableDiffusionPipeline -> stable-diffusion, flux -> flux
# Remove "Pipeline" suffix and convert to kebab case
if class_name.endswith('Pipeline'):
base_name = class_name[:-8] # Remove "Pipeline"
kebab_name = _camel_to_kebab(base_name)
aliases.append(kebab_name)
# Extract model family name (e.g., "stable-diffusion" from "stable-diffusion-xl-img-2-img")
parts = kebab_name.split('-')
if len(parts) >= 2:
# Try the first two words as a family name
family = '-'.join(parts[:2])
if family not in aliases:
aliases.append(family)
# If no specific task pattern matched but class contains "Pipeline", add "text-to-image" as default
# since most diffusion pipelines support text-to-image generation
if 'text-to-image' not in aliases and 'image-to-image' not in aliases:
# Only add for pipelines that seem to be generation pipelines (not schedulers, etc.)
if 'pipeline' in name_lower and not any(x in name_lower for x in ['scheduler', 'processor', 'encoder']):
# Don't automatically add - let it be explicit
pass
return list(set(aliases)) # Remove duplicates
def discover_diffusers_classes(
base_class_name: str,
include_base: bool = True
) -> Dict[str, Type]:
"""
Discover all subclasses of a given base class from diffusers.
This function provides a generic way to discover any type of diffusers class,
not just pipelines. It can be used to discover schedulers, models, processors,
and other components.
Args:
base_class_name: Name of the base class to search for subclasses
(e.g., "DiffusionPipeline", "SchedulerMixin", "ModelMixin")
include_base: Whether to include the base class itself in results
Returns:
Dict mapping class names to class objects
Examples:
# Discover all pipeline classes
pipelines = discover_diffusers_classes("DiffusionPipeline")
# Discover all scheduler classes
schedulers = discover_diffusers_classes("SchedulerMixin")
# Discover all model classes
models = discover_diffusers_classes("ModelMixin")
# Discover AutoPipeline classes
auto_pipelines = discover_diffusers_classes("AutoPipelineForText2Image")
"""
global _class_registries
# Check cache first
if base_class_name in _class_registries:
return _class_registries[base_class_name]
import diffusers
# Try to get the base class from diffusers
base_class = None
try:
base_class = getattr(diffusers, base_class_name)
except AttributeError:
# Try to find in submodules
for submodule in ['schedulers', 'models', 'pipelines']:
try:
module = importlib.import_module(f'diffusers.{submodule}')
if hasattr(module, base_class_name):
base_class = getattr(module, base_class_name)
break
except (ImportError, ModuleNotFoundError):
continue
if base_class is None:
raise ValueError(f"Could not find base class '{base_class_name}' in diffusers")
registry: Dict[str, Type] = {}
# Include base class if requested
if include_base:
registry[base_class_name] = base_class
# Scan diffusers module for subclasses
for attr_name in dir(diffusers):
try:
attr = getattr(diffusers, attr_name)
if (isinstance(attr, type) and
issubclass(attr, base_class) and
(include_base or attr is not base_class)):
registry[attr_name] = attr
except (ImportError, AttributeError, TypeError, RuntimeError, ModuleNotFoundError):
continue
# Cache the results
_class_registries[base_class_name] = registry
return registry
def get_available_classes(base_class_name: str) -> List[str]:
"""
Get a sorted list of all discovered class names for a given base class.
Args:
base_class_name: Name of the base class (e.g., "SchedulerMixin")
Returns:
Sorted list of discovered class names
"""
return sorted(discover_diffusers_classes(base_class_name).keys())
def _discover_pipelines() -> Tuple[Dict[str, Type], Dict[str, List[str]]]:
"""
Discover all subclasses of DiffusionPipeline from diffusers.
This function uses the generic discover_diffusers_classes() internally
and adds pipeline-specific task alias generation. It also includes
AutoPipeline classes which are special utility classes for automatic
pipeline selection.
Returns:
A tuple of (pipeline_registry, task_aliases) where:
- pipeline_registry: Dict mapping class names to class objects
- task_aliases: Dict mapping task aliases to lists of class names
"""
# Use the generic discovery function
pipeline_registry = discover_diffusers_classes("DiffusionPipeline", include_base=True)
# Also add AutoPipeline classes - these are special utility classes that are
# NOT subclasses of DiffusionPipeline but are commonly used
import diffusers
auto_pipeline_classes = [
"AutoPipelineForText2Image",
"AutoPipelineForImage2Image",
"AutoPipelineForInpainting",
]
for cls_name in auto_pipeline_classes:
try:
cls = getattr(diffusers, cls_name)
if cls is not None:
pipeline_registry[cls_name] = cls
except AttributeError:
# Class not available in this version of diffusers
pass
# Generate task aliases for pipelines
task_aliases: Dict[str, List[str]] = {}
for attr_name in pipeline_registry:
if attr_name == "DiffusionPipeline":
continue # Skip base class for alias generation
aliases = _extract_task_keywords(attr_name)
for alias in aliases:
if alias not in task_aliases:
task_aliases[alias] = []
if attr_name not in task_aliases[alias]:
task_aliases[alias].append(attr_name)
return pipeline_registry, task_aliases
def get_pipeline_registry() -> Dict[str, Type]:
"""
Get the cached pipeline registry.
Returns a dictionary mapping pipeline class names to their class objects.
The registry is built on first access and cached for subsequent calls.
"""
global _pipeline_registry, _task_aliases
if _pipeline_registry is None:
_pipeline_registry, _task_aliases = _discover_pipelines()
return _pipeline_registry
def get_task_aliases() -> Dict[str, List[str]]:
"""
Get the cached task aliases dictionary.
Returns a dictionary mapping task aliases (e.g., "text-to-image") to
lists of pipeline class names that support that task.
"""
global _pipeline_registry, _task_aliases
if _task_aliases is None:
_pipeline_registry, _task_aliases = _discover_pipelines()
return _task_aliases
def get_available_pipelines() -> List[str]:
"""
Get a sorted list of all discovered pipeline class names.
Returns:
List of pipeline class names available for loading.
"""
return sorted(get_pipeline_registry().keys())
def get_available_tasks() -> List[str]:
"""
Get a sorted list of all available task aliases.
Returns:
List of task aliases (e.g., ["text-to-image", "image-to-image", ...])
"""
return sorted(get_task_aliases().keys())
def resolve_pipeline_class(
class_name: Optional[str] = None,
task: Optional[str] = None,
model_id: Optional[str] = None
) -> Type:
"""
Resolve a pipeline class from class_name, task, or model_id.
Priority:
1. If class_name is provided, look it up directly
2. If task is provided, resolve through task aliases
3. If model_id is provided, try to infer from HuggingFace Hub
Args:
class_name: Exact pipeline class name (e.g., "StableDiffusionPipeline")
task: Task alias (e.g., "text-to-image", "img2img")
model_id: HuggingFace model ID (e.g., "runwayml/stable-diffusion-v1-5")
Returns:
The resolved pipeline class.
Raises:
ValueError: If no pipeline could be resolved.
"""
registry = get_pipeline_registry()
aliases = get_task_aliases()
# 1. Direct class name lookup
if class_name:
if class_name in registry:
return registry[class_name]
# Try case-insensitive match
for name, cls in registry.items():
if name.lower() == class_name.lower():
return cls
raise ValueError(
f"Unknown pipeline class '{class_name}'. "
f"Available pipelines: {', '.join(sorted(registry.keys())[:20])}..."
)
# 2. Task alias lookup
if task:
task_lower = task.lower().replace('_', '-')
if task_lower in aliases:
# Return the first matching pipeline for this task
matching_classes = aliases[task_lower]
if matching_classes:
return registry[matching_classes[0]]
# Try partial matching
for alias, classes in aliases.items():
if task_lower in alias or alias in task_lower:
if classes:
return registry[classes[0]]
raise ValueError(
f"Unknown task '{task}'. "
f"Available tasks: {', '.join(sorted(aliases.keys())[:20])}..."
)
# 3. Try to infer from HuggingFace Hub
if model_id:
try:
from huggingface_hub import model_info
info = model_info(model_id)
# Check pipeline_tag
if hasattr(info, 'pipeline_tag') and info.pipeline_tag:
tag = info.pipeline_tag.lower().replace('_', '-')
if tag in aliases:
matching_classes = aliases[tag]
if matching_classes:
return registry[matching_classes[0]]
# Check model card for hints
if hasattr(info, 'cardData') and info.cardData:
card = info.cardData
if 'pipeline_tag' in card:
tag = card['pipeline_tag'].lower().replace('_', '-')
if tag in aliases:
matching_classes = aliases[tag]
if matching_classes:
return registry[matching_classes[0]]
except ImportError:
# huggingface_hub not available
pass
except (KeyError, AttributeError, ValueError, OSError):
# Model info lookup failed - common cases:
# - KeyError: Missing keys in model card
# - AttributeError: Missing attributes on model info
# - ValueError: Invalid model data
# - OSError: Network or file access issues
pass
# Fallback: use DiffusionPipeline.from_pretrained which auto-detects
# DiffusionPipeline is always added to registry in _discover_pipelines (line 132)
# but use .get() with import fallback for extra safety
from diffusers import DiffusionPipeline
return registry.get('DiffusionPipeline', DiffusionPipeline)
raise ValueError(
"Must provide at least one of: class_name, task, or model_id. "
f"Available pipelines: {', '.join(sorted(registry.keys())[:20])}... "
f"Available tasks: {', '.join(sorted(aliases.keys())[:20])}..."
)
def load_diffusers_pipeline(
class_name: Optional[str] = None,
task: Optional[str] = None,
model_id: Optional[str] = None,
from_single_file: bool = False,
**kwargs
) -> Any:
"""
Load a diffusers pipeline dynamically.
This function resolves the appropriate pipeline class based on the provided
parameters and instantiates it with the given kwargs.
Args:
class_name: Exact pipeline class name (e.g., "StableDiffusionPipeline")
task: Task alias (e.g., "text-to-image", "img2img")
model_id: HuggingFace model ID or local path
from_single_file: If True, use from_single_file() instead of from_pretrained()
**kwargs: Additional arguments passed to from_pretrained() or from_single_file()
Returns:
An instantiated pipeline object.
Raises:
ValueError: If no pipeline could be resolved.
Exception: If pipeline loading fails.
Examples:
# Load by class name
pipe = load_diffusers_pipeline(
class_name="StableDiffusionPipeline",
model_id="runwayml/stable-diffusion-v1-5",
torch_dtype=torch.float16
)
# Load by task
pipe = load_diffusers_pipeline(
task="text-to-image",
model_id="runwayml/stable-diffusion-v1-5",
torch_dtype=torch.float16
)
# Load from single file
pipe = load_diffusers_pipeline(
class_name="StableDiffusionPipeline",
model_id="/path/to/model.safetensors",
from_single_file=True,
torch_dtype=torch.float16
)
"""
# Resolve the pipeline class
pipeline_class = resolve_pipeline_class(
class_name=class_name,
task=task,
model_id=model_id
)
# If no model_id provided but we have a class, we can't load
if model_id is None:
raise ValueError("model_id is required to load a pipeline")
# Load the pipeline
try:
if from_single_file:
# Check if the class has from_single_file method
if hasattr(pipeline_class, 'from_single_file'):
return pipeline_class.from_single_file(model_id, **kwargs)
else:
raise ValueError(
f"Pipeline class {pipeline_class.__name__} does not support from_single_file(). "
f"Use from_pretrained() instead."
)
else:
return pipeline_class.from_pretrained(model_id, **kwargs)
except Exception as e:
# Provide helpful error message
available = get_available_pipelines()
raise RuntimeError(
f"Failed to load pipeline '{pipeline_class.__name__}' from '{model_id}': {e}\n"
f"Available pipelines: {', '.join(available[:20])}..."
) from e
def get_pipeline_info(class_name: str) -> Dict[str, Any]:
"""
Get information about a specific pipeline class.
Args:
class_name: The pipeline class name
Returns:
Dictionary with pipeline information including:
- name: Class name
- aliases: List of task aliases
- supports_single_file: Whether from_single_file() is available
- docstring: Class docstring (if available)
"""
registry = get_pipeline_registry()
aliases = get_task_aliases()
if class_name not in registry:
raise ValueError(f"Unknown pipeline: {class_name}")
cls = registry[class_name]
# Find all aliases for this pipeline
pipeline_aliases = []
for alias, classes in aliases.items():
if class_name in classes:
pipeline_aliases.append(alias)
return {
'name': class_name,
'aliases': pipeline_aliases,
'supports_single_file': hasattr(cls, 'from_single_file'),
'docstring': cls.__doc__[:200] if cls.__doc__ else None
}
|