Spaces:
Sleeping
Sleeping
| from kaig.db import DB | |
| from ..definitions import Record | |
| from ..executor import Executor | |
| def test_flow(): | |
| db = DB("mem://", "root", "root", "kaig", "test-flow") | |
| exe = Executor(db) | |
| _ = db.sync_conn.query("CREATE document SET text = 'hello'") | |
| _ = db.sync_conn.query("CREATE document") | |
| # Forced priorities just to test that on the first run metadata_flow does | |
| # not process any records | |
| def chunk_flow(record: Record, _hash: str): # pyright: ignore[reportUnusedFunction] | |
| # create chunk | |
| _ = db.sync_conn.query( | |
| "CREATE chunk SET text = $text, document = $document", | |
| {"text": record["text"], "document": record["id"]}, | |
| ) | |
| # set output field so it's not reprocessed again | |
| _ = db.sync_conn.query( | |
| "UPDATE $rec_id SET chunked = true", {"rec_id": record["id"]} | |
| ) | |
| def metadata_flow(record: Record, _hash: str): # pyright: ignore[reportUnusedFunction] | |
| # set output field so it's not reprocessed again | |
| _ = db.sync_conn.query( | |
| "UPDATE $rec_id SET meta = {time: time::now()}", | |
| {"rec_id": record["id"]}, | |
| ) | |
| res = db.query("SELECT * FROM document WHERE chunked = true", {}, dict) | |
| assert len(res) == 0 # pyright: ignore[reportUnknownArgumentType] | |
| results = exe.execute_flows_once() | |
| assert results["chunk_flow"] == 1 | |
| assert results["metadata_flow"] == 0 | |
| res = db.query("SELECT * FROM document WHERE chunked = true", {}, dict) | |
| assert len(res) == 1 # pyright: ignore[reportUnknownArgumentType] | |
| res = db.query("SELECT * FROM chunk WHERE meta IS NOT NONE", {}, dict) | |
| assert len(res) == 0 # pyright: ignore[reportUnknownArgumentType] | |
| results = exe.execute_flows_once() | |
| assert results["chunk_flow"] == 0 | |
| assert results["metadata_flow"] == 1 | |
| res = db.query("SELECT * FROM chunk WHERE meta IS NOT NONE", {}, dict) | |
| assert len(res) == 1 # pyright: ignore[reportUnknownArgumentType] | |
| results = exe.execute_flows_once() | |
| assert results["chunk_flow"] == 0 | |
| assert results["metadata_flow"] == 0 | |