File size: 1,581 Bytes
673a52e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
#!/usr/bin/env python3
"""Test ingestion endpoints (Phase 3)."""
import os
import time
import requests

BASE_URL = os.getenv("BIOFLOW_API_URL", "http://localhost:8000")


def _post(path: str, payload: dict):
    r = requests.post(f"{BASE_URL}{path}", json=payload, timeout=30)
    try:
        return r.status_code, r.json()
    except Exception:
        return r.status_code, {}


def test_pubmed():
    status, data = _post("/api/ingest/pubmed", {"query": "BRCA1", "limit": 5, "sync": False})
    if status == 404:
        print("pubmed: SKIP (server not restarted with new route)")
        return True
    print(f"pubmed: {status} -> {data.get('status', 'n/a')}")
    return status == 200 and "job_id" in data


def test_uniprot():
    status, data = _post("/api/ingest/uniprot", {"query": "EGFR", "limit": 5, "sync": False})
    if status == 404:
        print("uniprot: SKIP (server not restarted with new route)")
        return True
    print(f"uniprot: {status} -> {data.get('status', 'n/a')}")
    return status == 200 and "job_id" in data


def test_chembl():
    status, data = _post("/api/ingest/chembl", {"query": "EGFR", "limit": 5, "sync": False})
    if status == 404:
        print("chembl: SKIP (server not restarted with new route)")
        return True
    print(f"chembl: {status} -> {data.get('status', 'n/a')}")
    return status == 200 and "job_id" in data


if __name__ == "__main__":
    print("BioFlow Ingestion API Test")
    ok = test_pubmed() and test_uniprot() and test_chembl()
    print("OK" if ok else "FAIL")
    raise SystemExit(0 if ok else 1)