|
|
from __future__ import annotations |
|
|
|
|
|
import ast |
|
|
import asyncio |
|
|
import inspect |
|
|
from collections.abc import AsyncIterator, Iterator |
|
|
from copy import deepcopy |
|
|
from textwrap import dedent |
|
|
from typing import TYPE_CHECKING, Any, ClassVar, NamedTuple, get_type_hints |
|
|
|
|
|
import nanoid |
|
|
import yaml |
|
|
from langchain_core.tools import StructuredTool |
|
|
from pydantic import BaseModel, ValidationError |
|
|
|
|
|
from langflow.base.tools.constants import ( |
|
|
TOOL_OUTPUT_DISPLAY_NAME, |
|
|
TOOL_OUTPUT_NAME, |
|
|
TOOL_TABLE_SCHEMA, |
|
|
TOOLS_METADATA_INPUT_NAME, |
|
|
) |
|
|
from langflow.custom.tree_visitor import RequiredInputsVisitor |
|
|
from langflow.exceptions.component import StreamingError |
|
|
from langflow.field_typing import Tool |
|
|
from langflow.graph.state.model import create_state_model |
|
|
from langflow.helpers.custom import format_type |
|
|
from langflow.memory import astore_message, aupdate_messages, delete_message |
|
|
from langflow.schema.artifact import get_artifact_type, post_process_raw |
|
|
from langflow.schema.data import Data |
|
|
from langflow.schema.message import ErrorMessage, Message |
|
|
from langflow.schema.properties import Source |
|
|
from langflow.services.tracing.schema import Log |
|
|
from langflow.template.field.base import UNDEFINED, Input, Output |
|
|
from langflow.template.frontend_node.custom_components import ComponentFrontendNode |
|
|
from langflow.utils.async_helpers import run_until_complete |
|
|
from langflow.utils.util import find_closest_match |
|
|
|
|
|
from .custom_component import CustomComponent |
|
|
|
|
|
if TYPE_CHECKING: |
|
|
from collections.abc import Callable |
|
|
|
|
|
from langflow.events.event_manager import EventManager |
|
|
from langflow.graph.edge.schema import EdgeData |
|
|
from langflow.graph.vertex.base import Vertex |
|
|
from langflow.inputs.inputs import InputTypes |
|
|
from langflow.schema import dotdict |
|
|
from langflow.schema.log import LoggableType |
|
|
|
|
|
|
|
|
_ComponentToolkit = None |
|
|
|
|
|
|
|
|
def _get_component_toolkit(): |
|
|
global _ComponentToolkit |
|
|
if _ComponentToolkit is None: |
|
|
from langflow.base.tools.component_tool import ComponentToolkit |
|
|
|
|
|
_ComponentToolkit = ComponentToolkit |
|
|
return _ComponentToolkit |
|
|
|
|
|
|
|
|
BACKWARDS_COMPATIBLE_ATTRIBUTES = ["user_id", "vertex", "tracing_service"] |
|
|
CONFIG_ATTRIBUTES = ["_display_name", "_description", "_icon", "_name", "_metadata"] |
|
|
|
|
|
|
|
|
class PlaceholderGraph(NamedTuple): |
|
|
"""A placeholder graph structure for components, providing backwards compatibility. |
|
|
|
|
|
and enabling component execution without a full graph object. |
|
|
|
|
|
This lightweight structure contains essential information typically found in a complete graph, |
|
|
allowing components to function in isolation or in simplified contexts. |
|
|
|
|
|
Attributes: |
|
|
flow_id (str | None): Unique identifier for the flow, if applicable. |
|
|
user_id (str | None): Identifier of the user associated with the flow, if any. |
|
|
session_id (str | None): Identifier for the current session, if applicable. |
|
|
context (dict): Additional contextual information for the component's execution. |
|
|
flow_name (str | None): Name of the flow, if available. |
|
|
""" |
|
|
|
|
|
flow_id: str | None |
|
|
user_id: str | None |
|
|
session_id: str | None |
|
|
context: dict |
|
|
flow_name: str | None |
|
|
|
|
|
|
|
|
class Component(CustomComponent): |
|
|
inputs: list[InputTypes] = [] |
|
|
outputs: list[Output] = [] |
|
|
code_class_base_inheritance: ClassVar[str] = "Component" |
|
|
_output_logs: dict[str, list[Log]] = {} |
|
|
_current_output: str = "" |
|
|
_metadata: dict = {} |
|
|
_ctx: dict = {} |
|
|
_code: str | None = None |
|
|
_logs: list[Log] = [] |
|
|
|
|
|
def __init__(self, **kwargs) -> None: |
|
|
|
|
|
|
|
|
inputs = {} |
|
|
config = {} |
|
|
for key, value in kwargs.items(): |
|
|
if key.startswith("_"): |
|
|
config[key] = value |
|
|
elif key in CONFIG_ATTRIBUTES: |
|
|
config[key[1:]] = value |
|
|
else: |
|
|
inputs[key] = value |
|
|
self._inputs: dict[str, InputTypes] = {} |
|
|
self._outputs_map: dict[str, Output] = {} |
|
|
self._results: dict[str, Any] = {} |
|
|
self._attributes: dict[str, Any] = {} |
|
|
self._parameters = inputs or {} |
|
|
self._edges: list[EdgeData] = [] |
|
|
self._components: list[Component] = [] |
|
|
self._current_output = "" |
|
|
self._event_manager: EventManager | None = None |
|
|
self._state_model = None |
|
|
self.set_attributes(self._parameters) |
|
|
self._output_logs = {} |
|
|
config = config or {} |
|
|
if "_id" not in config: |
|
|
config |= {"_id": f"{self.__class__.__name__}-{nanoid.generate(size=5)}"} |
|
|
self.__inputs = inputs |
|
|
self.__config = config |
|
|
self._reset_all_output_values() |
|
|
super().__init__(**config) |
|
|
if hasattr(self, "_trace_type"): |
|
|
self.trace_type = self._trace_type |
|
|
if not hasattr(self, "trace_type"): |
|
|
self.trace_type = "chain" |
|
|
if self.inputs is not None: |
|
|
self.map_inputs(self.inputs) |
|
|
if self.outputs is not None: |
|
|
self.map_outputs(self.outputs) |
|
|
|
|
|
self._set_output_types(list(self._outputs_map.values())) |
|
|
self.set_class_code() |
|
|
self._set_output_required_inputs() |
|
|
|
|
|
@property |
|
|
def ctx(self): |
|
|
if not hasattr(self, "graph") or self.graph is None: |
|
|
msg = "Graph not found. Please build the graph first." |
|
|
raise ValueError(msg) |
|
|
return self.graph.context |
|
|
|
|
|
def add_to_ctx(self, key: str, value: Any, *, overwrite: bool = False) -> None: |
|
|
"""Add a key-value pair to the context. |
|
|
|
|
|
Args: |
|
|
key (str): The key to add. |
|
|
value (Any): The value to associate with the key. |
|
|
overwrite (bool, optional): Whether to overwrite the existing value. Defaults to False. |
|
|
|
|
|
Raises: |
|
|
ValueError: If the graph is not built. |
|
|
""" |
|
|
if not hasattr(self, "graph") or self.graph is None: |
|
|
msg = "Graph not found. Please build the graph first." |
|
|
raise ValueError(msg) |
|
|
if key in self.graph.context and not overwrite: |
|
|
msg = f"Key {key} already exists in context. Set overwrite=True to overwrite." |
|
|
raise ValueError(msg) |
|
|
self.graph.context.update({key: value}) |
|
|
|
|
|
def update_ctx(self, value_dict: dict[str, Any]) -> None: |
|
|
"""Update the context with a dictionary of values. |
|
|
|
|
|
Args: |
|
|
value_dict (dict[str, Any]): The dictionary of values to update. |
|
|
|
|
|
Raises: |
|
|
ValueError: If the graph is not built. |
|
|
""" |
|
|
if not hasattr(self, "graph") or self.graph is None: |
|
|
msg = "Graph not found. Please build the graph first." |
|
|
raise ValueError(msg) |
|
|
if not isinstance(value_dict, dict): |
|
|
msg = "Value dict must be a dictionary" |
|
|
raise TypeError(msg) |
|
|
|
|
|
self.graph.context.update(value_dict) |
|
|
|
|
|
def _pre_run_setup(self): |
|
|
pass |
|
|
|
|
|
def set_event_manager(self, event_manager: EventManager | None = None) -> None: |
|
|
self._event_manager = event_manager |
|
|
|
|
|
def _reset_all_output_values(self) -> None: |
|
|
if isinstance(self._outputs_map, dict): |
|
|
for output in self._outputs_map.values(): |
|
|
output.value = UNDEFINED |
|
|
|
|
|
def _build_state_model(self): |
|
|
if self._state_model: |
|
|
return self._state_model |
|
|
name = self.name or self.__class__.__name__ |
|
|
model_name = f"{name}StateModel" |
|
|
fields = {} |
|
|
for output in self._outputs_map.values(): |
|
|
fields[output.name] = getattr(self, output.method) |
|
|
self._state_model = create_state_model(model_name=model_name, **fields) |
|
|
return self._state_model |
|
|
|
|
|
def get_state_model_instance_getter(self): |
|
|
state_model = self._build_state_model() |
|
|
|
|
|
def _instance_getter(_): |
|
|
return state_model() |
|
|
|
|
|
_instance_getter.__annotations__["return"] = state_model |
|
|
return _instance_getter |
|
|
|
|
|
def __deepcopy__(self, memo: dict) -> Component: |
|
|
if id(self) in memo: |
|
|
return memo[id(self)] |
|
|
kwargs = deepcopy(self.__config, memo) |
|
|
kwargs["inputs"] = deepcopy(self.__inputs, memo) |
|
|
new_component = type(self)(**kwargs) |
|
|
new_component._code = self._code |
|
|
new_component._outputs_map = self._outputs_map |
|
|
new_component._inputs = self._inputs |
|
|
new_component._edges = self._edges |
|
|
new_component._components = self._components |
|
|
new_component._parameters = self._parameters |
|
|
new_component._attributes = self._attributes |
|
|
new_component._output_logs = self._output_logs |
|
|
new_component._logs = self._logs |
|
|
memo[id(self)] = new_component |
|
|
return new_component |
|
|
|
|
|
def set_class_code(self) -> None: |
|
|
|
|
|
if self._code: |
|
|
return |
|
|
try: |
|
|
module = inspect.getmodule(self.__class__) |
|
|
if module is None: |
|
|
msg = "Could not find module for class" |
|
|
raise ValueError(msg) |
|
|
class_code = inspect.getsource(module) |
|
|
self._code = class_code |
|
|
except OSError as e: |
|
|
msg = f"Could not find source code for {self.__class__.__name__}" |
|
|
raise ValueError(msg) from e |
|
|
|
|
|
def set(self, **kwargs): |
|
|
"""Connects the component to other components or sets parameters and attributes. |
|
|
|
|
|
Args: |
|
|
**kwargs: Keyword arguments representing the connections, parameters, and attributes. |
|
|
|
|
|
Returns: |
|
|
None |
|
|
|
|
|
Raises: |
|
|
KeyError: If the specified input name does not exist. |
|
|
""" |
|
|
for key, value in kwargs.items(): |
|
|
self._process_connection_or_parameters(key, value) |
|
|
return self |
|
|
|
|
|
def list_inputs(self): |
|
|
"""Returns a list of input names.""" |
|
|
return [_input.name for _input in self.inputs] |
|
|
|
|
|
def list_outputs(self): |
|
|
"""Returns a list of output names.""" |
|
|
return [_output.name for _output in self._outputs_map.values()] |
|
|
|
|
|
async def run(self): |
|
|
"""Executes the component's logic and returns the result. |
|
|
|
|
|
Returns: |
|
|
The result of executing the component's logic. |
|
|
""" |
|
|
return await self._run() |
|
|
|
|
|
def set_vertex(self, vertex: Vertex) -> None: |
|
|
"""Sets the vertex for the component. |
|
|
|
|
|
Args: |
|
|
vertex (Vertex): The vertex to set. |
|
|
|
|
|
Returns: |
|
|
None |
|
|
""" |
|
|
self._vertex = vertex |
|
|
|
|
|
def get_input(self, name: str) -> Any: |
|
|
"""Retrieves the value of the input with the specified name. |
|
|
|
|
|
Args: |
|
|
name (str): The name of the input. |
|
|
|
|
|
Returns: |
|
|
Any: The value of the input. |
|
|
|
|
|
Raises: |
|
|
ValueError: If the input with the specified name is not found. |
|
|
""" |
|
|
if name in self._inputs: |
|
|
return self._inputs[name] |
|
|
msg = f"Input {name} not found in {self.__class__.__name__}" |
|
|
raise ValueError(msg) |
|
|
|
|
|
def get_output(self, name: str) -> Any: |
|
|
"""Retrieves the output with the specified name. |
|
|
|
|
|
Args: |
|
|
name (str): The name of the output to retrieve. |
|
|
|
|
|
Returns: |
|
|
Any: The output value. |
|
|
|
|
|
Raises: |
|
|
ValueError: If the output with the specified name is not found. |
|
|
""" |
|
|
if name in self._outputs_map: |
|
|
return self._outputs_map[name] |
|
|
msg = f"Output {name} not found in {self.__class__.__name__}" |
|
|
raise ValueError(msg) |
|
|
|
|
|
def set_on_output(self, name: str, **kwargs) -> None: |
|
|
output = self.get_output(name) |
|
|
for key, value in kwargs.items(): |
|
|
if not hasattr(output, key): |
|
|
msg = f"Output {name} does not have a method {key}" |
|
|
raise ValueError(msg) |
|
|
setattr(output, key, value) |
|
|
|
|
|
def set_output_value(self, name: str, value: Any) -> None: |
|
|
if name in self._outputs_map: |
|
|
self._outputs_map[name].value = value |
|
|
else: |
|
|
msg = f"Output {name} not found in {self.__class__.__name__}" |
|
|
raise ValueError(msg) |
|
|
|
|
|
def map_outputs(self, outputs: list[Output]) -> None: |
|
|
"""Maps the given list of outputs to the component. |
|
|
|
|
|
Args: |
|
|
outputs (List[Output]): The list of outputs to be mapped. |
|
|
|
|
|
Raises: |
|
|
ValueError: If the output name is None. |
|
|
|
|
|
Returns: |
|
|
None |
|
|
""" |
|
|
for output in outputs: |
|
|
if output.name is None: |
|
|
msg = "Output name cannot be None." |
|
|
raise ValueError(msg) |
|
|
|
|
|
|
|
|
self._outputs_map[output.name] = deepcopy(output) |
|
|
|
|
|
def map_inputs(self, inputs: list[InputTypes]) -> None: |
|
|
"""Maps the given inputs to the component. |
|
|
|
|
|
Args: |
|
|
inputs (List[InputTypes]): A list of InputTypes objects representing the inputs. |
|
|
|
|
|
Raises: |
|
|
ValueError: If the input name is None. |
|
|
|
|
|
""" |
|
|
for input_ in inputs: |
|
|
if input_.name is None: |
|
|
msg = "Input name cannot be None." |
|
|
raise ValueError(msg) |
|
|
self._inputs[input_.name] = deepcopy(input_) |
|
|
|
|
|
def validate(self, params: dict) -> None: |
|
|
"""Validates the component parameters. |
|
|
|
|
|
Args: |
|
|
params (dict): A dictionary containing the component parameters. |
|
|
|
|
|
Raises: |
|
|
ValueError: If the inputs are not valid. |
|
|
ValueError: If the outputs are not valid. |
|
|
""" |
|
|
self._validate_inputs(params) |
|
|
self._validate_outputs() |
|
|
|
|
|
def update_inputs( |
|
|
self, |
|
|
build_config: dotdict, |
|
|
field_value: Any, |
|
|
field_name: str | None = None, |
|
|
): |
|
|
return self.update_build_config(build_config, field_value, field_name) |
|
|
|
|
|
def run_and_validate_update_outputs(self, frontend_node: dict, field_name: str, field_value: Any): |
|
|
frontend_node = self.update_outputs(frontend_node, field_name, field_value) |
|
|
if field_name == "tool_mode" or frontend_node.get("tool_mode"): |
|
|
is_tool_mode = field_value or frontend_node.get("tool_mode") |
|
|
frontend_node["outputs"] = [self._build_tool_output()] if is_tool_mode else frontend_node["outputs"] |
|
|
if is_tool_mode: |
|
|
frontend_node.setdefault("template", {}) |
|
|
frontend_node["template"][TOOLS_METADATA_INPUT_NAME] = self._build_tools_metadata_input().to_dict() |
|
|
elif "template" in frontend_node: |
|
|
frontend_node["template"].pop(TOOLS_METADATA_INPUT_NAME, None) |
|
|
self.tools_metadata = frontend_node.get("template", {}).get(TOOLS_METADATA_INPUT_NAME, {}).get("value") |
|
|
return self._validate_frontend_node(frontend_node) |
|
|
|
|
|
def _validate_frontend_node(self, frontend_node: dict): |
|
|
|
|
|
for index, output in enumerate(frontend_node["outputs"]): |
|
|
if isinstance(output, dict): |
|
|
try: |
|
|
output_ = Output(**output) |
|
|
self._set_output_return_type(output_) |
|
|
output_dict = output_.model_dump() |
|
|
except ValidationError as e: |
|
|
msg = f"Invalid output: {e}" |
|
|
raise ValueError(msg) from e |
|
|
elif isinstance(output, Output): |
|
|
|
|
|
self._set_output_return_type(output) |
|
|
output_dict = output.model_dump() |
|
|
else: |
|
|
msg = f"Invalid output type: {type(output)}" |
|
|
raise TypeError(msg) |
|
|
frontend_node["outputs"][index] = output_dict |
|
|
return frontend_node |
|
|
|
|
|
def update_outputs(self, frontend_node: dict, field_name: str, field_value: Any) -> dict: |
|
|
"""Default implementation for updating outputs based on field changes. |
|
|
|
|
|
Subclasses can override this to modify outputs based on field_name and field_value. |
|
|
""" |
|
|
return frontend_node |
|
|
|
|
|
def _set_output_types(self, outputs: list[Output]) -> None: |
|
|
for output in outputs: |
|
|
self._set_output_return_type(output) |
|
|
|
|
|
def _set_output_return_type(self, output: Output) -> None: |
|
|
if output.method is None: |
|
|
msg = f"Output {output.name} does not have a method" |
|
|
raise ValueError(msg) |
|
|
return_types = self._get_method_return_type(output.method) |
|
|
output.add_types(return_types) |
|
|
output.set_selected() |
|
|
|
|
|
def _set_output_required_inputs(self) -> None: |
|
|
for output in self.outputs: |
|
|
if not output.method: |
|
|
continue |
|
|
method = getattr(self, output.method, None) |
|
|
if not method or not callable(method): |
|
|
continue |
|
|
try: |
|
|
source_code = inspect.getsource(method) |
|
|
ast_tree = ast.parse(dedent(source_code)) |
|
|
except Exception: |
|
|
ast_tree = ast.parse(dedent(self._code or "")) |
|
|
|
|
|
visitor = RequiredInputsVisitor(self._inputs) |
|
|
visitor.visit(ast_tree) |
|
|
output.required_inputs = sorted(visitor.required_inputs) |
|
|
|
|
|
def get_output_by_method(self, method: Callable): |
|
|
|
|
|
|
|
|
output = next((output for output in self._outputs_map.values() if output.method == method.__name__), None) |
|
|
if output is None: |
|
|
method_name = method.__name__ if hasattr(method, "__name__") else str(method) |
|
|
msg = f"Output with method {method_name} not found" |
|
|
raise ValueError(msg) |
|
|
return output |
|
|
|
|
|
def _inherits_from_component(self, method: Callable): |
|
|
|
|
|
|
|
|
return hasattr(method, "__self__") and isinstance(method.__self__, Component) |
|
|
|
|
|
def _method_is_valid_output(self, method: Callable): |
|
|
|
|
|
|
|
|
return ( |
|
|
hasattr(method, "__self__") |
|
|
and isinstance(method.__self__, Component) |
|
|
and method.__self__.get_output_by_method(method) |
|
|
) |
|
|
|
|
|
def _build_error_string_from_matching_pairs(self, matching_pairs: list[tuple[Output, Input]]): |
|
|
text = "" |
|
|
for output, input_ in matching_pairs: |
|
|
text += f"{output.name}[{','.join(output.types)}]->{input_.name}[{','.join(input_.input_types or [])}]\n" |
|
|
return text |
|
|
|
|
|
def _find_matching_output_method(self, input_name: str, value: Component): |
|
|
"""Find the output method from the given component and input name. |
|
|
|
|
|
Find the output method from the given component (`value`) that matches the specified input (`input_name`) |
|
|
in the current component. |
|
|
This method searches through all outputs of the provided component to find outputs whose types match |
|
|
the input types of the specified input in the current component. If exactly one matching output is found, |
|
|
it returns the corresponding method. If multiple matching outputs are found, it raises an error indicating |
|
|
ambiguity. If no matching outputs are found, it raises an error indicating that no suitable output was found. |
|
|
|
|
|
Args: |
|
|
input_name (str): The name of the input in the current component to match. |
|
|
value (Component): The component whose outputs are to be considered. |
|
|
|
|
|
Returns: |
|
|
Callable: The method corresponding to the matching output. |
|
|
|
|
|
Raises: |
|
|
ValueError: If multiple matching outputs are found, if no matching outputs are found, |
|
|
or if the output method is invalid. |
|
|
""" |
|
|
|
|
|
outputs = value._outputs_map.values() |
|
|
|
|
|
matching_pairs = [] |
|
|
|
|
|
input_ = self._inputs[input_name] |
|
|
|
|
|
matching_pairs = [ |
|
|
(output, input_) |
|
|
for output in outputs |
|
|
for output_type in output.types |
|
|
|
|
|
if input_.input_types and output_type in input_.input_types |
|
|
] |
|
|
|
|
|
if len(matching_pairs) > 1: |
|
|
matching_pairs_str = self._build_error_string_from_matching_pairs(matching_pairs) |
|
|
msg = ( |
|
|
f"There are multiple outputs from {value.__class__.__name__} " |
|
|
f"that can connect to inputs in {self.__class__.__name__}: {matching_pairs_str}" |
|
|
) |
|
|
|
|
|
if not matching_pairs: |
|
|
msg = ( |
|
|
f"No matching output from {value.__class__.__name__} found for input '{input_name}' " |
|
|
f"in {self.__class__.__name__}." |
|
|
) |
|
|
raise ValueError(msg) |
|
|
|
|
|
output, input_ = matching_pairs[0] |
|
|
|
|
|
if not isinstance(output.method, str): |
|
|
msg = f"Method {output.method} is not a valid output of {value.__class__.__name__}" |
|
|
raise TypeError(msg) |
|
|
return getattr(value, output.method) |
|
|
|
|
|
def _process_connection_or_parameter(self, key, value) -> None: |
|
|
input_ = self._get_or_create_input(key) |
|
|
|
|
|
if isinstance(value, Component): |
|
|
|
|
|
|
|
|
|
|
|
value = self._find_matching_output_method(key, value) |
|
|
if callable(value) and self._inherits_from_component(value): |
|
|
try: |
|
|
self._method_is_valid_output(value) |
|
|
except ValueError as e: |
|
|
msg = f"Method {value.__name__} is not a valid output of {value.__self__.__class__.__name__}" |
|
|
raise ValueError(msg) from e |
|
|
self._connect_to_component(key, value, input_) |
|
|
else: |
|
|
self._set_parameter_or_attribute(key, value) |
|
|
|
|
|
def _process_connection_or_parameters(self, key, value) -> None: |
|
|
|
|
|
|
|
|
if isinstance(value, list) and not any( |
|
|
isinstance(val, str | int | float | bool | type(None) | Message | Data | StructuredTool) for val in value |
|
|
): |
|
|
for val in value: |
|
|
self._process_connection_or_parameter(key, val) |
|
|
else: |
|
|
self._process_connection_or_parameter(key, value) |
|
|
|
|
|
def _get_or_create_input(self, key): |
|
|
try: |
|
|
return self._inputs[key] |
|
|
except KeyError: |
|
|
input_ = self._get_fallback_input(name=key, display_name=key) |
|
|
self._inputs[key] = input_ |
|
|
self.inputs.append(input_) |
|
|
return input_ |
|
|
|
|
|
def _connect_to_component(self, key, value, input_) -> None: |
|
|
component = value.__self__ |
|
|
self._components.append(component) |
|
|
output = component.get_output_by_method(value) |
|
|
self._add_edge(component, key, output, input_) |
|
|
|
|
|
def _add_edge(self, component, key, output, input_) -> None: |
|
|
self._edges.append( |
|
|
{ |
|
|
"source": component._id, |
|
|
"target": self._id, |
|
|
"data": { |
|
|
"sourceHandle": { |
|
|
"dataType": component.name or component.__class__.__name__, |
|
|
"id": component._id, |
|
|
"name": output.name, |
|
|
"output_types": output.types, |
|
|
}, |
|
|
"targetHandle": { |
|
|
"fieldName": key, |
|
|
"id": self._id, |
|
|
"inputTypes": input_.input_types, |
|
|
"type": input_.field_type, |
|
|
}, |
|
|
}, |
|
|
} |
|
|
) |
|
|
|
|
|
def _set_parameter_or_attribute(self, key, value) -> None: |
|
|
if isinstance(value, Component): |
|
|
methods = ", ".join([f"'{output.method}'" for output in value.outputs]) |
|
|
msg = ( |
|
|
f"You set {value.display_name} as value for `{key}`. " |
|
|
f"You should pass one of the following: {methods}" |
|
|
) |
|
|
raise TypeError(msg) |
|
|
self._set_input_value(key, value) |
|
|
self._parameters[key] = value |
|
|
self._attributes[key] = value |
|
|
|
|
|
def __call__(self, **kwargs): |
|
|
self.set(**kwargs) |
|
|
|
|
|
return run_until_complete(self.run()) |
|
|
|
|
|
async def _run(self): |
|
|
|
|
|
for key, _input in self._inputs.items(): |
|
|
if asyncio.iscoroutinefunction(_input.value): |
|
|
self._inputs[key].value = await _input.value() |
|
|
elif callable(_input.value): |
|
|
self._inputs[key].value = await asyncio.to_thread(_input.value) |
|
|
|
|
|
self.set_attributes({}) |
|
|
|
|
|
return await self.build_results() |
|
|
|
|
|
def __getattr__(self, name: str) -> Any: |
|
|
if "_attributes" in self.__dict__ and name in self.__dict__["_attributes"]: |
|
|
return self.__dict__["_attributes"][name] |
|
|
if "_inputs" in self.__dict__ and name in self.__dict__["_inputs"]: |
|
|
return self.__dict__["_inputs"][name].value |
|
|
if "_outputs_map" in self.__dict__ and name in self.__dict__["_outputs_map"]: |
|
|
return self.__dict__["_outputs_map"][name] |
|
|
if name in BACKWARDS_COMPATIBLE_ATTRIBUTES: |
|
|
return self.__dict__[f"_{name}"] |
|
|
if name.startswith("_") and name[1:] in BACKWARDS_COMPATIBLE_ATTRIBUTES: |
|
|
return self.__dict__[name] |
|
|
if name == "graph": |
|
|
|
|
|
session_id = self._session_id if hasattr(self, "_session_id") else None |
|
|
user_id = self._user_id if hasattr(self, "_user_id") else None |
|
|
flow_name = self._flow_name if hasattr(self, "_flow_name") else None |
|
|
flow_id = self._flow_id if hasattr(self, "_flow_id") else None |
|
|
return PlaceholderGraph( |
|
|
flow_id=flow_id, user_id=str(user_id), session_id=session_id, context={}, flow_name=flow_name |
|
|
) |
|
|
msg = f"{name} not found in {self.__class__.__name__}" |
|
|
raise AttributeError(msg) |
|
|
|
|
|
def _set_input_value(self, name: str, value: Any) -> None: |
|
|
if name in self._inputs: |
|
|
input_value = self._inputs[name].value |
|
|
if isinstance(input_value, Component): |
|
|
methods = ", ".join([f"'{output.method}'" for output in input_value.outputs]) |
|
|
msg = ( |
|
|
f"You set {input_value.display_name} as value for `{name}`. " |
|
|
f"You should pass one of the following: {methods}" |
|
|
) |
|
|
raise ValueError(msg) |
|
|
if callable(input_value) and hasattr(input_value, "__self__"): |
|
|
msg = f"Input {name} is connected to {input_value.__self__.display_name}.{input_value.__name__}" |
|
|
raise ValueError(msg) |
|
|
self._inputs[name].value = value |
|
|
if hasattr(self._inputs[name], "load_from_db"): |
|
|
self._inputs[name].load_from_db = False |
|
|
else: |
|
|
msg = f"Input {name} not found in {self.__class__.__name__}" |
|
|
raise ValueError(msg) |
|
|
|
|
|
def _validate_outputs(self) -> None: |
|
|
|
|
|
pass |
|
|
|
|
|
def _map_parameters_on_frontend_node(self, frontend_node: ComponentFrontendNode) -> None: |
|
|
for name, value in self._parameters.items(): |
|
|
frontend_node.set_field_value_in_template(name, value) |
|
|
|
|
|
def _map_parameters_on_template(self, template: dict) -> None: |
|
|
for name, value in self._parameters.items(): |
|
|
try: |
|
|
template[name]["value"] = value |
|
|
except KeyError as e: |
|
|
close_match = find_closest_match(name, list(template.keys())) |
|
|
if close_match: |
|
|
msg = f"Parameter '{name}' not found in {self.__class__.__name__}. Did you mean '{close_match}'?" |
|
|
raise ValueError(msg) from e |
|
|
msg = f"Parameter {name} not found in {self.__class__.__name__}. " |
|
|
raise ValueError(msg) from e |
|
|
|
|
|
def _get_method_return_type(self, method_name: str) -> list[str]: |
|
|
method = getattr(self, method_name) |
|
|
return_type = get_type_hints(method)["return"] |
|
|
extracted_return_types = self._extract_return_type(return_type) |
|
|
return [format_type(extracted_return_type) for extracted_return_type in extracted_return_types] |
|
|
|
|
|
def _update_template(self, frontend_node: dict): |
|
|
return frontend_node |
|
|
|
|
|
def to_frontend_node(self): |
|
|
|
|
|
|
|
|
|
|
|
field_config = self.get_template_config(self) |
|
|
frontend_node = ComponentFrontendNode.from_inputs(**field_config) |
|
|
for key in self._inputs: |
|
|
frontend_node.set_field_load_from_db_in_template(key, value=False) |
|
|
self._map_parameters_on_frontend_node(frontend_node) |
|
|
|
|
|
frontend_node_dict = frontend_node.to_dict(keep_name=False) |
|
|
frontend_node_dict = self._update_template(frontend_node_dict) |
|
|
self._map_parameters_on_template(frontend_node_dict["template"]) |
|
|
|
|
|
frontend_node = ComponentFrontendNode.from_dict(frontend_node_dict) |
|
|
if not self._code: |
|
|
self.set_class_code() |
|
|
code_field = Input( |
|
|
dynamic=True, |
|
|
required=True, |
|
|
placeholder="", |
|
|
multiline=True, |
|
|
value=self._code, |
|
|
password=False, |
|
|
name="code", |
|
|
advanced=True, |
|
|
field_type="code", |
|
|
is_list=False, |
|
|
) |
|
|
frontend_node.template.add_field(code_field) |
|
|
|
|
|
for output in frontend_node.outputs: |
|
|
if output.types: |
|
|
continue |
|
|
return_types = self._get_method_return_type(output.method) |
|
|
output.add_types(return_types) |
|
|
output.set_selected() |
|
|
|
|
|
frontend_node.validate_component() |
|
|
frontend_node.set_base_classes_from_outputs() |
|
|
return { |
|
|
"data": { |
|
|
"node": frontend_node.to_dict(keep_name=False), |
|
|
"type": self.name or self.__class__.__name__, |
|
|
"id": self._id, |
|
|
}, |
|
|
"id": self._id, |
|
|
} |
|
|
|
|
|
def _validate_inputs(self, params: dict) -> None: |
|
|
|
|
|
for key, value in params.copy().items(): |
|
|
if key not in self._inputs: |
|
|
continue |
|
|
input_ = self._inputs[key] |
|
|
|
|
|
|
|
|
input_.value = value |
|
|
params[input_.name] = input_.value |
|
|
|
|
|
def set_attributes(self, params: dict) -> None: |
|
|
self._validate_inputs(params) |
|
|
attributes = {} |
|
|
for key, value in params.items(): |
|
|
if key in self.__dict__ and value != getattr(self, key): |
|
|
msg = ( |
|
|
f"{self.__class__.__name__} defines an input parameter named '{key}' " |
|
|
f"that is a reserved word and cannot be used." |
|
|
) |
|
|
raise ValueError(msg) |
|
|
attributes[key] = value |
|
|
for key, input_obj in self._inputs.items(): |
|
|
if key not in attributes and key not in self._attributes: |
|
|
attributes[key] = input_obj.value or None |
|
|
self._attributes.update(attributes) |
|
|
|
|
|
def _set_outputs(self, outputs: list[dict]) -> None: |
|
|
self.outputs = [Output(**output) for output in outputs] |
|
|
for output in self.outputs: |
|
|
setattr(self, output.name, output) |
|
|
self._outputs_map[output.name] = output |
|
|
|
|
|
def get_trace_as_inputs(self): |
|
|
predefined_inputs = { |
|
|
input_.name: input_.value |
|
|
for input_ in self.inputs |
|
|
if hasattr(input_, "trace_as_input") and input_.trace_as_input |
|
|
} |
|
|
|
|
|
runtime_inputs = {name: input_.value for name, input_ in self._inputs.items() if hasattr(input_, "value")} |
|
|
return {**predefined_inputs, **runtime_inputs} |
|
|
|
|
|
def get_trace_as_metadata(self): |
|
|
return { |
|
|
input_.name: input_.value |
|
|
for input_ in self.inputs |
|
|
if hasattr(input_, "trace_as_metadata") and input_.trace_as_metadata |
|
|
} |
|
|
|
|
|
async def _build_with_tracing(self): |
|
|
inputs = self.get_trace_as_inputs() |
|
|
metadata = self.get_trace_as_metadata() |
|
|
async with self._tracing_service.trace_context(self, self.trace_name, inputs, metadata): |
|
|
results, artifacts = await self._build_results() |
|
|
self._tracing_service.set_outputs(self.trace_name, results) |
|
|
|
|
|
return results, artifacts |
|
|
|
|
|
async def _build_without_tracing(self): |
|
|
return await self._build_results() |
|
|
|
|
|
async def build_results(self): |
|
|
"""Build the results of the component.""" |
|
|
if hasattr(self, "graph"): |
|
|
session_id = self.graph.session_id |
|
|
elif hasattr(self, "_session_id"): |
|
|
session_id = self._session_id |
|
|
else: |
|
|
session_id = None |
|
|
try: |
|
|
if self._tracing_service: |
|
|
return await self._build_with_tracing() |
|
|
return await self._build_without_tracing() |
|
|
except StreamingError as e: |
|
|
await self.send_error( |
|
|
exception=e.cause, |
|
|
session_id=session_id, |
|
|
trace_name=getattr(self, "trace_name", None), |
|
|
source=e.source, |
|
|
) |
|
|
raise e.cause |
|
|
except Exception as e: |
|
|
await self.send_error( |
|
|
exception=e, |
|
|
session_id=session_id, |
|
|
source=Source(id=self._id, display_name=self.display_name, source=self.display_name), |
|
|
trace_name=getattr(self, "trace_name", None), |
|
|
) |
|
|
raise |
|
|
|
|
|
async def _build_results(self) -> tuple[dict, dict]: |
|
|
results = {} |
|
|
artifacts = {} |
|
|
if hasattr(self, "_pre_run_setup"): |
|
|
self._pre_run_setup() |
|
|
if hasattr(self, "outputs"): |
|
|
if any(getattr(_input, "tool_mode", False) for _input in self.inputs): |
|
|
self._append_tool_to_outputs_map() |
|
|
for output in self._outputs_map.values(): |
|
|
|
|
|
|
|
|
if ( |
|
|
not self._vertex |
|
|
or not self._vertex.outgoing_edges |
|
|
or output.name in self._vertex.edges_source_names |
|
|
): |
|
|
if output.method is None: |
|
|
msg = f"Output {output.name} does not have a method defined." |
|
|
raise ValueError(msg) |
|
|
self._current_output = output.name |
|
|
method: Callable = getattr(self, output.method) |
|
|
if output.cache and output.value != UNDEFINED: |
|
|
results[output.name] = output.value |
|
|
result = output.value |
|
|
else: |
|
|
|
|
|
if inspect.iscoroutinefunction(method): |
|
|
result = await method() |
|
|
else: |
|
|
result = await asyncio.to_thread(method) |
|
|
if ( |
|
|
self._vertex is not None |
|
|
and isinstance(result, Message) |
|
|
and result.flow_id is None |
|
|
and self._vertex.graph.flow_id is not None |
|
|
): |
|
|
result.set_flow_id(self._vertex.graph.flow_id) |
|
|
results[output.name] = result |
|
|
output.value = result |
|
|
|
|
|
custom_repr = self.custom_repr() |
|
|
if custom_repr is None and isinstance(result, dict | Data | str): |
|
|
custom_repr = result |
|
|
if not isinstance(custom_repr, str): |
|
|
custom_repr = str(custom_repr) |
|
|
raw = result |
|
|
if self.status is None: |
|
|
artifact_value = raw |
|
|
else: |
|
|
artifact_value = self.status |
|
|
raw = self.status |
|
|
|
|
|
if hasattr(raw, "data") and raw is not None: |
|
|
raw = raw.data |
|
|
if raw is None: |
|
|
raw = custom_repr |
|
|
|
|
|
elif hasattr(raw, "model_dump") and raw is not None: |
|
|
raw = raw.model_dump() |
|
|
if raw is None and isinstance(result, dict | Data | str): |
|
|
raw = result.data if isinstance(result, Data) else result |
|
|
artifact_type = get_artifact_type(artifact_value, result) |
|
|
raw, artifact_type = post_process_raw(raw, artifact_type) |
|
|
artifact = {"repr": custom_repr, "raw": raw, "type": artifact_type} |
|
|
artifacts[output.name] = artifact |
|
|
self._output_logs[output.name] = self._logs |
|
|
self._logs = [] |
|
|
self._current_output = "" |
|
|
self._artifacts = artifacts |
|
|
self._results = results |
|
|
if self._tracing_service: |
|
|
self._tracing_service.set_outputs(self.trace_name, results) |
|
|
return results, artifacts |
|
|
|
|
|
def custom_repr(self): |
|
|
if self.repr_value == "": |
|
|
self.repr_value = self.status |
|
|
if isinstance(self.repr_value, dict): |
|
|
return yaml.dump(self.repr_value) |
|
|
if isinstance(self.repr_value, str): |
|
|
return self.repr_value |
|
|
if isinstance(self.repr_value, BaseModel) and not isinstance(self.repr_value, Data): |
|
|
return str(self.repr_value) |
|
|
return self.repr_value |
|
|
|
|
|
def build_inputs(self): |
|
|
"""Builds the inputs for the custom component. |
|
|
|
|
|
Returns: |
|
|
List[Input]: The list of inputs. |
|
|
""" |
|
|
|
|
|
|
|
|
self.inputs = self.template_config.get("inputs", []) |
|
|
if not self.inputs: |
|
|
return {} |
|
|
return {_input.name: _input.model_dump(by_alias=True, exclude_none=True) for _input in self.inputs} |
|
|
|
|
|
def _get_field_order(self): |
|
|
try: |
|
|
inputs = self.template_config["inputs"] |
|
|
return [field.name for field in inputs] |
|
|
except KeyError: |
|
|
return [] |
|
|
|
|
|
def build(self, **kwargs) -> None: |
|
|
self.set_attributes(kwargs) |
|
|
|
|
|
def _get_fallback_input(self, **kwargs): |
|
|
return Input(**kwargs) |
|
|
|
|
|
def to_toolkit(self) -> list[Tool]: |
|
|
component_toolkit = _get_component_toolkit() |
|
|
tools = component_toolkit(component=self).get_tools(callbacks=self.get_langchain_callbacks()) |
|
|
if hasattr(self, TOOLS_METADATA_INPUT_NAME): |
|
|
tools = component_toolkit(component=self, metadata=self.tools_metadata).update_tools_metadata(tools=tools) |
|
|
return tools |
|
|
|
|
|
def get_project_name(self): |
|
|
if hasattr(self, "_tracing_service") and self._tracing_service: |
|
|
return self._tracing_service.project_name |
|
|
return "Langflow" |
|
|
|
|
|
def log(self, message: LoggableType | list[LoggableType], name: str | None = None) -> None: |
|
|
"""Logs a message. |
|
|
|
|
|
Args: |
|
|
message (LoggableType | list[LoggableType]): The message to log. |
|
|
name (str, optional): The name of the log. Defaults to None. |
|
|
""" |
|
|
if name is None: |
|
|
name = f"Log {len(self._logs) + 1}" |
|
|
log = Log(message=message, type=get_artifact_type(message), name=name) |
|
|
self._logs.append(log) |
|
|
if self._tracing_service and self._vertex: |
|
|
self._tracing_service.add_log(trace_name=self.trace_name, log=log) |
|
|
if self._event_manager is not None and self._current_output: |
|
|
data = log.model_dump() |
|
|
data["output"] = self._current_output |
|
|
data["component_id"] = self._id |
|
|
self._event_manager.on_log(data=data) |
|
|
|
|
|
def _append_tool_output(self) -> None: |
|
|
if next((output for output in self.outputs if output.name == TOOL_OUTPUT_NAME), None) is None: |
|
|
self.outputs.append( |
|
|
Output( |
|
|
name=TOOL_OUTPUT_NAME, |
|
|
display_name=TOOL_OUTPUT_DISPLAY_NAME, |
|
|
method="to_toolkit", |
|
|
types=["Tool"], |
|
|
) |
|
|
) |
|
|
|
|
|
async def send_message(self, message: Message, id_: str | None = None): |
|
|
if (hasattr(self, "graph") and self.graph.session_id) and (message is not None and not message.session_id): |
|
|
message.session_id = self.graph.session_id |
|
|
stored_message = await self._store_message(message) |
|
|
|
|
|
self._stored_message_id = stored_message.id |
|
|
try: |
|
|
complete_message = "" |
|
|
if ( |
|
|
self._should_stream_message(stored_message, message) |
|
|
and message is not None |
|
|
and isinstance(message.text, AsyncIterator | Iterator) |
|
|
): |
|
|
complete_message = await self._stream_message(message.text, stored_message) |
|
|
stored_message.text = complete_message |
|
|
stored_message = await self._update_stored_message(stored_message) |
|
|
else: |
|
|
|
|
|
self._send_message_event(stored_message, id_=id_) |
|
|
except Exception: |
|
|
|
|
|
await delete_message(stored_message.id) |
|
|
raise |
|
|
self.status = stored_message |
|
|
return stored_message |
|
|
|
|
|
async def _store_message(self, message: Message) -> Message: |
|
|
flow_id = self.graph.flow_id if hasattr(self, "graph") else None |
|
|
messages = await astore_message(message, flow_id=flow_id) |
|
|
if len(messages) != 1: |
|
|
msg = "Only one message can be stored at a time." |
|
|
raise ValueError(msg) |
|
|
|
|
|
return messages[0] |
|
|
|
|
|
def _send_message_event(self, message: Message, id_: str | None = None, category: str | None = None) -> None: |
|
|
if hasattr(self, "_event_manager") and self._event_manager: |
|
|
data_dict = message.data.copy() if hasattr(message, "data") else message.model_dump() |
|
|
if id_ and not data_dict.get("id"): |
|
|
data_dict["id"] = id_ |
|
|
category = category or data_dict.get("category", None) |
|
|
match category: |
|
|
case "error": |
|
|
self._event_manager.on_error(data=data_dict) |
|
|
case "remove_message": |
|
|
self._event_manager.on_remove_message(data={"id": data_dict["id"]}) |
|
|
case _: |
|
|
self._event_manager.on_message(data=data_dict) |
|
|
|
|
|
def _should_stream_message(self, stored_message: Message, original_message: Message) -> bool: |
|
|
return bool( |
|
|
hasattr(self, "_event_manager") |
|
|
and self._event_manager |
|
|
and stored_message.id |
|
|
and not isinstance(original_message.text, str) |
|
|
) |
|
|
|
|
|
async def _update_stored_message(self, stored_message: Message) -> Message: |
|
|
message_tables = await aupdate_messages(stored_message) |
|
|
if len(message_tables) != 1: |
|
|
msg = "Only one message can be updated at a time." |
|
|
raise ValueError(msg) |
|
|
message_table = message_tables[0] |
|
|
return await Message.create(**message_table.model_dump()) |
|
|
|
|
|
async def _stream_message(self, iterator: AsyncIterator | Iterator, message: Message) -> str: |
|
|
if not isinstance(iterator, AsyncIterator | Iterator): |
|
|
msg = "The message must be an iterator or an async iterator." |
|
|
raise TypeError(msg) |
|
|
|
|
|
if isinstance(iterator, AsyncIterator): |
|
|
return await self._handle_async_iterator(iterator, message.id, message) |
|
|
try: |
|
|
complete_message = "" |
|
|
first_chunk = True |
|
|
for chunk in iterator: |
|
|
complete_message = self._process_chunk( |
|
|
chunk.content, complete_message, message.id, message, first_chunk=first_chunk |
|
|
) |
|
|
first_chunk = False |
|
|
except Exception as e: |
|
|
raise StreamingError(cause=e, source=message.properties.source) from e |
|
|
else: |
|
|
return complete_message |
|
|
|
|
|
async def _handle_async_iterator(self, iterator: AsyncIterator, message_id: str, message: Message) -> str: |
|
|
complete_message = "" |
|
|
first_chunk = True |
|
|
async for chunk in iterator: |
|
|
complete_message = self._process_chunk( |
|
|
chunk.content, complete_message, message_id, message, first_chunk=first_chunk |
|
|
) |
|
|
first_chunk = False |
|
|
return complete_message |
|
|
|
|
|
def _process_chunk( |
|
|
self, chunk: str, complete_message: str, message_id: str, message: Message, *, first_chunk: bool = False |
|
|
) -> str: |
|
|
complete_message += chunk |
|
|
if self._event_manager: |
|
|
if first_chunk: |
|
|
|
|
|
msg_copy = message.model_copy() |
|
|
msg_copy.text = complete_message |
|
|
self._send_message_event(msg_copy, id_=message_id) |
|
|
self._event_manager.on_token( |
|
|
data={ |
|
|
"chunk": chunk, |
|
|
"id": str(message_id), |
|
|
} |
|
|
) |
|
|
return complete_message |
|
|
|
|
|
async def send_error( |
|
|
self, |
|
|
exception: Exception, |
|
|
session_id: str, |
|
|
trace_name: str, |
|
|
source: Source, |
|
|
) -> Message: |
|
|
"""Send an error message to the frontend.""" |
|
|
flow_id = self.graph.flow_id if hasattr(self, "graph") else None |
|
|
error_message = ErrorMessage( |
|
|
flow_id=flow_id, |
|
|
exception=exception, |
|
|
session_id=session_id, |
|
|
trace_name=trace_name, |
|
|
source=source, |
|
|
) |
|
|
await self.send_message(error_message) |
|
|
return error_message |
|
|
|
|
|
def _append_tool_to_outputs_map(self): |
|
|
self._outputs_map[TOOL_OUTPUT_NAME] = self._build_tool_output() |
|
|
|
|
|
|
|
|
|
|
|
def _build_tool_output(self) -> Output: |
|
|
return Output(name=TOOL_OUTPUT_NAME, display_name=TOOL_OUTPUT_DISPLAY_NAME, method="to_toolkit", types=["Tool"]) |
|
|
|
|
|
def _build_tools_metadata_input(self): |
|
|
tools = self.to_toolkit() |
|
|
tool_data = ( |
|
|
self.tools_metadata |
|
|
if hasattr(self, TOOLS_METADATA_INPUT_NAME) |
|
|
else [{"name": tool.name, "description": tool.description} for tool in tools] |
|
|
) |
|
|
try: |
|
|
from langflow.io import TableInput |
|
|
except ImportError as e: |
|
|
msg = "Failed to import TableInput from langflow.io" |
|
|
raise ImportError(msg) from e |
|
|
|
|
|
return TableInput( |
|
|
name=TOOLS_METADATA_INPUT_NAME, |
|
|
display_name="Tools Metadata", |
|
|
real_time_refresh=True, |
|
|
table_schema=TOOL_TABLE_SCHEMA, |
|
|
value=tool_data, |
|
|
) |
|
|
|