|
|
from __future__ import annotations |
|
|
|
|
|
from typing import TYPE_CHECKING, Any, cast |
|
|
from uuid import UUID |
|
|
|
|
|
from fastapi import HTTPException |
|
|
from pydantic.v1 import BaseModel, Field, create_model |
|
|
from sqlmodel import select |
|
|
|
|
|
from langflow.schema.schema import INPUT_FIELD_NAME |
|
|
from langflow.services.database.models.flow import Flow |
|
|
from langflow.services.database.models.flow.model import FlowRead |
|
|
from langflow.services.deps import async_session_scope, get_settings_service, session_scope |
|
|
|
|
|
if TYPE_CHECKING: |
|
|
from collections.abc import Awaitable, Callable |
|
|
|
|
|
from langflow.graph.graph.base import Graph |
|
|
from langflow.graph.schema import RunOutputs |
|
|
from langflow.graph.vertex.base import Vertex |
|
|
from langflow.schema import Data |
|
|
|
|
|
INPUT_TYPE_MAP = { |
|
|
"ChatInput": {"type_hint": "Optional[str]", "default": '""'}, |
|
|
"TextInput": {"type_hint": "Optional[str]", "default": '""'}, |
|
|
"JSONInput": {"type_hint": "Optional[dict]", "default": "{}"}, |
|
|
} |
|
|
|
|
|
|
|
|
def list_flows(*, user_id: str | None = None) -> list[Data]: |
|
|
if not user_id: |
|
|
msg = "Session is invalid" |
|
|
raise ValueError(msg) |
|
|
try: |
|
|
with session_scope() as session: |
|
|
flows = session.exec( |
|
|
select(Flow).where(Flow.user_id == user_id).where(Flow.is_component == False) |
|
|
).all() |
|
|
|
|
|
return [flow.to_data() for flow in flows] |
|
|
except Exception as e: |
|
|
msg = f"Error listing flows: {e}" |
|
|
raise ValueError(msg) from e |
|
|
|
|
|
|
|
|
async def load_flow( |
|
|
user_id: str, flow_id: str | None = None, flow_name: str | None = None, tweaks: dict | None = None |
|
|
) -> Graph: |
|
|
from langflow.graph.graph.base import Graph |
|
|
from langflow.processing.process import process_tweaks |
|
|
|
|
|
if not flow_id and not flow_name: |
|
|
msg = "Flow ID or Flow Name is required" |
|
|
raise ValueError(msg) |
|
|
if not flow_id and flow_name: |
|
|
flow_id = await find_flow(flow_name, user_id) |
|
|
if not flow_id: |
|
|
msg = f"Flow {flow_name} not found" |
|
|
raise ValueError(msg) |
|
|
|
|
|
async with async_session_scope() as session: |
|
|
graph_data = flow.data if (flow := await session.get(Flow, flow_id)) else None |
|
|
if not graph_data: |
|
|
msg = f"Flow {flow_id} not found" |
|
|
raise ValueError(msg) |
|
|
if tweaks: |
|
|
graph_data = process_tweaks(graph_data=graph_data, tweaks=tweaks) |
|
|
return Graph.from_payload(graph_data, flow_id=flow_id, user_id=user_id) |
|
|
|
|
|
|
|
|
async def find_flow(flow_name: str, user_id: str) -> str | None: |
|
|
async with async_session_scope() as session: |
|
|
stmt = select(Flow).where(Flow.name == flow_name).where(Flow.user_id == user_id) |
|
|
flow = (await session.exec(stmt)).first() |
|
|
return flow.id if flow else None |
|
|
|
|
|
|
|
|
async def run_flow( |
|
|
inputs: dict | list[dict] | None = None, |
|
|
tweaks: dict | None = None, |
|
|
flow_id: str | None = None, |
|
|
flow_name: str | None = None, |
|
|
output_type: str | None = "chat", |
|
|
user_id: str | None = None, |
|
|
run_id: str | None = None, |
|
|
session_id: str | None = None, |
|
|
graph: Graph | None = None, |
|
|
) -> list[RunOutputs]: |
|
|
if user_id is None: |
|
|
msg = "Session is invalid" |
|
|
raise ValueError(msg) |
|
|
if graph is None: |
|
|
graph = await load_flow(user_id, flow_id, flow_name, tweaks) |
|
|
if run_id: |
|
|
graph.set_run_id(UUID(run_id)) |
|
|
if session_id: |
|
|
graph.session_id = session_id |
|
|
if user_id: |
|
|
graph.user_id = user_id |
|
|
|
|
|
if inputs is None: |
|
|
inputs = [] |
|
|
if isinstance(inputs, dict): |
|
|
inputs = [inputs] |
|
|
inputs_list = [] |
|
|
inputs_components = [] |
|
|
types = [] |
|
|
for input_dict in inputs: |
|
|
inputs_list.append({INPUT_FIELD_NAME: cast("str", input_dict.get("input_value"))}) |
|
|
inputs_components.append(input_dict.get("components", [])) |
|
|
types.append(input_dict.get("type", "chat")) |
|
|
|
|
|
outputs = [ |
|
|
vertex.id |
|
|
for vertex in graph.vertices |
|
|
if output_type == "debug" |
|
|
or ( |
|
|
vertex.is_output and (output_type == "any" or output_type in vertex.id.lower()) |
|
|
) |
|
|
] |
|
|
|
|
|
fallback_to_env_vars = get_settings_service().settings.fallback_to_env_var |
|
|
|
|
|
return await graph.arun( |
|
|
inputs_list, |
|
|
outputs=outputs, |
|
|
inputs_components=inputs_components, |
|
|
types=types, |
|
|
fallback_to_env_vars=fallback_to_env_vars, |
|
|
) |
|
|
|
|
|
|
|
|
def generate_function_for_flow( |
|
|
inputs: list[Vertex], flow_id: str, user_id: str | UUID | None |
|
|
) -> Callable[..., Awaitable[Any]]: |
|
|
"""Generate a dynamic flow function based on the given inputs and flow ID. |
|
|
|
|
|
Args: |
|
|
inputs (List[Vertex]): The list of input vertices for the flow. |
|
|
flow_id (str): The ID of the flow. |
|
|
user_id (str | UUID | None): The user ID associated with the flow. |
|
|
|
|
|
Returns: |
|
|
Coroutine: The dynamic flow function. |
|
|
|
|
|
Raises: |
|
|
None |
|
|
|
|
|
Example: |
|
|
inputs = [vertex1, vertex2] |
|
|
flow_id = "my_flow" |
|
|
function = generate_function_for_flow(inputs, flow_id) |
|
|
result = function(input1, input2) |
|
|
""" |
|
|
|
|
|
args = [ |
|
|
( |
|
|
f"{input_.display_name.lower().replace(' ', '_')}: {INPUT_TYPE_MAP[input_.base_name]['type_hint']} = " |
|
|
f"{INPUT_TYPE_MAP[input_.base_name]['default']}" |
|
|
) |
|
|
for input_ in inputs |
|
|
] |
|
|
|
|
|
|
|
|
original_arg_names = [input_.display_name for input_ in inputs] |
|
|
|
|
|
|
|
|
func_args = ", ".join(args) |
|
|
|
|
|
|
|
|
arg_mappings = ", ".join( |
|
|
f'"{original_name}": {name}' |
|
|
for original_name, name in zip(original_arg_names, [arg.split(":")[0] for arg in args], strict=True) |
|
|
) |
|
|
|
|
|
func_body = f""" |
|
|
from typing import Optional |
|
|
async def flow_function({func_args}): |
|
|
tweaks = {{ {arg_mappings} }} |
|
|
from langflow.helpers.flow import run_flow |
|
|
from langchain_core.tools import ToolException |
|
|
from langflow.base.flow_processing.utils import build_data_from_result_data, format_flow_output_data |
|
|
try: |
|
|
run_outputs = await run_flow( |
|
|
tweaks={{key: {{'input_value': value}} for key, value in tweaks.items()}}, |
|
|
flow_id="{flow_id}", |
|
|
user_id="{user_id}" |
|
|
) |
|
|
if not run_outputs: |
|
|
return [] |
|
|
run_output = run_outputs[0] |
|
|
|
|
|
data = [] |
|
|
if run_output is not None: |
|
|
for output in run_output.outputs: |
|
|
if output: |
|
|
data.extend(build_data_from_result_data(output)) |
|
|
return format_flow_output_data(data) |
|
|
except Exception as e: |
|
|
raise ToolException(f'Error running flow: ' + e) |
|
|
""" |
|
|
|
|
|
compiled_func = compile(func_body, "<string>", "exec") |
|
|
local_scope: dict = {} |
|
|
exec(compiled_func, globals(), local_scope) |
|
|
return local_scope["flow_function"] |
|
|
|
|
|
|
|
|
def build_function_and_schema( |
|
|
flow_data: Data, graph: Graph, user_id: str | UUID | None |
|
|
) -> tuple[Callable[..., Awaitable[Any]], type[BaseModel]]: |
|
|
"""Builds a dynamic function and schema for a given flow. |
|
|
|
|
|
Args: |
|
|
flow_data (Data): The flow record containing information about the flow. |
|
|
graph (Graph): The graph representing the flow. |
|
|
user_id (str): The user ID associated with the flow. |
|
|
|
|
|
Returns: |
|
|
Tuple[Callable, BaseModel]: A tuple containing the dynamic function and the schema. |
|
|
""" |
|
|
flow_id = flow_data.id |
|
|
inputs = get_flow_inputs(graph) |
|
|
dynamic_flow_function = generate_function_for_flow(inputs, flow_id, user_id=user_id) |
|
|
schema = build_schema_from_inputs(flow_data.name, inputs) |
|
|
return dynamic_flow_function, schema |
|
|
|
|
|
|
|
|
def get_flow_inputs(graph: Graph) -> list[Vertex]: |
|
|
"""Retrieves the flow inputs from the given graph. |
|
|
|
|
|
Args: |
|
|
graph (Graph): The graph object representing the flow. |
|
|
|
|
|
Returns: |
|
|
List[Data]: A list of input data, where each record contains the ID, name, and description of the input vertex. |
|
|
""" |
|
|
return [vertex for vertex in graph.vertices if vertex.is_input] |
|
|
|
|
|
|
|
|
def build_schema_from_inputs(name: str, inputs: list[Vertex]) -> type[BaseModel]: |
|
|
"""Builds a schema from the given inputs. |
|
|
|
|
|
Args: |
|
|
name (str): The name of the schema. |
|
|
inputs (List[tuple[str, str, str]]): A list of tuples representing the inputs. |
|
|
Each tuple contains three elements: the input name, the input type, and the input description. |
|
|
|
|
|
Returns: |
|
|
BaseModel: The schema model. |
|
|
|
|
|
""" |
|
|
fields = {} |
|
|
for input_ in inputs: |
|
|
field_name = input_.display_name.lower().replace(" ", "_") |
|
|
description = input_.description |
|
|
fields[field_name] = (str, Field(default="", description=description)) |
|
|
return create_model(name, **fields) |
|
|
|
|
|
|
|
|
def get_arg_names(inputs: list[Vertex]) -> list[dict[str, str]]: |
|
|
"""Returns a list of dictionaries containing the component name and its corresponding argument name. |
|
|
|
|
|
Args: |
|
|
inputs (List[Vertex]): A list of Vertex objects representing the inputs. |
|
|
|
|
|
Returns: |
|
|
List[dict[str, str]]: A list of dictionaries, where each dictionary contains the component name and its |
|
|
argument name. |
|
|
""" |
|
|
return [ |
|
|
{"component_name": input_.display_name, "arg_name": input_.display_name.lower().replace(" ", "_")} |
|
|
for input_ in inputs |
|
|
] |
|
|
|
|
|
|
|
|
async def get_flow_by_id_or_endpoint_name(flow_id_or_name: str, user_id: UUID | None = None) -> FlowRead | None: |
|
|
async with async_session_scope() as session: |
|
|
endpoint_name = None |
|
|
try: |
|
|
flow_id = UUID(flow_id_or_name) |
|
|
flow = await session.get(Flow, flow_id) |
|
|
except ValueError: |
|
|
endpoint_name = flow_id_or_name |
|
|
stmt = select(Flow).where(Flow.endpoint_name == endpoint_name) |
|
|
if user_id: |
|
|
stmt = stmt.where(Flow.user_id == user_id) |
|
|
flow = (await session.exec(stmt)).first() |
|
|
if flow is None: |
|
|
raise HTTPException(status_code=404, detail=f"Flow identifier {flow_id_or_name} not found") |
|
|
return FlowRead.model_validate(flow, from_attributes=True) |
|
|
|
|
|
|
|
|
async def generate_unique_flow_name(flow_name, user_id, session): |
|
|
original_name = flow_name |
|
|
n = 1 |
|
|
while True: |
|
|
|
|
|
existing_flow = ( |
|
|
await session.exec( |
|
|
select(Flow).where( |
|
|
Flow.name == flow_name, |
|
|
Flow.user_id == user_id, |
|
|
) |
|
|
) |
|
|
).first() |
|
|
|
|
|
|
|
|
if not existing_flow: |
|
|
return flow_name |
|
|
|
|
|
|
|
|
flow_name = f"{original_name} ({n})" |
|
|
n += 1 |
|
|
|