|
|
import httpx, uuid, os |
|
|
from typing import Optional, Union |
|
|
from dotenv import load_dotenv |
|
|
|
|
|
load_dotenv() |
|
|
|
|
|
rpc_server_url = os.getenv('CHATXBT_RPC_SERVER_URL') |
|
|
|
|
|
async def rpc_call( |
|
|
method_name: str, |
|
|
params: Optional[Union[dict, list]] = None, |
|
|
url: str = rpc_server_url |
|
|
) -> dict: |
|
|
""" |
|
|
This function makes an RPC call to the specified URL with the given method name and parameters. |
|
|
|
|
|
Args: |
|
|
method_name (str): The name of the RPC method to be called. |
|
|
params (Optional[Union[dict, list]], optional): Optional parameters for the RPC method. Defaults to None. |
|
|
url (str, optional): The URL of the RPC server. Defaults to rpc_server_url. |
|
|
|
|
|
Returns: |
|
|
dict: The JSON response from the RPC server. |
|
|
|
|
|
Raises: |
|
|
httpx.RequestError: If an error occurs while making the RPC call. |
|
|
|
|
|
""" |
|
|
headers = { |
|
|
'Content-Type': 'application/json', |
|
|
} |
|
|
|
|
|
auth = httpx.BasicAuth( |
|
|
username=os.getenv('CHATXBT_RPC_SERVER_BASIC_AUTH_USERNAME'), |
|
|
password=os.getenv('CHATXBT_RPC_SERVER_BASIC_AUTH_PASSWORD') |
|
|
) |
|
|
|
|
|
payload = { |
|
|
'method': method_name, |
|
|
'params': params, |
|
|
'jsonrpc': '2.0', |
|
|
'id': str(uuid.uuid4()), |
|
|
} |
|
|
|
|
|
try: |
|
|
async with httpx.AsyncClient() as client: |
|
|
response = await client.post(url, json=payload, headers=headers, auth=auth) |
|
|
response.raise_for_status() |
|
|
return response.json() |
|
|
except httpx.RequestError as e: |
|
|
print(f"Error making RPC call: {e}") |
|
|
return None |