| | import httpx, uuid, os |
| | from typing import Optional, Union |
| | from dotenv import load_dotenv |
| |
|
| | load_dotenv() |
| |
|
| | rpc_server_url = os.getenv('GIINO_RPC_SERVER_URL') |
| |
|
| | async def rpc_call( |
| | method_name: str, |
| | params: Optional[Union[dict, list]] = None, |
| | url: str = rpc_server_url |
| | ) -> dict: |
| | """ |
| | This function makes an RPC call to the specified URL with the given method name and parameters. |
| | |
| | Args: |
| | method_name (str): The name of the RPC method to be called. |
| | params (Optional[Union[dict, list]], optional): Optional parameters for the RPC method. Defaults to None. |
| | url (str, optional): The URL of the RPC server. Defaults to rpc_server_url. |
| | |
| | Returns: |
| | dict: The JSON response from the RPC server. |
| | |
| | Raises: |
| | httpx.RequestError: If an error occurs while making the RPC call. |
| | |
| | """ |
| | headers = { |
| | 'Content-Type': 'application/json', |
| | } |
| |
|
| | auth = httpx.BasicAuth( |
| | username=os.getenv('GIINO_RPC_SERVER_BASIC_AUTH_USERNAME'), |
| | password=os.getenv('GIINO_RPC_SERVER_BASIC_AUTH_PASSWORD') |
| | ) |
| |
|
| | payload = { |
| | 'method': method_name, |
| | 'params': params, |
| | 'jsonrpc': '2.0', |
| | 'id': str(uuid.uuid4()), |
| | } |
| |
|
| | try: |
| | async with httpx.AsyncClient() as client: |
| | response = await client.post(url, json=payload, headers=headers, auth=auth) |
| | response.raise_for_status() |
| | return response.json() |
| | except httpx.RequestError as e: |
| | print(f"Error making RPC call: {e}") |
| | return None |