|
|
import threading |
|
|
from threading import Thread |
|
|
import base64 |
|
|
from datetime import datetime |
|
|
from reachy_mini import ReachyMini, ReachyMiniApp |
|
|
from reachy_mini.utils import create_head_pose |
|
|
import numpy as np |
|
|
import time |
|
|
import cv2 |
|
|
import os |
|
|
from openai import OpenAI |
|
|
from pathlib import Path |
|
|
from fastapi import Request, Depends, Body |
|
|
from enum import Enum |
|
|
|
|
|
class AmazonThisObject(ReachyMiniApp): |
|
|
custom_app_url: str | None = "http://0.0.0.0:8042" |
|
|
|
|
|
amaz_token: str = "" |
|
|
|
|
|
def __init__(self, *args, **kwargs): |
|
|
super().__init__(*args, **kwargs) |
|
|
self.amaz_token = "" |
|
|
|
|
|
def _get_self(self): |
|
|
return self |
|
|
|
|
|
def run(self, reachy_mini: ReachyMini, stop_event: threading.Event): |
|
|
last_photo = None |
|
|
last_photo_timestamp = None |
|
|
image_url = f"" |
|
|
exec_dir = os.path.abspath(__file__) |
|
|
exec_dir = exec_dir.replace("main.py","") |
|
|
file_path = Path(exec_dir) / ".env" |
|
|
|
|
|
@self.settings_app.post("/set_token") |
|
|
async def set_hf_token(request: Request): |
|
|
try: |
|
|
data = await request.json() |
|
|
token = data.get("token", "").strip() |
|
|
|
|
|
if not token: |
|
|
return { |
|
|
"success": False, |
|
|
"message": "No token provided." |
|
|
} |
|
|
|
|
|
|
|
|
with open(file_path, "w") as f: |
|
|
f.write(f"HF_TOKEN={token}\n") |
|
|
print(f"Token Hugging Face successfully saved in {file_path}") |
|
|
return { |
|
|
"success": True, |
|
|
"message": "Token saved." |
|
|
} |
|
|
except Exception as e: |
|
|
return { |
|
|
"success": False, |
|
|
"message": "Error on server side" |
|
|
} |
|
|
|
|
|
@self.settings_app.post("/check_token") |
|
|
async def hf_token_available(self_instance: AmazonThisObject = Depends(self._get_self)): |
|
|
file_path = Path(exec_dir) / ".env" |
|
|
if not file_path.exists(): |
|
|
return { |
|
|
"success": False, |
|
|
"token": None |
|
|
} |
|
|
try: |
|
|
|
|
|
with open(file_path, "r", encoding="utf-8") as f: |
|
|
lines = f.readlines() |
|
|
|
|
|
for line in lines: |
|
|
line = line.strip() |
|
|
if line.startswith("HF_TOKEN="): |
|
|
|
|
|
self_instance.amaz_token = line.split("=", 1)[1].strip().strip('"').strip("'") |
|
|
break |
|
|
if self_instance.amaz_token: |
|
|
return { |
|
|
"success": True, |
|
|
"token": self_instance.amaz_token |
|
|
} |
|
|
else: |
|
|
print("No HF_TOKEN found.") |
|
|
return { |
|
|
"success": False, |
|
|
"token": None, |
|
|
"message": "HF_TOKEN missing" |
|
|
} |
|
|
except Exception as e: |
|
|
print(f"Errot while readind .env : {e}") |
|
|
return { |
|
|
"success": False, |
|
|
"token": None, |
|
|
"message": "Error whild reading file" |
|
|
} |
|
|
|
|
|
|
|
|
@self.settings_app.post("/take_photo") |
|
|
def take_photo(self_instance: AmazonThisObject = Depends(self._get_self)): |
|
|
nonlocal last_photo, last_photo_timestamp |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
frame = reachy_mini.media.get_frame() |
|
|
if frame is None: |
|
|
return {"success": False, "error": "Camera not available"} |
|
|
|
|
|
exec_dir = os.path.abspath(__file__) |
|
|
exec_dir = exec_dir.replace("main.py","") |
|
|
|
|
|
file_path = Path(exec_dir) / "sound/shutter.mp3" |
|
|
|
|
|
reachy_mini.media.play_sound(file_path) |
|
|
|
|
|
|
|
|
success, encoded = cv2.imencode('.jpg', frame, [cv2.IMWRITE_JPEG_QUALITY, 90]) |
|
|
if not success: |
|
|
return {"success": False, "error": "Failed to encode image"} |
|
|
|
|
|
|
|
|
last_photo = encoded.tobytes() |
|
|
last_photo_timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") |
|
|
|
|
|
|
|
|
b64_image = base64.b64encode(last_photo).decode('utf-8') |
|
|
image_url = f"data:image/jpeg;base64,{b64_image}" |
|
|
|
|
|
return { |
|
|
"success": True, |
|
|
"preview": f"data:image/jpeg;base64,{b64_image}", |
|
|
"filename": f"reachy_photo_{last_photo_timestamp}.jpg", |
|
|
} |
|
|
|
|
|
|
|
|
@self.settings_app.post("/get_link") |
|
|
def get_link(self_instance: AmazonThisObject = Depends(self._get_self)): |
|
|
|
|
|
b64_image = base64.b64encode(last_photo).decode('utf-8') |
|
|
image_url = f"data:image/jpeg;base64,{b64_image}" |
|
|
|
|
|
|
|
|
|
|
|
hf_client = OpenAI( |
|
|
base_url="https://router.huggingface.co/v1", |
|
|
|
|
|
api_key=self_instance.amaz_token |
|
|
) |
|
|
completion = hf_client.chat.completions.create( |
|
|
model="zai-org/GLM-4.5V:novita", |
|
|
messages=[ |
|
|
{ |
|
|
"role": "user", |
|
|
"content": |
|
|
[ |
|
|
{ |
|
|
"type": "text", |
|
|
"text": "Generate only a valid link to search object on amazon, no blabla." |
|
|
}, |
|
|
{ |
|
|
"type": "image_url", |
|
|
"image_url": |
|
|
{ |
|
|
"url": image_url |
|
|
} |
|
|
} |
|
|
] |
|
|
} |
|
|
], |
|
|
) |
|
|
|
|
|
url_link = completion.choices[0].message.content |
|
|
|
|
|
|
|
|
url_link = url_link.replace("<|end_of_box|>", "") |
|
|
url_link = url_link.replace("<|begin_of_box|>", "") |
|
|
|
|
|
|
|
|
return { |
|
|
"success": True, |
|
|
"url": url_link |
|
|
} |
|
|
|
|
|
|
|
|
while not stop_event.is_set(): |
|
|
time.sleep(0.1) |
|
|
|
|
|
if __name__ == "__main__": |
|
|
app = AmazonThisObject() |
|
|
app.wrapped_run() |
|
|
|