scider / streamlit-client /workflow /observable_list.py
leonardklin's picture
Upload 328 files
978fed5 verified
"""Observable list that fires a callback on every append.
Used to hook into agent ``intermediate_state.append(...)`` calls so that the
Streamlit UI can receive real-time node completion notifications without
modifying any agent code.
"""
from typing import Any, Callable
class ObservableList(list):
"""A list subclass that invokes *on_append* after every ``append`` call."""
def __init__(self, *args: Any, on_append: Callable[[Any], None] | None = None, **kwargs: Any):
super().__init__(*args, **kwargs)
self._on_append = on_append
def append(self, item: Any) -> None:
super().append(item)
if self._on_append is not None:
self._on_append(item)