ragtiquicIA / scripts /ingest.sh
Santiago Casas
ingestion scripts
a830894
#!/usr/bin/env bash
set -euo pipefail
# Ingest PDFs/Markdown into the local SurrealDB-backed KG, then auto-stop when
# the queue is empty. Designed for local workflows where the DB lives in the
# HF dataset clone at /home/casas/AI/politicaldatabase.
ROOT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd)"
EXAMPLES_DIR="${ROOT_DIR}/examples/knowledge-graph"
DB_NAME="${DB_NAME:-test_db}"
KG_DB_URL="${KG_DB_URL:-ws://localhost:8000/rpc}"
KG_DOCLING_TOKENIZER="${KG_DOCLING_TOKENIZER:-cl100k_base}"
# PDF conversion defaults: prefer the fast path (no OCR).
KG_PDF_CONVERTER="${KG_PDF_CONVERTER:-kreuzberg}"
KG_PDF_FALLBACK="${KG_PDF_FALLBACK:-false}"
KG_LLM_MODEL_DEFAULT="7 - Qwen3-Coder-30B-A3B-Instruct - A code model from August 2025"
KG_LLM_FALLBACK_MODELS_DEFAULT="0 - Ministral-3-14B-Instruct-2512 - The latest Ministral from Dec.2.2025,15 - Apertus-8B-Instruct-2509 - A new swiss model from September 2025,8 GLM-4.7-Flash,Phi-4-multimodal-instruct"
KG_LLM_MODEL="${KG_LLM_MODEL:-${KG_LLM_MODEL_DEFAULT}}"
KG_LLM_FALLBACK_MODELS="${KG_LLM_FALLBACK_MODELS:-${KG_LLM_FALLBACK_MODELS_DEFAULT}}"
CHECK_INTERVAL_S="${CHECK_INTERVAL_S:-10}"
EMPTY_CHECKS_REQUIRED="${EMPTY_CHECKS_REQUIRED:-3}"
SECRETS_PATH="${SECRETS_PATH:-${ROOT_DIR}/.streamlit/secrets.toml}"
usage() {
cat <<'EOF'
Usage:
scripts/ingest.sh <file-or-dir> [<file-or-dir> ...]
Accepts PDF/MD files and/or directories (recursively ingests *.pdf, *.md, *.markdown).
Environment:
DB_NAME (default: test_db)
KG_DB_URL (default: ws://localhost:8000/rpc)
KG_LLM_MODEL (default: Qwen3-Coder...)
KG_LLM_FALLBACK_MODELS (default: Ministral,Apertus,GLM,Phi-4-mm)
KG_PDF_CONVERTER (default: kreuzberg)
KG_PDF_FALLBACK (default: false)
CHECK_INTERVAL_S (default: 10)
EMPTY_CHECKS_REQUIRED (default: 3)
SECRETS_PATH (default: .streamlit/secrets.toml)
EOF
}
if [ "$#" -lt 1 ]; then
usage
exit 2
fi
load_secrets() {
if [ ! -f "${SECRETS_PATH}" ]; then
return 0
fi
python3 - <<PY
import os
import tomllib
from pathlib import Path
path = Path(os.environ.get("SECRETS_PATH", ""))
if not path.exists():
raise SystemExit(0)
try:
with path.open("rb") as f:
data = tomllib.load(f)
except Exception:
raise SystemExit(0)
if not isinstance(data, dict):
raise SystemExit(0)
def emit(key: str) -> None:
val = data.get(key)
if isinstance(val, str) and val:
# shell-safe single-quoted export (escape single quotes)
v = val.replace("'", "'\\''")
print(f"export {key}='{v}'")
emit("BLABLADOR_API_KEY")
emit("BLABLADOR_BASE_URL")
PY
}
echo "Loading secrets from: ${SECRETS_PATH}"
export SECRETS_PATH
eval "$(load_secrets || true)"
export DB_NAME
export KG_DB_URL
export KG_DOCLING_TOKENIZER
export KG_PDF_CONVERTER
export KG_PDF_FALLBACK
export KG_LLM_MODEL
export KG_LLM_FALLBACK_MODELS
echo "Starting SurrealDB (docker)..."
"${ROOT_DIR}/scripts/run_surrealdb_local.sh" >/dev/null
collect_files() {
local input="$1"
if [ -d "${input}" ]; then
# PDFs + markdown
find "${input}" -type f \( \
-iname "*.pdf" -o -iname "*.md" -o -iname "*.markdown" \
\) -print
return 0
fi
if [ -f "${input}" ]; then
printf '%s\n' "${input}"
return 0
fi
return 1
}
files=()
while [ "$#" -gt 0 ]; do
p="$1"
shift
if ! out=$(collect_files "${p}" 2>/dev/null); then
echo "Skipping (not found): ${p}" >&2
continue
fi
while IFS= read -r f; do
files+=("$f")
done <<<"$out"
done
if [ "${#files[@]}" -eq 0 ]; then
echo "No files to ingest." >&2
exit 2
fi
echo "Will ingest ${#files[@]} file(s)."
insert_one() {
local file_path="$1"
# Resolve to absolute path because we run Python from a different working dir.
file_path="$(python3 -c 'import os,sys; print(os.path.abspath(sys.argv[1]))' "$file_path")"
if [ ! -f "${file_path}" ]; then
echo "Skipping (missing): ${file_path}" >&2
return 0
fi
local filename
filename="$(basename "${file_path}")"
case "${filename,,}" in
*.pdf) ctype="application/pdf" ;;
*.md|*.markdown) ctype="text/markdown" ;;
*)
echo "Skipping unsupported file: ${file_path}" >&2
return 0
;;
esac
(cd "${EXAMPLES_DIR}" && uv run python - "$file_path" "$ctype" <<'PY'
import os
import sys
from pathlib import Path
from knowledge_graph.db import init_db
file_path = Path(sys.argv[1])
content_type = sys.argv[2]
db = init_db(
init_llm=True,
db_name=os.environ.get("DB_NAME", "test_db"),
init_indexes=False,
)
doc, cached = db.store_original_document(str(file_path), content_type)
print(("cached" if cached else "inserted"), str(doc.id), file_path.name)
PY
)
}
for f in "${files[@]}"; do
insert_one "${f}"
done
echo "Starting ingestion runner..."
(cd "${EXAMPLES_DIR}" && uv run python -m knowledge_graph.ingestion_runner) &
runner_pid=$!
cleanup() {
if kill -0 "${runner_pid}" 2>/dev/null; then
kill -INT "${runner_pid}" 2>/dev/null || true
wait "${runner_pid}" 2>/dev/null || true
fi
}
trap cleanup EXIT
empty_hits=0
while true; do
sleep "${CHECK_INTERVAL_S}"
counts=$(cd "${EXAMPLES_DIR}" && uv run python - <<'PY'
import os
import time
from surrealdb import Surreal
db_url = os.getenv("KG_DB_URL", "ws://localhost:8000/rpc")
db_ns = os.getenv("DB_NS", "kaig")
db_name = os.getenv("DB_NAME", "test_db")
db_user = os.getenv("DB_USER", "root")
db_pass = os.getenv("DB_PASS", "root")
def count(conn: Surreal, q: str) -> int:
res = conn.query(q)
if not isinstance(res, list) or not res:
return 0
row = res[0]
if isinstance(row, dict) and "count" in row:
try:
return int(row["count"])
except Exception:
return 0
return 0
last_exc: Exception | None = None
for _ in range(3):
try:
conn = Surreal(db_url)
conn.signin({"username": db_user, "password": db_pass})
conn.use(db_ns, db_name)
pending_docs = count(
conn,
"SELECT count() AS count FROM document WHERE chunked IS NONE GROUP ALL",
)
pending_chunks = count(
conn,
"SELECT count() AS count FROM chunk WHERE concepts_inferred IS NONE GROUP ALL",
)
print(f"{pending_docs} {pending_chunks}")
raise SystemExit(0)
except Exception as exc:
last_exc = exc
time.sleep(2)
# Signal failure to the bash loop; it will retry next iteration.
print("-1 -1")
PY
)
pending_docs="${counts%% *}"
pending_chunks="${counts##* }"
if [ "${pending_docs}" = "-1" ] || [ "${pending_chunks}" = "-1" ]; then
echo "Pending: documents=? chunks=? (SurrealDB not responding yet)"
continue
fi
echo "Pending: documents=${pending_docs} chunks=${pending_chunks}"
if [ "${pending_docs}" = "0" ] && [ "${pending_chunks}" = "0" ]; then
empty_hits=$((empty_hits + 1))
echo "Empty check ${empty_hits}/${EMPTY_CHECKS_REQUIRED}"
else
empty_hits=0
fi
if [ "${empty_hits}" -ge "${EMPTY_CHECKS_REQUIRED}" ]; then
echo "Queue is empty. Stopping ingestion runner..."
kill -INT "${runner_pid}" 2>/dev/null || true
wait "${runner_pid}" 2>/dev/null || true
break
fi
if ! kill -0 "${runner_pid}" 2>/dev/null; then
echo "Ingestion runner exited."
break
fi
done
echo "Done. Next steps:"
echo " 1) Stop SurrealDB: docker stop ragtiquicia-surrealdb"
echo " 2) Commit+push dataset DB: cd /home/casas/AI/politicaldatabase && git add dbs/knowledge-graph && git commit -m 'Ingest documents' && git push"