Spaces:
Sleeping
Sleeping
| # installed packages | |
| from PIL import Image | |
| import paho.mqtt.client as paho | |
| # base python packages | |
| import json, io, base64 | |
| from queue import Queue | |
| from datetime import datetime | |
| import uuid | |
| def print_with_timestamp(message): | |
| print(f"[{datetime.now().strftime('%b %d, %H:%M:%S')}] {message}") | |
| class CobotController: | |
| user_id = str(uuid.uuid4()) | |
| def __init__( | |
| self, | |
| hive_mq_username: str, | |
| hive_mq_password: str, | |
| hive_mq_cloud: str, | |
| port: int, | |
| device_endpoint: str, | |
| user_id: str = None | |
| ): | |
| # setup client and response queues | |
| self.client = paho.Client(client_id="", userdata=None, protocol=paho.MQTTv5) | |
| self.client.tls_set() | |
| self.client.username_pw_set(hive_mq_username, hive_mq_password) | |
| self.client.connect(hive_mq_cloud, port) | |
| self.response_queue = Queue() | |
| def on_message(client, userdata, msg): | |
| payload_dict = json.loads(msg.payload) | |
| self.response_queue.put(payload_dict) | |
| def on_connect(client, userdata, flags, rc, properties=None): | |
| print_with_timestamp("Connected to HiveMQ broker...") | |
| self.client.on_connect = on_connect | |
| self.client.on_message = on_message | |
| self.client.loop_start() | |
| # initialize user id and endpoints | |
| if user_id is not None: | |
| CobotController.user_id = user_id | |
| self.user_id = CobotController.user_id | |
| self.device_endpoint = device_endpoint | |
| self.init_endpoint = self.device_endpoint + "/init" | |
| self.publish_endpoint = self.device_endpoint + "/" + self.user_id | |
| self.incoming_endpoint = self.publish_endpoint + "/response" | |
| self.client.subscribe(self.incoming_endpoint, qos=2) | |
| connected = self.check_connection_status() | |
| if connected: | |
| return | |
| # send an init request | |
| print_with_timestamp("Sending a connection request...") | |
| pub_handle = self.client.publish( | |
| self.init_endpoint, | |
| payload=json.dumps({"id": self.user_id}), | |
| qos=2 | |
| ) | |
| pub_handle.wait_for_publish() | |
| # get a response for the init message, if no response, have to wait for current users time to end | |
| print_with_timestamp("Waiting for cobot access...") | |
| prev_pos = None | |
| while True: | |
| try: | |
| payload = self.response_queue.get(timeout=10) | |
| if payload["status"] == "ready": | |
| self.client.publish( | |
| self.publish_endpoint, | |
| payload=json.dumps({"yeehaw": []}), | |
| qos=2 | |
| ) | |
| print_with_timestamp("Connected to server successfully.") | |
| break | |
| except Exception as e: | |
| resp = self.handle_publish_and_response( | |
| payload=json.dumps({"id": self.user_id}), | |
| custom_endpoint=self.device_endpoint + "/queuequery" | |
| ) | |
| if "queue_pos" not in resp: | |
| break | |
| pos = resp["queue_pos"] | |
| if prev_pos == None: | |
| prev_pos = pos | |
| elif prev_pos == pos: | |
| continue | |
| prev_pos = pos | |
| print_with_timestamp(f"Waiting for cobot access. There are {pos - 1} users ahead of you...") | |
| def check_connection_status(self): | |
| self.client.publish( | |
| self.publish_endpoint, | |
| payload=json.dumps({"command":"query/angles", "args": {}}), | |
| qos=2 | |
| ) | |
| try: # if we recieve any response, it means the server is currently servicing our requests | |
| _ = self.response_queue.get(timeout=5) | |
| return True | |
| except Exception as _: | |
| return False | |
| def handle_publish_and_response(self, payload, custom_endpoint=None): | |
| if custom_endpoint is None: | |
| self.client.publish(self.publish_endpoint, payload=payload, qos=2) | |
| else: | |
| self.client.publish(custom_endpoint, payload=payload, qos=2) | |
| return self.response_queue.get(block=True) | |
| def send_angles( | |
| self, | |
| angle_list: list[float] = [0.0] * 6, | |
| speed: int = 50 | |
| ): | |
| payload = json.dumps({"command": "control/angles", | |
| "args": {"angles": angle_list, "speed": speed}}) | |
| return self.handle_publish_and_response(payload) | |
| def send_coords( | |
| self, | |
| coord_list: list[float] = [0.0] * 6, | |
| speed: int = 50 | |
| ): | |
| payload = json.dumps({"command": "control/coords", | |
| "args": {"coords": coord_list, "speed": speed}}) | |
| return self.handle_publish_and_response(payload) | |
| def send_gripper_value( | |
| self, | |
| value: int = 100, | |
| speed: int = 50 | |
| ): | |
| payload = json.dumps({"command": "control/gripper", | |
| "args": {"gripper_value": value, "speed": speed}}) | |
| return self.handle_publish_and_response(payload) | |
| def get_angles(self): | |
| payload = json.dumps({"command": "query/angles", "args": {}}) | |
| return self.handle_publish_and_response(payload) | |
| def get_coords(self): | |
| payload = json.dumps({"command": "query/coords", "args": {}}) | |
| return self.handle_publish_and_response(payload) | |
| def get_gripper_value(self): | |
| payload = json.dumps({"command": "query/gripper", "args": {}}) | |
| return self.handle_publish_and_response(payload) | |
| def get_camera(self, quality=100, save_path=None): | |
| payload = json.dumps({"command": "query/camera", "args": {"quality": quality}}) | |
| response = self.handle_publish_and_response(payload) | |
| if not response["success"]: | |
| return response | |
| b64_bytes = base64.b64decode(response["image"]) | |
| img_bytes = io.BytesIO(b64_bytes) | |
| img = Image.open(img_bytes) | |
| response["image"] = img | |
| if save_path is not None: | |
| img.save(save_path) | |
| return response |