|
|
from __future__ import annotations |
|
|
|
|
|
import inspect |
|
|
import os |
|
|
import warnings |
|
|
from typing import TYPE_CHECKING, Any |
|
|
|
|
|
import orjson |
|
|
from loguru import logger |
|
|
from pydantic import PydanticDeprecatedSince20 |
|
|
|
|
|
from langflow.custom.eval import eval_custom_component_code |
|
|
from langflow.schema import Data |
|
|
from langflow.schema.artifact import get_artifact_type, post_process_raw |
|
|
from langflow.services.deps import get_tracing_service |
|
|
|
|
|
if TYPE_CHECKING: |
|
|
from langflow.custom import Component, CustomComponent |
|
|
from langflow.events.event_manager import EventManager |
|
|
from langflow.graph.vertex.base import Vertex |
|
|
|
|
|
|
|
|
def instantiate_class( |
|
|
vertex: Vertex, |
|
|
user_id=None, |
|
|
event_manager: EventManager | None = None, |
|
|
) -> Any: |
|
|
"""Instantiate class from module type and key, and params.""" |
|
|
vertex_type = vertex.vertex_type |
|
|
base_type = vertex.base_type |
|
|
logger.debug(f"Instantiating {vertex_type} of type {base_type}") |
|
|
|
|
|
if not base_type: |
|
|
msg = "No base type provided for vertex" |
|
|
raise ValueError(msg) |
|
|
|
|
|
custom_params = get_params(vertex.params) |
|
|
code = custom_params.pop("code") |
|
|
class_object: type[CustomComponent | Component] = eval_custom_component_code(code) |
|
|
custom_component: CustomComponent | Component = class_object( |
|
|
_user_id=user_id, |
|
|
_parameters=custom_params, |
|
|
_vertex=vertex, |
|
|
_tracing_service=get_tracing_service(), |
|
|
_id=vertex.id, |
|
|
) |
|
|
if hasattr(custom_component, "set_event_manager"): |
|
|
custom_component.set_event_manager(event_manager) |
|
|
return custom_component, custom_params |
|
|
|
|
|
|
|
|
async def get_instance_results( |
|
|
custom_component, |
|
|
custom_params: dict, |
|
|
vertex: Vertex, |
|
|
*, |
|
|
fallback_to_env_vars: bool = False, |
|
|
base_type: str = "component", |
|
|
): |
|
|
custom_params = update_params_with_load_from_db_fields( |
|
|
custom_component, custom_params, vertex.load_from_db_fields, fallback_to_env_vars=fallback_to_env_vars |
|
|
) |
|
|
with warnings.catch_warnings(): |
|
|
warnings.filterwarnings("ignore", category=PydanticDeprecatedSince20) |
|
|
if base_type == "custom_components": |
|
|
return await build_custom_component(params=custom_params, custom_component=custom_component) |
|
|
if base_type == "component": |
|
|
return await build_component(params=custom_params, custom_component=custom_component) |
|
|
msg = f"Base type {base_type} not found." |
|
|
raise ValueError(msg) |
|
|
|
|
|
|
|
|
def get_params(vertex_params): |
|
|
params = vertex_params |
|
|
params = convert_params_to_sets(params) |
|
|
params = convert_kwargs(params) |
|
|
return params.copy() |
|
|
|
|
|
|
|
|
def convert_params_to_sets(params): |
|
|
"""Convert certain params to sets.""" |
|
|
if "allowed_special" in params: |
|
|
params["allowed_special"] = set(params["allowed_special"]) |
|
|
if "disallowed_special" in params: |
|
|
params["disallowed_special"] = set(params["disallowed_special"]) |
|
|
return params |
|
|
|
|
|
|
|
|
def convert_kwargs(params): |
|
|
|
|
|
items_to_remove = [] |
|
|
for key, value in params.items(): |
|
|
if ("kwargs" in key or "config" in key) and isinstance(value, str): |
|
|
try: |
|
|
params[key] = orjson.loads(value) |
|
|
except orjson.JSONDecodeError: |
|
|
items_to_remove.append(key) |
|
|
|
|
|
|
|
|
for key in items_to_remove: |
|
|
params.pop(key, None) |
|
|
|
|
|
return params |
|
|
|
|
|
|
|
|
def update_params_with_load_from_db_fields( |
|
|
custom_component: CustomComponent, |
|
|
params, |
|
|
load_from_db_fields, |
|
|
*, |
|
|
fallback_to_env_vars=False, |
|
|
): |
|
|
for field in load_from_db_fields: |
|
|
if field not in params: |
|
|
continue |
|
|
|
|
|
try: |
|
|
key = custom_component.variables(params[field], field) |
|
|
except ValueError as e: |
|
|
if any(reason in str(e) for reason in ["User id is not set", "variable not found."]): |
|
|
raise |
|
|
logger.debug(str(e)) |
|
|
key = None |
|
|
|
|
|
if fallback_to_env_vars and key is None: |
|
|
key = os.getenv(params[field]) |
|
|
if key: |
|
|
logger.info(f"Using environment variable {params[field]} for {field}") |
|
|
else: |
|
|
logger.error(f"Environment variable {params[field]} is not set.") |
|
|
|
|
|
params[field] = key if key is not None else None |
|
|
if key is None: |
|
|
logger.warning(f"Could not get value for {field}. Setting it to None.") |
|
|
|
|
|
return params |
|
|
|
|
|
|
|
|
async def build_component( |
|
|
params: dict, |
|
|
custom_component: Component, |
|
|
): |
|
|
|
|
|
custom_component.set_attributes(params) |
|
|
build_results, artifacts = await custom_component.build_results() |
|
|
|
|
|
return custom_component, build_results, artifacts |
|
|
|
|
|
|
|
|
async def build_custom_component(params: dict, custom_component: CustomComponent): |
|
|
if "retriever" in params and hasattr(params["retriever"], "as_retriever"): |
|
|
params["retriever"] = params["retriever"].as_retriever() |
|
|
|
|
|
|
|
|
is_async = inspect.iscoroutinefunction(custom_component.build) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if is_async: |
|
|
|
|
|
build_result = await custom_component.build(**params) |
|
|
else: |
|
|
|
|
|
build_result = custom_component.build(**params) |
|
|
custom_repr = custom_component.custom_repr() |
|
|
if custom_repr is None and isinstance(build_result, dict | Data | str): |
|
|
custom_repr = build_result |
|
|
if not isinstance(custom_repr, str): |
|
|
custom_repr = str(custom_repr) |
|
|
raw = custom_component.repr_value |
|
|
if hasattr(raw, "data") and raw is not None: |
|
|
raw = raw.data |
|
|
|
|
|
elif hasattr(raw, "model_dump") and raw is not None: |
|
|
raw = raw.model_dump() |
|
|
if raw is None and isinstance(build_result, dict | Data | str): |
|
|
raw = build_result.data if isinstance(build_result, Data) else build_result |
|
|
|
|
|
artifact_type = get_artifact_type(custom_component.repr_value or raw, build_result) |
|
|
raw = post_process_raw(raw, artifact_type) |
|
|
artifact = {"repr": custom_repr, "raw": raw, "type": artifact_type} |
|
|
|
|
|
if custom_component._vertex is not None: |
|
|
custom_component._artifacts = {custom_component._vertex.outputs[0].get("name"): artifact} |
|
|
custom_component._results = {custom_component._vertex.outputs[0].get("name"): build_result} |
|
|
return custom_component, build_result, artifact |
|
|
|
|
|
msg = "Custom component does not have a vertex" |
|
|
raise ValueError(msg) |
|
|
|