|
|
from __future__ import annotations |
|
|
|
|
|
import uuid |
|
|
from datetime import timedelta |
|
|
from typing import TYPE_CHECKING, Annotated, Any |
|
|
|
|
|
from fastapi import Depends, HTTPException, Query |
|
|
from fastapi_pagination import Params |
|
|
from loguru import logger |
|
|
from sqlalchemy import delete |
|
|
from sqlmodel.ext.asyncio.session import AsyncSession |
|
|
|
|
|
from langflow.graph.graph.base import Graph |
|
|
from langflow.services.auth.utils import get_current_active_user |
|
|
from langflow.services.database.models import User |
|
|
from langflow.services.database.models.flow import Flow |
|
|
from langflow.services.database.models.transactions.model import TransactionTable |
|
|
from langflow.services.database.models.vertex_builds.model import VertexBuildTable |
|
|
from langflow.services.deps import async_session_scope, get_session |
|
|
from langflow.services.store.utils import get_lf_version_from_pypi |
|
|
|
|
|
if TYPE_CHECKING: |
|
|
from langflow.services.chat.service import ChatService |
|
|
from langflow.services.store.schema import StoreComponentCreate |
|
|
|
|
|
|
|
|
API_WORDS = ["api", "key", "token"] |
|
|
|
|
|
MAX_PAGE_SIZE = 50 |
|
|
MIN_PAGE_SIZE = 1 |
|
|
|
|
|
CurrentActiveUser = Annotated[User, Depends(get_current_active_user)] |
|
|
DbSession = Annotated[AsyncSession, Depends(get_session)] |
|
|
|
|
|
|
|
|
def has_api_terms(word: str): |
|
|
return "api" in word and ("key" in word or ("token" in word and "tokens" not in word)) |
|
|
|
|
|
|
|
|
def remove_api_keys(flow: dict): |
|
|
"""Remove api keys from flow data.""" |
|
|
if flow.get("data") and flow["data"].get("nodes"): |
|
|
for node in flow["data"]["nodes"]: |
|
|
node_data = node.get("data").get("node") |
|
|
template = node_data.get("template") |
|
|
for value in template.values(): |
|
|
if isinstance(value, dict) and has_api_terms(value["name"]) and value.get("password"): |
|
|
value["value"] = None |
|
|
|
|
|
return flow |
|
|
|
|
|
|
|
|
def build_input_keys_response(langchain_object, artifacts): |
|
|
"""Build the input keys response.""" |
|
|
input_keys_response = { |
|
|
"input_keys": dict.fromkeys(langchain_object.input_keys, ""), |
|
|
"memory_keys": [], |
|
|
"handle_keys": artifacts.get("handle_keys", []), |
|
|
} |
|
|
|
|
|
|
|
|
for key, value in artifacts.items(): |
|
|
if key in input_keys_response["input_keys"]: |
|
|
input_keys_response["input_keys"][key] = value |
|
|
|
|
|
|
|
|
if hasattr(langchain_object, "memory") and hasattr(langchain_object.memory, "memory_variables"): |
|
|
|
|
|
input_keys_response["input_keys"] = { |
|
|
key: value |
|
|
for key, value in input_keys_response["input_keys"].items() |
|
|
if key not in langchain_object.memory.memory_variables |
|
|
} |
|
|
|
|
|
input_keys_response["memory_keys"] = langchain_object.memory.memory_variables |
|
|
|
|
|
if hasattr(langchain_object, "prompt") and hasattr(langchain_object.prompt, "template"): |
|
|
input_keys_response["template"] = langchain_object.prompt.template |
|
|
|
|
|
return input_keys_response |
|
|
|
|
|
|
|
|
def validate_is_component(flows: list[Flow]): |
|
|
for flow in flows: |
|
|
if not flow.data or flow.is_component is not None: |
|
|
continue |
|
|
|
|
|
is_component = get_is_component_from_data(flow.data) |
|
|
if is_component is not None: |
|
|
flow.is_component = is_component |
|
|
else: |
|
|
flow.is_component = len(flow.data.get("nodes", [])) == 1 |
|
|
return flows |
|
|
|
|
|
|
|
|
def get_is_component_from_data(data: dict): |
|
|
"""Returns True if the data is a component.""" |
|
|
return data.get("is_component") |
|
|
|
|
|
|
|
|
async def check_langflow_version(component: StoreComponentCreate) -> None: |
|
|
from langflow.utils.version import get_version_info |
|
|
|
|
|
__version__ = get_version_info()["version"] |
|
|
|
|
|
if not component.last_tested_version: |
|
|
component.last_tested_version = __version__ |
|
|
|
|
|
langflow_version = await get_lf_version_from_pypi() |
|
|
if langflow_version is None: |
|
|
raise HTTPException(status_code=500, detail="Unable to verify the latest version of Langflow") |
|
|
if langflow_version != component.last_tested_version: |
|
|
logger.warning( |
|
|
f"Your version of Langflow ({component.last_tested_version}) is outdated. " |
|
|
f"Please update to the latest version ({langflow_version}) and try again." |
|
|
) |
|
|
|
|
|
|
|
|
def format_elapsed_time(elapsed_time: float) -> str: |
|
|
"""Format elapsed time to a human-readable format coming from perf_counter(). |
|
|
|
|
|
- Less than 1 second: returns milliseconds |
|
|
- Less than 1 minute: returns seconds rounded to 2 decimals |
|
|
- 1 minute or more: returns minutes and seconds |
|
|
""" |
|
|
delta = timedelta(seconds=elapsed_time) |
|
|
if delta < timedelta(seconds=1): |
|
|
milliseconds = round(delta / timedelta(milliseconds=1)) |
|
|
return f"{milliseconds} ms" |
|
|
|
|
|
if delta < timedelta(minutes=1): |
|
|
seconds = round(elapsed_time, 2) |
|
|
unit = "second" if seconds == 1 else "seconds" |
|
|
return f"{seconds} {unit}" |
|
|
|
|
|
minutes = delta // timedelta(minutes=1) |
|
|
seconds = round((delta - timedelta(minutes=minutes)).total_seconds(), 2) |
|
|
minutes_unit = "minute" if minutes == 1 else "minutes" |
|
|
seconds_unit = "second" if seconds == 1 else "seconds" |
|
|
return f"{minutes} {minutes_unit}, {seconds} {seconds_unit}" |
|
|
|
|
|
|
|
|
async def _get_flow_name(flow_id: str) -> str: |
|
|
async with async_session_scope() as session: |
|
|
flow = await session.get(Flow, flow_id) |
|
|
if flow is None: |
|
|
msg = f"Flow {flow_id} not found" |
|
|
raise ValueError(msg) |
|
|
return flow.name |
|
|
|
|
|
|
|
|
async def build_graph_from_data(flow_id: str, payload: dict, **kwargs): |
|
|
"""Build and cache the graph.""" |
|
|
|
|
|
if "flow_name" not in kwargs: |
|
|
flow_name = await _get_flow_name(flow_id) |
|
|
kwargs["flow_name"] = flow_name |
|
|
graph = Graph.from_payload(payload, flow_id, **kwargs) |
|
|
for vertex_id in graph.has_session_id_vertices: |
|
|
vertex = graph.get_vertex(vertex_id) |
|
|
if vertex is None: |
|
|
msg = f"Vertex {vertex_id} not found" |
|
|
raise ValueError(msg) |
|
|
if not vertex.raw_params.get("session_id"): |
|
|
vertex.update_raw_params({"session_id": flow_id}, overwrite=True) |
|
|
|
|
|
run_id = uuid.uuid4() |
|
|
graph.set_run_id(run_id) |
|
|
graph.set_run_name() |
|
|
await graph.initialize_run() |
|
|
return graph |
|
|
|
|
|
|
|
|
async def build_graph_from_db_no_cache(flow_id: str, session: AsyncSession): |
|
|
"""Build and cache the graph.""" |
|
|
flow: Flow | None = await session.get(Flow, flow_id) |
|
|
if not flow or not flow.data: |
|
|
msg = "Invalid flow ID" |
|
|
raise ValueError(msg) |
|
|
return await build_graph_from_data(flow_id, flow.data, flow_name=flow.name, user_id=str(flow.user_id)) |
|
|
|
|
|
|
|
|
async def build_graph_from_db(flow_id: str, session: AsyncSession, chat_service: ChatService): |
|
|
graph = await build_graph_from_db_no_cache(flow_id, session) |
|
|
await chat_service.set_cache(flow_id, graph) |
|
|
return graph |
|
|
|
|
|
|
|
|
async def build_and_cache_graph_from_data( |
|
|
flow_id: str, |
|
|
chat_service: ChatService, |
|
|
graph_data: dict, |
|
|
): |
|
|
"""Build and cache the graph.""" |
|
|
graph = Graph.from_payload(graph_data, flow_id) |
|
|
await chat_service.set_cache(flow_id, graph) |
|
|
return graph |
|
|
|
|
|
|
|
|
def format_syntax_error_message(exc: SyntaxError) -> str: |
|
|
"""Format a SyntaxError message for returning to the frontend.""" |
|
|
if exc.text is None: |
|
|
return f"Syntax error in code. Error on line {exc.lineno}" |
|
|
return f"Syntax error in code. Error on line {exc.lineno}: {exc.text.strip()}" |
|
|
|
|
|
|
|
|
def get_causing_exception(exc: BaseException) -> BaseException: |
|
|
"""Get the causing exception from an exception.""" |
|
|
if hasattr(exc, "__cause__") and exc.__cause__: |
|
|
return get_causing_exception(exc.__cause__) |
|
|
return exc |
|
|
|
|
|
|
|
|
def format_exception_message(exc: Exception) -> str: |
|
|
"""Format an exception message for returning to the frontend.""" |
|
|
|
|
|
|
|
|
causing_exception = get_causing_exception(exc) |
|
|
if isinstance(causing_exception, SyntaxError): |
|
|
return format_syntax_error_message(causing_exception) |
|
|
return str(exc) |
|
|
|
|
|
|
|
|
def get_top_level_vertices(graph, vertices_ids): |
|
|
"""Retrieves the top-level vertices from the given graph based on the provided vertex IDs. |
|
|
|
|
|
Args: |
|
|
graph (Graph): The graph object containing the vertices. |
|
|
vertices_ids (list): A list of vertex IDs. |
|
|
|
|
|
Returns: |
|
|
list: A list of top-level vertex IDs. |
|
|
|
|
|
""" |
|
|
top_level_vertices = [] |
|
|
for vertex_id in vertices_ids: |
|
|
vertex = graph.get_vertex(vertex_id) |
|
|
if vertex.parent_is_top_level: |
|
|
top_level_vertices.append(vertex.parent_node_id) |
|
|
else: |
|
|
top_level_vertices.append(vertex_id) |
|
|
return top_level_vertices |
|
|
|
|
|
|
|
|
def parse_exception(exc): |
|
|
"""Parse the exception message.""" |
|
|
if hasattr(exc, "body"): |
|
|
return exc.body["message"] |
|
|
return str(exc) |
|
|
|
|
|
|
|
|
def get_suggestion_message(outdated_components: list[str]) -> str: |
|
|
"""Get the suggestion message for the outdated components.""" |
|
|
count = len(outdated_components) |
|
|
if count == 0: |
|
|
return "The flow contains no outdated components." |
|
|
if count == 1: |
|
|
return ( |
|
|
"The flow contains 1 outdated component. " |
|
|
f"We recommend updating the following component: {outdated_components[0]}." |
|
|
) |
|
|
components = ", ".join(outdated_components) |
|
|
return ( |
|
|
f"The flow contains {count} outdated components. " |
|
|
f"We recommend updating the following components: {components}." |
|
|
) |
|
|
|
|
|
|
|
|
def parse_value(value: Any, input_type: str) -> Any: |
|
|
"""Helper function to parse the value based on input type.""" |
|
|
if value == "": |
|
|
return value |
|
|
if input_type == "IntInput": |
|
|
return int(value) if value is not None else None |
|
|
if input_type == "FloatInput": |
|
|
return float(value) if value is not None else None |
|
|
return value |
|
|
|
|
|
|
|
|
async def cascade_delete_flow(session: AsyncSession, flow_id: uuid.UUID) -> None: |
|
|
try: |
|
|
await session.exec(delete(TransactionTable).where(TransactionTable.flow_id == flow_id)) |
|
|
await session.exec(delete(VertexBuildTable).where(VertexBuildTable.flow_id == flow_id)) |
|
|
await session.exec(delete(Flow).where(Flow.id == flow_id)) |
|
|
except Exception as e: |
|
|
msg = f"Unable to cascade delete flow: ${flow_id}" |
|
|
raise RuntimeError(msg, e) from e |
|
|
|
|
|
|
|
|
def custom_params( |
|
|
page: int | None = Query(None), |
|
|
size: int | None = Query(None), |
|
|
): |
|
|
if page is None and size is None: |
|
|
return None |
|
|
return Params(page=page or MIN_PAGE_SIZE, size=size or MAX_PAGE_SIZE) |
|
|
|