Spaces:
Sleeping
Knowledge Graph Example
This example builds a simple knowledge graph on top of SurrealDB from uploaded documents, using a data-flow pattern to drive ingestion.
At a high level:
- You upload a document to the FastAPI server (
/upload). - The server stores it in SurrealDB as a
documentrecord. - A background ingestion worker runs flows that:
- chunk documents into embedded
chunkrecords - infer
conceptnodes from each chunk and createMENTIONS_CONCEPTedges
- chunk documents into embedded
- A separate chat agent can retrieve relevant chunks (vector + graph context) and answer using the ingested documents.
Code layout
The package lives under examples/knowledge-graph/src/knowledge_graph/:
server.py- FastAPI application and lifecycle management.
- Starts the background ingestion loop on startup and stops it on shutdown.
- Creates a
flow.Executorbound to the SurrealDB connection.
ingestion.py- Defines the ingestion pipeline by registering flow handlers with
@exe.flow(...). - Current flows:
chunk: processesdocumentrecords that have not been chunked yetinfer_concepts: processeschunkrecords that have not had concepts inferred yet
- Defines the ingestion pipeline by registering flow handlers with
flow/executor.py: generic runtime for the data-flow pattern (polls DB for work, executes handlers, applies backoff when idle).definitions.py:Flow(Pydantic model) and shared types.
handlers/upload.py: receives a file and stores it as an original document in DB.chunk.py: converts and chunks documents, embeds chunks, inserts them.inference.py: uses the configured LLM to extract concepts and writeconceptnodes +MENTIONS_CONCEPTedges.
agent.py- PydanticAI agent with a
retrievetool. - Runs SurrealQL from
surql/search_chunks.surqlto fetch relevant chunks and supplies them as context to the model.
- PydanticAI agent with a
surql/- SurrealQL files for schema definition and retrieval queries.
Data-flow pattern (flow/stamp) used here
This example uses a database-driven data flow:
- Each step (“flow”) queries a DB table for records that are eligible for processing.
- The step writes results back to the DB.
- The step marks completion by setting a stamp field on the record.
Flow definition
Flows are registered via the @exe.flow(table=..., stamp=..., dependencies=..., priority=...)
decorator. Under the hood:
- The executor stores flow metadata in the
flowtable. - Each handler is assigned a stable hash derived from its compiled code. This hash is written into the record’s stamp field after processing.
Eligibility and dependencies
A record becomes a candidate when:
- its stamp field is
NONE(meaning “not yet processed by this flow”), and - all dependency fields (if any) are present (not
NONE).
This makes the pipeline restart-safe and incremental: if the server stops, it resumes based on DB state rather than in-memory state.
Stamping and idempotency
Each handler must set its stamp field, e.g.:
document.chunked = <flow_hash>chunk.concepts_inferred = <flow_hash>
This prevents reprocessing the same record and also makes changes traceable: if you update a flow function, its hash changes and you can see which records were processed by which version of the flow.
Run:
DB:
surreal start -u root -p root rocksdb:dbs/knowledge-graph
or use the helper script:
./scripts/run_surrealdb.sh
or just knowledge-graph-db from the repo base directory.
LLM + embeddings (Blablador)
This example uses OpenAI-compatible APIs. For Blablador, set:
export OPENAI_API_KEY="$BLABLADOR_API_KEY"
export OPENAI_BASE_URL="${BLABLADOR_BASE_URL:-https://api.helmholtz-blablador.fz-juelich.de/v1/}"
Defaults are alias-fast for chat and local sentence-transformers embeddings.
Override chat model and fallbacks if needed:
export KG_LLM_MODEL=alias-fast
export KG_LLM_FALLBACK_MODELS=alias-large,alias-code
export KG_CHAT_MODEL=alias-fast
To use local embeddings explicitly:
export KG_EMBEDDINGS_PROVIDER=sentence-transformers
export KG_LOCAL_EMBEDDINGS_MODEL=sentence-transformers/all-MiniLM-L6-v2
To try Blablador embeddings (may be unstable):
export KG_EMBEDDINGS_PROVIDER=openai
export KG_EMBEDDINGS_MODEL=alias-embeddings
Server and ingestion worker
DB_NAME=test_db uv run --env-file .env -- fastapi run examples/knowledge-graph/src/knowledge_graph/server.py --port 8080
By default, ingestion is disabled so the server can start quickly. To enable ingestion at startup:
export KG_ENABLE_INGESTION=true
Recommended flow for large uploads:
- Upload documents.
- Run ingestion separately.
To run ingestion separately (recommended for large backlogs):
./scripts/start_ingestion.sh
If you see WebSocket disconnects, switch to HTTP for the DB client:
export KG_DB_URL=http://localhost:8000
PDF converter selection
By default, the ingestion flow prefers Docling with no fallback. You can override the order with:
export KG_PDF_CONVERTER=docling
Other values: kreuzberg (prefer Kreuzberg), auto (try Kreuzberg first).
To enable fallback converters:
export KG_PDF_FALLBACK=true
Docling tokenizer configuration:
export KG_DOCLING_TOKENIZER=cl100k_base
Markdown ingestion
You can upload .md files directly; they are chunked locally without PDF
conversion. The uploader also guesses content types by filename when missing.
If a file comes through as application/octet-stream, the ingestion pipeline
will attempt to guess the type from the filename before converting.
Party plan metadata
The knowledge-graph example includes a metadata file for the 2026 party plan PDFs. It is used to expand acronyms and to surface plan URLs in answers:
examples/knowledge-graph/data/party_plan_metadata.json
or just knowledge-graph test_db from the repo base directory.
Chat agent
DB_NAME=test_db uv run --env-file .env uvicorn knowledge_graph.agent:app --host 127.0.0.1 --port 7932
Status check
./scripts/status_check.sh
This script now acts as a status checker and log tail helper. Logs are written
to logs/server.log and logs/ui.log.
Quickstart scripts
Start SurrealDB:
./scripts/run_surrealdb.sh
Start server (foreground):
./scripts/start_server.sh
Start server in background:
./scripts/start_server.sh -b
Upload PDFs/Markdowns:
./scripts/upload_pdfs.sh /path/to/folder
Run ingestion (process backlog):
./scripts/start_ingestion.sh
Start UI:
source scripts/start_ui.sh
Streamlit app (query-first)
Run locally (requires SurrealDB running):
export DB_NAME=test_db
streamlit run examples/knowledge-graph/streamlit_app.py
Uploads are limited to one PDF/Markdown at a time (default max 50 MB).
Ingestion runs in a background thread and writes logs to logs/ingestion.log.
The Streamlit UI can display party banner images using images/metadata.json.
Set the limit explicitly:
export KG_MAX_UPLOAD_MB=50
export STREAMLIT_SERVER_MAX_UPLOAD_SIZE=50
Docker (single container with SurrealDB inside):
docker build -f Dockerfile.streamlit -t kaig-streamlit .
docker run -p 8501:8501 \
-e BLABLADOR_API_KEY=... \
-e BLABLADOR_BASE_URL=https://api.helmholtz-blablador.fz-juelich.de/v1/ \
kaig-streamlit
The build expects dbs/knowledge-graph/ to be present in the build context.
SurrealDB retry settings (optional):
export KG_DB_RETRY_ATTEMPTS=3
export KG_DB_RETRY_DELAY=1.0
Check status:
./scripts/status_check.sh
Limit retrieval tool calls per question (default: 10):
export KG_MAX_RETRIEVE_CALLS=1
Tune search threshold and fallback:
export KG_SEARCH_THRESHOLD=0.15
export KG_SEARCH_FALLBACK=true
or just knowledge-graph-agent test_db from the repo base directory.
SurrealQL queries:
Visualise the graph:
SELECT *,
->MENTIONS_CONCEPT->concept as concepts
FROM chunk;
Flow status
This will show how many records have been processed by and are pending for each "flow".
LET $flows = SELECT * FROM flow;
RETURN $flows.fold([], |$a, $flow| {
LET $b = SELECT
$flow.id as flow,
type::field($flow.stamp) as flow_hash,
count() as count,
$flow.table as table
FROM type::table($flow.table)
GROUP BY flow_hash;
RETURN $a.concat($b)
});
Output example:
[
{
count: 1,
flow: flow:chunk,
flow_hash: NONE,
table: 'document'
},
{
count: 2,
flow: flow:chunk,
flow_hash: 'bbb6fe4b55cce1b3c8af0e7713a33d75',
table: 'document'
},
{
count: 4,
flow: flow:infer_concepts,
flow_hash: NONE,
table: 'chunk'
},
{
count: 27,
flow: flow:infer_concepts,
flow_hash: '75f90c71db9aeb2cf6f871ba1f75828c',
table: 'chunk'
}
]
Different hashes mean the records have been processed by different versions of the flow function. This can happen if the flow function has been updated.