bioflow / test_ingestion_api.py
ramiiiiiiiiiiiiiiiiiiiiiiiiiiiiii's picture
Fix explorer/ingestion UI and 3D endpoints
673a52e
#!/usr/bin/env python3
"""Test ingestion endpoints (Phase 3)."""
import os
import time
import requests
BASE_URL = os.getenv("BIOFLOW_API_URL", "http://localhost:8000")
def _post(path: str, payload: dict):
r = requests.post(f"{BASE_URL}{path}", json=payload, timeout=30)
try:
return r.status_code, r.json()
except Exception:
return r.status_code, {}
def test_pubmed():
status, data = _post("/api/ingest/pubmed", {"query": "BRCA1", "limit": 5, "sync": False})
if status == 404:
print("pubmed: SKIP (server not restarted with new route)")
return True
print(f"pubmed: {status} -> {data.get('status', 'n/a')}")
return status == 200 and "job_id" in data
def test_uniprot():
status, data = _post("/api/ingest/uniprot", {"query": "EGFR", "limit": 5, "sync": False})
if status == 404:
print("uniprot: SKIP (server not restarted with new route)")
return True
print(f"uniprot: {status} -> {data.get('status', 'n/a')}")
return status == 200 and "job_id" in data
def test_chembl():
status, data = _post("/api/ingest/chembl", {"query": "EGFR", "limit": 5, "sync": False})
if status == 404:
print("chembl: SKIP (server not restarted with new route)")
return True
print(f"chembl: {status} -> {data.get('status', 'n/a')}")
return status == 200 and "job_id" in data
if __name__ == "__main__":
print("BioFlow Ingestion API Test")
ok = test_pubmed() and test_uniprot() and test_chembl()
print("OK" if ok else "FAIL")
raise SystemExit(0 if ok else 1)