| | from collections import defaultdict |
| | from collections.abc import Callable |
| | from threading import Lock |
| |
|
| | from loguru import logger |
| |
|
| | from langflow.services.base import Service |
| | from langflow.services.settings.service import SettingsService |
| |
|
| |
|
| | class StateService(Service): |
| | name = "state_service" |
| |
|
| | def append_state(self, key, new_state, run_id: str) -> None: |
| | raise NotImplementedError |
| |
|
| | def update_state(self, key, new_state, run_id: str) -> None: |
| | raise NotImplementedError |
| |
|
| | def get_state(self, key, run_id: str): |
| | raise NotImplementedError |
| |
|
| | def subscribe(self, key, observer: Callable) -> None: |
| | raise NotImplementedError |
| |
|
| | def notify_observers(self, key, new_state) -> None: |
| | raise NotImplementedError |
| |
|
| |
|
| | class InMemoryStateService(StateService): |
| | def __init__(self, settings_service: SettingsService): |
| | self.settings_service = settings_service |
| | self.states: dict = {} |
| | self.observers: dict = defaultdict(list) |
| | self.lock = Lock() |
| |
|
| | def append_state(self, key, new_state, run_id: str) -> None: |
| | with self.lock: |
| | if run_id not in self.states: |
| | self.states[run_id] = {} |
| | if key not in self.states[run_id]: |
| | self.states[run_id][key] = [] |
| | elif not isinstance(self.states[run_id][key], list): |
| | self.states[run_id][key] = [self.states[run_id][key]] |
| | self.states[run_id][key].append(new_state) |
| | self.notify_append_observers(key, new_state) |
| |
|
| | def update_state(self, key, new_state, run_id: str) -> None: |
| | with self.lock: |
| | if run_id not in self.states: |
| | self.states[run_id] = {} |
| | self.states[run_id][key] = new_state |
| | self.notify_observers(key, new_state) |
| |
|
| | def get_state(self, key, run_id: str): |
| | with self.lock: |
| | return self.states.get(run_id, {}).get(key, "") |
| |
|
| | def subscribe(self, key, observer: Callable) -> None: |
| | with self.lock: |
| | if observer not in self.observers[key]: |
| | self.observers[key].append(observer) |
| |
|
| | def notify_observers(self, key, new_state) -> None: |
| | for callback in self.observers[key]: |
| | callback(key, new_state, append=False) |
| |
|
| | def notify_append_observers(self, key, new_state) -> None: |
| | for callback in self.observers[key]: |
| | try: |
| | callback(key, new_state, append=True) |
| | except Exception: |
| | logger.exception(f"Error in observer {callback} for key {key}") |
| | logger.warning("Callbacks not implemented yet") |
| |
|