|
|
import json |
|
|
from uuid import UUID |
|
|
|
|
|
import pytest |
|
|
from langflow.memory import aget_messages |
|
|
from langflow.services.database.models.flow import FlowCreate, FlowUpdate |
|
|
from orjson import orjson |
|
|
|
|
|
|
|
|
@pytest.mark.benchmark |
|
|
async def test_build_flow(client, json_memory_chatbot_no_llm, logged_in_headers): |
|
|
flow_id = await _create_flow(client, json_memory_chatbot_no_llm, logged_in_headers) |
|
|
|
|
|
async with client.stream("POST", f"api/v1/build/{flow_id}/flow", json={}, headers=logged_in_headers) as r: |
|
|
await consume_and_assert_stream(r) |
|
|
|
|
|
await check_messages(flow_id) |
|
|
|
|
|
|
|
|
@pytest.mark.benchmark |
|
|
async def test_build_flow_from_request_data(client, json_memory_chatbot_no_llm, logged_in_headers): |
|
|
flow_id = await _create_flow(client, json_memory_chatbot_no_llm, logged_in_headers) |
|
|
response = await client.get("api/v1/flows/" + str(flow_id), headers=logged_in_headers) |
|
|
flow_data = response.json() |
|
|
|
|
|
async with client.stream( |
|
|
"POST", f"api/v1/build/{flow_id}/flow", json={"data": flow_data["data"]}, headers=logged_in_headers |
|
|
) as r: |
|
|
await consume_and_assert_stream(r) |
|
|
|
|
|
await check_messages(flow_id) |
|
|
|
|
|
|
|
|
async def test_build_flow_with_frozen_path(client, json_memory_chatbot_no_llm, logged_in_headers): |
|
|
flow_id = await _create_flow(client, json_memory_chatbot_no_llm, logged_in_headers) |
|
|
|
|
|
response = await client.get("api/v1/flows/" + str(flow_id), headers=logged_in_headers) |
|
|
flow_data = response.json() |
|
|
flow_data["data"]["nodes"][0]["data"]["node"]["frozen"] = True |
|
|
response = await client.patch( |
|
|
f"api/v1/flows/{flow_id}", |
|
|
json=FlowUpdate(name="Flow", description="description", data=flow_data["data"]).model_dump(), |
|
|
headers=logged_in_headers, |
|
|
) |
|
|
response.raise_for_status() |
|
|
|
|
|
async with client.stream("POST", f"api/v1/build/{flow_id}/flow", json={}, headers=logged_in_headers) as r: |
|
|
await consume_and_assert_stream(r) |
|
|
|
|
|
await check_messages(flow_id) |
|
|
|
|
|
|
|
|
async def check_messages(flow_id): |
|
|
messages = await aget_messages(flow_id=UUID(flow_id), order="ASC") |
|
|
assert len(messages) == 2 |
|
|
assert messages[0].session_id == flow_id |
|
|
assert messages[0].sender == "User" |
|
|
assert messages[0].sender_name == "User" |
|
|
assert messages[0].text == "" |
|
|
assert messages[1].session_id == flow_id |
|
|
assert messages[1].sender == "Machine" |
|
|
assert messages[1].sender_name == "AI" |
|
|
|
|
|
|
|
|
async def consume_and_assert_stream(r): |
|
|
count = 0 |
|
|
async for line in r.aiter_lines(): |
|
|
|
|
|
if not line: |
|
|
continue |
|
|
parsed = json.loads(line) |
|
|
if count == 0: |
|
|
assert parsed["event"] == "vertices_sorted" |
|
|
ids = parsed["data"]["ids"] |
|
|
ids.sort() |
|
|
assert ids == ["ChatInput-CIGht"] |
|
|
|
|
|
to_run = parsed["data"]["to_run"] |
|
|
to_run.sort() |
|
|
assert to_run == ["ChatInput-CIGht", "ChatOutput-QA7ej", "Memory-amN4Z", "Prompt-iWbCC"] |
|
|
elif count > 0 and count < 5: |
|
|
assert parsed["event"] == "end_vertex" |
|
|
assert parsed["data"]["build_data"] is not None |
|
|
elif count == 5: |
|
|
assert parsed["event"] == "end" |
|
|
else: |
|
|
msg = f"Unexpected line: {line}" |
|
|
raise ValueError(msg) |
|
|
count += 1 |
|
|
|
|
|
|
|
|
async def _create_flow(client, json_memory_chatbot_no_llm, logged_in_headers): |
|
|
vector_store = orjson.loads(json_memory_chatbot_no_llm) |
|
|
data = vector_store["data"] |
|
|
vector_store = FlowCreate(name="Flow", description="description", data=data, endpoint_name="f") |
|
|
response = await client.post("api/v1/flows/", json=vector_store.model_dump(), headers=logged_in_headers) |
|
|
response.raise_for_status() |
|
|
return response.json()["id"] |
|
|
|