File size: 7,998 Bytes
d47e7bb
533b8cb
d47e7bb
 
 
 
 
 
 
 
 
f8d91ab
 
533b8cb
d47e7bb
 
 
 
f8d91ab
 
 
 
 
 
 
 
 
d47e7bb
 
 
 
f8d91ab
 
 
 
 
 
533b8cb
 
 
f8d91ab
533b8cb
 
 
 
 
 
 
 
 
 
f8d91ab
533b8cb
 
 
 
 
 
 
f8d91ab
d47e7bb
6950d8a
f8d91ab
533b8cb
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
d47e7bb
 
 
f8d91ab
533b8cb
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
d47e7bb
 
 
f8d91ab
533b8cb
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
d47e7bb
533b8cb
 
 
 
 
 
 
 
 
d47e7bb
533b8cb
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
d47e7bb
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
import threading
from threading import Thread
import base64
from datetime import datetime
from reachy_mini import ReachyMini, ReachyMiniApp
from reachy_mini.utils import create_head_pose
import numpy as np
import time
import cv2
import os
from openai import OpenAI
from pathlib import Path
from fastapi import Request, Depends, Body
from enum import Enum

class AmazonThisObject(ReachyMiniApp):
    custom_app_url: str | None = "http://0.0.0.0:8042"

    amaz_token: str = ""  # ou la valeur que tu veux

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.amaz_token = ""

    def _get_self(self):
        return self

    def run(self, reachy_mini: ReachyMini, stop_event: threading.Event):
        last_photo = None
        last_photo_timestamp = None
        image_url = f""
        exec_dir = os.path.abspath(__file__)
        exec_dir = exec_dir.replace("main.py","")
        file_path = Path(exec_dir) / ".env"

        @self.settings_app.post("/set_token")
        async def set_hf_token(request: Request):
                try:
                    data = await request.json()
                    token = data.get("token", "").strip()

                    if not token:
                        return {
                            "success": False,
                            "message": "No token provided."
                        }

                    # Erase or add new file with HF_TOKEN=ton_token
                    with open(file_path, "w") as f:
                        f.write(f"HF_TOKEN={token}\n")
                        print(f"Token Hugging Face successfully saved in {file_path}")
                    return {
                        "success": True,
                        "message": "Token saved."
                    }
                except Exception as e:
                    return {
                    "success": False,
                    "message": "Error on server side"
                    }

        @self.settings_app.post("/check_token")
        async def hf_token_available(self_instance: AmazonThisObject = Depends(self._get_self)):
                file_path = Path(exec_dir) / ".env"
                if not file_path.exists():
                    return {
                        "success": False,
                        "token": None
                    }
                try:
                    # Read all the file
                    with open(file_path, "r", encoding="utf-8") as f:
                            lines = f.readlines()
                    # Find line HF_TOKEN=
                    for line in lines:
                            line = line.strip()
                            if line.startswith("HF_TOKEN="):
                                # On prend tout après le =
                                self_instance.amaz_token = line.split("=", 1)[1].strip().strip('"').strip("'")
                                break
                    if self_instance.amaz_token:
                            return {
                                "success": True,
                                "token": self_instance.amaz_token
                            }
                    else:
                            print("No HF_TOKEN found.")
                            return {
                                "success": False,
                                "token": None,
                                "message": "HF_TOKEN missing"
                            }
                except Exception as e:
                    print(f"Errot while readind .env : {e}")
                    return {
                        "success": False,
                        "token": None,
                        "message": "Error whild reading file"
                    }

        #Take photo REST POST request
        @self.settings_app.post("/take_photo")
        def take_photo(self_instance: AmazonThisObject = Depends(self._get_self)):
                nonlocal last_photo, last_photo_timestamp

                # Part for the Simulator
                # Initialize webcam (0 = default camera)
                #cam = cv2.VideoCapture(0)
                # Capture one frame with opencv on simulator [ need a webcam ] -> later replace by reachy robot camera
                #ret, frame = cam.read()
                #if not ret:
                #    return {"success": False, "error": "Camera not available"}

                #Code for reachy camera
                frame = reachy_mini.media.get_frame()
                if frame is None:
                    return {"success": False, "error": "Camera not available"}
                
                exec_dir = os.path.abspath(__file__)
                exec_dir = exec_dir.replace("main.py","")

                file_path = Path(exec_dir) / "sound/shutter.mp3"

                reachy_mini.media.play_sound(file_path)

                # Encode as JPEG
                success, encoded = cv2.imencode('.jpg', frame, [cv2.IMWRITE_JPEG_QUALITY, 90])
                if not success:
                    return {"success": False, "error": "Failed to encode image"}

                #Save last photo
                last_photo = encoded.tobytes()
                last_photo_timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")

                # Return base64 for preview
                b64_image = base64.b64encode(last_photo).decode('utf-8')
                image_url = f"data:image/jpeg;base64,{b64_image}"

                return {
                    "success": True,
                    "preview": f"data:image/jpeg;base64,{b64_image}",
                    "filename": f"reachy_photo_{last_photo_timestamp}.jpg",
                }

        #Get link REST POST request
        @self.settings_app.post("/get_link")
        def get_link(self_instance: AmazonThisObject = Depends(self._get_self)):
                #Restore image from save variable last_photo
                b64_image = base64.b64encode(last_photo).decode('utf-8')
                image_url = f"data:image/jpeg;base64,{b64_image}"

                #Use model zai-org/GLM-4.5V with inference novita with huggingface client.
                #Analyse image and get URL only -> bypass blabla from LLM by usin "no blabla".
                hf_client = OpenAI(
                    base_url="https://router.huggingface.co/v1",
                    #define token in environment variable
                    api_key=self_instance.amaz_token
                )
                completion = hf_client.chat.completions.create(
                    model="zai-org/GLM-4.5V:novita",
                    messages=[
                        {
                            "role": "user",
                            "content":
                            [
                                {
                                    "type": "text",
                                    "text": "Generate only a valid link to search object on amazon, no blabla."
                                },
                                {
                                    "type": "image_url",
                                    "image_url":
                                    {
                                        "url": image_url
                                    }
                                }
                            ]
                        }
                    ],
                )
                #Get link from result
                url_link = completion.choices[0].message.content

                #Clear content and keep only URL
                url_link = url_link.replace("<|end_of_box|>", "")
                url_link = url_link.replace("<|begin_of_box|>", "")

                #Return success + url link
                return {
                    "success": True,
                    "url": url_link
                }

        # Keep the app running
        while not stop_event.is_set():
            time.sleep(0.1)

if __name__ == "__main__":
    app = AmazonThisObject()
    app.wrapped_run()