Santiago Casas
running HF streamlit with files
cf450f7
from kaig.db import DB
from ..definitions import Record
from ..executor import Executor
def test_flow():
db = DB("mem://", "root", "root", "kaig", "test-flow")
exe = Executor(db)
_ = db.sync_conn.query("CREATE document SET text = 'hello'")
_ = db.sync_conn.query("CREATE document")
# Forced priorities just to test that on the first run metadata_flow does
# not process any records
@exe.flow(
table="document",
stamp="chunked",
dependencies=["text"],
priority=1,
)
def chunk_flow(record: Record, _hash: str): # pyright: ignore[reportUnusedFunction]
# create chunk
_ = db.sync_conn.query(
"CREATE chunk SET text = $text, document = $document",
{"text": record["text"], "document": record["id"]},
)
# set output field so it's not reprocessed again
_ = db.sync_conn.query(
"UPDATE $rec_id SET chunked = true", {"rec_id": record["id"]}
)
@exe.flow(table="chunk", stamp="meta", priority=2)
def metadata_flow(record: Record, _hash: str): # pyright: ignore[reportUnusedFunction]
# set output field so it's not reprocessed again
_ = db.sync_conn.query(
"UPDATE $rec_id SET meta = {time: time::now()}",
{"rec_id": record["id"]},
)
res = db.query("SELECT * FROM document WHERE chunked = true", {}, dict)
assert len(res) == 0 # pyright: ignore[reportUnknownArgumentType]
results = exe.execute_flows_once()
assert results["chunk_flow"] == 1
assert results["metadata_flow"] == 0
res = db.query("SELECT * FROM document WHERE chunked = true", {}, dict)
assert len(res) == 1 # pyright: ignore[reportUnknownArgumentType]
res = db.query("SELECT * FROM chunk WHERE meta IS NOT NONE", {}, dict)
assert len(res) == 0 # pyright: ignore[reportUnknownArgumentType]
results = exe.execute_flows_once()
assert results["chunk_flow"] == 0
assert results["metadata_flow"] == 1
res = db.query("SELECT * FROM chunk WHERE meta IS NOT NONE", {}, dict)
assert len(res) == 1 # pyright: ignore[reportUnknownArgumentType]
results = exe.execute_flows_once()
assert results["chunk_flow"] == 0
assert results["metadata_flow"] == 0