File size: 1,703 Bytes
da5e53f
 
9b6aa92
 
 
 
da5e53f
 
6e04ea8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
da5e53f
 
 
 
58a8345
 
 
 
 
da5e53f
 
 
 
 
 
 
 
 
58a8345
da5e53f
 
 
 
6e04ea8
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
import httpx, uuid, os
from typing import Optional, Union
from dotenv import load_dotenv

load_dotenv()

rpc_server_url = os.getenv('CHATXBT_RPC_SERVER_URL')

async def rpc_call(
    method_name: str,  # The name of the RPC method to be called
    params: Optional[Union[dict, list]] = None,  # Optional parameters for the RPC method
    url: str = rpc_server_url  # The URL of the RPC server
) -> dict:  # Returns the JSON response from the RPC server
    """
    This function makes an RPC call to the specified URL with the given method name and parameters.

    Args:
        method_name (str): The name of the RPC method to be called.
        params (Optional[Union[dict, list]], optional): Optional parameters for the RPC method. Defaults to None.
        url (str, optional): The URL of the RPC server. Defaults to rpc_server_url.

    Returns:
        dict: The JSON response from the RPC server.

    Raises:
        httpx.RequestError: If an error occurs while making the RPC call.

    """
    headers = {
        'Content-Type': 'application/json',
    }

    auth = httpx.BasicAuth(
        username=os.getenv('CHATXBT_RPC_SERVER_BASIC_AUTH_USERNAME'),
        password=os.getenv('CHATXBT_RPC_SERVER_BASIC_AUTH_PASSWORD')
    )

    payload = {
        'method': method_name,
        'params': params,
        'jsonrpc': '2.0',
        'id': str(uuid.uuid4()),
    }

    try:
        async with httpx.AsyncClient() as client:
            response = await client.post(url, json=payload, headers=headers, auth=auth)
            response.raise_for_status()
            return response.json()
    except httpx.RequestError as e:
        print(f"Error making RPC call: {e}")
        return None