import asyncio import websockets import json from typing import Dict, Any, Callable, Optional, TypedDict, Literal, List, cast GGRAPHER_URI = "wss://ggrphr.davidalber.net" class StartNodeRequest(TypedDict): recordId: int getAdvisors: bool getDescendants: bool class RequestPayload(TypedDict): kind: Literal["build-graph"] options: Dict[Literal["reportingCallback"], bool] startNodes: List[StartNodeRequest] class ProgressCallback(TypedDict): queued: int fetching: int done: int def make_payload(record_id: int) -> RequestPayload: return { "kind": "build-graph", "options": {"reportingCallback": True}, "startNodes": [{ "recordId": record_id, "getAdvisors": True, "getDescendants": False, }], } async def get_graph( payload: RequestPayload, progress_cb: Optional[Callable[[ProgressCallback], None]] = None ) -> Dict[str, Any]: def intify_record_keys(d: Dict[Any, Any]) -> Dict[Any, Any]: if "nodes" in d: ret = {k: v for k, v in d.items() if k != "nodes"} ret["nodes"] = {int(k): v for k, v in d["nodes"].items()} return ret return d async with websockets.connect(GGRAPHER_URI) as ws: await ws.send(json.dumps(payload)) while True: response_json = await ws.recv() response = json.loads(response_json, object_hook=intify_record_keys) response_payload = response.get("payload") if response["kind"] == "graph": return cast(Dict[str, Any], response_payload) elif response["kind"] == "progress" and progress_cb: progress = cast(ProgressCallback, response_payload) progress_cb(progress) else: continue