rc_inference / robot /interface_client.py
rxcui's picture
Upload folder using huggingface_hub
cb65626 verified
import pickle
import time
import uuid
import numpy as np
import requests
from utils.enums import ReturnCode
from utils.log import setup_logger
from utils.util import retry_request
from utils.util import timeout
logger = setup_logger()
base_url = "http://api.robochallenge.cn"
mock_url = "http://127.0.0.1:9095"
MAX_RETRY = 3
RETRY_DELAY = 1
class InterfaceClient:
def __init__(self, user_id,mock=False):
self.user_id = user_id
self.session = requests.Session()
self.job_id = None
self.robot_id = None
self.robot_url = None
self.clock_offset = None
self.mock = mock
def _get(self, url, **kwargs):
@retry_request(retries=MAX_RETRY, delay=RETRY_DELAY)
def inner():
return self.session.get(url, **kwargs)
return inner()
def _post(self, url, **kwargs):
@retry_request(retries=MAX_RETRY, delay=RETRY_DELAY)
def inner():
return self.session.post(url, **kwargs)
return inner()
def _put(self, url, **kwargs):
@retry_request(retries=MAX_RETRY, delay=RETRY_DELAY)
def inner():
return self.session.put(url, **kwargs)
return inner()
@staticmethod
def _print_response_error(prefix: str, error: requests.exceptions.RequestException):
response = getattr(error, "response", None)
if response is None:
print(prefix)
return
print(f"{prefix} status={response.status_code} body={response.text}")
def update_job_info(self, job_id, robot_id):
self.job_id = job_id
self.robot_id = robot_id
self.robot_url = base_url + f"/robots/{robot_id}/direct"
if self.mock:
self.robot_url = mock_url + "/"
self.clock_offset = self.cal_clockoffset()
print(f"clock jitter:{self.clock_offset}s")
def reset_job_info(self):
self.job_id = None
self.robot_id = None
self.robot_url = None
self.clock_offset = None
self.mock = False
def cal_clockoffset(self):
offsets = []
while True:
try:
for _ in range(10):
t1 = time.time()
response = self._get(f"{self.robot_url}/clock-sync", headers={"x-user-id": self.user_id})
response.raise_for_status()
t2 = float(response.json()['timestamp'])
t3 = time.time()
offset = ((t2 - t1) + (t2 - t3)) / 2
offsets.append(offset)
time.sleep(0.5)
break
except requests.exceptions.RequestException as e:
print(f"Error getting clock: {e}")
time.sleep(0.5)
continue
return float(np.array(offsets).mean())
def get_state(self, image_size, image_type, action_type, resize_name=None):
try:
url = f"{self.robot_url}/state.pkl"
params ={'width':image_size[0],'height': image_size[1],
'image_type':image_type,'action_type': action_type,
}
if resize_name:
params['resize_name'] = resize_name
response = self._get(url,
params=params,
headers={"x-user-id": self.user_id}
)
response.raise_for_status()
data = pickle.loads(response.content)
if isinstance(data, dict) and data.get("status") == "size_none":
print("Warning: Robot state not ready (size is None)!")
print("test state:", data)
return data
except requests.exceptions.RequestException as e:
self._print_response_error(f"Error getting state: {e}", e)
return None
def start_motion(self):
url = f"{self.robot_url}/start_motion"
response = self._get(url)
return response
def end_motion(self):
url = f"{self.robot_url}/stop_motion"
response = self._get(url)
return response
def post_actions(self, actions, duration, action_type):
i = 0
while i < 5:
try:
req_hash = f"gpu-server-{uuid.uuid4()}"
url = f"{self.robot_url}/action?hash={req_hash}"
send_data = {"actions": actions, "duration": duration}
response = self._post(url, params={'action_type':action_type}, json=send_data,
headers={"x-user-id":self.user_id})
response.raise_for_status()
if response.json().get("result") == "success":
break
else:
print(f"Robot failed to process actions: {response.json().get('message')}")
print(f"Robot action response body: {response.text}")
except requests.exceptions.RequestException as e:
i += 1
self._print_response_error(f"Error posting actions: {e}", e)
def start_robot(self, job_id):
url = f"{base_url}/jobs/update"
response = self._post(url, json={"job_id": job_id, "action": "start"}, headers={"x-user-id": self.user_id})
return response
def _get_job_status(self, job_id):
response = self._get(f"{base_url}/jobs/{job_id}", headers={"x-user-id": self.user_id})
try:
return response.json()
except Exception as e:
logger.warning(f'get error in _get_job_status: {e}')
return {}
def wait_for_robot_ready(self, job_id, poll_interval=2):
while True:
res = self._get_job_status(job_id)
if "device" in res and "robot_id" in res:
robot_id = res["device"]["robot_id"]
return robot_id,job_id
time.sleep(poll_interval)
@timeout(600)
def wait_for_robot_running(self, job_id, poll_interval=2):
while True:
res = self._get_job_status(job_id)
print(res)
if res and "status" in res:
if res['status'] == "running":
return ReturnCode.SUCCESS
elif res['status'] == "prepare":
pass
else:
return ReturnCode.FAILURE
time.sleep(poll_interval)
def get_job_status(self, job_id):
response = self._get_job_status(job_id)
print(job_id, response)
return response['device'], response['status']
def get_all_jobs(self,job_collection_id):
response = self._get(f"{base_url}/job_collections/{job_collection_id}", headers={"x-user-id": self.user_id})
try:
return response.json()
except Exception as e:
logger.warning(f'get error in get_all_jobs: {e}')
return {}
def get_all_runs(self, submission_id):
"""
Get all runs for a submission.
Args:
submission_id (str): The unique identifier for the submission to monitor.
Returns:
list: A list of runs.
"""
response = self._get(f"{base_url}/v2/job_collections/submission/{submission_id}/runs", headers={"x-user-id": self.user_id})
try:
return response.json()
except Exception as e:
logger.warning(f'get error in get_all_runs: {e}')
return []