Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| """Reset a failed document so it can be re-ingested. | |
| This is for documents that ended up with e.g.: | |
| chunked = "error:<hash>" | |
| It will: | |
| - Find the document record by filename (basename match) | |
| - Set document.chunked = NONE and clear document.processing_error | |
| - Delete existing chunks for that document | |
| Defaults use the same env vars as the rest of the tooling: | |
| - KG_DB_URL (default ws://localhost:8000/rpc) | |
| - DB_NS (default kaig) | |
| - DB_NAME (default test_db) | |
| - DB_USER/DB_PASS (default root/root) | |
| """ | |
| from __future__ import annotations | |
| import argparse | |
| import os | |
| from pathlib import Path | |
| from typing import Any | |
| from surrealdb import Surreal | |
| def _env(name: str, default: str) -> str: | |
| v = os.getenv(name) | |
| return v.strip() if isinstance(v, str) and v.strip() else default | |
| def _query_rows( | |
| conn: Surreal, surql: str, vars: dict[str, Any] | None = None | |
| ) -> list[dict[str, Any]]: | |
| res = conn.query(surql, vars or {}) | |
| if not isinstance(res, list): | |
| return [] | |
| return [r for r in res if isinstance(r, dict)] | |
| def main() -> int: | |
| parser = argparse.ArgumentParser( | |
| description="Reset a failed document for re-ingestion" | |
| ) | |
| parser.add_argument( | |
| "filename", help="Filename as stored in SurrealDB (or a path; basename is used)" | |
| ) | |
| parser.add_argument( | |
| "--force", | |
| action="store_true", | |
| help="Reset even if chunked does not start with 'error:'", | |
| ) | |
| args = parser.parse_args() | |
| filename = Path(args.filename).name | |
| db_url = _env("KG_DB_URL", "ws://localhost:8000/rpc") | |
| db_ns = _env("DB_NS", "kaig") | |
| db_name = _env("DB_NAME", "test_db") | |
| db_user = _env("DB_USER", "root") | |
| db_pass = _env("DB_PASS", "root") | |
| conn = Surreal(db_url) | |
| conn.signin({"username": db_user, "password": db_pass}) | |
| conn.use(db_ns, db_name) | |
| rows = _query_rows( | |
| conn, | |
| "SELECT id, filename, chunked, processing_error FROM document WHERE filename = $name LIMIT 1", | |
| {"name": filename}, | |
| ) | |
| if not rows: | |
| print(f"Not found: {filename}") | |
| return 2 | |
| doc = rows[0] | |
| doc_id = doc.get("id") | |
| chunked = doc.get("chunked") | |
| if not args.force: | |
| if not (isinstance(chunked, str) and chunked.startswith("error:")): | |
| print( | |
| "Refusing to reset because chunked is not an error stamp. " | |
| "Use --force to override.\n" | |
| f"filename={filename} chunked={chunked!r}" | |
| ) | |
| return 3 | |
| print(f"Resetting: {filename}") | |
| print(f" id={doc_id}") | |
| print(f" chunked={chunked!r}") | |
| if doc.get("processing_error") is not None: | |
| print(f" processing_error={doc.get('processing_error')!r}") | |
| conn.query( | |
| "UPDATE ONLY $id SET chunked = NONE, processing_error = NONE", | |
| {"id": doc_id}, | |
| ) | |
| conn.query("DELETE chunk WHERE doc = $id", {"id": doc_id}) | |
| print("Done. Document is ready for re-ingestion.") | |
| return 0 | |
| if __name__ == "__main__": | |
| raise SystemExit(main()) | |