|
|
from __future__ import annotations |
|
|
|
|
|
from collections.abc import Callable, Sequence |
|
|
from pathlib import Path |
|
|
from typing import TYPE_CHECKING, Any, ClassVar |
|
|
|
|
|
import yaml |
|
|
from cachetools import TTLCache |
|
|
from langchain_core.documents import Document |
|
|
from pydantic import BaseModel |
|
|
|
|
|
from langflow.custom.custom_component.base_component import BaseComponent |
|
|
from langflow.helpers.flow import list_flows, load_flow, run_flow |
|
|
from langflow.schema import Data |
|
|
from langflow.services.deps import get_storage_service, get_variable_service, session_scope |
|
|
from langflow.services.storage.service import StorageService |
|
|
from langflow.template.utils import update_frontend_node_with_template_values |
|
|
from langflow.type_extraction.type_extraction import post_process_type |
|
|
from langflow.utils import validate |
|
|
|
|
|
if TYPE_CHECKING: |
|
|
from langchain.callbacks.base import BaseCallbackHandler |
|
|
|
|
|
from langflow.graph.graph.base import Graph |
|
|
from langflow.graph.vertex.base import Vertex |
|
|
from langflow.schema.dotdict import dotdict |
|
|
from langflow.schema.schema import OutputValue |
|
|
from langflow.services.storage.service import StorageService |
|
|
from langflow.services.tracing.schema import Log |
|
|
from langflow.services.tracing.service import TracingService |
|
|
|
|
|
|
|
|
class CustomComponent(BaseComponent): |
|
|
"""Represents a custom component in Langflow. |
|
|
|
|
|
Attributes: |
|
|
name (Optional[str]): This attribute helps the frontend apply styles to known components. |
|
|
display_name (Optional[str]): The display name of the custom component. |
|
|
description (Optional[str]): The description of the custom component. |
|
|
code (Optional[str]): The code of the custom component. |
|
|
field_config (dict): The field configuration of the custom component. |
|
|
code_class_base_inheritance (ClassVar[str]): The base class name for the custom component. |
|
|
function_entrypoint_name (ClassVar[str]): The name of the function entrypoint for the custom component. |
|
|
function (Optional[Callable]): The function associated with the custom component. |
|
|
repr_value (Optional[Any]): The representation value of the custom component. |
|
|
user_id (Optional[Union[UUID, str]]): The user ID associated with the custom component. |
|
|
status (Optional[Any]): The status of the custom component. |
|
|
_tree (Optional[dict]): The code tree of the custom component. |
|
|
""" |
|
|
|
|
|
name: str | None = None |
|
|
"""The name of the component used to styles. Defaults to None.""" |
|
|
display_name: str | None = None |
|
|
"""The display name of the component. Defaults to None.""" |
|
|
description: str | None = None |
|
|
"""The description of the component. Defaults to None.""" |
|
|
icon: str | None = None |
|
|
"""The icon of the component. It should be an emoji. Defaults to None.""" |
|
|
is_input: bool | None = None |
|
|
"""The input state of the component. Defaults to None. |
|
|
If True, the component must have a field named 'input_value'.""" |
|
|
add_tool_output: bool | None = False |
|
|
"""Indicates whether the component will be treated as a tool. Defaults to False.""" |
|
|
is_output: bool | None = None |
|
|
"""The output state of the component. Defaults to None. |
|
|
If True, the component must have a field named 'input_value'.""" |
|
|
field_config: dict = {} |
|
|
"""The field configuration of the component. Defaults to an empty dictionary.""" |
|
|
field_order: list[str] | None = None |
|
|
"""The field order of the component. Defaults to an empty list.""" |
|
|
frozen: bool | None = False |
|
|
"""The default frozen state of the component. Defaults to False.""" |
|
|
build_parameters: dict | None = None |
|
|
"""The build parameters of the component. Defaults to None.""" |
|
|
_vertex: Vertex | None = None |
|
|
"""The edge target parameter of the component. Defaults to None.""" |
|
|
_code_class_base_inheritance: ClassVar[str] = "CustomComponent" |
|
|
function_entrypoint_name: ClassVar[str] = "build" |
|
|
function: Callable | None = None |
|
|
repr_value: Any | None = "" |
|
|
status: Any | None = None |
|
|
"""The status of the component. This is displayed on the frontend. Defaults to None.""" |
|
|
_flows_data: list[Data] | None = None |
|
|
_outputs: list[OutputValue] = [] |
|
|
_logs: list[Log] = [] |
|
|
_output_logs: dict[str, list[Log] | Log] = {} |
|
|
_tracing_service: TracingService | None = None |
|
|
_tree: dict | None = None |
|
|
|
|
|
def __init__(self, **data) -> None: |
|
|
"""Initializes a new instance of the CustomComponent class. |
|
|
|
|
|
Args: |
|
|
**data: Additional keyword arguments to initialize the custom component. |
|
|
""" |
|
|
self.cache: TTLCache = TTLCache(maxsize=1024, ttl=60) |
|
|
self._logs: list[Log] = [] |
|
|
self._results: dict = {} |
|
|
self._artifacts: dict = {} |
|
|
super().__init__(**data) |
|
|
|
|
|
def set_attributes(self, parameters: dict) -> None: |
|
|
pass |
|
|
|
|
|
def set_parameters(self, parameters: dict) -> None: |
|
|
self._parameters = parameters |
|
|
self.set_attributes(self._parameters) |
|
|
|
|
|
@property |
|
|
def trace_name(self) -> str: |
|
|
if hasattr(self, "_id") and self._id is None: |
|
|
msg = "Component id is not set" |
|
|
raise ValueError(msg) |
|
|
if hasattr(self, "_id"): |
|
|
return f"{self.display_name} ({self._id})" |
|
|
return f"{self.display_name}" |
|
|
|
|
|
def update_state(self, name: str, value: Any) -> None: |
|
|
if not self._vertex: |
|
|
msg = "Vertex is not set" |
|
|
raise ValueError(msg) |
|
|
try: |
|
|
self._vertex.graph.update_state(name=name, record=value, caller=self._vertex.id) |
|
|
except Exception as e: |
|
|
msg = f"Error updating state: {e}" |
|
|
raise ValueError(msg) from e |
|
|
|
|
|
def stop(self, output_name: str | None = None) -> None: |
|
|
if not output_name and self._vertex and len(self._vertex.outputs) == 1: |
|
|
output_name = self._vertex.outputs[0]["name"] |
|
|
elif not output_name: |
|
|
msg = "You must specify an output name to call stop" |
|
|
raise ValueError(msg) |
|
|
if not self._vertex: |
|
|
msg = "Vertex is not set" |
|
|
raise ValueError(msg) |
|
|
try: |
|
|
self.graph.mark_branch(vertex_id=self._vertex.id, output_name=output_name, state="INACTIVE") |
|
|
except Exception as e: |
|
|
msg = f"Error stopping {self.display_name}: {e}" |
|
|
raise ValueError(msg) from e |
|
|
|
|
|
def append_state(self, name: str, value: Any) -> None: |
|
|
if not self._vertex: |
|
|
msg = "Vertex is not set" |
|
|
raise ValueError(msg) |
|
|
try: |
|
|
self._vertex.graph.append_state(name=name, record=value, caller=self._vertex.id) |
|
|
except Exception as e: |
|
|
msg = f"Error appending state: {e}" |
|
|
raise ValueError(msg) from e |
|
|
|
|
|
def get_state(self, name: str): |
|
|
if not self._vertex: |
|
|
msg = "Vertex is not set" |
|
|
raise ValueError(msg) |
|
|
try: |
|
|
return self._vertex.graph.get_state(name=name) |
|
|
except Exception as e: |
|
|
msg = f"Error getting state: {e}" |
|
|
raise ValueError(msg) from e |
|
|
|
|
|
@staticmethod |
|
|
def resolve_path(path: str) -> str: |
|
|
"""Resolves the path to an absolute path.""" |
|
|
if not path: |
|
|
return path |
|
|
path_object = Path(path) |
|
|
|
|
|
if path_object.parts and path_object.parts[0] == "~": |
|
|
path_object = path_object.expanduser() |
|
|
elif path_object.is_relative_to("."): |
|
|
path_object = path_object.resolve() |
|
|
return str(path_object) |
|
|
|
|
|
def get_full_path(self, path: str) -> str: |
|
|
storage_svc: StorageService = get_storage_service() |
|
|
|
|
|
flow_id, file_name = path.split("/", 1) |
|
|
return storage_svc.build_full_path(flow_id, file_name) |
|
|
|
|
|
@property |
|
|
def graph(self): |
|
|
return self._vertex.graph |
|
|
|
|
|
@property |
|
|
def user_id(self): |
|
|
if hasattr(self, "_user_id") and self._user_id: |
|
|
return self._user_id |
|
|
return self.graph.user_id |
|
|
|
|
|
@property |
|
|
def flow_id(self): |
|
|
return self.graph.flow_id |
|
|
|
|
|
@property |
|
|
def flow_name(self): |
|
|
return self.graph.flow_name |
|
|
|
|
|
def _get_field_order(self): |
|
|
return self.field_order or list(self.field_config.keys()) |
|
|
|
|
|
def custom_repr(self): |
|
|
"""Returns the custom representation of the custom component. |
|
|
|
|
|
Returns: |
|
|
str: The custom representation of the custom component. |
|
|
""" |
|
|
if self.repr_value == "": |
|
|
self.repr_value = self.status |
|
|
if isinstance(self.repr_value, dict): |
|
|
return yaml.dump(self.repr_value) |
|
|
if isinstance(self.repr_value, str): |
|
|
return self.repr_value |
|
|
if isinstance(self.repr_value, BaseModel) and not isinstance(self.repr_value, Data): |
|
|
return str(self.repr_value) |
|
|
return self.repr_value |
|
|
|
|
|
def build_config(self): |
|
|
"""Builds the configuration for the custom component. |
|
|
|
|
|
Returns: |
|
|
dict: The configuration for the custom component. |
|
|
""" |
|
|
return self.field_config |
|
|
|
|
|
def update_build_config( |
|
|
self, |
|
|
build_config: dotdict, |
|
|
field_value: Any, |
|
|
field_name: str | None = None, |
|
|
): |
|
|
build_config[field_name]["value"] = field_value |
|
|
return build_config |
|
|
|
|
|
@property |
|
|
def tree(self): |
|
|
"""Gets the code tree of the custom component. |
|
|
|
|
|
Returns: |
|
|
dict: The code tree of the custom component. |
|
|
""" |
|
|
return self.get_code_tree(self._code or "") |
|
|
|
|
|
def to_data(self, data: Any, *, keys: list[str] | None = None, silent_errors: bool = False) -> list[Data]: |
|
|
"""Converts input data into a list of Data objects. |
|
|
|
|
|
Args: |
|
|
data (Any): The input data to be converted. It can be a single item or a sequence of items. |
|
|
If the input data is a Langchain Document, text_key and data_key are ignored. |
|
|
|
|
|
keys (List[str], optional): The keys to access the text and data values in each item. |
|
|
It should be a list of strings where the first element is the text key and the second element |
|
|
is the data key. |
|
|
Defaults to None, in which case the default keys "text" and "data" are used. |
|
|
silent_errors (bool, optional): Whether to suppress errors when the specified keys are not found |
|
|
in the data. |
|
|
|
|
|
Returns: |
|
|
List[Data]: A list of Data objects. |
|
|
|
|
|
Raises: |
|
|
ValueError: If the input data is not of a valid type or if the specified keys are not found in the data. |
|
|
|
|
|
""" |
|
|
if not keys: |
|
|
keys = [] |
|
|
data_objects = [] |
|
|
if not isinstance(data, Sequence): |
|
|
data = [data] |
|
|
for item in data: |
|
|
data_dict = {} |
|
|
if isinstance(item, Document): |
|
|
data_dict = item.metadata |
|
|
data_dict["text"] = item.page_content |
|
|
elif isinstance(item, BaseModel): |
|
|
model_dump = item.model_dump() |
|
|
for key in keys: |
|
|
if silent_errors: |
|
|
data_dict[key] = model_dump.get(key, "") |
|
|
else: |
|
|
try: |
|
|
data_dict[key] = model_dump[key] |
|
|
except KeyError as e: |
|
|
msg = f"Key {key} not found in {item}" |
|
|
raise ValueError(msg) from e |
|
|
|
|
|
elif isinstance(item, str): |
|
|
data_dict = {"text": item} |
|
|
elif isinstance(item, dict): |
|
|
data_dict = item.copy() |
|
|
else: |
|
|
msg = f"Invalid data type: {type(item)}" |
|
|
raise TypeError(msg) |
|
|
|
|
|
data_objects.append(Data(data=data_dict)) |
|
|
|
|
|
return data_objects |
|
|
|
|
|
def get_method_return_type(self, method_name: str): |
|
|
build_method = self.get_method(method_name) |
|
|
if not build_method or not build_method.get("has_return"): |
|
|
return [] |
|
|
return_type = build_method["return_type"] |
|
|
|
|
|
return self._extract_return_type(return_type) |
|
|
|
|
|
def create_references_from_data(self, data: list[Data], *, include_data: bool = False) -> str: |
|
|
"""Create references from a list of data. |
|
|
|
|
|
Args: |
|
|
data (List[dict]): A list of data, where each record is a dictionary. |
|
|
include_data (bool, optional): Whether to include data in the references. Defaults to False. |
|
|
|
|
|
Returns: |
|
|
str: A string containing the references in markdown format. |
|
|
""" |
|
|
if not data: |
|
|
return "" |
|
|
markdown_string = "---\n" |
|
|
for value in data: |
|
|
markdown_string += f"- Text: {value.get_text()}" |
|
|
if include_data: |
|
|
markdown_string += f" Data: {value.data}" |
|
|
markdown_string += "\n" |
|
|
return markdown_string |
|
|
|
|
|
@property |
|
|
def get_function_entrypoint_args(self) -> list: |
|
|
"""Gets the arguments of the function entrypoint for the custom component. |
|
|
|
|
|
Returns: |
|
|
list: The arguments of the function entrypoint. |
|
|
""" |
|
|
build_method = self.get_method(self._function_entrypoint_name) |
|
|
if not build_method: |
|
|
return [] |
|
|
|
|
|
args = build_method["args"] |
|
|
for arg in args: |
|
|
if not arg.get("type") and arg.get("name") != "self": |
|
|
|
|
|
arg["type"] = "Data" |
|
|
return args |
|
|
|
|
|
def get_method(self, method_name: str): |
|
|
"""Gets the build method for the custom component. |
|
|
|
|
|
Returns: |
|
|
dict: The build method for the custom component. |
|
|
""" |
|
|
if not self._code: |
|
|
return {} |
|
|
|
|
|
component_classes = [ |
|
|
cls for cls in self.tree["classes"] if "Component" in cls["bases"] or "CustomComponent" in cls["bases"] |
|
|
] |
|
|
if not component_classes: |
|
|
return {} |
|
|
|
|
|
|
|
|
component_class = component_classes[0] |
|
|
build_methods = [method for method in component_class["methods"] if method["name"] == (method_name)] |
|
|
|
|
|
return build_methods[0] if build_methods else {} |
|
|
|
|
|
@property |
|
|
def _get_function_entrypoint_return_type(self) -> list[Any]: |
|
|
"""Gets the return type of the function entrypoint for the custom component. |
|
|
|
|
|
Returns: |
|
|
List[Any]: The return type of the function entrypoint. |
|
|
""" |
|
|
return self.get_method_return_type(self._function_entrypoint_name) |
|
|
|
|
|
def _extract_return_type(self, return_type: Any) -> list[Any]: |
|
|
return post_process_type(return_type) |
|
|
|
|
|
@property |
|
|
def get_main_class_name(self): |
|
|
"""Gets the main class name of the custom component. |
|
|
|
|
|
Returns: |
|
|
str: The main class name of the custom component. |
|
|
""" |
|
|
if not self._code: |
|
|
return "" |
|
|
|
|
|
base_name = self._code_class_base_inheritance |
|
|
method_name = self._function_entrypoint_name |
|
|
|
|
|
classes = [] |
|
|
for item in self.tree.get("classes", []): |
|
|
if base_name in item["bases"]: |
|
|
method_names = [method["name"] for method in item["methods"]] |
|
|
if method_name in method_names: |
|
|
classes.append(item["name"]) |
|
|
|
|
|
|
|
|
return next(iter(classes), "") |
|
|
|
|
|
@property |
|
|
def template_config(self): |
|
|
"""Gets the template configuration for the custom component. |
|
|
|
|
|
Returns: |
|
|
dict: The template configuration for the custom component. |
|
|
""" |
|
|
if not self._template_config: |
|
|
self._template_config = self.build_template_config() |
|
|
return self._template_config |
|
|
|
|
|
@property |
|
|
def variables(self): |
|
|
"""Returns the variable for the current user with the specified name. |
|
|
|
|
|
Raises: |
|
|
ValueError: If the user id is not set. |
|
|
|
|
|
Returns: |
|
|
The variable for the current user with the specified name. |
|
|
""" |
|
|
|
|
|
def get_variable(name: str, field: str): |
|
|
if hasattr(self, "_user_id") and not self.user_id: |
|
|
msg = f"User id is not set for {self.__class__.__name__}" |
|
|
raise ValueError(msg) |
|
|
variable_service = get_variable_service() |
|
|
|
|
|
with session_scope() as session: |
|
|
user_id = self.user_id or "" |
|
|
return variable_service.get_variable(user_id=user_id, name=name, field=field, session=session) |
|
|
|
|
|
return get_variable |
|
|
|
|
|
def list_key_names(self): |
|
|
"""Lists the names of the variables for the current user. |
|
|
|
|
|
Raises: |
|
|
ValueError: If the user id is not set. |
|
|
|
|
|
Returns: |
|
|
List[str]: The names of the variables for the current user. |
|
|
""" |
|
|
if hasattr(self, "_user_id") and not self.user_id: |
|
|
msg = f"User id is not set for {self.__class__.__name__}" |
|
|
raise ValueError(msg) |
|
|
variable_service = get_variable_service() |
|
|
|
|
|
with session_scope() as session: |
|
|
return variable_service.list_variables_sync(user_id=self.user_id, session=session) |
|
|
|
|
|
def index(self, value: int = 0): |
|
|
"""Returns a function that returns the value at the given index in the iterable. |
|
|
|
|
|
Args: |
|
|
value (int): The index value. |
|
|
|
|
|
Returns: |
|
|
Callable: A function that returns the value at the given index. |
|
|
""" |
|
|
|
|
|
def get_index(iterable: list[Any]): |
|
|
return iterable[value] if iterable else iterable |
|
|
|
|
|
return get_index |
|
|
|
|
|
def get_function(self): |
|
|
"""Gets the function associated with the custom component. |
|
|
|
|
|
Returns: |
|
|
Callable: The function associated with the custom component. |
|
|
""" |
|
|
return validate.create_function(self._code, self._function_entrypoint_name) |
|
|
|
|
|
async def load_flow(self, flow_id: str, tweaks: dict | None = None) -> Graph: |
|
|
if not self.user_id: |
|
|
msg = "Session is invalid" |
|
|
raise ValueError(msg) |
|
|
return await load_flow(user_id=str(self.user_id), flow_id=flow_id, tweaks=tweaks) |
|
|
|
|
|
async def run_flow( |
|
|
self, |
|
|
inputs: dict | list[dict] | None = None, |
|
|
flow_id: str | None = None, |
|
|
flow_name: str | None = None, |
|
|
output_type: str | None = "chat", |
|
|
tweaks: dict | None = None, |
|
|
) -> Any: |
|
|
return await run_flow( |
|
|
inputs=inputs, |
|
|
output_type=output_type, |
|
|
flow_id=flow_id, |
|
|
flow_name=flow_name, |
|
|
tweaks=tweaks, |
|
|
user_id=str(self.user_id), |
|
|
run_id=self.graph.run_id, |
|
|
) |
|
|
|
|
|
def list_flows(self) -> list[Data]: |
|
|
if not self.user_id: |
|
|
msg = "Session is invalid" |
|
|
raise ValueError(msg) |
|
|
try: |
|
|
return list_flows(user_id=str(self.user_id)) |
|
|
except Exception as e: |
|
|
msg = f"Error listing flows: {e}" |
|
|
raise ValueError(msg) from e |
|
|
|
|
|
def build(self, *args: Any, **kwargs: Any) -> Any: |
|
|
"""Builds the custom component. |
|
|
|
|
|
Args: |
|
|
*args: The positional arguments. |
|
|
**kwargs: The keyword arguments. |
|
|
|
|
|
Returns: |
|
|
Any: The result of the build process. |
|
|
""" |
|
|
raise NotImplementedError |
|
|
|
|
|
def post_code_processing(self, new_frontend_node: dict, current_frontend_node: dict): |
|
|
"""This function is called after the code validation is done.""" |
|
|
return update_frontend_node_with_template_values( |
|
|
frontend_node=new_frontend_node, raw_frontend_node=current_frontend_node |
|
|
) |
|
|
|
|
|
def get_langchain_callbacks(self) -> list[BaseCallbackHandler]: |
|
|
if self._tracing_service: |
|
|
return self._tracing_service.get_langchain_callbacks() |
|
|
return [] |
|
|
|