| import contextvars |
| from typing import Optional, NamedTuple |
|
|
| class ExecutionContext(NamedTuple): |
| """ |
| Context information about the currently executing node. |
| |
| Attributes: |
| node_id: The ID of the currently executing node |
| list_index: The index in a list being processed (for operations on batches/lists) |
| """ |
| prompt_id: str |
| node_id: str |
| list_index: Optional[int] |
|
|
| current_executing_context: contextvars.ContextVar[Optional[ExecutionContext]] = contextvars.ContextVar("current_executing_context", default=None) |
|
|
| def get_executing_context() -> Optional[ExecutionContext]: |
| return current_executing_context.get(None) |
|
|
| class CurrentNodeContext: |
| """ |
| Context manager for setting the current executing node context. |
| |
| Sets the current_executing_context on enter and resets it on exit. |
| |
| Example: |
| with CurrentNodeContext(node_id="123", list_index=0): |
| # Code that should run with the current node context set |
| process_image() |
| """ |
| def __init__(self, prompt_id: str, node_id: str, list_index: Optional[int] = None): |
| self.context = ExecutionContext( |
| prompt_id= prompt_id, |
| node_id= node_id, |
| list_index= list_index |
| ) |
| self.token = None |
|
|
| def __enter__(self): |
| self.token = current_executing_context.set(self.context) |
| return self |
|
|
| def __exit__(self, exc_type, exc_val, exc_tb): |
| if self.token is not None: |
| current_executing_context.reset(self.token) |
|
|