|
|
from __future__ import annotations |
|
|
|
|
|
import ast |
|
|
import asyncio |
|
|
import inspect |
|
|
import os |
|
|
import traceback |
|
|
import types |
|
|
from collections.abc import AsyncIterator, Callable, Iterator, Mapping |
|
|
from enum import Enum |
|
|
from typing import TYPE_CHECKING, Any |
|
|
|
|
|
import pandas as pd |
|
|
from loguru import logger |
|
|
|
|
|
from langflow.exceptions.component import ComponentBuildError |
|
|
from langflow.graph.schema import INPUT_COMPONENTS, OUTPUT_COMPONENTS, InterfaceComponentTypes, ResultData |
|
|
from langflow.graph.utils import UnbuiltObject, UnbuiltResult, log_transaction |
|
|
from langflow.interface import initialize |
|
|
from langflow.interface.listing import lazy_load_dict |
|
|
from langflow.schema.artifact import ArtifactType |
|
|
from langflow.schema.data import Data |
|
|
from langflow.schema.message import Message |
|
|
from langflow.schema.schema import INPUT_FIELD_NAME, OutputValue, build_output_logs |
|
|
from langflow.services.deps import get_storage_service |
|
|
from langflow.utils.constants import DIRECT_TYPES |
|
|
from langflow.utils.schemas import ChatOutputResponse |
|
|
from langflow.utils.util import sync_to_async, unescape_string |
|
|
|
|
|
if TYPE_CHECKING: |
|
|
from uuid import UUID |
|
|
|
|
|
from langflow.custom import Component |
|
|
from langflow.events.event_manager import EventManager |
|
|
from langflow.graph.edge.base import CycleEdge, Edge |
|
|
from langflow.graph.graph.base import Graph |
|
|
from langflow.graph.vertex.schema import NodeData |
|
|
from langflow.services.tracing.schema import Log |
|
|
|
|
|
|
|
|
class VertexStates(str, Enum): |
|
|
"""Vertex are related to it being active, inactive, or in an error state.""" |
|
|
|
|
|
ACTIVE = "ACTIVE" |
|
|
INACTIVE = "INACTIVE" |
|
|
ERROR = "ERROR" |
|
|
|
|
|
|
|
|
class Vertex: |
|
|
def __init__( |
|
|
self, |
|
|
data: NodeData, |
|
|
graph: Graph, |
|
|
*, |
|
|
base_type: str | None = None, |
|
|
is_task: bool = False, |
|
|
params: dict | None = None, |
|
|
) -> None: |
|
|
|
|
|
|
|
|
self._lock = asyncio.Lock() |
|
|
self.will_stream = False |
|
|
self.updated_raw_params = False |
|
|
self.id: str = data["id"] |
|
|
self.base_name = self.id.split("-")[0] |
|
|
self.is_state = False |
|
|
self.is_input = any(input_component_name in self.id for input_component_name in INPUT_COMPONENTS) |
|
|
self.is_output = any(output_component_name in self.id for output_component_name in OUTPUT_COMPONENTS) |
|
|
self.has_session_id = None |
|
|
self.custom_component = None |
|
|
self.has_external_input = False |
|
|
self.has_external_output = False |
|
|
self.graph = graph |
|
|
self.full_data = data.copy() |
|
|
self.base_type: str | None = base_type |
|
|
self.outputs: list[dict] = [] |
|
|
self.parse_data() |
|
|
self.built_object: Any = UnbuiltObject() |
|
|
self.built_result: Any = None |
|
|
self.built = False |
|
|
self._successors_ids: list[str] | None = None |
|
|
self.artifacts: dict[str, Any] = {} |
|
|
self.artifacts_raw: dict[str, Any] = {} |
|
|
self.artifacts_type: dict[str, str] = {} |
|
|
self.steps: list[Callable] = [self._build] |
|
|
self.steps_ran: list[Callable] = [] |
|
|
self.task_id: str | None = None |
|
|
self.is_task = is_task |
|
|
self.params = params or {} |
|
|
self.parent_node_id: str | None = self.full_data.get("parent_node_id") |
|
|
self.load_from_db_fields: list[str] = [] |
|
|
self.parent_is_top_level = False |
|
|
self.layer = None |
|
|
self.result: ResultData | None = None |
|
|
self.results: dict[str, Any] = {} |
|
|
self.outputs_logs: dict[str, OutputValue] = {} |
|
|
self.logs: dict[str, list[Log]] = {} |
|
|
self.has_cycle_edges = False |
|
|
try: |
|
|
self.is_interface_component = self.vertex_type in InterfaceComponentTypes |
|
|
except ValueError: |
|
|
self.is_interface_component = False |
|
|
|
|
|
self.use_result = False |
|
|
self.build_times: list[float] = [] |
|
|
self.state = VertexStates.ACTIVE |
|
|
self.log_transaction_tasks: set[asyncio.Task] = set() |
|
|
|
|
|
def set_input_value(self, name: str, value: Any) -> None: |
|
|
if self.custom_component is None: |
|
|
msg = f"Vertex {self.id} does not have a component instance." |
|
|
raise ValueError(msg) |
|
|
self.custom_component._set_input_value(name, value) |
|
|
|
|
|
def to_data(self): |
|
|
return self.full_data |
|
|
|
|
|
def add_component_instance(self, component_instance: Component) -> None: |
|
|
component_instance.set_vertex(self) |
|
|
self.custom_component = component_instance |
|
|
|
|
|
def add_result(self, name: str, result: Any) -> None: |
|
|
self.results[name] = result |
|
|
|
|
|
def update_graph_state(self, key, new_state, *, append: bool) -> None: |
|
|
if append: |
|
|
self.graph.append_state(key, new_state, caller=self.id) |
|
|
else: |
|
|
self.graph.update_state(key, new_state, caller=self.id) |
|
|
|
|
|
def set_state(self, state: str) -> None: |
|
|
self.state = VertexStates[state] |
|
|
if self.state == VertexStates.INACTIVE and self.graph.in_degree_map[self.id] <= 1: |
|
|
|
|
|
|
|
|
self.graph.inactivated_vertices.add(self.id) |
|
|
elif self.state == VertexStates.ACTIVE and self.id in self.graph.inactivated_vertices: |
|
|
self.graph.inactivated_vertices.remove(self.id) |
|
|
|
|
|
def is_active(self): |
|
|
return self.state == VertexStates.ACTIVE |
|
|
|
|
|
@property |
|
|
def avg_build_time(self): |
|
|
return sum(self.build_times) / len(self.build_times) if self.build_times else 0 |
|
|
|
|
|
def add_build_time(self, time) -> None: |
|
|
self.build_times.append(time) |
|
|
|
|
|
def set_result(self, result: ResultData) -> None: |
|
|
self.result = result |
|
|
|
|
|
def get_built_result(self): |
|
|
|
|
|
|
|
|
|
|
|
if self.is_interface_component and not isinstance(self.built_object, UnbuiltObject): |
|
|
result = self.built_object |
|
|
|
|
|
|
|
|
if not isinstance(result, dict | str) and hasattr(result, "content"): |
|
|
return result.content |
|
|
return result |
|
|
if isinstance(self.built_object, str): |
|
|
self.built_result = self.built_object |
|
|
|
|
|
if isinstance(self.built_result, UnbuiltResult): |
|
|
return {} |
|
|
return self.built_result if isinstance(self.built_result, dict) else {"result": self.built_result} |
|
|
|
|
|
def set_artifacts(self) -> None: |
|
|
pass |
|
|
|
|
|
@property |
|
|
def edges(self) -> list[CycleEdge]: |
|
|
return self.graph.get_vertex_edges(self.id) |
|
|
|
|
|
@property |
|
|
def outgoing_edges(self) -> list[CycleEdge]: |
|
|
return [edge for edge in self.edges if edge.source_id == self.id] |
|
|
|
|
|
@property |
|
|
def incoming_edges(self) -> list[CycleEdge]: |
|
|
return [edge for edge in self.edges if edge.target_id == self.id] |
|
|
|
|
|
@property |
|
|
def edges_source_names(self) -> set[str | None]: |
|
|
return {edge.source_handle.name for edge in self.edges} |
|
|
|
|
|
@property |
|
|
def predecessors(self) -> list[Vertex]: |
|
|
return self.graph.get_predecessors(self) |
|
|
|
|
|
@property |
|
|
def successors(self) -> list[Vertex]: |
|
|
return self.graph.get_successors(self) |
|
|
|
|
|
@property |
|
|
def successors_ids(self) -> list[str]: |
|
|
return self.graph.successor_map.get(self.id, []) |
|
|
|
|
|
def __getstate__(self): |
|
|
state = self.__dict__.copy() |
|
|
state["_lock"] = None |
|
|
state["built_object"] = None if isinstance(self.built_object, UnbuiltObject) else self.built_object |
|
|
state["built_result"] = None if isinstance(self.built_result, UnbuiltResult) else self.built_result |
|
|
return state |
|
|
|
|
|
def __setstate__(self, state): |
|
|
self.__dict__.update(state) |
|
|
self._lock = asyncio.Lock() |
|
|
self.built_object = state.get("built_object") or UnbuiltObject() |
|
|
self.built_result = state.get("built_result") or UnbuiltResult() |
|
|
|
|
|
def set_top_level(self, top_level_vertices: list[str]) -> None: |
|
|
self.parent_is_top_level = self.parent_node_id in top_level_vertices |
|
|
|
|
|
def parse_data(self) -> None: |
|
|
self.data = self.full_data["data"] |
|
|
if self.data["node"]["template"]["_type"] == "Component": |
|
|
if "outputs" not in self.data["node"]: |
|
|
msg = f"Outputs not found for {self.display_name}" |
|
|
raise ValueError(msg) |
|
|
self.outputs = self.data["node"]["outputs"] |
|
|
else: |
|
|
self.outputs = self.data["node"].get("outputs", []) |
|
|
self.output = self.data["node"]["base_classes"] |
|
|
|
|
|
self.display_name: str = self.data["node"].get("display_name", self.id.split("-")[0]) |
|
|
self.icon: str = self.data["node"].get("icon", self.id.split("-")[0]) |
|
|
|
|
|
self.description: str = self.data["node"].get("description", "") |
|
|
self.frozen: bool = self.data["node"].get("frozen", False) |
|
|
|
|
|
self.is_input = self.data["node"].get("is_input") or self.is_input |
|
|
self.is_output = self.data["node"].get("is_output") or self.is_output |
|
|
template_dicts = {key: value for key, value in self.data["node"]["template"].items() if isinstance(value, dict)} |
|
|
|
|
|
self.has_session_id = "session_id" in template_dicts |
|
|
|
|
|
self.required_inputs: list[str] = [] |
|
|
self.optional_inputs: list[str] = [] |
|
|
for value_dict in template_dicts.values(): |
|
|
list_to_append = self.required_inputs if value_dict.get("required") else self.optional_inputs |
|
|
|
|
|
if "type" in value_dict: |
|
|
list_to_append.append(value_dict["type"]) |
|
|
if "input_types" in value_dict: |
|
|
list_to_append.extend(value_dict["input_types"]) |
|
|
|
|
|
template_dict = self.data["node"]["template"] |
|
|
self.vertex_type = ( |
|
|
self.data["type"] |
|
|
if "Tool" not in [type_ for out in self.outputs for type_ in out["types"]] |
|
|
or template_dict["_type"].islower() |
|
|
else template_dict["_type"] |
|
|
) |
|
|
|
|
|
if self.base_type is None: |
|
|
for base_type, value in lazy_load_dict.all_types_dict.items(): |
|
|
if self.vertex_type in value: |
|
|
self.base_type = base_type |
|
|
break |
|
|
|
|
|
def get_value_from_template_dict(self, key: str): |
|
|
template_dict = self.data.get("node", {}).get("template", {}) |
|
|
if key not in template_dict: |
|
|
msg = f"Key {key} not found in template dict" |
|
|
raise ValueError(msg) |
|
|
return template_dict.get(key, {}).get("value") |
|
|
|
|
|
def get_task(self): |
|
|
|
|
|
|
|
|
from celery.result import AsyncResult |
|
|
|
|
|
return AsyncResult(self.task_id) |
|
|
|
|
|
def _set_params_from_normal_edge(self, params: dict, edge: Edge, template_dict: dict): |
|
|
param_key = edge.target_param |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if param_key in template_dict and edge.target_id == self.id: |
|
|
if template_dict[param_key].get("list"): |
|
|
if param_key not in params: |
|
|
params[param_key] = [] |
|
|
params[param_key].append(self.graph.get_vertex(edge.source_id)) |
|
|
elif edge.target_id == self.id: |
|
|
if isinstance(template_dict[param_key].get("value"), dict): |
|
|
|
|
|
|
|
|
param_dict = template_dict[param_key]["value"] |
|
|
if not param_dict or len(param_dict) != 1: |
|
|
params[param_key] = self.graph.get_vertex(edge.source_id) |
|
|
else: |
|
|
params[param_key] = {key: self.graph.get_vertex(edge.source_id) for key in param_dict} |
|
|
|
|
|
else: |
|
|
params[param_key] = self.graph.get_vertex(edge.source_id) |
|
|
return params |
|
|
|
|
|
def build_params(self) -> None: |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if self.graph is None: |
|
|
msg = "Graph not found" |
|
|
raise ValueError(msg) |
|
|
|
|
|
if self.updated_raw_params: |
|
|
self.updated_raw_params = False |
|
|
return |
|
|
|
|
|
template_dict = {key: value for key, value in self.data["node"]["template"].items() if isinstance(value, dict)} |
|
|
params: dict = {} |
|
|
|
|
|
for edge in self.edges: |
|
|
if not hasattr(edge, "target_param"): |
|
|
continue |
|
|
params = self._set_params_from_normal_edge(params, edge, template_dict) |
|
|
|
|
|
load_from_db_fields = [] |
|
|
for field_name, field in template_dict.items(): |
|
|
if field_name in params: |
|
|
continue |
|
|
|
|
|
|
|
|
if field_name == "_type" or (not field.get("show") and field_name != "code"): |
|
|
continue |
|
|
|
|
|
|
|
|
if field.get("type") == "file": |
|
|
|
|
|
|
|
|
|
|
|
if file_path := field.get("file_path"): |
|
|
storage_service = get_storage_service() |
|
|
try: |
|
|
flow_id, file_name = os.path.split(file_path) |
|
|
full_path = storage_service.build_full_path(flow_id, file_name) |
|
|
except ValueError as e: |
|
|
if "too many values to unpack" in str(e): |
|
|
full_path = file_path |
|
|
else: |
|
|
raise |
|
|
params[field_name] = full_path |
|
|
elif field.get("required"): |
|
|
field_display_name = field.get("display_name") |
|
|
logger.warning( |
|
|
f"File path not found for {field_display_name} in component {self.display_name}. " |
|
|
"Setting to None." |
|
|
) |
|
|
params[field_name] = None |
|
|
elif field["list"]: |
|
|
params[field_name] = [] |
|
|
else: |
|
|
params[field_name] = None |
|
|
|
|
|
elif field.get("type") in DIRECT_TYPES and params.get(field_name) is None: |
|
|
val = field.get("value") |
|
|
if field.get("type") == "code": |
|
|
try: |
|
|
if field_name == "code": |
|
|
params[field_name] = val |
|
|
else: |
|
|
params[field_name] = ast.literal_eval(val) if val else None |
|
|
except Exception: |
|
|
logger.debug(f"Error evaluating code for {field_name}") |
|
|
params[field_name] = val |
|
|
elif field.get("type") in {"dict", "NestedDict"}: |
|
|
|
|
|
|
|
|
|
|
|
if isinstance(val, list): |
|
|
params[field_name] = {k: v for item in field.get("value", []) for k, v in item.items()} |
|
|
elif isinstance(val, dict): |
|
|
params[field_name] = val |
|
|
elif field.get("type") == "int" and val is not None: |
|
|
try: |
|
|
params[field_name] = int(val) |
|
|
except ValueError: |
|
|
params[field_name] = val |
|
|
elif field.get("type") == "float" and val is not None: |
|
|
try: |
|
|
params[field_name] = float(val) |
|
|
except ValueError: |
|
|
params[field_name] = val |
|
|
params[field_name] = val |
|
|
elif field.get("type") == "str" and val is not None: |
|
|
|
|
|
|
|
|
if isinstance(val, list): |
|
|
params[field_name] = [unescape_string(v) for v in val] |
|
|
elif isinstance(val, str): |
|
|
params[field_name] = unescape_string(val) |
|
|
elif isinstance(val, Data): |
|
|
params[field_name] = unescape_string(val.get_text()) |
|
|
elif field.get("type") == "bool" and val is not None: |
|
|
if isinstance(val, bool): |
|
|
params[field_name] = val |
|
|
elif isinstance(val, str): |
|
|
params[field_name] = bool(val) |
|
|
elif field.get("type") == "table" and val is not None: |
|
|
|
|
|
|
|
|
if isinstance(val, list) and all(isinstance(item, dict) for item in val): |
|
|
params[field_name] = pd.DataFrame(val) |
|
|
else: |
|
|
msg = f"Invalid value type {type(val)} for field {field_name}" |
|
|
raise ValueError(msg) |
|
|
elif val is not None and val != "": |
|
|
params[field_name] = val |
|
|
|
|
|
if field.get("load_from_db"): |
|
|
load_from_db_fields.append(field_name) |
|
|
|
|
|
if not field.get("required") and params.get(field_name) is None: |
|
|
if field.get("default"): |
|
|
params[field_name] = field.get("default") |
|
|
else: |
|
|
params.pop(field_name, None) |
|
|
|
|
|
self.params = params |
|
|
self.load_from_db_fields = load_from_db_fields |
|
|
self.raw_params = params.copy() |
|
|
|
|
|
def update_raw_params(self, new_params: Mapping[str, str | list[str]], *, overwrite: bool = False) -> None: |
|
|
"""Update the raw parameters of the vertex with the given new parameters. |
|
|
|
|
|
Args: |
|
|
new_params (Dict[str, Any]): The new parameters to update. |
|
|
overwrite (bool, optional): Whether to overwrite the existing parameters. |
|
|
Defaults to False. |
|
|
|
|
|
Raises: |
|
|
ValueError: If any key in new_params is not found in self.raw_params. |
|
|
""" |
|
|
|
|
|
if not new_params: |
|
|
return |
|
|
if any(isinstance(self.raw_params.get(key), Vertex) for key in new_params): |
|
|
return |
|
|
if not overwrite: |
|
|
for key in new_params.copy(): |
|
|
if key not in self.raw_params: |
|
|
new_params.pop(key) |
|
|
self.raw_params.update(new_params) |
|
|
self.params = self.raw_params.copy() |
|
|
self.updated_raw_params = True |
|
|
|
|
|
def instantiate_component(self, user_id=None) -> None: |
|
|
if not self.custom_component: |
|
|
self.custom_component, _ = initialize.loading.instantiate_class( |
|
|
user_id=user_id, |
|
|
vertex=self, |
|
|
) |
|
|
|
|
|
async def _build( |
|
|
self, |
|
|
fallback_to_env_vars, |
|
|
user_id=None, |
|
|
event_manager: EventManager | None = None, |
|
|
) -> None: |
|
|
"""Initiate the build process.""" |
|
|
logger.debug(f"Building {self.display_name}") |
|
|
await self._build_each_vertex_in_params_dict() |
|
|
|
|
|
if self.base_type is None: |
|
|
msg = f"Base type for vertex {self.display_name} not found" |
|
|
raise ValueError(msg) |
|
|
|
|
|
if not self.custom_component: |
|
|
custom_component, custom_params = initialize.loading.instantiate_class( |
|
|
user_id=user_id, vertex=self, event_manager=event_manager |
|
|
) |
|
|
else: |
|
|
custom_component = self.custom_component |
|
|
if hasattr(self.custom_component, "set_event_manager"): |
|
|
self.custom_component.set_event_manager(event_manager) |
|
|
custom_params = initialize.loading.get_params(self.params) |
|
|
|
|
|
await self._build_results( |
|
|
custom_component=custom_component, |
|
|
custom_params=custom_params, |
|
|
fallback_to_env_vars=fallback_to_env_vars, |
|
|
base_type=self.base_type, |
|
|
) |
|
|
|
|
|
self._validate_built_object() |
|
|
|
|
|
self.built = True |
|
|
|
|
|
def extract_messages_from_artifacts(self, artifacts: dict[str, Any]) -> list[dict]: |
|
|
"""Extracts messages from the artifacts. |
|
|
|
|
|
Args: |
|
|
artifacts (Dict[str, Any]): The artifacts to extract messages from. |
|
|
|
|
|
Returns: |
|
|
List[str]: The extracted messages. |
|
|
""" |
|
|
try: |
|
|
text = artifacts["text"] |
|
|
sender = artifacts.get("sender") |
|
|
sender_name = artifacts.get("sender_name") |
|
|
session_id = artifacts.get("session_id") |
|
|
stream_url = artifacts.get("stream_url") |
|
|
files = [{"path": file} if isinstance(file, str) else file for file in artifacts.get("files", [])] |
|
|
component_id = self.id |
|
|
type_ = self.artifacts_type |
|
|
|
|
|
if isinstance(sender_name, Data | Message): |
|
|
sender_name = sender_name.get_text() |
|
|
|
|
|
messages = [ |
|
|
ChatOutputResponse( |
|
|
message=text, |
|
|
sender=sender, |
|
|
sender_name=sender_name, |
|
|
session_id=session_id, |
|
|
stream_url=stream_url, |
|
|
files=files, |
|
|
component_id=component_id, |
|
|
type=type_, |
|
|
).model_dump(exclude_none=True) |
|
|
] |
|
|
except KeyError: |
|
|
messages = [] |
|
|
|
|
|
return messages |
|
|
|
|
|
def finalize_build(self) -> None: |
|
|
result_dict = self.get_built_result() |
|
|
|
|
|
|
|
|
self.set_artifacts() |
|
|
artifacts = self.artifacts_raw |
|
|
messages = self.extract_messages_from_artifacts(artifacts) if isinstance(artifacts, dict) else [] |
|
|
result_dict = ResultData( |
|
|
results=result_dict, |
|
|
artifacts=artifacts, |
|
|
outputs=self.outputs_logs, |
|
|
logs=self.logs, |
|
|
messages=messages, |
|
|
component_display_name=self.display_name, |
|
|
component_id=self.id, |
|
|
) |
|
|
self.set_result(result_dict) |
|
|
|
|
|
async def _build_each_vertex_in_params_dict(self) -> None: |
|
|
"""Iterates over each vertex in the params dictionary and builds it.""" |
|
|
for key, value in self.raw_params.items(): |
|
|
if self._is_vertex(value): |
|
|
if value == self: |
|
|
del self.params[key] |
|
|
continue |
|
|
await self._build_vertex_and_update_params( |
|
|
key, |
|
|
value, |
|
|
) |
|
|
elif isinstance(value, list) and self._is_list_of_vertices(value): |
|
|
await self._build_list_of_vertices_and_update_params(key, value) |
|
|
elif isinstance(value, dict): |
|
|
await self._build_dict_and_update_params( |
|
|
key, |
|
|
value, |
|
|
) |
|
|
elif key not in self.params or self.updated_raw_params: |
|
|
self.params[key] = value |
|
|
|
|
|
async def _build_dict_and_update_params( |
|
|
self, |
|
|
key, |
|
|
vertices_dict: dict[str, Vertex], |
|
|
) -> None: |
|
|
"""Iterates over a dictionary of vertices, builds each and updates the params dictionary.""" |
|
|
for sub_key, value in vertices_dict.items(): |
|
|
if not self._is_vertex(value): |
|
|
self.params[key][sub_key] = value |
|
|
else: |
|
|
result = await value.get_result(self, target_handle_name=key) |
|
|
self.params[key][sub_key] = result |
|
|
|
|
|
def _is_vertex(self, value): |
|
|
"""Checks if the provided value is an instance of Vertex.""" |
|
|
return isinstance(value, Vertex) |
|
|
|
|
|
def _is_list_of_vertices(self, value): |
|
|
"""Checks if the provided value is a list of Vertex instances.""" |
|
|
return all(self._is_vertex(vertex) for vertex in value) |
|
|
|
|
|
async def get_result(self, requester: Vertex, target_handle_name: str | None = None) -> Any: |
|
|
"""Retrieves the result of the vertex. |
|
|
|
|
|
This is a read-only method so it raises an error if the vertex has not been built yet. |
|
|
|
|
|
Returns: |
|
|
The result of the vertex. |
|
|
""" |
|
|
async with self._lock: |
|
|
return await self._get_result(requester, target_handle_name) |
|
|
|
|
|
def _log_transaction_async( |
|
|
self, flow_id: str | UUID, source: Vertex, status, target: Vertex | None = None, error=None |
|
|
) -> None: |
|
|
task = asyncio.create_task(log_transaction(flow_id, source, status, target, error)) |
|
|
self.log_transaction_tasks.add(task) |
|
|
task.add_done_callback(self.log_transaction_tasks.discard) |
|
|
|
|
|
async def _get_result( |
|
|
self, |
|
|
requester: Vertex, |
|
|
target_handle_name: str | None = None, |
|
|
) -> Any: |
|
|
"""Retrieves the result of the built component. |
|
|
|
|
|
If the component has not been built yet, a ValueError is raised. |
|
|
|
|
|
Returns: |
|
|
The built result if use_result is True, else the built object. |
|
|
""" |
|
|
flow_id = self.graph.flow_id |
|
|
if not self.built: |
|
|
if flow_id: |
|
|
self._log_transaction_async(str(flow_id), source=self, target=requester, status="error") |
|
|
msg = f"Component {self.display_name} has not been built yet" |
|
|
raise ValueError(msg) |
|
|
|
|
|
result = self.built_result if self.use_result else self.built_object |
|
|
if flow_id: |
|
|
self._log_transaction_async(str(flow_id), source=self, target=requester, status="success") |
|
|
return result |
|
|
|
|
|
async def _build_vertex_and_update_params(self, key, vertex: Vertex) -> None: |
|
|
"""Builds a given vertex and updates the params dictionary accordingly.""" |
|
|
result = await vertex.get_result(self, target_handle_name=key) |
|
|
self._handle_func(key, result) |
|
|
if isinstance(result, list): |
|
|
self._extend_params_list_with_result(key, result) |
|
|
self.params[key] = result |
|
|
|
|
|
async def _build_list_of_vertices_and_update_params( |
|
|
self, |
|
|
key, |
|
|
vertices: list[Vertex], |
|
|
) -> None: |
|
|
"""Iterates over a list of vertices, builds each and updates the params dictionary.""" |
|
|
self.params[key] = [] |
|
|
for vertex in vertices: |
|
|
result = await vertex.get_result(self, target_handle_name=key) |
|
|
|
|
|
|
|
|
if not isinstance(self.params[key], list): |
|
|
self.params[key] = [self.params[key]] |
|
|
|
|
|
if isinstance(result, list): |
|
|
self.params[key].extend(result) |
|
|
else: |
|
|
try: |
|
|
if self.params[key] == result: |
|
|
continue |
|
|
|
|
|
self.params[key].append(result) |
|
|
except AttributeError as e: |
|
|
logger.exception(e) |
|
|
msg = ( |
|
|
f"Params {key} ({self.params[key]}) is not a list and cannot be extended with {result}" |
|
|
f"Error building Component {self.display_name}: \n\n{e}" |
|
|
) |
|
|
raise ValueError(msg) from e |
|
|
|
|
|
def _handle_func(self, key, result) -> None: |
|
|
"""Handles 'func' key by checking if the result is a function and setting it as coroutine.""" |
|
|
if key == "func": |
|
|
if not isinstance(result, types.FunctionType): |
|
|
if hasattr(result, "run"): |
|
|
result = result.run |
|
|
elif hasattr(result, "get_function"): |
|
|
result = result.get_function() |
|
|
elif inspect.iscoroutinefunction(result): |
|
|
self.params["coroutine"] = result |
|
|
else: |
|
|
self.params["coroutine"] = sync_to_async(result) |
|
|
|
|
|
def _extend_params_list_with_result(self, key, result) -> None: |
|
|
"""Extends a list in the params dictionary with the given result if it exists.""" |
|
|
if isinstance(self.params[key], list): |
|
|
self.params[key].extend(result) |
|
|
|
|
|
async def _build_results( |
|
|
self, custom_component, custom_params, base_type: str, *, fallback_to_env_vars=False |
|
|
) -> None: |
|
|
try: |
|
|
result = await initialize.loading.get_instance_results( |
|
|
custom_component=custom_component, |
|
|
custom_params=custom_params, |
|
|
vertex=self, |
|
|
fallback_to_env_vars=fallback_to_env_vars, |
|
|
base_type=base_type, |
|
|
) |
|
|
|
|
|
self.outputs_logs = build_output_logs(self, result) |
|
|
|
|
|
self._update_built_object_and_artifacts(result) |
|
|
except Exception as exc: |
|
|
tb = traceback.format_exc() |
|
|
logger.exception(exc) |
|
|
msg = f"Error building Component {self.display_name}: \n\n{exc}" |
|
|
raise ComponentBuildError(msg, tb) from exc |
|
|
|
|
|
def _update_built_object_and_artifacts(self, result: Any | tuple[Any, dict] | tuple[Component, Any, dict]) -> None: |
|
|
"""Updates the built object and its artifacts.""" |
|
|
if isinstance(result, tuple): |
|
|
if len(result) == 2: |
|
|
self.built_object, self.artifacts = result |
|
|
elif len(result) == 3: |
|
|
self.custom_component, self.built_object, self.artifacts = result |
|
|
self.logs = self.custom_component._output_logs |
|
|
self.artifacts_raw = self.artifacts.get("raw", None) |
|
|
self.artifacts_type = { |
|
|
self.outputs[0]["name"]: self.artifacts.get("type", None) or ArtifactType.UNKNOWN.value |
|
|
} |
|
|
self.artifacts = {self.outputs[0]["name"]: self.artifacts} |
|
|
else: |
|
|
self.built_object = result |
|
|
|
|
|
def _validate_built_object(self) -> None: |
|
|
"""Checks if the built object is None and raises a ValueError if so.""" |
|
|
if isinstance(self.built_object, UnbuiltObject): |
|
|
msg = f"{self.display_name}: {self.built_object_repr()}" |
|
|
raise TypeError(msg) |
|
|
if self.built_object is None: |
|
|
message = f"{self.display_name} returned None." |
|
|
if self.base_type == "custom_components": |
|
|
message += " Make sure your build method returns a component." |
|
|
|
|
|
logger.warning(message) |
|
|
elif isinstance(self.built_object, Iterator | AsyncIterator): |
|
|
if self.display_name == "Text Output": |
|
|
msg = f"You are trying to stream to a {self.display_name}. Try using a Chat Output instead." |
|
|
raise ValueError(msg) |
|
|
|
|
|
def _reset(self) -> None: |
|
|
self.built = False |
|
|
self.built_object = UnbuiltObject() |
|
|
self.built_result = UnbuiltResult() |
|
|
self.artifacts = {} |
|
|
self.steps_ran = [] |
|
|
self.build_params() |
|
|
|
|
|
def _is_chat_input(self) -> bool: |
|
|
return False |
|
|
|
|
|
def build_inactive(self) -> None: |
|
|
|
|
|
self.built = True |
|
|
self.built_object = None |
|
|
self.built_result = None |
|
|
|
|
|
async def build( |
|
|
self, |
|
|
user_id=None, |
|
|
inputs: dict[str, Any] | None = None, |
|
|
files: list[str] | None = None, |
|
|
requester: Vertex | None = None, |
|
|
event_manager: EventManager | None = None, |
|
|
**kwargs, |
|
|
) -> Any: |
|
|
async with self._lock: |
|
|
if self.state == VertexStates.INACTIVE: |
|
|
|
|
|
self.build_inactive() |
|
|
return None |
|
|
|
|
|
if self.frozen and self.built: |
|
|
return await self.get_requester_result(requester) |
|
|
if self.built and requester is not None: |
|
|
|
|
|
|
|
|
return await self.get_requester_result(requester) |
|
|
self._reset() |
|
|
|
|
|
if inputs is not None and "session" in inputs and inputs["session"] is not None and self.has_session_id: |
|
|
session_id_value = self.get_value_from_template_dict("session_id") |
|
|
if session_id_value == "": |
|
|
self.update_raw_params({"session_id": inputs["session"]}, overwrite=True) |
|
|
if self._is_chat_input() and (inputs or files): |
|
|
chat_input = {} |
|
|
if ( |
|
|
inputs |
|
|
and isinstance(inputs, dict) |
|
|
and "input_value" in inputs |
|
|
and inputs.get("input_value") is not None |
|
|
): |
|
|
chat_input.update({"input_value": inputs.get(INPUT_FIELD_NAME, "")}) |
|
|
if files: |
|
|
chat_input.update({"files": files}) |
|
|
|
|
|
self.update_raw_params(chat_input, overwrite=True) |
|
|
|
|
|
|
|
|
for step in self.steps: |
|
|
if step not in self.steps_ran: |
|
|
await step(user_id=user_id, event_manager=event_manager, **kwargs) |
|
|
self.steps_ran.append(step) |
|
|
|
|
|
self.finalize_build() |
|
|
|
|
|
return await self.get_requester_result(requester) |
|
|
|
|
|
async def get_requester_result(self, requester: Vertex | None): |
|
|
|
|
|
|
|
|
if requester is None: |
|
|
return self.built_object |
|
|
|
|
|
|
|
|
requester_edge = next((edge for edge in self.edges if edge.target_id == requester.id), None) |
|
|
|
|
|
return ( |
|
|
None |
|
|
if requester_edge is None |
|
|
else await requester_edge.get_result_from_source(source=self, target=requester) |
|
|
) |
|
|
|
|
|
def add_edge(self, edge: CycleEdge) -> None: |
|
|
if edge not in self.edges: |
|
|
self.edges.append(edge) |
|
|
|
|
|
def __repr__(self) -> str: |
|
|
return f"Vertex(display_name={self.display_name}, id={self.id}, data={self.data})" |
|
|
|
|
|
def __eq__(self, /, other: object) -> bool: |
|
|
try: |
|
|
if not isinstance(other, Vertex): |
|
|
return False |
|
|
|
|
|
|
|
|
ids_are_equal = self.id == other.id |
|
|
|
|
|
|
|
|
data_are_equal = self.data == other.data |
|
|
except AttributeError: |
|
|
return False |
|
|
else: |
|
|
return ids_are_equal and data_are_equal |
|
|
|
|
|
def __hash__(self) -> int: |
|
|
return id(self) |
|
|
|
|
|
def built_object_repr(self) -> str: |
|
|
|
|
|
return "Built successfully ✨" if self.built_object is not None else "Failed to build 😵💫" |
|
|
|
|
|
def apply_on_outputs(self, func: Callable[[Any], Any]) -> None: |
|
|
"""Applies a function to the outputs of the vertex.""" |
|
|
if not self.custom_component or not self.custom_component.outputs: |
|
|
return |
|
|
|
|
|
[func(output) for output in self.custom_component._outputs_map.values()] |
|
|
|