app / src /libs /rpc_client.py
lemdaddy's picture
Minor change
0043098
raw
history blame
1.7 kB
import httpx, uuid, os
from typing import Optional, Union
from dotenv import load_dotenv
load_dotenv()
rpc_server_url = os.getenv('CHATXBT_RPC_SERVER_URL')
async def rpc_call(
method_name: str, # The name of the RPC method to be called
params: Optional[Union[dict, list]] = None, # Optional parameters for the RPC method
url: str = rpc_server_url # The URL of the RPC server
) -> dict: # Returns the JSON response from the RPC server
"""
This function makes an RPC call to the specified URL with the given method name and parameters.
Args:
method_name (str): The name of the RPC method to be called.
params (Optional[Union[dict, list]], optional): Optional parameters for the RPC method. Defaults to None.
url (str, optional): The URL of the RPC server. Defaults to rpc_server_url.
Returns:
dict: The JSON response from the RPC server.
Raises:
httpx.RequestError: If an error occurs while making the RPC call.
"""
headers = {
'Content-Type': 'application/json',
}
auth = httpx.BasicAuth(
username=os.getenv('CHATXBT_RPC_SERVER_BASIC_AUTH_USERNAME'),
password=os.getenv('CHATXBT_RPC_SERVER_BASIC_AUTH_PASSWORD')
)
payload = {
'method': method_name,
'params': params,
'jsonrpc': '2.0',
'id': str(uuid.uuid4()),
}
try:
async with httpx.AsyncClient() as client:
response = await client.post(url, json=payload, headers=headers, auth=auth)
response.raise_for_status()
return response.json()
except httpx.RequestError as e:
print(f"Error making RPC call: {e}")
return None