Spaces:
Running
Running
| #!/usr/bin/env python3 | |
| """Test ingestion endpoints (Phase 3).""" | |
| import os | |
| import time | |
| import requests | |
| BASE_URL = os.getenv("BIOFLOW_API_URL", "http://localhost:8000") | |
| def _post(path: str, payload: dict): | |
| r = requests.post(f"{BASE_URL}{path}", json=payload, timeout=30) | |
| try: | |
| return r.status_code, r.json() | |
| except Exception: | |
| return r.status_code, {} | |
| def test_pubmed(): | |
| status, data = _post("/api/ingest/pubmed", {"query": "BRCA1", "limit": 5, "sync": False}) | |
| if status == 404: | |
| print("pubmed: SKIP (server not restarted with new route)") | |
| return True | |
| print(f"pubmed: {status} -> {data.get('status', 'n/a')}") | |
| return status == 200 and "job_id" in data | |
| def test_uniprot(): | |
| status, data = _post("/api/ingest/uniprot", {"query": "EGFR", "limit": 5, "sync": False}) | |
| if status == 404: | |
| print("uniprot: SKIP (server not restarted with new route)") | |
| return True | |
| print(f"uniprot: {status} -> {data.get('status', 'n/a')}") | |
| return status == 200 and "job_id" in data | |
| def test_chembl(): | |
| status, data = _post("/api/ingest/chembl", {"query": "EGFR", "limit": 5, "sync": False}) | |
| if status == 404: | |
| print("chembl: SKIP (server not restarted with new route)") | |
| return True | |
| print(f"chembl: {status} -> {data.get('status', 'n/a')}") | |
| return status == 200 and "job_id" in data | |
| if __name__ == "__main__": | |
| print("BioFlow Ingestion API Test") | |
| ok = test_pubmed() and test_uniprot() and test_chembl() | |
| print("OK" if ok else "FAIL") | |
| raise SystemExit(0 if ok else 1) | |