|
|
from typing import TYPE_CHECKING, Any |
|
|
|
|
|
from loguru import logger |
|
|
|
|
|
from langflow.base.flow_processing.utils import build_data_from_result_data |
|
|
from langflow.custom import CustomComponent |
|
|
from langflow.graph.graph.base import Graph |
|
|
from langflow.graph.vertex.base import Vertex |
|
|
from langflow.helpers.flow import get_flow_inputs |
|
|
from langflow.schema import Data |
|
|
from langflow.schema.dotdict import dotdict |
|
|
from langflow.template.field.base import Input |
|
|
|
|
|
if TYPE_CHECKING: |
|
|
from langflow.graph.schema import RunOutputs |
|
|
|
|
|
|
|
|
class SubFlowComponent(CustomComponent): |
|
|
display_name = "Sub Flow" |
|
|
description = ( |
|
|
"Dynamically Generates a Component from a Flow. The output is a list of data with keys 'result' and 'message'." |
|
|
) |
|
|
beta: bool = True |
|
|
field_order = ["flow_name"] |
|
|
name = "SubFlow" |
|
|
|
|
|
def get_flow_names(self) -> list[str]: |
|
|
flow_datas = self.list_flows() |
|
|
return [flow_data.data["name"] for flow_data in flow_datas] |
|
|
|
|
|
def get_flow(self, flow_name: str) -> Data | None: |
|
|
flow_datas = self.list_flows() |
|
|
for flow_data in flow_datas: |
|
|
if flow_data.data["name"] == flow_name: |
|
|
return flow_data |
|
|
return None |
|
|
|
|
|
def update_build_config(self, build_config: dotdict, field_value: Any, field_name: str | None = None): |
|
|
logger.debug(f"Updating build config with field value {field_value} and field name {field_name}") |
|
|
if field_name == "flow_name": |
|
|
build_config["flow_name"]["options"] = self.get_flow_names() |
|
|
|
|
|
for key in list(build_config.keys()): |
|
|
if key not in {*self.field_order, "code", "_type", "get_final_results_only"}: |
|
|
del build_config[key] |
|
|
if field_value is not None and field_name == "flow_name": |
|
|
try: |
|
|
flow_data = self.get_flow(field_value) |
|
|
except Exception: |
|
|
logger.exception(f"Error getting flow {field_value}") |
|
|
else: |
|
|
if not flow_data: |
|
|
msg = f"Flow {field_value} not found." |
|
|
logger.error(msg) |
|
|
else: |
|
|
try: |
|
|
graph = Graph.from_payload(flow_data.data["data"]) |
|
|
|
|
|
inputs = get_flow_inputs(graph) |
|
|
|
|
|
build_config = self.add_inputs_to_build_config(inputs, build_config) |
|
|
except Exception: |
|
|
logger.exception(f"Error building graph for flow {field_value}") |
|
|
|
|
|
return build_config |
|
|
|
|
|
def add_inputs_to_build_config(self, inputs: list[Vertex], build_config: dotdict): |
|
|
new_fields: list[Input] = [] |
|
|
for vertex in inputs: |
|
|
field = Input( |
|
|
display_name=vertex.display_name, |
|
|
name=vertex.id, |
|
|
info=vertex.description, |
|
|
field_type="str", |
|
|
value=None, |
|
|
) |
|
|
new_fields.append(field) |
|
|
logger.debug(new_fields) |
|
|
for field in new_fields: |
|
|
build_config[field.name] = field.to_dict() |
|
|
return build_config |
|
|
|
|
|
def build_config(self): |
|
|
return { |
|
|
"input_value": { |
|
|
"display_name": "Input Value", |
|
|
"multiline": True, |
|
|
}, |
|
|
"flow_name": { |
|
|
"display_name": "Flow Name", |
|
|
"info": "The name of the flow to run.", |
|
|
"options": [], |
|
|
"real_time_refresh": True, |
|
|
"refresh_button": True, |
|
|
}, |
|
|
"tweaks": { |
|
|
"display_name": "Tweaks", |
|
|
"info": "Tweaks to apply to the flow.", |
|
|
}, |
|
|
"get_final_results_only": { |
|
|
"display_name": "Get Final Results Only", |
|
|
"info": "If False, the output will contain all outputs from the flow.", |
|
|
"advanced": True, |
|
|
}, |
|
|
} |
|
|
|
|
|
async def build(self, flow_name: str, **kwargs) -> list[Data]: |
|
|
tweaks = {key: {"input_value": value} for key, value in kwargs.items()} |
|
|
run_outputs: list[RunOutputs | None] = await self.run_flow( |
|
|
tweaks=tweaks, |
|
|
flow_name=flow_name, |
|
|
) |
|
|
if not run_outputs: |
|
|
return [] |
|
|
run_output = run_outputs[0] |
|
|
|
|
|
data = [] |
|
|
if run_output is not None: |
|
|
for output in run_output.outputs: |
|
|
if output: |
|
|
data.extend(build_data_from_result_data(output)) |
|
|
|
|
|
self.status = data |
|
|
logger.debug(data) |
|
|
return data |
|
|
|