|
|
from typing import Any |
|
|
|
|
|
from loguru import logger |
|
|
|
|
|
from langflow.base.flow_processing.utils import build_data_from_result_data |
|
|
from langflow.custom import Component |
|
|
from langflow.graph.graph.base import Graph |
|
|
from langflow.graph.vertex.base import Vertex |
|
|
from langflow.helpers.flow import get_flow_inputs |
|
|
from langflow.io import DropdownInput, Output |
|
|
from langflow.schema import Data, dotdict |
|
|
|
|
|
|
|
|
class SubFlowComponent(Component): |
|
|
display_name = "Sub Flow" |
|
|
description = "Generates a Component from a Flow, with all of its inputs, and " |
|
|
name = "SubFlow" |
|
|
beta: bool = True |
|
|
icon = "Workflow" |
|
|
|
|
|
def get_flow_names(self) -> list[str]: |
|
|
flow_data = self.list_flows() |
|
|
return [flow_data.data["name"] for flow_data in flow_data] |
|
|
|
|
|
def get_flow(self, flow_name: str) -> Data | None: |
|
|
flow_datas = self.list_flows() |
|
|
for flow_data in flow_datas: |
|
|
if flow_data.data["name"] == flow_name: |
|
|
return flow_data |
|
|
return None |
|
|
|
|
|
def update_build_config(self, build_config: dotdict, field_value: Any, field_name: str | None = None): |
|
|
if field_name == "flow_name": |
|
|
build_config["flow_name"]["options"] = self.get_flow_names() |
|
|
|
|
|
for key in list(build_config.keys()): |
|
|
if key not in [x.name for x in self.inputs] + ["code", "_type", "get_final_results_only"]: |
|
|
del build_config[key] |
|
|
if field_value is not None and field_name == "flow_name": |
|
|
try: |
|
|
flow_data = self.get_flow(field_value) |
|
|
except Exception: |
|
|
logger.exception(f"Error getting flow {field_value}") |
|
|
else: |
|
|
if not flow_data: |
|
|
msg = f"Flow {field_value} not found." |
|
|
logger.error(msg) |
|
|
else: |
|
|
try: |
|
|
graph = Graph.from_payload(flow_data.data["data"]) |
|
|
|
|
|
inputs = get_flow_inputs(graph) |
|
|
|
|
|
build_config = self.add_inputs_to_build_config(inputs, build_config) |
|
|
except Exception: |
|
|
logger.exception(f"Error building graph for flow {field_value}") |
|
|
|
|
|
return build_config |
|
|
|
|
|
def add_inputs_to_build_config(self, inputs_vertex: list[Vertex], build_config: dotdict): |
|
|
new_fields: list[dotdict] = [] |
|
|
|
|
|
for vertex in inputs_vertex: |
|
|
new_vertex_inputs = [] |
|
|
field_template = vertex.data["node"]["template"] |
|
|
for inp in field_template: |
|
|
if inp not in {"code", "_type"}: |
|
|
field_template[inp]["display_name"] = ( |
|
|
vertex.display_name + " - " + field_template[inp]["display_name"] |
|
|
) |
|
|
field_template[inp]["name"] = vertex.id + "|" + inp |
|
|
new_vertex_inputs.append(field_template[inp]) |
|
|
new_fields += new_vertex_inputs |
|
|
for field in new_fields: |
|
|
build_config[field["name"]] = field |
|
|
return build_config |
|
|
|
|
|
inputs = [ |
|
|
DropdownInput( |
|
|
name="flow_name", |
|
|
display_name="Flow Name", |
|
|
info="The name of the flow to run.", |
|
|
options=[], |
|
|
refresh_button=True, |
|
|
real_time_refresh=True, |
|
|
), |
|
|
] |
|
|
|
|
|
outputs = [Output(name="flow_outputs", display_name="Flow Outputs", method="generate_results")] |
|
|
|
|
|
async def generate_results(self) -> list[Data]: |
|
|
tweaks: dict = {} |
|
|
for field in self._attributes: |
|
|
if field != "flow_name" and "|" in field: |
|
|
[node, name] = field.split("|") |
|
|
if node not in tweaks: |
|
|
tweaks[node] = {} |
|
|
tweaks[node][name] = self._attributes[field] |
|
|
flow_name = self._attributes.get("flow_name") |
|
|
run_outputs = await self.run_flow( |
|
|
tweaks=tweaks, |
|
|
flow_name=flow_name, |
|
|
output_type="all", |
|
|
) |
|
|
data: list[Data] = [] |
|
|
if not run_outputs: |
|
|
return data |
|
|
run_output = run_outputs[0] |
|
|
|
|
|
if run_output is not None: |
|
|
for output in run_output.outputs: |
|
|
if output: |
|
|
data.extend(build_data_from_result_data(output)) |
|
|
return data |
|
|
|