truthtaicom's picture
Upload folder using huggingface_hub
4b0794d verified
from collections import defaultdict
from collections.abc import Callable
from threading import Lock
from loguru import logger
from langflow.services.base import Service
from langflow.services.settings.service import SettingsService
class StateService(Service):
name = "state_service"
def append_state(self, key, new_state, run_id: str) -> None:
raise NotImplementedError
def update_state(self, key, new_state, run_id: str) -> None:
raise NotImplementedError
def get_state(self, key, run_id: str):
raise NotImplementedError
def subscribe(self, key, observer: Callable) -> None:
raise NotImplementedError
def notify_observers(self, key, new_state) -> None:
raise NotImplementedError
class InMemoryStateService(StateService):
def __init__(self, settings_service: SettingsService):
self.settings_service = settings_service
self.states: dict = {}
self.observers: dict = defaultdict(list)
self.lock = Lock()
def append_state(self, key, new_state, run_id: str) -> None:
with self.lock:
if run_id not in self.states:
self.states[run_id] = {}
if key not in self.states[run_id]:
self.states[run_id][key] = []
elif not isinstance(self.states[run_id][key], list):
self.states[run_id][key] = [self.states[run_id][key]]
self.states[run_id][key].append(new_state)
self.notify_append_observers(key, new_state)
def update_state(self, key, new_state, run_id: str) -> None:
with self.lock:
if run_id not in self.states:
self.states[run_id] = {}
self.states[run_id][key] = new_state
self.notify_observers(key, new_state)
def get_state(self, key, run_id: str):
with self.lock:
return self.states.get(run_id, {}).get(key, "")
def subscribe(self, key, observer: Callable) -> None:
with self.lock:
if observer not in self.observers[key]:
self.observers[key].append(observer)
def notify_observers(self, key, new_state) -> None:
for callback in self.observers[key]:
callback(key, new_state, append=False)
def notify_append_observers(self, key, new_state) -> None:
for callback in self.observers[key]:
try:
callback(key, new_state, append=True)
except Exception: # noqa: BLE001
logger.exception(f"Error in observer {callback} for key {key}")
logger.warning("Callbacks not implemented yet")