File size: 17,978 Bytes
5ed0597
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
# backend/record_controller.py (o record_controller_gradio.py)
import subprocess
import os
import gradio as gr
import serial.tools.list_ports # Nuevo: para listar puertos seriales
import json # Necesario para la función load_config si la copiamos aquí

# IMPORTANTE: Si robot_detector.py está en el mismo directorio 'backend',
# puedes importar su función de carga de configuración.
# Asegúrate de que el archivo robot_detector.py exista en la misma carpeta.
try:
    from robot_detector import load_config as load_robot_config, CONFIG_FILE as ROBOT_CONFIG_FILE
except ImportError:
    print("Advertencia: No se pudo importar 'load_config' de 'robot_detector.py'.")
    print("Asegúrate de que 'robot_detector.py' esté en el mismo directorio 'backend'.")
    print("Se usará una implementación local de load_config.")
    
    # Implementación fallback de load_config si no se puede importar
    ROBOT_CONFIG_FILE = "robot_config.json"
    def load_robot_config():
        if os.path.exists(ROBOT_CONFIG_FILE):
            try:
                with open(ROBOT_CONFIG_FILE, 'r') as f:
                    config = json.load(f)
                return (
                    config.get("robot_follower", {}).get("port", ""),
                    config.get("robot_follower", {}).get("id", ""),
                    config.get("teleop_leader", {}).get("port", ""),
                    config.get("teleop_leader", {}).get("id", "")
                )
            except Exception as e:
                print(f"Error al cargar la configuración existente desde '{ROBOT_CONFIG_FILE}': {e}. Se usarán valores por defecto.")
                return "", "", "", ""
        return "", "", "", ""


def run_command(command: str, description: str):
    """
    Ejecuta un comando de shell y captura su salida, manejando errores.
    """
    print(f"\n--- {description} ---")
    process_output = [] # Use a list to collect output for better streaming if needed later
    try:
        # Use Popen to stream output in real-time, important for long running processes
        process = subprocess.Popen(
            command,
            shell=True,
            stdout=subprocess.PIPE,
            stderr=subprocess.STDOUT, # Redirect stderr to stdout
            text=True
        )

        for line in iter(process.stdout.readline, ''):
            print(line, end='') # Print to console for server-side logging
            process_output.append(line)
            # You could add a gr.Progress() here for real-time update in Gradio if needed

        process.wait() # Wait for the process to complete

        if process.returncode == 0:
            print(f"Éxito: {description}")
            return True, "".join(process_output)
        else:
            error_message = f"Error durante '{description}': El comando devolvió el código de salida {process.returncode}"
            print(error_message)
            return False, "".join(process_output) # Return all collected output, including errors
    except Exception as e:
        error_message = f"Ocurrió un error inesperado durante '{description}': {e}"
        print(error_message)
        return False, error_message

def list_serial_device_paths():
    """
    Lista los puertos seriales disponibles y devuelve solo las rutas de los dispositivos.
    """
    ports = serial.tools.list_ports.comports()
    device_paths = []
    if not ports:
        return [] # Return empty list if no ports found
    
    for port in ports:
        device_paths.append(port.device)
    
    return device_paths

def login_to_huggingface(token: str):
    """
    Inicia sesión en Hugging Face CLI con el token proporcionado.
    """
    if not token or token == "hf_YOUR_ACTUAL_WRITE_TOKEN_HERE":
        return False, "Error: Por favor, proporciona un token de Hugging Face válido."
    
    success, output = run_command(
        f"huggingface-cli login --token {token}", # <--- ¡Argumento eliminado!
        "Iniciando sesión en Hugging Face CLI"
    )
    if success:
        return True, "¡Inicio de sesión en Hugging Face exitoso!"
    else:
        return False, output

def get_huggingface_user():
    """
    Obtiene el nombre de usuario de Hugging Face.
    """
    success, output = run_command(
        "huggingface-cli whoami | head -n 1",
        "Obteniendo nombre de usuario de Hugging Face"
    )
    if success:
        # The output might contain warnings before the actual username.
        # Try to find the username, which should be the first non-empty, non-warning line.
        lines = output.splitlines()
        for line in lines:
            if line.strip() and not (line.strip().startswith("warnings.") or "deprecated" in line.lower()):
                return True, line.strip()
        return False, "No se pudo extraer el nombre de usuario de la salida de 'whoami'."
    else:
        return False, output

def record_dataset_core(hf_user: str,
                        robot_port: str,
                        teleop_port: str,
                        num_episodes: int = 2,
                        single_task: str = "Grab the black cube",
                        push_to_hub: bool = True,
                        resume: bool = False,
                        episode_time_s: int = 60,
                        reset_time_s: int = 60):
    """
    Función central para grabar un dataset usando LeRobot.
    """
    if not hf_user:
        return False, "Error: Nombre de usuario de Hugging Face no disponible. ¿Falló el inicio de sesión?"

    # Use the IDs from the loaded config for consistency
    initial_follower_port, initial_follower_id, initial_leader_port, initial_leader_id = load_robot_config()
    robot_id = initial_follower_id if initial_follower_id else "my_awesome_follower_arm"
    teleop_id = initial_leader_id if initial_leader_id else "my_awesome_leader_arm"


    dataset_repo_id = f"{hf_user}/record-test"
    
    command = [
        "python", "-m", "lerobot.record",
        "--robot.type=so101_follower",
        f"--robot.port={robot_port}",
        f"--robot.id={robot_id}", # Use ID from config/default
        "--robot.cameras=\"{{ front: {{type: opencv, index_or_path: 0, width: 1920, height: 1080, fps: 30}}}}\"", # Escaping for shell command
        "--teleop.type=so101_leader",
        f"--teleop.port={teleop_port}",
        f"--teleop.id={teleop_id}", # Use ID from config/default
        "--display_data=true",
        f"--dataset.repo_id={dataset_repo_id}",
        f"--dataset.num_episodes={num_episodes}",
        f"--dataset.single_task=\"{single_task}\"",
        f"--dataset.episode_time_s={episode_time_s}",
        f"--dataset.reset_time_s={reset_time_s}"
    ]

    if not push_to_hub:
        command.append("--dataset.push_to_hub=False")
    if resume:
        command.append("--resume=true")

    full_command = " ".join(command)

    success, output = run_command(full_command, "Grabación del dataset")
    if success:
        final_message = f"¡Grabación del dataset completada exitosamente!\n"
        final_message += f"El dataset se almacenó localmente en: ~/.cache/huggingface/lerobot/{dataset_repo_id}\n"
        if push_to_hub:
            final_message += f"El dataset ha sido subido a: https://huggingface.co/datasets/{dataset_repo_id}\n"
            final_message += "Puedes añadir etiquetas personalizadas (ej. 'tutorial') en la página del dataset en Hugging Face."
        else:
            final_message += "La subida al Hugging Face Hub fue deshabilitada."
        return True, final_message
    else:
        return False, output

# --- Gradio Interface Logic ---

# Variable global para almacenar el usuario de Hugging Face
current_hf_user = None

def gradio_login(hf_token_input: str):
    """Interfaz Gradio para iniciar sesión en Hugging Face."""
    global current_hf_user
    success, message = login_to_huggingface(hf_token_input)
    if success:
        gr.Info(message)
        success_user, user_name = get_huggingface_user()
        if success_user:
            current_hf_user = user_name
            return gr.update(value=user_name, interactive=False), gr.update(visible=True, value=message) # Update username field and status box
        else:
            return gr.update(value="", interactive=True), gr.update(visible=True, value=user_name) # Show error if user not retrieved
    else:
        current_hf_user = None
        return gr.update(value="", interactive=True), gr.update(visible=True, value=message) # Clear username and show error

def gradio_record(robot_port_input: str,
                  teleop_port_input: str,
                  num_episodes_input: int,
                  single_task_input: str,
                  push_to_hub_input: bool,
                  resume_input: bool,
                  episode_time_s_input: int,
                  reset_time_s_input: int):
    """Interfaz Gradio para iniciar la grabación del dataset."""
    global current_hf_user
    if not current_hf_user:
        return gr.update(visible=True, value="Error: No se ha iniciado sesión en Hugging Face o no se pudo obtener el usuario. Por favor, inicia sesión primero."), gr.update(interactive=False)

    gr.Info("Iniciando grabación del dataset. Esto puede tardar unos segundos...")
    success, message = record_dataset_core(
        hf_user=current_hf_user,
        robot_port=robot_port_input,
        teleop_port=teleop_port_input,
        num_episodes=num_episodes_input,
        single_task=single_task_input,
        push_to_hub=push_to_hub_input,
        resume=resume_input,
        episode_time_s=episode_time_s_input,
        reset_time_s=reset_time_s_input
    )
    if success:
        gr.Info("Grabación completada.")
        return gr.update(visible=True, value=message), gr.update(interactive=True)
    else:
        gr.Info("Grabación fallida. Revisa el log para más detalles.")
        return gr.update(visible=True, value=message), gr.update(interactive=True)

# Define la interfaz de Gradio
with gr.Blocks(title="Controlador de Grabación LeRobot") as demo:
    gr.Markdown("# <center>Controlador de Grabación de Datasets LeRobot</center>")
    gr.Markdown("Esta interfaz te ayuda a gestionar la grabación de datasets para LeRobot.")

    with gr.Tab("1. Configuración de Hugging Face"):
        gr.Markdown("## Configuración de Hugging Face")
        gr.Markdown(
            "Introduce tu **token de Hugging Face con permisos de escritura**. "
            "Puedes generarlo en [huggingface.co/settings/tokens](https://huggingface.co/settings/tokens)."
        )
        hf_token_input = gr.Textbox(
            label="Token de Hugging Face",
            type="password",
            placeholder="hf_YOUR_ACTUAL_WRITE_TOKEN_HERE",
            info="El token debe tener permisos de escritura (write)."
        )
        login_btn = gr.Button("Iniciar Sesión / Verificar Token")
        hf_user_output = gr.Textbox(label="Usuario de Hugging Face Actual", interactive=False, placeholder="No autenticado", show_copy_button=True)
        login_status_output = gr.Textbox(label="Estado de Autenticación", interactive=False, visible=False)
        
        login_btn.click(
            fn=gradio_login,
            inputs=hf_token_input,
            outputs=[hf_user_output, login_status_output]
        )

    with gr.Tab("2. Parámetros de Grabación"):
        gr.Markdown("## Parámetros del Robot y la Grabación")
        gr.Markdown(
            "Asegúrate de que los **puertos seriales** de tus robots sean correctos. "
            "Puedes detectarlos automáticamente o introducirlos manualmente."
        )

        # Load initial config from robot_config.json
        initial_follower_port, initial_follower_id, initial_leader_port, initial_leader_id = load_robot_config()

        with gr.Group(): # Group for auto-detection and manual input
            gr.Markdown("### Puerto del Robot Follower")
            robot_port_dropdown = gr.Dropdown(
                label="Seleccionar Puerto",
                choices=[], # Populated on load/refresh
                value=initial_follower_port, # Pre-fill from config
                interactive=True,
                allow_custom_value=True # Allow manual input
            )
            robot_port_manual = gr.Textbox(
                label="O introducir Puerto Manualmente",
                value=initial_follower_port, # Pre-fill from config
                placeholder="ej. /dev/tty.usbmodemXXXXX",
                interactive=True
            )
            # Synchronize dropdown and manual textbox
            robot_port_dropdown.change(lambda x: x, inputs=robot_port_dropdown, outputs=robot_port_manual)
            robot_port_manual.change(lambda x: x, inputs=robot_port_manual, outputs=robot_port_dropdown)

        with gr.Group(): # Group for auto-detection and manual input
            gr.Markdown("### Puerto del Teleoperador Leader")
            teleop_port_dropdown = gr.Dropdown(
                label="Seleccionar Puerto",
                choices=[], # Populated on load/refresh
                value=initial_leader_port, # Pre-fill from config
                interactive=True,
                allow_custom_value=True # Allow manual input
            )
            teleop_port_manual = gr.Textbox(
                label="O introducir Puerto Manualmente",
                value=initial_leader_port, # Pre-fill from config
                placeholder="ej. /dev/tty.usbmodemXXXXX",
                interactive=True
            )
            # Synchronize dropdown and manual textbox
            teleop_port_dropdown.change(lambda x: x, inputs=teleop_port_dropdown, outputs=teleop_port_manual)
            teleop_port_manual.change(lambda x: x, inputs=teleop_port_manual, outputs=teleop_port_dropdown)

        refresh_ports_btn = gr.Button("🔄 Refrescar Puertos Detectados")

        num_episodes_input = gr.Slider(
            minimum=1, maximum=100, step=1, value=2, label="Número de Episodios a Grabar",
            info="Define cuántos episodios se grabarán."
        )
        single_task_input = gr.Textbox(
            label="Descripción de la Tarea (Single Task)",
            value="Grab the black cube",
            placeholder="Describe la tarea que realizará el robot"
        )
        episode_time_s_input = gr.Slider(
            minimum=10, maximum=300, step=10, value=60, label="Duración del Episodio (segundos)",
            info="Tiempo máximo de grabación para cada episodio."
        )
        reset_time_s_input = gr.Slider(
            minimum=5, maximum=120, step=5, value=60, label="Tiempo de Reseteo (segundos)",
            info="Tiempo para preparar el entorno entre episodios."
        )
        push_to_hub_input = gr.Checkbox(
            label="Subir Dataset a Hugging Face Hub",
            value=True,
            info="Marca esta casilla para subir el dataset a tu perfil de Hugging Face."
        )
        resume_input = gr.Checkbox(
            label="Reanudar Grabación Existente",
            value=False,
            info="Si hay una grabación previa, continúa desde donde se detuvo."
        )

        # Function to update dropdown choices on button click or page load
        def update_port_dropdowns():
            ports = list_serial_device_paths()
            # The dropdowns allow custom values, so setting choices updates the list for selection
            return gr.update(choices=ports), gr.update(choices=ports)

        # Attach update function to refresh button and initial load
        refresh_ports_btn.click(
            fn=update_port_dropdowns,
            inputs=None,
            outputs=[robot_port_dropdown, teleop_port_dropdown]
        )
        demo.load(
            fn=update_port_dropdowns,
            inputs=None,
            outputs=[robot_port_dropdown, teleop_port_dropdown]
        )

    with gr.Tab("3. Iniciar Grabación"):
        gr.Markdown("## Ejecutar Grabación del Dataset")
        gr.Markdown(
            "Haz clic en el botón para iniciar la grabación. "
            "Una nueva ventana de LeRobot se abrirá para la teleoperación y visualización."
        )
        record_btn = gr.Button("🚀 Iniciar Grabación 🚀", variant="primary")
        
        # Output para el estado y mensajes de la grabación
        record_status_output = gr.Textbox(
            label="Estado de la Grabación",
            interactive=False,
            visible=False,
            lines=10
        )

        record_btn.click(
            fn=gradio_record,
            inputs=[
                # Use the current value from either dropdown or textbox (they are synchronized)
                robot_port_manual, # We'll use the textbox value as it's the ultimate source of truth
                teleop_port_manual, # Same for teleop port
                num_episodes_input,
                single_task_input,
                push_to_hub_input,
                resume_input,
                episode_time_s_input,
                reset_time_s_input
            ],
            outputs=[record_status_output, record_btn] # Update status output and re-enable button
        )
        gr.Markdown("### Controles de Teclado Durante la Grabación (en la ventana de LeRobot):")
        gr.Markdown(
            "- **Flecha Derecha (→):** Detener el episodio actual y pasar al siguiente.\n"
            "- **Flecha Izquierda (←):** Cancelar el episodio actual y volver a grabar.\n"
            "- **Escape (ESC):** Detener la sesión, codificar videos y subir el dataset (si está activado)."
        )

    gr.Markdown("---")
    gr.Markdown("Hecho con ❤️ para RobotCleanPupusas503")

# Lanza la interfaz de Gradio
if __name__ == "__main__":
    try:
        import serial # Test if pyserial is installed
    except ImportError:
        print("Error: La librería 'pyserial' no está instalada.")
        print("Por favor, instala 'pyserial' ejecutando: pip install pyserial")
        exit(1)
    
    demo.launch(share=False)