Spaces:
Sleeping
Sleeping
| """Observable list that fires a callback on every append. | |
| Used to hook into agent ``intermediate_state.append(...)`` calls so that the | |
| Streamlit UI can receive real-time node completion notifications without | |
| modifying any agent code. | |
| """ | |
| from typing import Any, Callable | |
| class ObservableList(list): | |
| """A list subclass that invokes *on_append* after every ``append`` call.""" | |
| def __init__(self, *args: Any, on_append: Callable[[Any], None] | None = None, **kwargs: Any): | |
| super().__init__(*args, **kwargs) | |
| self._on_append = on_append | |
| def append(self, item: Any) -> None: | |
| super().append(item) | |
| if self._on_append is not None: | |
| self._on_append(item) | |