import threading from threading import Thread import base64 from datetime import datetime from reachy_mini import ReachyMini, ReachyMiniApp from reachy_mini.utils import create_head_pose import numpy as np import time import cv2 import os from openai import OpenAI from pathlib import Path from fastapi import Request, Depends, Body from enum import Enum class AmazonThisObject(ReachyMiniApp): custom_app_url: str | None = "http://0.0.0.0:8042" amaz_token: str = "" # ou la valeur que tu veux def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.amaz_token = "" def _get_self(self): return self def run(self, reachy_mini: ReachyMini, stop_event: threading.Event): last_photo = None last_photo_timestamp = None image_url = f"" exec_dir = os.path.abspath(__file__) exec_dir = exec_dir.replace("main.py","") file_path = Path(exec_dir) / ".env" @self.settings_app.post("/set_token") async def set_hf_token(request: Request): try: data = await request.json() token = data.get("token", "").strip() if not token: return { "success": False, "message": "No token provided." } # Erase or add new file with HF_TOKEN=ton_token with open(file_path, "w") as f: f.write(f"HF_TOKEN={token}\n") print(f"Token Hugging Face successfully saved in {file_path}") return { "success": True, "message": "Token saved." } except Exception as e: return { "success": False, "message": "Error on server side" } @self.settings_app.post("/check_token") async def hf_token_available(self_instance: AmazonThisObject = Depends(self._get_self)): file_path = Path(exec_dir) / ".env" if not file_path.exists(): return { "success": False, "token": None } try: # Read all the file with open(file_path, "r", encoding="utf-8") as f: lines = f.readlines() # Find line HF_TOKEN= for line in lines: line = line.strip() if line.startswith("HF_TOKEN="): # On prend tout après le = self_instance.amaz_token = line.split("=", 1)[1].strip().strip('"').strip("'") break if self_instance.amaz_token: return { "success": True, "token": self_instance.amaz_token } else: print("No HF_TOKEN found.") return { "success": False, "token": None, "message": "HF_TOKEN missing" } except Exception as e: print(f"Errot while readind .env : {e}") return { "success": False, "token": None, "message": "Error whild reading file" } #Take photo REST POST request @self.settings_app.post("/take_photo") def take_photo(self_instance: AmazonThisObject = Depends(self._get_self)): nonlocal last_photo, last_photo_timestamp # Part for the Simulator # Initialize webcam (0 = default camera) #cam = cv2.VideoCapture(0) # Capture one frame with opencv on simulator [ need a webcam ] -> later replace by reachy robot camera #ret, frame = cam.read() #if not ret: # return {"success": False, "error": "Camera not available"} #Code for reachy camera frame = reachy_mini.media.get_frame() if frame is None: return {"success": False, "error": "Camera not available"} exec_dir = os.path.abspath(__file__) exec_dir = exec_dir.replace("main.py","") file_path = Path(exec_dir) / "sound/shutter.mp3" reachy_mini.media.play_sound(file_path) # Encode as JPEG success, encoded = cv2.imencode('.jpg', frame, [cv2.IMWRITE_JPEG_QUALITY, 90]) if not success: return {"success": False, "error": "Failed to encode image"} #Save last photo last_photo = encoded.tobytes() last_photo_timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") # Return base64 for preview b64_image = base64.b64encode(last_photo).decode('utf-8') image_url = f"data:image/jpeg;base64,{b64_image}" return { "success": True, "preview": f"data:image/jpeg;base64,{b64_image}", "filename": f"reachy_photo_{last_photo_timestamp}.jpg", } #Get link REST POST request @self.settings_app.post("/get_link") def get_link(self_instance: AmazonThisObject = Depends(self._get_self)): #Restore image from save variable last_photo b64_image = base64.b64encode(last_photo).decode('utf-8') image_url = f"data:image/jpeg;base64,{b64_image}" #Use model zai-org/GLM-4.5V with inference novita with huggingface client. #Analyse image and get URL only -> bypass blabla from LLM by usin "no blabla". hf_client = OpenAI( base_url="https://router.huggingface.co/v1", #define token in environment variable api_key=self_instance.amaz_token ) completion = hf_client.chat.completions.create( model="zai-org/GLM-4.5V:novita", messages=[ { "role": "user", "content": [ { "type": "text", "text": "Generate only a valid link to search object on amazon, no blabla." }, { "type": "image_url", "image_url": { "url": image_url } } ] } ], ) #Get link from result url_link = completion.choices[0].message.content #Clear content and keep only URL url_link = url_link.replace("<|end_of_box|>", "") url_link = url_link.replace("<|begin_of_box|>", "") #Return success + url link return { "success": True, "url": url_link } # Keep the app running while not stop_event.is_set(): time.sleep(0.1) if __name__ == "__main__": app = AmazonThisObject() app.wrapped_run()