File size: 2,284 Bytes
cf450f7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
from kaig.db import DB

from ..definitions import Record
from ..executor import Executor


def test_flow():
    db = DB("mem://", "root", "root", "kaig", "test-flow")
    exe = Executor(db)

    _ = db.sync_conn.query("CREATE document SET text = 'hello'")
    _ = db.sync_conn.query("CREATE document")

    # Forced priorities just to test that on the first run metadata_flow does
    # not process any records
    @exe.flow(
        table="document",
        stamp="chunked",
        dependencies=["text"],
        priority=1,
    )
    def chunk_flow(record: Record, _hash: str):  # pyright: ignore[reportUnusedFunction]
        # create chunk
        _ = db.sync_conn.query(
            "CREATE chunk SET text = $text, document = $document",
            {"text": record["text"], "document": record["id"]},
        )
        # set output field so it's not reprocessed again
        _ = db.sync_conn.query(
            "UPDATE $rec_id SET chunked = true", {"rec_id": record["id"]}
        )

    @exe.flow(table="chunk", stamp="meta", priority=2)
    def metadata_flow(record: Record, _hash: str):  # pyright: ignore[reportUnusedFunction]
        # set output field so it's not reprocessed again
        _ = db.sync_conn.query(
            "UPDATE $rec_id SET meta = {time: time::now()}",
            {"rec_id": record["id"]},
        )

    res = db.query("SELECT * FROM document WHERE chunked = true", {}, dict)
    assert len(res) == 0  # pyright: ignore[reportUnknownArgumentType]

    results = exe.execute_flows_once()
    assert results["chunk_flow"] == 1
    assert results["metadata_flow"] == 0

    res = db.query("SELECT * FROM document WHERE chunked = true", {}, dict)
    assert len(res) == 1  # pyright: ignore[reportUnknownArgumentType]
    res = db.query("SELECT * FROM chunk WHERE meta IS NOT NONE", {}, dict)
    assert len(res) == 0  # pyright: ignore[reportUnknownArgumentType]

    results = exe.execute_flows_once()
    assert results["chunk_flow"] == 0
    assert results["metadata_flow"] == 1

    res = db.query("SELECT * FROM chunk WHERE meta IS NOT NONE", {}, dict)
    assert len(res) == 1  # pyright: ignore[reportUnknownArgumentType]

    results = exe.execute_flows_once()
    assert results["chunk_flow"] == 0
    assert results["metadata_flow"] == 0