velai / dataflow /nodes_base.py
cansik's picture
Upload folder via script
3025bb3 verified
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Iterable, Callable
from .enums import NodeKind, DataPortState
from .ports import PortSchema, PortState
@dataclass(slots=True)
class NodeType:
"""Static node type metadata."""
kind: NodeKind
display_name: str
inputs: list[PortSchema]
outputs: list[PortSchema]
@dataclass(slots=True)
class NodeInstance:
"""Runtime node instance."""
node_id: str
node_type: NodeType
auto_process: bool = False
x: float = 0
y: float = 0
inputs: dict[str, PortState] = field(default_factory=dict)
outputs: dict[str, PortState] = field(default_factory=dict)
on_process: Callable[["NodeInstance"], None] | None = None
def __post_init__(self) -> None:
if not self.inputs:
self.inputs = {p.name: PortState(p) for p in self.node_type.inputs}
if not self.outputs:
self.outputs = {p.name: PortState(p) for p in self.node_type.outputs}
def all_inputs(self) -> Iterable[PortState]:
return self.inputs.values()
def all_outputs(self) -> Iterable[PortState]:
return self.outputs.values()
def mark_dirty(self) -> None:
for p in self.all_outputs():
p.state = DataPortState.DIRTY
async def process(self) -> None:
if self.on_process:
# check if on_process is async
import inspect
if inspect.iscoroutinefunction(self.on_process):
await self.on_process(self)
else:
self.on_process(self)
else:
pass
def reset_node(self) -> None:
pass