File size: 3,042 Bytes
9c6fc7f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
#!/usr/bin/env python3
"""Reset a failed document so it can be re-ingested.

This is for documents that ended up with e.g.:
  chunked = "error:<hash>"

It will:
- Find the document record by filename (basename match)
- Set document.chunked = NONE and clear document.processing_error
- Delete existing chunks for that document

Defaults use the same env vars as the rest of the tooling:
- KG_DB_URL (default ws://localhost:8000/rpc)
- DB_NS (default kaig)
- DB_NAME (default test_db)
- DB_USER/DB_PASS (default root/root)
"""

from __future__ import annotations

import argparse
import os
from pathlib import Path
from typing import Any

from surrealdb import Surreal


def _env(name: str, default: str) -> str:
    v = os.getenv(name)
    return v.strip() if isinstance(v, str) and v.strip() else default


def _query_rows(
    conn: Surreal, surql: str, vars: dict[str, Any] | None = None
) -> list[dict[str, Any]]:
    res = conn.query(surql, vars or {})
    if not isinstance(res, list):
        return []
    return [r for r in res if isinstance(r, dict)]


def main() -> int:
    parser = argparse.ArgumentParser(
        description="Reset a failed document for re-ingestion"
    )
    parser.add_argument(
        "filename", help="Filename as stored in SurrealDB (or a path; basename is used)"
    )
    parser.add_argument(
        "--force",
        action="store_true",
        help="Reset even if chunked does not start with 'error:'",
    )
    args = parser.parse_args()

    filename = Path(args.filename).name

    db_url = _env("KG_DB_URL", "ws://localhost:8000/rpc")
    db_ns = _env("DB_NS", "kaig")
    db_name = _env("DB_NAME", "test_db")
    db_user = _env("DB_USER", "root")
    db_pass = _env("DB_PASS", "root")

    conn = Surreal(db_url)
    conn.signin({"username": db_user, "password": db_pass})
    conn.use(db_ns, db_name)

    rows = _query_rows(
        conn,
        "SELECT id, filename, chunked, processing_error FROM document WHERE filename = $name LIMIT 1",
        {"name": filename},
    )
    if not rows:
        print(f"Not found: {filename}")
        return 2

    doc = rows[0]
    doc_id = doc.get("id")
    chunked = doc.get("chunked")

    if not args.force:
        if not (isinstance(chunked, str) and chunked.startswith("error:")):
            print(
                "Refusing to reset because chunked is not an error stamp. "
                "Use --force to override.\n"
                f"filename={filename} chunked={chunked!r}"
            )
            return 3

    print(f"Resetting: {filename}")
    print(f"  id={doc_id}")
    print(f"  chunked={chunked!r}")
    if doc.get("processing_error") is not None:
        print(f"  processing_error={doc.get('processing_error')!r}")

    conn.query(
        "UPDATE ONLY $id SET chunked = NONE, processing_error = NONE",
        {"id": doc_id},
    )
    conn.query("DELETE chunk WHERE doc = $id", {"id": doc_id})

    print("Done. Document is ready for re-ingestion.")
    return 0


if __name__ == "__main__":
    raise SystemExit(main())