ragtiquicIA / scripts /ingest_one.py
Santiago Casas
ingestion scripts
a830894
#!/usr/bin/env python3
"""Ingest a single file into the KG DB (without the flow executor).
This is useful for debugging/reprocessing a single problematic PDF (e.g. FA.pdf)
without running the full ingestion runner loop.
It will:
- Ensure the document exists in DB (store_original_document)
- Run chunking_handler on that document
- Mark document.chunked
- Run inference for all chunks with concepts_inferred IS NONE
- Mark chunk.concepts_inferred
Defaults:
- Uses KG_DB_URL / DB_NAME / DB_NS / DB_USER / DB_PASS
- Uses KG_PDF_CONVERTER / KG_PDF_FALLBACK from environment
"""
from __future__ import annotations
import argparse
import mimetypes
import os
from pathlib import Path
from knowledge_graph.db import init_db
from knowledge_graph.definitions import Chunk
from knowledge_graph.handlers.chunk import chunking_handler
from knowledge_graph.handlers.inference import inferrence_handler
def main() -> int:
parser = argparse.ArgumentParser(description="Ingest a single PDF/MD file")
parser.add_argument("path", help="Path to the file (pdf/md)")
parser.add_argument(
"--stamp",
default="manual",
help="Stamp value written to document.chunked and chunk.concepts_inferred",
)
args = parser.parse_args()
file_path = Path(args.path)
if not file_path.exists():
raise SystemExit(f"File not found: {file_path}")
ctype = mimetypes.guess_type(file_path.name)[0]
if not ctype:
if file_path.suffix.lower() == ".pdf":
ctype = "application/pdf"
else:
ctype = "text/plain"
db_name = os.getenv("DB_NAME", "test_db")
db = init_db(init_llm=True, db_name=db_name, init_indexes=False)
doc, cached = db.store_original_document(str(file_path), ctype)
print(f"Document: {doc.id} filename={doc.filename} cached={cached}")
print(
f"Converter: {os.getenv('KG_PDF_CONVERTER', 'docling')} fallback={os.getenv('KG_PDF_FALLBACK', 'true')}"
)
try:
chunking_handler(db, doc)
db.sync_conn.query(
"UPDATE $rec SET chunked = $hash, processing_error = NONE",
{"rec": doc.id, "hash": args.stamp},
)
except Exception as exc:
db.sync_conn.query(
"UPDATE $rec SET chunked = $hash, processing_error = $err",
{"rec": doc.id, "hash": f"error:{args.stamp}", "err": str(exc)},
)
raise
chunks = db.query(
"""SELECT * FROM chunk
WHERE doc = $doc AND concepts_inferred IS NONE
ORDER BY index ASC
""",
{"doc": doc.id},
dict,
)
print(f"Chunks pending inference: {len(chunks)}")
for i, chunk_data in enumerate(chunks, start=1):
chunk = Chunk.model_validate(chunk_data)
_ = inferrence_handler(db, chunk)
db.sync_conn.query(
"UPDATE $rec SET concepts_inferred = $hash, processing_error = NONE",
{"rec": chunk.id, "hash": args.stamp},
)
if i % 10 == 0:
print(f"Inferred {i}/{len(chunks)}")
print("Done")
return 0
if __name__ == "__main__":
raise SystemExit(main())