Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """Ingest a single file into the KG DB (without the flow executor). | |
| This is useful for debugging/reprocessing a single problematic PDF (e.g. FA.pdf) | |
| without running the full ingestion runner loop. | |
| It will: | |
| - Ensure the document exists in DB (store_original_document) | |
| - Run chunking_handler on that document | |
| - Mark document.chunked | |
| - Run inference for all chunks with concepts_inferred IS NONE | |
| - Mark chunk.concepts_inferred | |
| Defaults: | |
| - Uses KG_DB_URL / DB_NAME / DB_NS / DB_USER / DB_PASS | |
| - Uses KG_PDF_CONVERTER / KG_PDF_FALLBACK from environment | |
| """ | |
| from __future__ import annotations | |
| import argparse | |
| import mimetypes | |
| import os | |
| from pathlib import Path | |
| from knowledge_graph.db import init_db | |
| from knowledge_graph.definitions import Chunk | |
| from knowledge_graph.handlers.chunk import chunking_handler | |
| from knowledge_graph.handlers.inference import inferrence_handler | |
| def main() -> int: | |
| parser = argparse.ArgumentParser(description="Ingest a single PDF/MD file") | |
| parser.add_argument("path", help="Path to the file (pdf/md)") | |
| parser.add_argument( | |
| "--stamp", | |
| default="manual", | |
| help="Stamp value written to document.chunked and chunk.concepts_inferred", | |
| ) | |
| args = parser.parse_args() | |
| file_path = Path(args.path) | |
| if not file_path.exists(): | |
| raise SystemExit(f"File not found: {file_path}") | |
| ctype = mimetypes.guess_type(file_path.name)[0] | |
| if not ctype: | |
| if file_path.suffix.lower() == ".pdf": | |
| ctype = "application/pdf" | |
| else: | |
| ctype = "text/plain" | |
| db_name = os.getenv("DB_NAME", "test_db") | |
| db = init_db(init_llm=True, db_name=db_name, init_indexes=False) | |
| doc, cached = db.store_original_document(str(file_path), ctype) | |
| print(f"Document: {doc.id} filename={doc.filename} cached={cached}") | |
| print( | |
| f"Converter: {os.getenv('KG_PDF_CONVERTER', 'docling')} fallback={os.getenv('KG_PDF_FALLBACK', 'true')}" | |
| ) | |
| try: | |
| chunking_handler(db, doc) | |
| db.sync_conn.query( | |
| "UPDATE $rec SET chunked = $hash, processing_error = NONE", | |
| {"rec": doc.id, "hash": args.stamp}, | |
| ) | |
| except Exception as exc: | |
| db.sync_conn.query( | |
| "UPDATE $rec SET chunked = $hash, processing_error = $err", | |
| {"rec": doc.id, "hash": f"error:{args.stamp}", "err": str(exc)}, | |
| ) | |
| raise | |
| chunks = db.query( | |
| """SELECT * FROM chunk | |
| WHERE doc = $doc AND concepts_inferred IS NONE | |
| ORDER BY index ASC | |
| """, | |
| {"doc": doc.id}, | |
| dict, | |
| ) | |
| print(f"Chunks pending inference: {len(chunks)}") | |
| for i, chunk_data in enumerate(chunks, start=1): | |
| chunk = Chunk.model_validate(chunk_data) | |
| _ = inferrence_handler(db, chunk) | |
| db.sync_conn.query( | |
| "UPDATE $rec SET concepts_inferred = $hash, processing_error = NONE", | |
| {"rec": chunk.id, "hash": args.stamp}, | |
| ) | |
| if i % 10 == 0: | |
| print(f"Inferred {i}/{len(chunks)}") | |
| print("Done") | |
| return 0 | |
| if __name__ == "__main__": | |
| raise SystemExit(main()) | |