jonathan@tuxmani.fr
-Fix indent error, add shutter sound for camera
533b8cb
import threading
from threading import Thread
import base64
from datetime import datetime
from reachy_mini import ReachyMini, ReachyMiniApp
from reachy_mini.utils import create_head_pose
import numpy as np
import time
import cv2
import os
from openai import OpenAI
from pathlib import Path
from fastapi import Request, Depends, Body
from enum import Enum
class AmazonThisObject(ReachyMiniApp):
custom_app_url: str | None = "http://0.0.0.0:8042"
amaz_token: str = "" # ou la valeur que tu veux
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.amaz_token = ""
def _get_self(self):
return self
def run(self, reachy_mini: ReachyMini, stop_event: threading.Event):
last_photo = None
last_photo_timestamp = None
image_url = f""
exec_dir = os.path.abspath(__file__)
exec_dir = exec_dir.replace("main.py","")
file_path = Path(exec_dir) / ".env"
@self.settings_app.post("/set_token")
async def set_hf_token(request: Request):
try:
data = await request.json()
token = data.get("token", "").strip()
if not token:
return {
"success": False,
"message": "No token provided."
}
# Erase or add new file with HF_TOKEN=ton_token
with open(file_path, "w") as f:
f.write(f"HF_TOKEN={token}\n")
print(f"Token Hugging Face successfully saved in {file_path}")
return {
"success": True,
"message": "Token saved."
}
except Exception as e:
return {
"success": False,
"message": "Error on server side"
}
@self.settings_app.post("/check_token")
async def hf_token_available(self_instance: AmazonThisObject = Depends(self._get_self)):
file_path = Path(exec_dir) / ".env"
if not file_path.exists():
return {
"success": False,
"token": None
}
try:
# Read all the file
with open(file_path, "r", encoding="utf-8") as f:
lines = f.readlines()
# Find line HF_TOKEN=
for line in lines:
line = line.strip()
if line.startswith("HF_TOKEN="):
# On prend tout après le =
self_instance.amaz_token = line.split("=", 1)[1].strip().strip('"').strip("'")
break
if self_instance.amaz_token:
return {
"success": True,
"token": self_instance.amaz_token
}
else:
print("No HF_TOKEN found.")
return {
"success": False,
"token": None,
"message": "HF_TOKEN missing"
}
except Exception as e:
print(f"Errot while readind .env : {e}")
return {
"success": False,
"token": None,
"message": "Error whild reading file"
}
#Take photo REST POST request
@self.settings_app.post("/take_photo")
def take_photo(self_instance: AmazonThisObject = Depends(self._get_self)):
nonlocal last_photo, last_photo_timestamp
# Part for the Simulator
# Initialize webcam (0 = default camera)
#cam = cv2.VideoCapture(0)
# Capture one frame with opencv on simulator [ need a webcam ] -> later replace by reachy robot camera
#ret, frame = cam.read()
#if not ret:
# return {"success": False, "error": "Camera not available"}
#Code for reachy camera
frame = reachy_mini.media.get_frame()
if frame is None:
return {"success": False, "error": "Camera not available"}
exec_dir = os.path.abspath(__file__)
exec_dir = exec_dir.replace("main.py","")
file_path = Path(exec_dir) / "sound/shutter.mp3"
reachy_mini.media.play_sound(file_path)
# Encode as JPEG
success, encoded = cv2.imencode('.jpg', frame, [cv2.IMWRITE_JPEG_QUALITY, 90])
if not success:
return {"success": False, "error": "Failed to encode image"}
#Save last photo
last_photo = encoded.tobytes()
last_photo_timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
# Return base64 for preview
b64_image = base64.b64encode(last_photo).decode('utf-8')
image_url = f"data:image/jpeg;base64,{b64_image}"
return {
"success": True,
"preview": f"data:image/jpeg;base64,{b64_image}",
"filename": f"reachy_photo_{last_photo_timestamp}.jpg",
}
#Get link REST POST request
@self.settings_app.post("/get_link")
def get_link(self_instance: AmazonThisObject = Depends(self._get_self)):
#Restore image from save variable last_photo
b64_image = base64.b64encode(last_photo).decode('utf-8')
image_url = f"data:image/jpeg;base64,{b64_image}"
#Use model zai-org/GLM-4.5V with inference novita with huggingface client.
#Analyse image and get URL only -> bypass blabla from LLM by usin "no blabla".
hf_client = OpenAI(
base_url="https://router.huggingface.co/v1",
#define token in environment variable
api_key=self_instance.amaz_token
)
completion = hf_client.chat.completions.create(
model="zai-org/GLM-4.5V:novita",
messages=[
{
"role": "user",
"content":
[
{
"type": "text",
"text": "Generate only a valid link to search object on amazon, no blabla."
},
{
"type": "image_url",
"image_url":
{
"url": image_url
}
}
]
}
],
)
#Get link from result
url_link = completion.choices[0].message.content
#Clear content and keep only URL
url_link = url_link.replace("<|end_of_box|>", "")
url_link = url_link.replace("<|begin_of_box|>", "")
#Return success + url link
return {
"success": True,
"url": url_link
}
# Keep the app running
while not stop_event.is_set():
time.sleep(0.1)
if __name__ == "__main__":
app = AmazonThisObject()
app.wrapped_run()