Spaces:
Running
Running
| from __future__ import annotations | |
| import json | |
| from collections.abc import Generator | |
| from enum import Enum | |
| from typing import TYPE_CHECKING, Any | |
| from uuid import UUID | |
| from langchain_core.documents import Document | |
| from loguru import logger | |
| from pydantic import BaseModel | |
| from pydantic.v1 import BaseModel as V1BaseModel | |
| from langflow.interface.utils import extract_input_variables_from_prompt | |
| from langflow.schema.data import Data | |
| from langflow.schema.message import Message | |
| from langflow.services.database.models.transactions.crud import log_transaction as crud_log_transaction | |
| from langflow.services.database.models.transactions.model import TransactionBase | |
| from langflow.services.database.models.vertex_builds.crud import log_vertex_build as crud_log_vertex_build | |
| from langflow.services.database.models.vertex_builds.model import VertexBuildBase | |
| from langflow.services.database.utils import async_session_getter | |
| from langflow.services.deps import get_db_service, get_settings_service | |
| if TYPE_CHECKING: | |
| from langflow.api.v1.schemas import ResultDataResponse | |
| from langflow.graph.vertex.base import Vertex | |
| class UnbuiltObject: | |
| pass | |
| class UnbuiltResult: | |
| pass | |
| class ArtifactType(str, Enum): | |
| TEXT = "text" | |
| RECORD = "record" | |
| OBJECT = "object" | |
| ARRAY = "array" | |
| STREAM = "stream" | |
| UNKNOWN = "unknown" | |
| MESSAGE = "message" | |
| def validate_prompt(prompt: str): | |
| """Validate prompt.""" | |
| if extract_input_variables_from_prompt(prompt): | |
| return prompt | |
| return fix_prompt(prompt) | |
| def fix_prompt(prompt: str): | |
| """Fix prompt.""" | |
| return prompt + " {input}" | |
| def flatten_list(list_of_lists: list[list | Any]) -> list: | |
| """Flatten list of lists.""" | |
| new_list = [] | |
| for item in list_of_lists: | |
| if isinstance(item, list): | |
| new_list.extend(item) | |
| else: | |
| new_list.append(item) | |
| return new_list | |
| def serialize_field(value): | |
| """Serialize field. | |
| Unified serialization function for handling both BaseModel and Document types, | |
| including handling lists of these types. | |
| """ | |
| if isinstance(value, list | tuple): | |
| return [serialize_field(v) for v in value] | |
| if isinstance(value, Document): | |
| return value.to_json() | |
| if isinstance(value, BaseModel): | |
| return serialize_field(value.model_dump()) | |
| if isinstance(value, dict): | |
| return {k: serialize_field(v) for k, v in value.items()} | |
| if isinstance(value, V1BaseModel): | |
| if hasattr(value, "to_json"): | |
| return value.to_json() | |
| return value.dict() | |
| return str(value) | |
| def get_artifact_type(value, build_result) -> str: | |
| result = ArtifactType.UNKNOWN | |
| match value: | |
| case Data(): | |
| result = ArtifactType.RECORD | |
| case str(): | |
| result = ArtifactType.TEXT | |
| case dict(): | |
| result = ArtifactType.OBJECT | |
| case list(): | |
| result = ArtifactType.ARRAY | |
| case Message(): | |
| result = ArtifactType.MESSAGE | |
| if result == ArtifactType.UNKNOWN and ( | |
| isinstance(build_result, Generator) or (isinstance(value, Message) and isinstance(value.text, Generator)) | |
| ): | |
| result = ArtifactType.STREAM | |
| return result.value | |
| def post_process_raw(raw, artifact_type: str): | |
| if artifact_type == ArtifactType.STREAM.value: | |
| raw = "" | |
| return raw | |
| def _vertex_to_primitive_dict(target: Vertex) -> dict: | |
| """Cleans the parameters of the target vertex.""" | |
| # Removes all keys that the values aren't python types like str, int, bool, etc. | |
| params = { | |
| key: value for key, value in target.params.items() if isinstance(value, str | int | bool | float | list | dict) | |
| } | |
| # if it is a list we need to check if the contents are python types | |
| for key, value in params.items(): | |
| if isinstance(value, list): | |
| params[key] = [item for item in value if isinstance(item, str | int | bool | float | list | dict)] | |
| return params | |
| async def log_transaction( | |
| flow_id: str | UUID, source: Vertex, status, target: Vertex | None = None, error=None | |
| ) -> None: | |
| try: | |
| if not get_settings_service().settings.transactions_storage_enabled: | |
| return | |
| if not flow_id: | |
| if source.graph.flow_id: | |
| flow_id = source.graph.flow_id | |
| else: | |
| return | |
| inputs = _vertex_to_primitive_dict(source) | |
| transaction = TransactionBase( | |
| vertex_id=source.id, | |
| target_id=target.id if target else None, | |
| inputs=inputs, | |
| # ugly hack to get the model dump with weird datatypes | |
| outputs=json.loads(source.result.model_dump_json()) if source.result else None, | |
| status=status, | |
| error=error, | |
| flow_id=flow_id if isinstance(flow_id, UUID) else UUID(flow_id), | |
| ) | |
| async with async_session_getter(get_db_service()) as session: | |
| inserted = await crud_log_transaction(session, transaction) | |
| logger.debug(f"Logged transaction: {inserted.id}") | |
| except Exception: # noqa: BLE001 | |
| logger.exception("Error logging transaction") | |
| async def log_vertex_build( | |
| *, | |
| flow_id: str, | |
| vertex_id: str, | |
| valid: bool, | |
| params: Any, | |
| data: ResultDataResponse, | |
| artifacts: dict | None = None, | |
| ) -> None: | |
| try: | |
| if not get_settings_service().settings.vertex_builds_storage_enabled: | |
| return | |
| vertex_build = VertexBuildBase( | |
| flow_id=flow_id, | |
| id=vertex_id, | |
| valid=valid, | |
| params=str(params) if params else None, | |
| # ugly hack to get the model dump with weird datatypes | |
| data=json.loads(data.model_dump_json()), | |
| # ugly hack to get the model dump with weird datatypes | |
| artifacts=json.loads(json.dumps(artifacts, default=str)), | |
| ) | |
| async with async_session_getter(get_db_service()) as session: | |
| inserted = await crud_log_vertex_build(session, vertex_build) | |
| logger.debug(f"Logged vertex build: {inserted.build_id}") | |
| except Exception: # noqa: BLE001 | |
| logger.exception("Error logging vertex build") | |
| def rewrite_file_path(file_path: str): | |
| file_path = file_path.replace("\\", "/") | |
| if ":" in file_path: | |
| file_path = file_path.split(":", 1)[-1] | |
| file_path_split = [part for part in file_path.split("/") if part] | |
| if len(file_path_split) > 1: | |
| consistent_file_path = f"{file_path_split[-2]}/{file_path_split[-1]}" | |
| else: | |
| consistent_file_path = "/".join(file_path_split) | |
| return [consistent_file_path] | |