Spaces:
Sleeping
Sleeping
Flow
This package provides a small orchestrator to run lightweight, database-backed
flows on SurrealDB records. It exposes an Executor that registers flow
handlers, selects eligible records, and prevents repeat processing by requiring
handlers to mark their output field.
Core ideas
- Flows are declarative. Use
@executor.flow(table, stamp, dependencies, priority)to describe which table to watch, which field signals completion, required dependencies, and optional priority ordering. Flow definitions are stored in theflowtable for observability. - Handlers do the work. Decorated functions receive a record dictionary and should perform side effects (e.g., creating related rows) and then update the configured output field so the record is not reprocessed.
- Execution loops are flexible. Call
execute_flows_once()to process any ready records one time, orawait executor.run()to keep polling with exponential backoff untilexecutor.stop()is called.
Example
from demo_unstruct_to_graph import flow
from kaig.db import DB
db = DB("mem://", "root", "root", "kaig", "demo")
executor = flow.Executor(db)
@executor.flow(table="document", stamp="chunked", dependencies=["text"])
def chunk(record: flow.Record):
_ = db.sync_conn.query(
"CREATE chunk SET text = $text, document = $document",
{"text": record["text"], "document": record["id"]},
)
# set output field so it's not reprocessed again
_ = db.sync_conn.query("UPDATE $rec SET chunked = true", {"rec": record["id"]})
results = executor.execute_flows_once()
# results => {"chunk": processed_count}
See flow/tests/flow_test.py for a complete chained example that first chunks
new documents and then enriches the resulting chunks.