Spaces:
Running
Running
| from collections import defaultdict | |
| from collections.abc import Callable | |
| from threading import Lock | |
| from loguru import logger | |
| from langflow.services.base import Service | |
| from langflow.services.settings.service import SettingsService | |
| class StateService(Service): | |
| name = "state_service" | |
| def append_state(self, key, new_state, run_id: str) -> None: | |
| raise NotImplementedError | |
| def update_state(self, key, new_state, run_id: str) -> None: | |
| raise NotImplementedError | |
| def get_state(self, key, run_id: str): | |
| raise NotImplementedError | |
| def subscribe(self, key, observer: Callable) -> None: | |
| raise NotImplementedError | |
| def notify_observers(self, key, new_state) -> None: | |
| raise NotImplementedError | |
| class InMemoryStateService(StateService): | |
| def __init__(self, settings_service: SettingsService): | |
| self.settings_service = settings_service | |
| self.states: dict = {} | |
| self.observers: dict = defaultdict(list) | |
| self.lock = Lock() | |
| def append_state(self, key, new_state, run_id: str) -> None: | |
| with self.lock: | |
| if run_id not in self.states: | |
| self.states[run_id] = {} | |
| if key not in self.states[run_id]: | |
| self.states[run_id][key] = [] | |
| elif not isinstance(self.states[run_id][key], list): | |
| self.states[run_id][key] = [self.states[run_id][key]] | |
| self.states[run_id][key].append(new_state) | |
| self.notify_append_observers(key, new_state) | |
| def update_state(self, key, new_state, run_id: str) -> None: | |
| with self.lock: | |
| if run_id not in self.states: | |
| self.states[run_id] = {} | |
| self.states[run_id][key] = new_state | |
| self.notify_observers(key, new_state) | |
| def get_state(self, key, run_id: str): | |
| with self.lock: | |
| return self.states.get(run_id, {}).get(key, "") | |
| def subscribe(self, key, observer: Callable) -> None: | |
| with self.lock: | |
| if observer not in self.observers[key]: | |
| self.observers[key].append(observer) | |
| def notify_observers(self, key, new_state) -> None: | |
| for callback in self.observers[key]: | |
| callback(key, new_state, append=False) | |
| def notify_append_observers(self, key, new_state) -> None: | |
| for callback in self.observers[key]: | |
| try: | |
| callback(key, new_state, append=True) | |
| except Exception: # noqa: BLE001 | |
| logger.exception(f"Error in observer {callback} for key {key}") | |
| logger.warning("Callbacks not implemented yet") | |