File size: 7,998 Bytes
d47e7bb 533b8cb d47e7bb f8d91ab 533b8cb d47e7bb f8d91ab d47e7bb f8d91ab 533b8cb f8d91ab 533b8cb f8d91ab 533b8cb f8d91ab d47e7bb 6950d8a f8d91ab 533b8cb d47e7bb f8d91ab 533b8cb d47e7bb f8d91ab 533b8cb d47e7bb 533b8cb d47e7bb 533b8cb d47e7bb |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 |
import threading
from threading import Thread
import base64
from datetime import datetime
from reachy_mini import ReachyMini, ReachyMiniApp
from reachy_mini.utils import create_head_pose
import numpy as np
import time
import cv2
import os
from openai import OpenAI
from pathlib import Path
from fastapi import Request, Depends, Body
from enum import Enum
class AmazonThisObject(ReachyMiniApp):
custom_app_url: str | None = "http://0.0.0.0:8042"
amaz_token: str = "" # ou la valeur que tu veux
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.amaz_token = ""
def _get_self(self):
return self
def run(self, reachy_mini: ReachyMini, stop_event: threading.Event):
last_photo = None
last_photo_timestamp = None
image_url = f""
exec_dir = os.path.abspath(__file__)
exec_dir = exec_dir.replace("main.py","")
file_path = Path(exec_dir) / ".env"
@self.settings_app.post("/set_token")
async def set_hf_token(request: Request):
try:
data = await request.json()
token = data.get("token", "").strip()
if not token:
return {
"success": False,
"message": "No token provided."
}
# Erase or add new file with HF_TOKEN=ton_token
with open(file_path, "w") as f:
f.write(f"HF_TOKEN={token}\n")
print(f"Token Hugging Face successfully saved in {file_path}")
return {
"success": True,
"message": "Token saved."
}
except Exception as e:
return {
"success": False,
"message": "Error on server side"
}
@self.settings_app.post("/check_token")
async def hf_token_available(self_instance: AmazonThisObject = Depends(self._get_self)):
file_path = Path(exec_dir) / ".env"
if not file_path.exists():
return {
"success": False,
"token": None
}
try:
# Read all the file
with open(file_path, "r", encoding="utf-8") as f:
lines = f.readlines()
# Find line HF_TOKEN=
for line in lines:
line = line.strip()
if line.startswith("HF_TOKEN="):
# On prend tout après le =
self_instance.amaz_token = line.split("=", 1)[1].strip().strip('"').strip("'")
break
if self_instance.amaz_token:
return {
"success": True,
"token": self_instance.amaz_token
}
else:
print("No HF_TOKEN found.")
return {
"success": False,
"token": None,
"message": "HF_TOKEN missing"
}
except Exception as e:
print(f"Errot while readind .env : {e}")
return {
"success": False,
"token": None,
"message": "Error whild reading file"
}
#Take photo REST POST request
@self.settings_app.post("/take_photo")
def take_photo(self_instance: AmazonThisObject = Depends(self._get_self)):
nonlocal last_photo, last_photo_timestamp
# Part for the Simulator
# Initialize webcam (0 = default camera)
#cam = cv2.VideoCapture(0)
# Capture one frame with opencv on simulator [ need a webcam ] -> later replace by reachy robot camera
#ret, frame = cam.read()
#if not ret:
# return {"success": False, "error": "Camera not available"}
#Code for reachy camera
frame = reachy_mini.media.get_frame()
if frame is None:
return {"success": False, "error": "Camera not available"}
exec_dir = os.path.abspath(__file__)
exec_dir = exec_dir.replace("main.py","")
file_path = Path(exec_dir) / "sound/shutter.mp3"
reachy_mini.media.play_sound(file_path)
# Encode as JPEG
success, encoded = cv2.imencode('.jpg', frame, [cv2.IMWRITE_JPEG_QUALITY, 90])
if not success:
return {"success": False, "error": "Failed to encode image"}
#Save last photo
last_photo = encoded.tobytes()
last_photo_timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
# Return base64 for preview
b64_image = base64.b64encode(last_photo).decode('utf-8')
image_url = f"data:image/jpeg;base64,{b64_image}"
return {
"success": True,
"preview": f"data:image/jpeg;base64,{b64_image}",
"filename": f"reachy_photo_{last_photo_timestamp}.jpg",
}
#Get link REST POST request
@self.settings_app.post("/get_link")
def get_link(self_instance: AmazonThisObject = Depends(self._get_self)):
#Restore image from save variable last_photo
b64_image = base64.b64encode(last_photo).decode('utf-8')
image_url = f"data:image/jpeg;base64,{b64_image}"
#Use model zai-org/GLM-4.5V with inference novita with huggingface client.
#Analyse image and get URL only -> bypass blabla from LLM by usin "no blabla".
hf_client = OpenAI(
base_url="https://router.huggingface.co/v1",
#define token in environment variable
api_key=self_instance.amaz_token
)
completion = hf_client.chat.completions.create(
model="zai-org/GLM-4.5V:novita",
messages=[
{
"role": "user",
"content":
[
{
"type": "text",
"text": "Generate only a valid link to search object on amazon, no blabla."
},
{
"type": "image_url",
"image_url":
{
"url": image_url
}
}
]
}
],
)
#Get link from result
url_link = completion.choices[0].message.content
#Clear content and keep only URL
url_link = url_link.replace("<|end_of_box|>", "")
url_link = url_link.replace("<|begin_of_box|>", "")
#Return success + url link
return {
"success": True,
"url": url_link
}
# Keep the app running
while not stop_event.is_set():
time.sleep(0.1)
if __name__ == "__main__":
app = AmazonThisObject()
app.wrapped_run()
|