|
|
import ast |
|
|
import contextlib |
|
|
import re |
|
|
import traceback |
|
|
from typing import Any |
|
|
from uuid import UUID |
|
|
|
|
|
from fastapi import HTTPException |
|
|
from loguru import logger |
|
|
from pydantic import BaseModel |
|
|
|
|
|
from langflow.custom import CustomComponent |
|
|
from langflow.custom.custom_component.component import Component |
|
|
from langflow.custom.directory_reader.utils import ( |
|
|
abuild_custom_component_list_from_path, |
|
|
build_custom_component_list_from_path, |
|
|
merge_nested_dicts_with_renaming, |
|
|
) |
|
|
from langflow.custom.eval import eval_custom_component_code |
|
|
from langflow.custom.schema import MissingDefault |
|
|
from langflow.field_typing.range_spec import RangeSpec |
|
|
from langflow.helpers.custom import format_type |
|
|
from langflow.schema import dotdict |
|
|
from langflow.template.field.base import Input |
|
|
from langflow.template.frontend_node.custom_components import ComponentFrontendNode, CustomComponentFrontendNode |
|
|
from langflow.type_extraction.type_extraction import extract_inner_type |
|
|
from langflow.utils import validate |
|
|
from langflow.utils.util import get_base_classes |
|
|
|
|
|
|
|
|
class UpdateBuildConfigError(Exception): |
|
|
pass |
|
|
|
|
|
|
|
|
def add_output_types(frontend_node: CustomComponentFrontendNode, return_types: list[str]) -> None: |
|
|
"""Add output types to the frontend node.""" |
|
|
for return_type in return_types: |
|
|
if return_type is None: |
|
|
raise HTTPException( |
|
|
status_code=400, |
|
|
detail={ |
|
|
"error": ("Invalid return type. Please check your code and try again."), |
|
|
"traceback": traceback.format_exc(), |
|
|
}, |
|
|
) |
|
|
if return_type is str: |
|
|
return_type_ = "Text" |
|
|
elif hasattr(return_type, "__name__"): |
|
|
return_type_ = return_type.__name__ |
|
|
elif hasattr(return_type, "__class__"): |
|
|
return_type_ = return_type.__class__.__name__ |
|
|
else: |
|
|
return_type_ = str(return_type) |
|
|
|
|
|
frontend_node.add_output_type(return_type_) |
|
|
|
|
|
|
|
|
def reorder_fields(frontend_node: CustomComponentFrontendNode, field_order: list[str]) -> None: |
|
|
"""Reorder fields in the frontend node based on the specified field_order.""" |
|
|
if not field_order: |
|
|
return |
|
|
|
|
|
|
|
|
field_dict = {field.name: field for field in frontend_node.template.fields} |
|
|
reordered_fields = [field_dict[name] for name in field_order if name in field_dict] |
|
|
|
|
|
reordered_fields.extend(field for field in frontend_node.template.fields if field.name not in field_order) |
|
|
frontend_node.template.fields = reordered_fields |
|
|
frontend_node.field_order = field_order |
|
|
|
|
|
|
|
|
def add_base_classes(frontend_node: CustomComponentFrontendNode, return_types: list[str]) -> None: |
|
|
"""Add base classes to the frontend node.""" |
|
|
for return_type_instance in return_types: |
|
|
if return_type_instance is None: |
|
|
raise HTTPException( |
|
|
status_code=400, |
|
|
detail={ |
|
|
"error": ("Invalid return type. Please check your code and try again."), |
|
|
"traceback": traceback.format_exc(), |
|
|
}, |
|
|
) |
|
|
|
|
|
base_classes = get_base_classes(return_type_instance) |
|
|
if return_type_instance is str: |
|
|
base_classes.append("Text") |
|
|
|
|
|
for base_class in base_classes: |
|
|
frontend_node.add_base_class(base_class) |
|
|
|
|
|
|
|
|
def extract_type_from_optional(field_type): |
|
|
"""Extract the type from a string formatted as "Optional[<type>]". |
|
|
|
|
|
Parameters: |
|
|
field_type (str): The string from which to extract the type. |
|
|
|
|
|
Returns: |
|
|
str: The extracted type, or an empty string if no type was found. |
|
|
""" |
|
|
if "optional" not in field_type.lower(): |
|
|
return field_type |
|
|
match = re.search(r"\[(.*?)\]$", field_type) |
|
|
return match[1] if match else field_type |
|
|
|
|
|
|
|
|
def get_field_properties(extra_field): |
|
|
"""Get the properties of an extra field.""" |
|
|
field_name = extra_field["name"] |
|
|
field_type = extra_field.get("type", "str") |
|
|
field_value = extra_field.get("default", "") |
|
|
|
|
|
|
|
|
|
|
|
field_required = "optional" not in field_type.lower() and isinstance(field_value, MissingDefault) |
|
|
field_value = field_value if not isinstance(field_value, MissingDefault) else None |
|
|
|
|
|
if not field_required: |
|
|
field_type = extract_type_from_optional(field_type) |
|
|
if field_value is not None: |
|
|
with contextlib.suppress(Exception): |
|
|
field_value = ast.literal_eval(field_value) |
|
|
return field_name, field_type, field_value, field_required |
|
|
|
|
|
|
|
|
def process_type(field_type: str): |
|
|
if field_type.startswith(("list", "List")): |
|
|
return extract_inner_type(field_type) |
|
|
|
|
|
|
|
|
|
|
|
lowercase_type = field_type.lower() |
|
|
if lowercase_type in {"prompt", "code"}: |
|
|
return lowercase_type |
|
|
return field_type |
|
|
|
|
|
|
|
|
def add_new_custom_field( |
|
|
*, |
|
|
frontend_node: CustomComponentFrontendNode, |
|
|
field_name: str, |
|
|
field_type: str, |
|
|
field_value: Any, |
|
|
field_required: bool, |
|
|
field_config: dict, |
|
|
): |
|
|
|
|
|
|
|
|
display_name = field_config.pop("display_name", None) |
|
|
if not field_type: |
|
|
if "type" in field_config and field_config["type"] is not None: |
|
|
field_type = field_config.pop("type") |
|
|
elif "field_type" in field_config and field_config["field_type"] is not None: |
|
|
field_type = field_config.pop("field_type") |
|
|
field_contains_list = "list" in field_type.lower() |
|
|
field_type = process_type(field_type) |
|
|
field_value = field_config.pop("value", field_value) |
|
|
field_advanced = field_config.pop("advanced", False) |
|
|
|
|
|
if field_type == "Dict": |
|
|
field_type = "dict" |
|
|
|
|
|
if field_type == "bool" and field_value is None: |
|
|
field_value = False |
|
|
|
|
|
if field_type == "SecretStr": |
|
|
field_config["password"] = True |
|
|
field_config["load_from_db"] = True |
|
|
field_config["input_types"] = ["Text"] |
|
|
|
|
|
|
|
|
|
|
|
is_list = isinstance(field_config.get("options"), list) |
|
|
field_config["is_list"] = is_list or field_config.get("list", False) or field_contains_list |
|
|
|
|
|
if "name" in field_config: |
|
|
logger.warning("The 'name' key in field_config is used to build the object and can't be changed.") |
|
|
required = field_config.pop("required", field_required) |
|
|
placeholder = field_config.pop("placeholder", "") |
|
|
|
|
|
new_field = Input( |
|
|
name=field_name, |
|
|
field_type=field_type, |
|
|
value=field_value, |
|
|
show=True, |
|
|
required=required, |
|
|
advanced=field_advanced, |
|
|
placeholder=placeholder, |
|
|
display_name=display_name, |
|
|
**sanitize_field_config(field_config), |
|
|
) |
|
|
frontend_node.template.upsert_field(field_name, new_field) |
|
|
if isinstance(frontend_node.custom_fields, dict): |
|
|
frontend_node.custom_fields[field_name] = None |
|
|
|
|
|
return frontend_node |
|
|
|
|
|
|
|
|
def add_extra_fields(frontend_node, field_config, function_args) -> None: |
|
|
"""Add extra fields to the frontend node.""" |
|
|
if not function_args: |
|
|
return |
|
|
field_config_ = field_config.copy() |
|
|
function_args_names = [arg["name"] for arg in function_args] |
|
|
|
|
|
|
|
|
|
|
|
for extra_field in function_args: |
|
|
if "name" not in extra_field or extra_field["name"] in { |
|
|
"self", |
|
|
"kwargs", |
|
|
"args", |
|
|
}: |
|
|
continue |
|
|
|
|
|
field_name, field_type, field_value, field_required = get_field_properties(extra_field) |
|
|
config = field_config_.pop(field_name, {}) |
|
|
frontend_node = add_new_custom_field( |
|
|
frontend_node=frontend_node, |
|
|
field_name=field_name, |
|
|
field_type=field_type, |
|
|
field_value=field_value, |
|
|
field_required=field_required, |
|
|
field_config=config, |
|
|
) |
|
|
if "kwargs" in function_args_names and not all(key in function_args_names for key in field_config): |
|
|
for field_name, config in field_config_.items(): |
|
|
if "name" not in config or field_name == "code": |
|
|
continue |
|
|
config_ = config.model_dump() if isinstance(config, BaseModel) else config |
|
|
field_name_, field_type, field_value, field_required = get_field_properties(extra_field=config_) |
|
|
frontend_node = add_new_custom_field( |
|
|
frontend_node=frontend_node, |
|
|
field_name=field_name_, |
|
|
field_type=field_type, |
|
|
field_value=field_value, |
|
|
field_required=field_required, |
|
|
field_config=config_, |
|
|
) |
|
|
|
|
|
|
|
|
def get_field_dict(field: Input | dict): |
|
|
"""Get the field dictionary from a Input or a dict.""" |
|
|
if isinstance(field, Input): |
|
|
return dotdict(field.model_dump(by_alias=True, exclude_none=True)) |
|
|
return field |
|
|
|
|
|
|
|
|
def run_build_inputs( |
|
|
custom_component: Component, |
|
|
): |
|
|
"""Run the build inputs of a custom component.""" |
|
|
try: |
|
|
return custom_component.build_inputs() |
|
|
|
|
|
except Exception as exc: |
|
|
logger.exception("Error running build inputs") |
|
|
raise HTTPException(status_code=500, detail=str(exc)) from exc |
|
|
|
|
|
|
|
|
def get_component_instance(custom_component: CustomComponent, user_id: str | UUID | None = None): |
|
|
if custom_component._code is None: |
|
|
error = "Code is None" |
|
|
elif not isinstance(custom_component._code, str): |
|
|
error = "Invalid code type" |
|
|
else: |
|
|
try: |
|
|
custom_class = eval_custom_component_code(custom_component._code) |
|
|
except Exception as exc: |
|
|
logger.exception("Error while evaluating custom component code") |
|
|
raise HTTPException( |
|
|
status_code=400, |
|
|
detail={ |
|
|
"error": ("Invalid type conversion. Please check your code and try again."), |
|
|
"traceback": traceback.format_exc(), |
|
|
}, |
|
|
) from exc |
|
|
|
|
|
try: |
|
|
return custom_class(_user_id=user_id, _code=custom_component._code) |
|
|
except Exception as exc: |
|
|
logger.exception("Error while instantiating custom component") |
|
|
if hasattr(exc, "detail") and "traceback" in exc.detail: |
|
|
logger.error(exc.detail["traceback"]) |
|
|
|
|
|
raise |
|
|
|
|
|
msg = f"Invalid type conversion: {error}. Please check your code and try again." |
|
|
logger.error(msg) |
|
|
raise HTTPException( |
|
|
status_code=400, |
|
|
detail={"error": msg}, |
|
|
) |
|
|
|
|
|
|
|
|
def run_build_config( |
|
|
custom_component: CustomComponent, |
|
|
user_id: str | UUID | None = None, |
|
|
) -> tuple[dict, CustomComponent]: |
|
|
"""Build the field configuration for a custom component.""" |
|
|
if custom_component._code is None: |
|
|
error = "Code is None" |
|
|
elif not isinstance(custom_component._code, str): |
|
|
error = "Invalid code type" |
|
|
else: |
|
|
try: |
|
|
custom_class = eval_custom_component_code(custom_component._code) |
|
|
except Exception as exc: |
|
|
logger.exception("Error while evaluating custom component code") |
|
|
raise HTTPException( |
|
|
status_code=400, |
|
|
detail={ |
|
|
"error": ("Invalid type conversion. Please check your code and try again."), |
|
|
"traceback": traceback.format_exc(), |
|
|
}, |
|
|
) from exc |
|
|
|
|
|
try: |
|
|
custom_instance = custom_class(_user_id=user_id) |
|
|
build_config: dict = custom_instance.build_config() |
|
|
|
|
|
for field_name, field in build_config.copy().items(): |
|
|
|
|
|
|
|
|
field_dict = get_field_dict(field) |
|
|
|
|
|
if "rangeSpec" in field_dict and isinstance(field_dict["rangeSpec"], RangeSpec): |
|
|
field_dict["rangeSpec"] = field_dict["rangeSpec"].model_dump() |
|
|
build_config[field_name] = field_dict |
|
|
|
|
|
except Exception as exc: |
|
|
logger.exception("Error while building field config") |
|
|
if hasattr(exc, "detail") and "traceback" in exc.detail: |
|
|
logger.error(exc.detail["traceback"]) |
|
|
raise |
|
|
return build_config, custom_instance |
|
|
|
|
|
msg = f"Invalid type conversion: {error}. Please check your code and try again." |
|
|
logger.error(msg) |
|
|
raise HTTPException( |
|
|
status_code=400, |
|
|
detail={"error": msg}, |
|
|
) |
|
|
|
|
|
|
|
|
def add_code_field(frontend_node: CustomComponentFrontendNode, raw_code): |
|
|
code_field = Input( |
|
|
dynamic=True, |
|
|
required=True, |
|
|
placeholder="", |
|
|
multiline=True, |
|
|
value=raw_code, |
|
|
password=False, |
|
|
name="code", |
|
|
advanced=True, |
|
|
field_type="code", |
|
|
is_list=False, |
|
|
) |
|
|
frontend_node.template.add_field(code_field) |
|
|
|
|
|
return frontend_node |
|
|
|
|
|
|
|
|
def build_custom_component_template_from_inputs( |
|
|
custom_component: Component | CustomComponent, user_id: str | UUID | None = None |
|
|
): |
|
|
|
|
|
cc_instance = get_component_instance(custom_component, user_id=user_id) |
|
|
field_config = cc_instance.get_template_config(cc_instance) |
|
|
frontend_node = ComponentFrontendNode.from_inputs(**field_config) |
|
|
frontend_node = add_code_field(frontend_node, custom_component._code) |
|
|
|
|
|
for output in frontend_node.outputs: |
|
|
if output.types: |
|
|
continue |
|
|
return_types = cc_instance.get_method_return_type(output.method) |
|
|
return_types = [format_type(return_type) for return_type in return_types] |
|
|
output.add_types(return_types) |
|
|
output.set_selected() |
|
|
|
|
|
frontend_node.validate_component() |
|
|
|
|
|
frontend_node.set_base_classes_from_outputs() |
|
|
reorder_fields(frontend_node, cc_instance._get_field_order()) |
|
|
|
|
|
return frontend_node.to_dict(keep_name=False), cc_instance |
|
|
|
|
|
|
|
|
def build_custom_component_template( |
|
|
custom_component: CustomComponent, |
|
|
user_id: str | UUID | None = None, |
|
|
) -> tuple[dict[str, Any], CustomComponent | Component]: |
|
|
"""Build a custom component template.""" |
|
|
try: |
|
|
has_template_config = hasattr(custom_component, "template_config") |
|
|
except Exception as exc: |
|
|
raise HTTPException( |
|
|
status_code=400, |
|
|
detail={ |
|
|
"error": (f"Error building Component: {exc}"), |
|
|
"traceback": traceback.format_exc(), |
|
|
}, |
|
|
) from exc |
|
|
if not has_template_config: |
|
|
raise HTTPException( |
|
|
status_code=400, |
|
|
detail={ |
|
|
"error": ("Error building Component. Please check if you are importing Component correctly."), |
|
|
}, |
|
|
) |
|
|
try: |
|
|
if "inputs" in custom_component.template_config: |
|
|
return build_custom_component_template_from_inputs(custom_component, user_id=user_id) |
|
|
frontend_node = CustomComponentFrontendNode(**custom_component.template_config) |
|
|
|
|
|
field_config, custom_instance = run_build_config( |
|
|
custom_component, |
|
|
user_id=user_id, |
|
|
) |
|
|
|
|
|
entrypoint_args = custom_component.get_function_entrypoint_args |
|
|
|
|
|
add_extra_fields(frontend_node, field_config, entrypoint_args) |
|
|
|
|
|
frontend_node = add_code_field(frontend_node, custom_component._code) |
|
|
|
|
|
add_base_classes(frontend_node, custom_component._get_function_entrypoint_return_type) |
|
|
add_output_types(frontend_node, custom_component._get_function_entrypoint_return_type) |
|
|
|
|
|
reorder_fields(frontend_node, custom_instance._get_field_order()) |
|
|
|
|
|
return frontend_node.to_dict(keep_name=False), custom_instance |
|
|
except Exception as exc: |
|
|
if isinstance(exc, HTTPException): |
|
|
raise |
|
|
raise HTTPException( |
|
|
status_code=400, |
|
|
detail={ |
|
|
"error": (f"Error building Component: {exc}"), |
|
|
"traceback": traceback.format_exc(), |
|
|
}, |
|
|
) from exc |
|
|
|
|
|
|
|
|
def create_component_template(component): |
|
|
"""Create a template for a component.""" |
|
|
component_code = component["code"] |
|
|
component_output_types = component["output_types"] |
|
|
|
|
|
component_extractor = Component(_code=component_code) |
|
|
|
|
|
component_template, component_instance = build_custom_component_template(component_extractor) |
|
|
if not component_template["output_types"] and component_output_types: |
|
|
component_template["output_types"] = component_output_types |
|
|
|
|
|
return component_template, component_instance |
|
|
|
|
|
|
|
|
def build_custom_components(components_paths: list[str]): |
|
|
"""Build custom components from the specified paths.""" |
|
|
if not components_paths: |
|
|
return {} |
|
|
|
|
|
logger.info(f"Building custom components from {components_paths}") |
|
|
custom_components_from_file: dict = {} |
|
|
processed_paths = set() |
|
|
for path in components_paths: |
|
|
path_str = str(path) |
|
|
if path_str in processed_paths: |
|
|
continue |
|
|
|
|
|
custom_component_dict = build_custom_component_list_from_path(path_str) |
|
|
if custom_component_dict: |
|
|
category = next(iter(custom_component_dict)) |
|
|
logger.info(f"Loading {len(custom_component_dict[category])} component(s) from category {category}") |
|
|
custom_components_from_file = merge_nested_dicts_with_renaming( |
|
|
custom_components_from_file, custom_component_dict |
|
|
) |
|
|
processed_paths.add(path_str) |
|
|
|
|
|
return custom_components_from_file |
|
|
|
|
|
|
|
|
async def abuild_custom_components(components_paths: list[str]): |
|
|
"""Build custom components from the specified paths.""" |
|
|
if not components_paths: |
|
|
return {} |
|
|
|
|
|
logger.info(f"Building custom components from {components_paths}") |
|
|
custom_components_from_file: dict = {} |
|
|
processed_paths = set() |
|
|
for path in components_paths: |
|
|
path_str = str(path) |
|
|
if path_str in processed_paths: |
|
|
continue |
|
|
|
|
|
custom_component_dict = await abuild_custom_component_list_from_path(path_str) |
|
|
if custom_component_dict: |
|
|
category = next(iter(custom_component_dict)) |
|
|
logger.info(f"Loading {len(custom_component_dict[category])} component(s) from category {category}") |
|
|
custom_components_from_file = merge_nested_dicts_with_renaming( |
|
|
custom_components_from_file, custom_component_dict |
|
|
) |
|
|
processed_paths.add(path_str) |
|
|
|
|
|
return custom_components_from_file |
|
|
|
|
|
|
|
|
def update_field_dict( |
|
|
custom_component_instance: "CustomComponent", |
|
|
field_dict: dict, |
|
|
build_config: dict, |
|
|
*, |
|
|
update_field: str | None = None, |
|
|
update_field_value: Any | None = None, |
|
|
call: bool = False, |
|
|
): |
|
|
"""Update the field dictionary by calling options() or value() if they are callable.""" |
|
|
if ( |
|
|
("real_time_refresh" in field_dict or "refresh_button" in field_dict) |
|
|
and any( |
|
|
( |
|
|
field_dict.get("real_time_refresh", False), |
|
|
field_dict.get("refresh_button", False), |
|
|
) |
|
|
) |
|
|
and call |
|
|
): |
|
|
try: |
|
|
dd_build_config = dotdict(build_config) |
|
|
custom_component_instance.update_build_config( |
|
|
build_config=dd_build_config, |
|
|
field_value=update_field, |
|
|
field_name=update_field_value, |
|
|
) |
|
|
build_config = dd_build_config |
|
|
except Exception as exc: |
|
|
msg = f"Error while running update_build_config: {exc}" |
|
|
logger.exception(msg) |
|
|
raise UpdateBuildConfigError(msg) from exc |
|
|
|
|
|
return build_config |
|
|
|
|
|
|
|
|
def sanitize_field_config(field_config: dict | Input): |
|
|
|
|
|
field_dict = field_config.to_dict() if isinstance(field_config, Input) else field_config |
|
|
for key in [ |
|
|
"name", |
|
|
"field_type", |
|
|
"value", |
|
|
"required", |
|
|
"placeholder", |
|
|
"display_name", |
|
|
"advanced", |
|
|
"show", |
|
|
]: |
|
|
field_dict.pop(key, None) |
|
|
|
|
|
|
|
|
field_dict.pop("field_type", None) |
|
|
field_dict.pop("type", None) |
|
|
|
|
|
return field_dict |
|
|
|
|
|
|
|
|
def build_component(component): |
|
|
"""Build a single component.""" |
|
|
component_template, component_instance = create_component_template(component) |
|
|
component_name = get_instance_name(component_instance) |
|
|
return component_name, component_template |
|
|
|
|
|
|
|
|
def get_function(code): |
|
|
"""Get the function.""" |
|
|
function_name = validate.extract_function_name(code) |
|
|
|
|
|
return validate.create_function(code, function_name) |
|
|
|
|
|
|
|
|
def get_instance_name(instance): |
|
|
name = instance.__class__.__name__ |
|
|
if hasattr(instance, "name") and instance.name: |
|
|
name = instance.name |
|
|
return name |
|
|
|