File size: 5,757 Bytes
59f1501 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 |
"""
This module provides callback management functionality for TorchDynamo's compilation process.
It implements a thread-safe system for registering, managing and executing callbacks that run
at the start and end of TorchDynamo compilations. Key features include:
- Registration and deregistration of compilation callbacks
- Thread-safe callback handling with proper locking mechanisms
- Prevention of duplicate callback execution when configured
- Decorator utilities for easy callback registration
- Context manager for controlled callback lifecycle
The module centers around the CompilationCallbackHandler class which maintains separate
lists for start and end callbacks, manages their execution order, and ensures thread-safety.
Utility decorators @on_compile_start and @on_compile_end provide a convenient way to
register compilation hooks.
Example usage:
@on_compile_start
def my_start_callback():
print("Starting compilation")
@on_compile_end
def my_end_callback():
print("Compilation complete")
"""
import enum
import threading
from collections.abc import Generator
from contextlib import contextmanager
from dataclasses import dataclass, field # noqa: F811
from typing import Any, Callable
class CallbackTrigger(enum.Enum):
# most common case, dynamo attempts to trace a new frame
DYNAMO = 1
# backward compilation can be deferred to runtime
LAZY_BACKWARD = 2
# some backends autotune at runtime
TRITON_AUTOTUNING = 3
# cudagraphs record at runtime
CUDAGRAPH_RECORDING = 4
@dataclass
class CallbackArgs:
callback_trigger: CallbackTrigger
compile_id: str
@dataclass
class CompilationCallbackHandler:
start_callbacks: list[Callable[[CallbackArgs], None]] = field(default_factory=list)
end_callbacks: list[Callable[[CallbackArgs], None]] = field(default_factory=list)
__pending_callbacks_counter: int = field(default=0, init=False, repr=False)
__pending_callbacks_counter_lock: threading.Lock = field(
default_factory=threading.Lock, init=False, repr=False
)
def register_start_callback(
self, callback: Callable[[CallbackArgs], None]
) -> Callable[[CallbackArgs], None]:
"""
Register a callback function to be called when the compilation starts.
Args:
- callback (Callable): The callback function to register.
"""
self.start_callbacks.append(callback)
return callback
def register_end_callback(
self, callback: Callable[[CallbackArgs], None]
) -> Callable[[CallbackArgs], None]:
"""
Register a callback function to be called when the compilation ends.
Args:
- callback (Callable): The callback function to register.
"""
self.end_callbacks.append(callback)
return callback
def remove_start_callback(self, callback: Callable[[CallbackArgs], None]) -> None:
"""
Remove a registered start callback function.
Args:
- callback (Callable): The callback function to remove.
"""
self.start_callbacks.remove(callback)
def remove_end_callback(self, callback: Callable[[CallbackArgs], None]) -> None:
"""
Remove a registered end callback function.
Args:
- callback (Callable): The callback function to remove.
"""
self.end_callbacks.remove(callback)
def run_start_callbacks(self, args: CallbackArgs) -> None:
"""
Execute all registered start callbacks.
"""
for callback in self.start_callbacks:
callback(args)
def run_end_callbacks(self, args: CallbackArgs) -> None:
"""
Execute all registered end callbacks.
"""
for callback in self.end_callbacks:
callback(args)
@contextmanager
def install_callbacks(
self, trigger: CallbackTrigger, compile_id: str
) -> Generator[None, Any, Any]:
"""
Context manager to install the callbacks and run them when the context is exited.
"""
args = CallbackArgs(trigger, compile_id)
try:
with self.__pending_callbacks_counter_lock:
if self.__pending_callbacks_counter == 0:
self.run_start_callbacks(args)
self.__pending_callbacks_counter += 1
yield
finally:
with self.__pending_callbacks_counter_lock:
assert self.__pending_callbacks_counter > 0, (
"Pending callbacks counter cannot become negative."
)
if self.__pending_callbacks_counter == 1:
self.run_end_callbacks(args)
self.__pending_callbacks_counter -= 1
def clear(self) -> None:
"""
Clear all registered callbacks.
"""
self.start_callbacks.clear()
self.end_callbacks.clear()
assert self.__pending_callbacks_counter == 0
callback_handler = CompilationCallbackHandler()
def on_compile_start(
callback: Callable[[CallbackArgs], None],
) -> Callable[[CallbackArgs], None]:
"""
Decorator to register a callback function for the start of the compilation.
"""
callback_handler.register_start_callback(callback)
return callback
def on_compile_end(
callback: Callable[[CallbackArgs], None],
) -> Callable[[CallbackArgs], None]:
"""
Decorator to register a callback function for the end of the compilation.
"""
callback_handler.register_end_callback(callback)
return callback
|