File size: 19,337 Bytes
0f07ba7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
"""
Dynamic Diffusers Pipeline Loader

This module provides dynamic discovery and loading of diffusers pipelines at runtime,
eliminating the need for per-pipeline conditional statements. New pipelines added to
diffusers become available automatically without code changes.

The module also supports discovering other diffusers classes like schedulers, models,
and other components, making it a generic solution for dynamic class loading.

Usage:
    from diffusers_dynamic_loader import load_diffusers_pipeline, get_available_pipelines

    # Load by class name
    pipe = load_diffusers_pipeline(class_name="StableDiffusionPipeline", model_id="...", torch_dtype=torch.float16)

    # Load by task alias
    pipe = load_diffusers_pipeline(task="text-to-image", model_id="...", torch_dtype=torch.float16)

    # Load using model_id (infers from HuggingFace Hub if possible)
    pipe = load_diffusers_pipeline(model_id="runwayml/stable-diffusion-v1-5", torch_dtype=torch.float16)

    # Get list of available pipelines
    available = get_available_pipelines()

    # Discover other diffusers classes (schedulers, models, etc.)
    schedulers = discover_diffusers_classes("SchedulerMixin")
    models = discover_diffusers_classes("ModelMixin")
"""

import importlib
import re
import sys
from typing import Any, Dict, List, Optional, Tuple, Type


# Global cache for discovered pipelines - computed once per process
_pipeline_registry: Optional[Dict[str, Type]] = None
_task_aliases: Optional[Dict[str, List[str]]] = None

# Global cache for other discovered class types
_class_registries: Dict[str, Dict[str, Type]] = {}


def _camel_to_kebab(name: str) -> str:
    """
    Convert CamelCase to kebab-case.

    Examples:
        StableDiffusionPipeline -> stable-diffusion-pipeline
        StableDiffusionXLImg2ImgPipeline -> stable-diffusion-xl-img-2-img-pipeline
    """
    # Insert hyphen before uppercase letters (but not at the start)
    s1 = re.sub('(.)([A-Z][a-z]+)', r'\1-\2', name)
    # Insert hyphen before uppercase letters following lowercase letters or numbers
    s2 = re.sub('([a-z0-9])([A-Z])', r'\1-\2', s1)
    return s2.lower()


def _extract_task_keywords(class_name: str) -> List[str]:
    """
    Extract task-related keywords from a pipeline class name.

    This function derives useful task aliases from the class name without
    hardcoding per-pipeline branches.

    Returns a list of potential task aliases for this pipeline.
    """
    aliases = []
    name_lower = class_name.lower()

    # Direct task mappings based on common patterns in class names
    task_patterns = {
        'text2image': ['text-to-image', 'txt2img', 'text2image'],
        'texttoimage': ['text-to-image', 'txt2img', 'text2image'],
        'txt2img': ['text-to-image', 'txt2img', 'text2image'],
        'img2img': ['image-to-image', 'img2img', 'image2image'],
        'image2image': ['image-to-image', 'img2img', 'image2image'],
        'imagetoimage': ['image-to-image', 'img2img', 'image2image'],
        'img2video': ['image-to-video', 'img2vid', 'img2video'],
        'imagetovideo': ['image-to-video', 'img2vid', 'img2video'],
        'text2video': ['text-to-video', 'txt2vid', 'text2video'],
        'texttovideo': ['text-to-video', 'txt2vid', 'text2video'],
        'inpaint': ['inpainting', 'inpaint'],
        'depth2img': ['depth-to-image', 'depth2img'],
        'depthtoimage': ['depth-to-image', 'depth2img'],
        'controlnet': ['controlnet', 'control-net'],
        'upscale': ['upscaling', 'upscale', 'super-resolution'],
        'superresolution': ['upscaling', 'upscale', 'super-resolution'],
    }

    # Check for each pattern in the class name
    for pattern, task_aliases in task_patterns.items():
        if pattern in name_lower:
            aliases.extend(task_aliases)

    # Also detect general pipeline types from the class name structure
    # E.g., StableDiffusionPipeline -> stable-diffusion, flux -> flux
    # Remove "Pipeline" suffix and convert to kebab case
    if class_name.endswith('Pipeline'):
        base_name = class_name[:-8]  # Remove "Pipeline"
        kebab_name = _camel_to_kebab(base_name)
        aliases.append(kebab_name)

        # Extract model family name (e.g., "stable-diffusion" from "stable-diffusion-xl-img-2-img")
        parts = kebab_name.split('-')
        if len(parts) >= 2:
            # Try the first two words as a family name
            family = '-'.join(parts[:2])
            if family not in aliases:
                aliases.append(family)

    # If no specific task pattern matched but class contains "Pipeline", add "text-to-image" as default
    # since most diffusion pipelines support text-to-image generation
    if 'text-to-image' not in aliases and 'image-to-image' not in aliases:
        # Only add for pipelines that seem to be generation pipelines (not schedulers, etc.)
        if 'pipeline' in name_lower and not any(x in name_lower for x in ['scheduler', 'processor', 'encoder']):
            # Don't automatically add - let it be explicit
            pass

    return list(set(aliases))  # Remove duplicates


def discover_diffusers_classes(
    base_class_name: str,
    include_base: bool = True
) -> Dict[str, Type]:
    """
    Discover all subclasses of a given base class from diffusers.

    This function provides a generic way to discover any type of diffusers class,
    not just pipelines. It can be used to discover schedulers, models, processors,
    and other components.

    Args:
        base_class_name: Name of the base class to search for subclasses
                        (e.g., "DiffusionPipeline", "SchedulerMixin", "ModelMixin")
        include_base: Whether to include the base class itself in results

    Returns:
        Dict mapping class names to class objects

    Examples:
        # Discover all pipeline classes
        pipelines = discover_diffusers_classes("DiffusionPipeline")

        # Discover all scheduler classes
        schedulers = discover_diffusers_classes("SchedulerMixin")

        # Discover all model classes
        models = discover_diffusers_classes("ModelMixin")

        # Discover AutoPipeline classes
        auto_pipelines = discover_diffusers_classes("AutoPipelineForText2Image")
    """
    global _class_registries

    # Check cache first
    if base_class_name in _class_registries:
        return _class_registries[base_class_name]

    import diffusers

    # Try to get the base class from diffusers
    base_class = None
    try:
        base_class = getattr(diffusers, base_class_name)
    except AttributeError:
        # Try to find in submodules
        for submodule in ['schedulers', 'models', 'pipelines']:
            try:
                module = importlib.import_module(f'diffusers.{submodule}')
                if hasattr(module, base_class_name):
                    base_class = getattr(module, base_class_name)
                    break
            except (ImportError, ModuleNotFoundError):
                continue

    if base_class is None:
        raise ValueError(f"Could not find base class '{base_class_name}' in diffusers")

    registry: Dict[str, Type] = {}

    # Include base class if requested
    if include_base:
        registry[base_class_name] = base_class

    # Scan diffusers module for subclasses
    for attr_name in dir(diffusers):
        try:
            attr = getattr(diffusers, attr_name)
            if (isinstance(attr, type) and
                issubclass(attr, base_class) and
                (include_base or attr is not base_class)):
                registry[attr_name] = attr
        except (ImportError, AttributeError, TypeError, RuntimeError, ModuleNotFoundError):
            continue

    # Cache the results
    _class_registries[base_class_name] = registry
    return registry


def get_available_classes(base_class_name: str) -> List[str]:
    """
    Get a sorted list of all discovered class names for a given base class.

    Args:
        base_class_name: Name of the base class (e.g., "SchedulerMixin")

    Returns:
        Sorted list of discovered class names
    """
    return sorted(discover_diffusers_classes(base_class_name).keys())


def _discover_pipelines() -> Tuple[Dict[str, Type], Dict[str, List[str]]]:
    """
    Discover all subclasses of DiffusionPipeline from diffusers.

    This function uses the generic discover_diffusers_classes() internally
    and adds pipeline-specific task alias generation. It also includes
    AutoPipeline classes which are special utility classes for automatic
    pipeline selection.

    Returns:
        A tuple of (pipeline_registry, task_aliases) where:
        - pipeline_registry: Dict mapping class names to class objects
        - task_aliases: Dict mapping task aliases to lists of class names
    """
    # Use the generic discovery function
    pipeline_registry = discover_diffusers_classes("DiffusionPipeline", include_base=True)

    # Also add AutoPipeline classes - these are special utility classes that are
    # NOT subclasses of DiffusionPipeline but are commonly used
    import diffusers
    auto_pipeline_classes = [
        "AutoPipelineForText2Image",
        "AutoPipelineForImage2Image",
        "AutoPipelineForInpainting",
    ]
    for cls_name in auto_pipeline_classes:
        try:
            cls = getattr(diffusers, cls_name)
            if cls is not None:
                pipeline_registry[cls_name] = cls
        except AttributeError:
            # Class not available in this version of diffusers
            pass

    # Generate task aliases for pipelines
    task_aliases: Dict[str, List[str]] = {}
    for attr_name in pipeline_registry:
        if attr_name == "DiffusionPipeline":
            continue  # Skip base class for alias generation

        aliases = _extract_task_keywords(attr_name)
        for alias in aliases:
            if alias not in task_aliases:
                task_aliases[alias] = []
            if attr_name not in task_aliases[alias]:
                task_aliases[alias].append(attr_name)

    return pipeline_registry, task_aliases


def get_pipeline_registry() -> Dict[str, Type]:
    """
    Get the cached pipeline registry.

    Returns a dictionary mapping pipeline class names to their class objects.
    The registry is built on first access and cached for subsequent calls.
    """
    global _pipeline_registry, _task_aliases
    if _pipeline_registry is None:
        _pipeline_registry, _task_aliases = _discover_pipelines()
    return _pipeline_registry


def get_task_aliases() -> Dict[str, List[str]]:
    """
    Get the cached task aliases dictionary.

    Returns a dictionary mapping task aliases (e.g., "text-to-image") to
    lists of pipeline class names that support that task.
    """
    global _pipeline_registry, _task_aliases
    if _task_aliases is None:
        _pipeline_registry, _task_aliases = _discover_pipelines()
    return _task_aliases


def get_available_pipelines() -> List[str]:
    """
    Get a sorted list of all discovered pipeline class names.

    Returns:
        List of pipeline class names available for loading.
    """
    return sorted(get_pipeline_registry().keys())


def get_available_tasks() -> List[str]:
    """
    Get a sorted list of all available task aliases.

    Returns:
        List of task aliases (e.g., ["text-to-image", "image-to-image", ...])
    """
    return sorted(get_task_aliases().keys())


def resolve_pipeline_class(
    class_name: Optional[str] = None,
    task: Optional[str] = None,
    model_id: Optional[str] = None
) -> Type:
    """
    Resolve a pipeline class from class_name, task, or model_id.

    Priority:
    1. If class_name is provided, look it up directly
    2. If task is provided, resolve through task aliases
    3. If model_id is provided, try to infer from HuggingFace Hub

    Args:
        class_name: Exact pipeline class name (e.g., "StableDiffusionPipeline")
        task: Task alias (e.g., "text-to-image", "img2img")
        model_id: HuggingFace model ID (e.g., "runwayml/stable-diffusion-v1-5")

    Returns:
        The resolved pipeline class.

    Raises:
        ValueError: If no pipeline could be resolved.
    """
    registry = get_pipeline_registry()
    aliases = get_task_aliases()

    # 1. Direct class name lookup
    if class_name:
        if class_name in registry:
            return registry[class_name]
        # Try case-insensitive match
        for name, cls in registry.items():
            if name.lower() == class_name.lower():
                return cls
        raise ValueError(
            f"Unknown pipeline class '{class_name}'. "
            f"Available pipelines: {', '.join(sorted(registry.keys())[:20])}..."
        )

    # 2. Task alias lookup
    if task:
        task_lower = task.lower().replace('_', '-')
        if task_lower in aliases:
            # Return the first matching pipeline for this task
            matching_classes = aliases[task_lower]
            if matching_classes:
                return registry[matching_classes[0]]

        # Try partial matching
        for alias, classes in aliases.items():
            if task_lower in alias or alias in task_lower:
                if classes:
                    return registry[classes[0]]

        raise ValueError(
            f"Unknown task '{task}'. "
            f"Available tasks: {', '.join(sorted(aliases.keys())[:20])}..."
        )

    # 3. Try to infer from HuggingFace Hub
    if model_id:
        try:
            from huggingface_hub import model_info
            info = model_info(model_id)

            # Check pipeline_tag
            if hasattr(info, 'pipeline_tag') and info.pipeline_tag:
                tag = info.pipeline_tag.lower().replace('_', '-')
                if tag in aliases:
                    matching_classes = aliases[tag]
                    if matching_classes:
                        return registry[matching_classes[0]]

            # Check model card for hints
            if hasattr(info, 'cardData') and info.cardData:
                card = info.cardData
                if 'pipeline_tag' in card:
                    tag = card['pipeline_tag'].lower().replace('_', '-')
                    if tag in aliases:
                        matching_classes = aliases[tag]
                        if matching_classes:
                            return registry[matching_classes[0]]

        except ImportError:
            # huggingface_hub not available
            pass
        except (KeyError, AttributeError, ValueError, OSError):
            # Model info lookup failed - common cases:
            # - KeyError: Missing keys in model card
            # - AttributeError: Missing attributes on model info
            # - ValueError: Invalid model data
            # - OSError: Network or file access issues
            pass

        # Fallback: use DiffusionPipeline.from_pretrained which auto-detects
        # DiffusionPipeline is always added to registry in _discover_pipelines (line 132)
        # but use .get() with import fallback for extra safety
        from diffusers import DiffusionPipeline
        return registry.get('DiffusionPipeline', DiffusionPipeline)

    raise ValueError(
        "Must provide at least one of: class_name, task, or model_id. "
        f"Available pipelines: {', '.join(sorted(registry.keys())[:20])}... "
        f"Available tasks: {', '.join(sorted(aliases.keys())[:20])}..."
    )


def load_diffusers_pipeline(
    class_name: Optional[str] = None,
    task: Optional[str] = None,
    model_id: Optional[str] = None,
    from_single_file: bool = False,
    **kwargs
) -> Any:
    """
    Load a diffusers pipeline dynamically.

    This function resolves the appropriate pipeline class based on the provided
    parameters and instantiates it with the given kwargs.

    Args:
        class_name: Exact pipeline class name (e.g., "StableDiffusionPipeline")
        task: Task alias (e.g., "text-to-image", "img2img")
        model_id: HuggingFace model ID or local path
        from_single_file: If True, use from_single_file() instead of from_pretrained()
        **kwargs: Additional arguments passed to from_pretrained() or from_single_file()

    Returns:
        An instantiated pipeline object.

    Raises:
        ValueError: If no pipeline could be resolved.
        Exception: If pipeline loading fails.

    Examples:
        # Load by class name
        pipe = load_diffusers_pipeline(
            class_name="StableDiffusionPipeline",
            model_id="runwayml/stable-diffusion-v1-5",
            torch_dtype=torch.float16
        )

        # Load by task
        pipe = load_diffusers_pipeline(
            task="text-to-image",
            model_id="runwayml/stable-diffusion-v1-5",
            torch_dtype=torch.float16
        )

        # Load from single file
        pipe = load_diffusers_pipeline(
            class_name="StableDiffusionPipeline",
            model_id="/path/to/model.safetensors",
            from_single_file=True,
            torch_dtype=torch.float16
        )
    """
    # Resolve the pipeline class
    pipeline_class = resolve_pipeline_class(
        class_name=class_name,
        task=task,
        model_id=model_id
    )

    # If no model_id provided but we have a class, we can't load
    if model_id is None:
        raise ValueError("model_id is required to load a pipeline")

    # Load the pipeline
    try:
        if from_single_file:
            # Check if the class has from_single_file method
            if hasattr(pipeline_class, 'from_single_file'):
                return pipeline_class.from_single_file(model_id, **kwargs)
            else:
                raise ValueError(
                    f"Pipeline class {pipeline_class.__name__} does not support from_single_file(). "
                    f"Use from_pretrained() instead."
                )
        else:
            return pipeline_class.from_pretrained(model_id, **kwargs)

    except Exception as e:
        # Provide helpful error message
        available = get_available_pipelines()
        raise RuntimeError(
            f"Failed to load pipeline '{pipeline_class.__name__}' from '{model_id}': {e}\n"
            f"Available pipelines: {', '.join(available[:20])}..."
        ) from e


def get_pipeline_info(class_name: str) -> Dict[str, Any]:
    """
    Get information about a specific pipeline class.

    Args:
        class_name: The pipeline class name

    Returns:
        Dictionary with pipeline information including:
        - name: Class name
        - aliases: List of task aliases
        - supports_single_file: Whether from_single_file() is available
        - docstring: Class docstring (if available)
    """
    registry = get_pipeline_registry()
    aliases = get_task_aliases()

    if class_name not in registry:
        raise ValueError(f"Unknown pipeline: {class_name}")

    cls = registry[class_name]

    # Find all aliases for this pipeline
    pipeline_aliases = []
    for alias, classes in aliases.items():
        if class_name in classes:
            pipeline_aliases.append(alias)

    return {
        'name': class_name,
        'aliases': pipeline_aliases,
        'supports_single_file': hasattr(cls, 'from_single_file'),
        'docstring': cls.__doc__[:200] if cls.__doc__ else None
    }