| import paho.mqtt.client as paho | |
| import json, io, base64 | |
| from queue import Queue | |
| from PIL import Image | |
| class CobotController: | |
| def __init__( | |
| self, | |
| hive_mq_username: str, | |
| hive_mq_password: str, | |
| hive_mq_cloud: str, | |
| port: int, | |
| cobot_id: str, | |
| ): | |
| self.publish_endpoint = cobot_id | |
| self.response_endpoint = cobot_id + "/response" | |
| self.client = paho.Client(client_id="", userdata=None, protocol=paho.MQTTv5) | |
| self.client.tls_set() | |
| self.client.username_pw_set(hive_mq_username, hive_mq_password) | |
| self.client.connect(hive_mq_cloud, port) | |
| response_queue = Queue() | |
| def on_message(client, userdata, msg): | |
| payload_dict = json.loads(msg.payload) | |
| response_queue.put(payload_dict) | |
| self.response_queue = response_queue | |
| def on_connect(client, userdata, flags, rc, properties=None): | |
| print("Connection recieved") | |
| self.client.on_connect = on_connect | |
| self.client.on_message = on_message | |
| self.client.subscribe(self.response_endpoint, qos=2) | |
| self.client.loop_start() | |
| def handle_publish_and_response(self, payload): | |
| self.client.publish(self.publish_endpoint, payload=payload, qos=2) | |
| return self.response_queue.get(block=True) | |
| def send_angles( | |
| self, | |
| angle_list: list[float] = [0.0] * 6, | |
| speed: int = 50 | |
| ): | |
| payload = json.dumps({"command": "control/angles", | |
| "args": {"angles": angle_list, "speed": speed}}) | |
| return self.handle_publish_and_response(payload) | |
| def send_coords( | |
| self, | |
| coord_list: list[float] = [0.0] * 6, | |
| speed: int = 50 | |
| ): | |
| payload = json.dumps({"command": "control/coords", | |
| "args": {"coords": coord_list, "speed": speed}}) | |
| return self.handle_publish_and_response(payload) | |
| def send_gripper_value( | |
| self, | |
| value: int = 100, | |
| speed: int = 50 | |
| ): | |
| payload = json.dumps({"command": "control/gripper", | |
| "args": {"gripper_value": value, "speed": speed}}) | |
| return self.handle_publish_and_response(payload) | |
| def get_angles(self): | |
| payload = json.dumps({"command": "query/angles", "args": {}}) | |
| return self.handle_publish_and_response(payload) | |
| def get_coords(self): | |
| payload = json.dumps({"command": "query/coords", "args": {}}) | |
| return self.handle_publish_and_response(payload) | |
| def get_gripper_value(self): | |
| payload = json.dumps({"command": "query/gripper", "args": {}}) | |
| return self.handle_publish_and_response(payload) | |
| def get_camera(self, quality=100, save_path=None): | |
| payload = json.dumps({"command": "query/camera", "args": {"quality": quality}}) | |
| response = self.handle_publish_and_response(payload) | |
| if not response["success"]: | |
| return response | |
| b64_bytes = base64.b64decode(response["image"]) | |
| img_bytes = io.BytesIO(b64_bytes) | |
| img = Image.open(img_bytes) | |
| response["image"] = img | |
| if save_path is not None: | |
| img.save(save_path) | |
| return response |