| """API contract tests — run with CEPHEUS_CLOUD=1 to avoid heavy ML imports.""" |
| import os |
| import sys |
| from unittest.mock import MagicMock |
|
|
| os.environ.setdefault("CEPHEUS_CLOUD", "1") |
| os.environ.setdefault("CEPHEUS_API_KEY", "test-key") |
| os.environ.pop("CEPHEUS_AUTH_DEV_MODE", None) |
| os.environ.pop("CEPHEUS_JWT_SECRET", None) |
|
|
| |
| for mod in ( |
| "google.adk", |
| "google.adk.agents", |
| "google.adk.runners", |
| "google.adk.sessions", |
| ): |
| sys.modules.setdefault(mod, MagicMock()) |
|
|
| BACKEND_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) |
| if BACKEND_DIR not in sys.path: |
| sys.path.insert(0, BACKEND_DIR) |
|
|
| from fastapi.testclient import TestClient |
|
|
| import main |
|
|
| API_HEADERS = {"X-API-Key": "test-key"} |
| client = TestClient(main.app) |
|
|
|
|
| def test_health(): |
| r = client.get("/health") |
| assert r.status_code == 200 |
| assert r.json()["status"] == "ok" |
|
|
|
|
| def test_health_live_wake(): |
| r = client.get("/health/live") |
| assert r.status_code == 200 |
| body = r.json() |
| assert body["status"] in ("warming", "ready") |
| assert "message" in body |
|
|
|
|
| def test_gossip_start_accepts_payload(): |
| r = client.post( |
| "/gossip/start", |
| headers=API_HEADERS, |
| json={"staffId": "STAFF-42", "personName": "Alex", "cause": "Suspicious contact"}, |
| ) |
| assert r.status_code == 200 |
| body = r.json() |
| assert body["status"] == "started" |
| assert body["tracking"]["staffId"] == "STAFF-42" |
| assert body["tracking"]["personName"] == "Alex" |
| assert body["tracking"]["cause"] == "Suspicious contact" |
| assert body["root_person"] == "Alex" |
|
|
| gossip = client.get("/gossip", headers=API_HEADERS).json() |
| assert gossip["is_tracking"] is True |
| assert gossip["tracking"]["staffId"] == "STAFF-42" |
| assert gossip["root_person"] == "Alex" |
|
|
|
|
| def test_gossip_stop_clears_tracking_meta(): |
| client.post("/gossip/start", headers=API_HEADERS, json={"personName": "Pat"}) |
| r = client.post("/gossip/stop", headers=API_HEADERS) |
| assert r.status_code == 200 |
| gossip = client.get("/gossip", headers=API_HEADERS).json() |
| assert gossip["is_tracking"] is False |
| assert gossip["tracking"] == {} |
|
|
|
|
| def test_sos_persists_to_get(): |
| before = len(client.get("/sos", headers=API_HEADERS).json()) |
| r = client.post( |
| "/sos", |
| headers=API_HEADERS, |
| json={ |
| "guest_id": "guest-test-1", |
| "lat": 12.97, |
| "lng": 77.59, |
| "location_label": "Gate A", |
| "message": "Need help", |
| }, |
| ) |
| assert r.status_code == 200 |
| assert r.json()["status"] == "received" |
| after = client.get("/sos", headers=API_HEADERS).json() |
| assert len(after) == before + 1 |
| assert after[-1]["guest_id"] == "guest-test-1" |
|
|
|
|
| def test_create_issue_defaults_title_and_broadcast_shape(): |
| r = client.post( |
| "/issues", |
| headers=API_HEADERS, |
| json={"desc": "Smoke in corridor B", "type": "fire"}, |
| ) |
| assert r.status_code == 200 |
| issue = r.json() |
| assert issue["title"] == "Smoke in corridor B" |
| assert issue["status"] == "ONGOING" |
|
|
| listed = client.get("/issues", headers=API_HEADERS).json() |
| assert any(i["id"] == issue["id"] for i in listed) |
|
|
|
|
| def test_unauthorized_without_api_key(): |
| r = client.post("/issues", json={"title": "x", "desc": "y"}) |
| assert r.status_code == 401 |
|
|
|
|
| def test_signage_toggle_sets_explicit_active_state(): |
| r = client.post( |
| "/signage/s1/toggle", |
| headers=API_HEADERS, |
| json={"active": True}, |
| ) |
| assert r.status_code == 200 |
| body = r.json() |
| assert body["id"] == "s1" |
| assert body["active"] is True |
|
|
| state = client.get("/signage", headers=API_HEADERS).json() |
| assert state["s1"] is True |
|
|
|
|
| def test_files_upload_returns_normalized_path(): |
| png = ( |
| b"\x89PNG\r\n\x1a\n\x00\x00\x00\rIHDR\x00\x00\x00\x01" |
| b"\x00\x00\x00\x01\x08\x02\x00\x00\x00\x90wS\xde\x00\x00\x00\x0cIDATx\x9cc\x00\x01" |
| b"\x00\x00\x05\x00\x01\x0d\n-\xb4\x00\x00\x00\x00IEND\xaeB`\x82" |
| ) |
| r = client.post( |
| "/files/upload", |
| headers=API_HEADERS, |
| files={"file": ("tiny.png", png, "image/png")}, |
| ) |
| assert r.status_code == 200 |
| body = r.json() |
| assert body["url"].startswith("/files/uploads/") |
| assert body["path"] == body["url"] |
|
|
|
|
| def test_emergency_nearby_returns_services(monkeypatch): |
| import types |
|
|
| fake_payload = { |
| "elements": [ |
| {"id": 1, "lat": 12.98, "lon": 77.60, "tags": {"amenity": "hospital", "name": "Test Hospital"}}, |
| {"id": 2, "lat": 12.96, "lon": 77.58, "tags": {"amenity": "police", "name": "Test PS"}}, |
| {"id": 3, "lat": 12.97, "lon": 77.59, "tags": {"amenity": "cafe", "name": "Ignore Me"}}, |
| ] |
| } |
|
|
| class FakeResp: |
| def raise_for_status(self): |
| return None |
|
|
| def json(self): |
| return fake_payload |
|
|
| class FakeClient: |
| def __init__(self, *a, **k): |
| pass |
|
|
| async def __aenter__(self): |
| return self |
|
|
| async def __aexit__(self, *a): |
| return False |
|
|
| async def post(self, *a, **k): |
| return FakeResp() |
|
|
| monkeypatch.setattr(main, "httpx", types.SimpleNamespace(AsyncClient=FakeClient)) |
| main._nearby_cache.clear() |
|
|
| r = client.get("/emergency/nearby?lat=12.9716&lng=77.5946", headers=API_HEADERS) |
| assert r.status_code == 200 |
| body = r.json() |
| assert body["source"] == "overpass" |
| assert body["services"]["hospital"][0]["name"] == "Test Hospital" |
| assert body["services"]["police"][0]["name"] == "Test PS" |
| assert "cafe" not in body["services"] |
| assert isinstance(body["services"]["hospital"][0]["distKm"], (int, float)) |
|
|
|
|
| def test_gossip_ingest_frame_co_presence_only(monkeypatch): |
| """Single-person frames must NOT create interaction edges; pairs in one frame do.""" |
| class FakeEngine: |
| def reload_db(self): |
| return None |
|
|
| def match_all_faces(self, frame, threshold=None): |
| return [ |
| {"name": "ContactA", "confidence": 0.9, "bbox": [0, 0, 1, 1], "found": True}, |
| {"name": "ContactB", "confidence": 0.85, "bbox": [2, 2, 3, 3], "found": True}, |
| ] |
|
|
| monkeypatch.setattr(main.vision_engine, "face_engine", FakeEngine()) |
| client.post("/gossip/start", headers=API_HEADERS, json={"personName": "RootGuy"}) |
|
|
| import cv2 |
| import numpy as np |
|
|
| frame = np.zeros((16, 16, 3), dtype=np.uint8) |
| ok, buf = cv2.imencode(".jpg", frame) |
| assert ok |
| jpeg_bytes = buf.tobytes() |
|
|
| r = client.post( |
| "/gossip/ingest_frame", |
| headers=API_HEADERS, |
| data={"cam_id": "cam-01"}, |
| files={"file": ("frame.jpg", jpeg_bytes, "image/jpeg")}, |
| ) |
| assert r.status_code == 200 |
| body = r.json() |
| assert "ContactA" in body["names"] |
| assert "ContactB" in body["names"] |
| pairs = {(l["source"], l["target"]) for l in body["graph"]["links"]} |
| assert ("ContactA", "ContactB") in pairs or ("ContactB", "ContactA") in pairs |
|
|
|
|
| def test_tracking_session_reset_clears_history(): |
| client.post("/gossip/start", headers=API_HEADERS, json={"personName": "ResetTest", "staffId": "S1"}) |
| r = client.post("/tracking/session/reset", headers=API_HEADERS, json={"broadcast": False}) |
| assert r.status_code == 200 |
| body = r.json() |
| assert body["status"] == "reset" |
| assert body.get("broadcast") is False |
| gossip = client.get("/gossip", headers=API_HEADERS).json() |
| assert gossip.get("total_interactions", len(gossip.get("links", []))) == 0 or len(gossip.get("links", [])) == 0 |
| assert gossip.get("tracking") == {} |
|
|