Spaces:
Running
Running
| from __future__ import annotations | |
| import uuid | |
| from datetime import timedelta | |
| from typing import TYPE_CHECKING, Annotated, Any | |
| from fastapi import Depends, HTTPException, Query | |
| from fastapi_pagination import Params | |
| from loguru import logger | |
| from sqlalchemy import delete | |
| from sqlmodel.ext.asyncio.session import AsyncSession | |
| from langflow.graph.graph.base import Graph | |
| from langflow.services.auth.utils import get_current_active_user | |
| from langflow.services.database.models import User | |
| from langflow.services.database.models.flow import Flow | |
| from langflow.services.database.models.transactions.model import TransactionTable | |
| from langflow.services.database.models.vertex_builds.model import VertexBuildTable | |
| from langflow.services.deps import async_session_scope, get_session | |
| from langflow.services.store.utils import get_lf_version_from_pypi | |
| if TYPE_CHECKING: | |
| from langflow.services.chat.service import ChatService | |
| from langflow.services.store.schema import StoreComponentCreate | |
| API_WORDS = ["api", "key", "token"] | |
| MAX_PAGE_SIZE = 50 | |
| MIN_PAGE_SIZE = 1 | |
| CurrentActiveUser = Annotated[User, Depends(get_current_active_user)] | |
| DbSession = Annotated[AsyncSession, Depends(get_session)] | |
| def has_api_terms(word: str): | |
| return "api" in word and ("key" in word or ("token" in word and "tokens" not in word)) | |
| def remove_api_keys(flow: dict): | |
| """Remove api keys from flow data.""" | |
| if flow.get("data") and flow["data"].get("nodes"): | |
| for node in flow["data"]["nodes"]: | |
| node_data = node.get("data").get("node") | |
| template = node_data.get("template") | |
| for value in template.values(): | |
| if isinstance(value, dict) and has_api_terms(value["name"]) and value.get("password"): | |
| value["value"] = None | |
| return flow | |
| def build_input_keys_response(langchain_object, artifacts): | |
| """Build the input keys response.""" | |
| input_keys_response = { | |
| "input_keys": dict.fromkeys(langchain_object.input_keys, ""), | |
| "memory_keys": [], | |
| "handle_keys": artifacts.get("handle_keys", []), | |
| } | |
| # Set the input keys values from artifacts | |
| for key, value in artifacts.items(): | |
| if key in input_keys_response["input_keys"]: | |
| input_keys_response["input_keys"][key] = value | |
| # If the object has memory, that memory will have a memory_variables attribute | |
| # memory variables should be removed from the input keys | |
| if hasattr(langchain_object, "memory") and hasattr(langchain_object.memory, "memory_variables"): | |
| # Remove memory variables from input keys | |
| input_keys_response["input_keys"] = { | |
| key: value | |
| for key, value in input_keys_response["input_keys"].items() | |
| if key not in langchain_object.memory.memory_variables | |
| } | |
| # Add memory variables to memory_keys | |
| input_keys_response["memory_keys"] = langchain_object.memory.memory_variables | |
| if hasattr(langchain_object, "prompt") and hasattr(langchain_object.prompt, "template"): | |
| input_keys_response["template"] = langchain_object.prompt.template | |
| return input_keys_response | |
| def validate_is_component(flows: list[Flow]): | |
| for flow in flows: | |
| if not flow.data or flow.is_component is not None: | |
| continue | |
| is_component = get_is_component_from_data(flow.data) | |
| if is_component is not None: | |
| flow.is_component = is_component | |
| else: | |
| flow.is_component = len(flow.data.get("nodes", [])) == 1 | |
| return flows | |
| def get_is_component_from_data(data: dict): | |
| """Returns True if the data is a component.""" | |
| return data.get("is_component") | |
| async def check_langflow_version(component: StoreComponentCreate) -> None: | |
| from langflow.utils.version import get_version_info | |
| __version__ = get_version_info()["version"] | |
| if not component.last_tested_version: | |
| component.last_tested_version = __version__ | |
| langflow_version = await get_lf_version_from_pypi() | |
| if langflow_version is None: | |
| raise HTTPException(status_code=500, detail="Unable to verify the latest version of Langflow") | |
| if langflow_version != component.last_tested_version: | |
| logger.warning( | |
| f"Your version of Langflow ({component.last_tested_version}) is outdated. " | |
| f"Please update to the latest version ({langflow_version}) and try again." | |
| ) | |
| def format_elapsed_time(elapsed_time: float) -> str: | |
| """Format elapsed time to a human-readable format coming from perf_counter(). | |
| - Less than 1 second: returns milliseconds | |
| - Less than 1 minute: returns seconds rounded to 2 decimals | |
| - 1 minute or more: returns minutes and seconds | |
| """ | |
| delta = timedelta(seconds=elapsed_time) | |
| if delta < timedelta(seconds=1): | |
| milliseconds = round(delta / timedelta(milliseconds=1)) | |
| return f"{milliseconds} ms" | |
| if delta < timedelta(minutes=1): | |
| seconds = round(elapsed_time, 2) | |
| unit = "second" if seconds == 1 else "seconds" | |
| return f"{seconds} {unit}" | |
| minutes = delta // timedelta(minutes=1) | |
| seconds = round((delta - timedelta(minutes=minutes)).total_seconds(), 2) | |
| minutes_unit = "minute" if minutes == 1 else "minutes" | |
| seconds_unit = "second" if seconds == 1 else "seconds" | |
| return f"{minutes} {minutes_unit}, {seconds} {seconds_unit}" | |
| async def _get_flow_name(flow_id: str) -> str: | |
| async with async_session_scope() as session: | |
| flow = await session.get(Flow, flow_id) | |
| if flow is None: | |
| msg = f"Flow {flow_id} not found" | |
| raise ValueError(msg) | |
| return flow.name | |
| async def build_graph_from_data(flow_id: str, payload: dict, **kwargs): | |
| """Build and cache the graph.""" | |
| # Get flow name | |
| if "flow_name" not in kwargs: | |
| flow_name = await _get_flow_name(flow_id) | |
| kwargs["flow_name"] = flow_name | |
| graph = Graph.from_payload(payload, flow_id, **kwargs) | |
| for vertex_id in graph.has_session_id_vertices: | |
| vertex = graph.get_vertex(vertex_id) | |
| if vertex is None: | |
| msg = f"Vertex {vertex_id} not found" | |
| raise ValueError(msg) | |
| if not vertex.raw_params.get("session_id"): | |
| vertex.update_raw_params({"session_id": flow_id}, overwrite=True) | |
| run_id = uuid.uuid4() | |
| graph.set_run_id(run_id) | |
| graph.set_run_name() | |
| await graph.initialize_run() | |
| return graph | |
| async def build_graph_from_db_no_cache(flow_id: str, session: AsyncSession): | |
| """Build and cache the graph.""" | |
| flow: Flow | None = await session.get(Flow, flow_id) | |
| if not flow or not flow.data: | |
| msg = "Invalid flow ID" | |
| raise ValueError(msg) | |
| return await build_graph_from_data(flow_id, flow.data, flow_name=flow.name, user_id=str(flow.user_id)) | |
| async def build_graph_from_db(flow_id: str, session: AsyncSession, chat_service: ChatService): | |
| graph = await build_graph_from_db_no_cache(flow_id, session) | |
| await chat_service.set_cache(flow_id, graph) | |
| return graph | |
| async def build_and_cache_graph_from_data( | |
| flow_id: str, | |
| chat_service: ChatService, | |
| graph_data: dict, | |
| ): # -> Graph | Any: | |
| """Build and cache the graph.""" | |
| graph = Graph.from_payload(graph_data, flow_id) | |
| await chat_service.set_cache(flow_id, graph) | |
| return graph | |
| def format_syntax_error_message(exc: SyntaxError) -> str: | |
| """Format a SyntaxError message for returning to the frontend.""" | |
| if exc.text is None: | |
| return f"Syntax error in code. Error on line {exc.lineno}" | |
| return f"Syntax error in code. Error on line {exc.lineno}: {exc.text.strip()}" | |
| def get_causing_exception(exc: BaseException) -> BaseException: | |
| """Get the causing exception from an exception.""" | |
| if hasattr(exc, "__cause__") and exc.__cause__: | |
| return get_causing_exception(exc.__cause__) | |
| return exc | |
| def format_exception_message(exc: Exception) -> str: | |
| """Format an exception message for returning to the frontend.""" | |
| # We need to check if the __cause__ is a SyntaxError | |
| # If it is, we need to return the message of the SyntaxError | |
| causing_exception = get_causing_exception(exc) | |
| if isinstance(causing_exception, SyntaxError): | |
| return format_syntax_error_message(causing_exception) | |
| return str(exc) | |
| def get_top_level_vertices(graph, vertices_ids): | |
| """Retrieves the top-level vertices from the given graph based on the provided vertex IDs. | |
| Args: | |
| graph (Graph): The graph object containing the vertices. | |
| vertices_ids (list): A list of vertex IDs. | |
| Returns: | |
| list: A list of top-level vertex IDs. | |
| """ | |
| top_level_vertices = [] | |
| for vertex_id in vertices_ids: | |
| vertex = graph.get_vertex(vertex_id) | |
| if vertex.parent_is_top_level: | |
| top_level_vertices.append(vertex.parent_node_id) | |
| else: | |
| top_level_vertices.append(vertex_id) | |
| return top_level_vertices | |
| def parse_exception(exc): | |
| """Parse the exception message.""" | |
| if hasattr(exc, "body"): | |
| return exc.body["message"] | |
| return str(exc) | |
| def get_suggestion_message(outdated_components: list[str]) -> str: | |
| """Get the suggestion message for the outdated components.""" | |
| count = len(outdated_components) | |
| if count == 0: | |
| return "The flow contains no outdated components." | |
| if count == 1: | |
| return ( | |
| "The flow contains 1 outdated component. " | |
| f"We recommend updating the following component: {outdated_components[0]}." | |
| ) | |
| components = ", ".join(outdated_components) | |
| return ( | |
| f"The flow contains {count} outdated components. " | |
| f"We recommend updating the following components: {components}." | |
| ) | |
| def parse_value(value: Any, input_type: str) -> Any: | |
| """Helper function to parse the value based on input type.""" | |
| if value == "": | |
| return value | |
| if input_type == "IntInput": | |
| return int(value) if value is not None else None | |
| if input_type == "FloatInput": | |
| return float(value) if value is not None else None | |
| return value | |
| async def cascade_delete_flow(session: AsyncSession, flow_id: uuid.UUID) -> None: | |
| try: | |
| await session.exec(delete(TransactionTable).where(TransactionTable.flow_id == flow_id)) | |
| await session.exec(delete(VertexBuildTable).where(VertexBuildTable.flow_id == flow_id)) | |
| await session.exec(delete(Flow).where(Flow.id == flow_id)) | |
| except Exception as e: | |
| msg = f"Unable to cascade delete flow: ${flow_id}" | |
| raise RuntimeError(msg, e) from e | |
| def custom_params( | |
| page: int | None = Query(None), | |
| size: int | None = Query(None), | |
| ): | |
| if page is None and size is None: | |
| return None | |
| return Params(page=page or MIN_PAGE_SIZE, size=size or MAX_PAGE_SIZE) | |