File size: 1,336 Bytes
cf450f7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
import logging

from pydantic import TypeAdapter

# from demo_unstruct_to_graph.queue import process_task, take_task
from kaig.definitions import OriginalDocument

from . import flow
from .definitions import Chunk
from .handlers.chunk import chunking_handler
from .handlers.inference import inferrence_handler

OriginalDocumentTA = TypeAdapter(OriginalDocument)

logger = logging.getLogger(__name__)


async def ingestion_loop(exe: flow.Executor):
    db = exe.db

    @exe.flow("document", stamp="chunked", priority=2)
    def chunk(record: flow.Record, hash: str):  # pyright: ignore[reportUnusedFunction]
        doc = OriginalDocumentTA.validate_python(record)

        chunking_handler(db, doc)

        # set output field so it's not reprocessed again
        _ = db.sync_conn.query(
            "UPDATE $rec SET chunked = $hash", {"rec": doc.id, "hash": hash}
        )

    @exe.flow("chunk", stamp="concepts_inferred")
    def infer_concepts(record: flow.Record, hash: str):  # pyright: ignore[reportUnusedFunction]
        chunk = Chunk.model_validate(record)

        _ = inferrence_handler(db, chunk)

        # set output field so it's not reprocessed again
        _ = db.sync_conn.query(
            "UPDATE $rec SET concepts_inferred = $hash",
            {"rec": chunk.id, "hash": hash},
        )

    await exe.run()