nerserver / tests /test_reextract.py
Robin
refactor: remove httpx proxy, call GLiNER model directly
bfeb704
Raw
History Blame Contribute Delete
5.59 kB
"""Integration test: login → trigger re-extraction → poll until done.
Usage:
python tests/test_reextract.py
Requires:
- AI-KB backend running at http://localhost:8000
"""
from __future__ import annotations
import sys
import time
import httpx
BASE = "http://localhost:8000/v1"
EMAIL = "admin@neargo.ai"
PASSWORD = "NearGo2026!"
ORG_ID = "36f377e9-4e49-4b83-b7b0-681b973aa734" # NearGo Admin org
POLL_INTERVAL = 3 # seconds between trace polls
TIMEOUT = 600 # seconds total
def step(msg: str) -> None:
print(f"\n{'='*60}\n{msg}\n{'='*60}")
def login(client: httpx.Client) -> str:
step("1. Login")
r = client.post(f"{BASE}/auth/login", json={"email": EMAIL, "password": PASSWORD})
r.raise_for_status()
token = r.json()["access_token"]
print(f" OK token: {token[:40]}...")
return token
def pick_doc(client: httpx.Client) -> str:
step(f"2. List docs in org {ORG_ID}")
r = client.get(f"{BASE}/orgs/{ORG_ID}/docs")
r.raise_for_status()
items = r.json() if isinstance(r.json(), list) else r.json().get("items", [])
if not items:
print(" No docs found — aborting.")
sys.exit(1)
doc = items[0]
doc_id = doc.get("id") or doc.get("doc_id")
title = doc.get("title") or doc.get("name") or "(untitled)"
print(f" Using doc: {doc_id} title={title!r}")
return doc_id
def trigger_reparse(client: httpx.Client, doc_id: str) -> float:
step(f"3. Trigger reparse doc={doc_id}")
t_trigger = time.monotonic()
r = client.post(f"{BASE}/docs/{doc_id}/reparse")
print(f" status: {r.status_code}")
if r.status_code not in (200, 202):
print(f" body: {r.text}")
r.raise_for_status()
else:
print(f" body: {r.json()}")
return t_trigger
def poll_trace(client: httpx.Client, doc_id: str, t_trigger: float) -> None:
step(f"4. Polling traces doc={doc_id}")
deadline = time.monotonic() + TIMEOUT
terminal = {"completed", "failed", "error", "done", "success", "cancelled"}
last_step = None
status = "unknown"
data: dict = {}
seen_running = False # must see "running/processing" before accepting terminal
while time.monotonic() < deadline:
try:
r = client.get(f"{BASE}/docs/{doc_id}/traces/latest", timeout=10)
except httpx.TimeoutException:
print(f" [{time.strftime('%H:%M:%S')}] (request timed out, backend may be busy)")
time.sleep(POLL_INTERVAL)
continue
if r.status_code == 404:
print(f" [{time.strftime('%H:%M:%S')}] (trace not yet created, waiting...)")
time.sleep(POLL_INTERVAL)
continue
r.raise_for_status()
data = r.json()
status = (
data.get("status")
or data.get("state")
or data.get("pipeline_status")
or "unknown"
)
current_step = data.get("current_step") or data.get("stage") or data.get("step") or ""
action = (data.get("current_action") or "")[:70]
elapsed = data.get("elapsed_seconds") or data.get("elapsed") or data.get("duration_s") or "?"
# Track: once we see running, a subsequent terminal state is real.
if status.lower() in ("running", "processing", "queued"):
seen_running = True
if current_step != last_step:
print(f" [{time.strftime('%H:%M:%S')}] status={status} step={current_step} action={action}")
last_step = current_step
if status.lower() in terminal:
# If we've seen running/processing at least once, this is a genuine completion.
# If not, the "failed" may be a stale pre-reparse entry — wait for the
# new pipeline to start writing its own trace.
elapsed_since_trigger = time.monotonic() - t_trigger
if seen_running or elapsed_since_trigger > 15:
print(f"\n FINAL STATUS: {status} (elapsed_since_trigger={elapsed_since_trigger:.0f}s)")
_print_summary(data)
return
else:
print(f" [{time.strftime('%H:%M:%S')}] status={status} but waiting for new pipeline to start...")
time.sleep(POLL_INTERVAL)
print(f"\n TIMEOUT after {TIMEOUT}s — last status: {status}")
_print_summary(data)
def _print_summary(data: dict) -> None:
keys = [
"trace_id", "file_name", "duration_s",
"entities_count", "triples_count", "chunks_count",
"error",
]
for k in keys:
if k in data and data[k] not in (None, "", 0, {}):
print(f" {k}: {data[k]}")
g = data.get("graph", {})
if g:
print(f" graph.status: {g.get('status')} triples_written: {g.get('triples_written', 0)}"
f" chunks_ok: {g.get('chunks_ok', 0)}/{g.get('chunks_total', 0)}")
ch = data.get("chunk", {})
if ch.get("total"):
print(f" chunk.total: {ch['total']} by_level: {ch.get('by_level', {})}")
em = data.get("embed", {})
if em.get("vectors_indexed"):
print(f" embed.vectors_indexed: {em['vectors_indexed']} dur: {em.get('duration_s', 0):.1f}s")
def main() -> None:
print(f"Target: {BASE}")
print(f"Org: {ORG_ID}")
with httpx.Client(timeout=30) as client:
token = login(client)
client.headers["Authorization"] = f"Bearer {token}"
doc_id = pick_doc(client)
t_trigger = trigger_reparse(client, doc_id)
poll_trace(client, doc_id, t_trigger)
print("\nDone.")
if __name__ == "__main__":
main()