Spaces:
Runtime error
Runtime error
File size: 17,978 Bytes
5ed0597 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 |
# backend/record_controller.py (o record_controller_gradio.py)
import subprocess
import os
import gradio as gr
import serial.tools.list_ports # Nuevo: para listar puertos seriales
import json # Necesario para la función load_config si la copiamos aquí
# IMPORTANTE: Si robot_detector.py está en el mismo directorio 'backend',
# puedes importar su función de carga de configuración.
# Asegúrate de que el archivo robot_detector.py exista en la misma carpeta.
try:
from robot_detector import load_config as load_robot_config, CONFIG_FILE as ROBOT_CONFIG_FILE
except ImportError:
print("Advertencia: No se pudo importar 'load_config' de 'robot_detector.py'.")
print("Asegúrate de que 'robot_detector.py' esté en el mismo directorio 'backend'.")
print("Se usará una implementación local de load_config.")
# Implementación fallback de load_config si no se puede importar
ROBOT_CONFIG_FILE = "robot_config.json"
def load_robot_config():
if os.path.exists(ROBOT_CONFIG_FILE):
try:
with open(ROBOT_CONFIG_FILE, 'r') as f:
config = json.load(f)
return (
config.get("robot_follower", {}).get("port", ""),
config.get("robot_follower", {}).get("id", ""),
config.get("teleop_leader", {}).get("port", ""),
config.get("teleop_leader", {}).get("id", "")
)
except Exception as e:
print(f"Error al cargar la configuración existente desde '{ROBOT_CONFIG_FILE}': {e}. Se usarán valores por defecto.")
return "", "", "", ""
return "", "", "", ""
def run_command(command: str, description: str):
"""
Ejecuta un comando de shell y captura su salida, manejando errores.
"""
print(f"\n--- {description} ---")
process_output = [] # Use a list to collect output for better streaming if needed later
try:
# Use Popen to stream output in real-time, important for long running processes
process = subprocess.Popen(
command,
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT, # Redirect stderr to stdout
text=True
)
for line in iter(process.stdout.readline, ''):
print(line, end='') # Print to console for server-side logging
process_output.append(line)
# You could add a gr.Progress() here for real-time update in Gradio if needed
process.wait() # Wait for the process to complete
if process.returncode == 0:
print(f"Éxito: {description}")
return True, "".join(process_output)
else:
error_message = f"Error durante '{description}': El comando devolvió el código de salida {process.returncode}"
print(error_message)
return False, "".join(process_output) # Return all collected output, including errors
except Exception as e:
error_message = f"Ocurrió un error inesperado durante '{description}': {e}"
print(error_message)
return False, error_message
def list_serial_device_paths():
"""
Lista los puertos seriales disponibles y devuelve solo las rutas de los dispositivos.
"""
ports = serial.tools.list_ports.comports()
device_paths = []
if not ports:
return [] # Return empty list if no ports found
for port in ports:
device_paths.append(port.device)
return device_paths
def login_to_huggingface(token: str):
"""
Inicia sesión en Hugging Face CLI con el token proporcionado.
"""
if not token or token == "hf_YOUR_ACTUAL_WRITE_TOKEN_HERE":
return False, "Error: Por favor, proporciona un token de Hugging Face válido."
success, output = run_command(
f"huggingface-cli login --token {token}", # <--- ¡Argumento eliminado!
"Iniciando sesión en Hugging Face CLI"
)
if success:
return True, "¡Inicio de sesión en Hugging Face exitoso!"
else:
return False, output
def get_huggingface_user():
"""
Obtiene el nombre de usuario de Hugging Face.
"""
success, output = run_command(
"huggingface-cli whoami | head -n 1",
"Obteniendo nombre de usuario de Hugging Face"
)
if success:
# The output might contain warnings before the actual username.
# Try to find the username, which should be the first non-empty, non-warning line.
lines = output.splitlines()
for line in lines:
if line.strip() and not (line.strip().startswith("warnings.") or "deprecated" in line.lower()):
return True, line.strip()
return False, "No se pudo extraer el nombre de usuario de la salida de 'whoami'."
else:
return False, output
def record_dataset_core(hf_user: str,
robot_port: str,
teleop_port: str,
num_episodes: int = 2,
single_task: str = "Grab the black cube",
push_to_hub: bool = True,
resume: bool = False,
episode_time_s: int = 60,
reset_time_s: int = 60):
"""
Función central para grabar un dataset usando LeRobot.
"""
if not hf_user:
return False, "Error: Nombre de usuario de Hugging Face no disponible. ¿Falló el inicio de sesión?"
# Use the IDs from the loaded config for consistency
initial_follower_port, initial_follower_id, initial_leader_port, initial_leader_id = load_robot_config()
robot_id = initial_follower_id if initial_follower_id else "my_awesome_follower_arm"
teleop_id = initial_leader_id if initial_leader_id else "my_awesome_leader_arm"
dataset_repo_id = f"{hf_user}/record-test"
command = [
"python", "-m", "lerobot.record",
"--robot.type=so101_follower",
f"--robot.port={robot_port}",
f"--robot.id={robot_id}", # Use ID from config/default
"--robot.cameras=\"{{ front: {{type: opencv, index_or_path: 0, width: 1920, height: 1080, fps: 30}}}}\"", # Escaping for shell command
"--teleop.type=so101_leader",
f"--teleop.port={teleop_port}",
f"--teleop.id={teleop_id}", # Use ID from config/default
"--display_data=true",
f"--dataset.repo_id={dataset_repo_id}",
f"--dataset.num_episodes={num_episodes}",
f"--dataset.single_task=\"{single_task}\"",
f"--dataset.episode_time_s={episode_time_s}",
f"--dataset.reset_time_s={reset_time_s}"
]
if not push_to_hub:
command.append("--dataset.push_to_hub=False")
if resume:
command.append("--resume=true")
full_command = " ".join(command)
success, output = run_command(full_command, "Grabación del dataset")
if success:
final_message = f"¡Grabación del dataset completada exitosamente!\n"
final_message += f"El dataset se almacenó localmente en: ~/.cache/huggingface/lerobot/{dataset_repo_id}\n"
if push_to_hub:
final_message += f"El dataset ha sido subido a: https://huggingface.co/datasets/{dataset_repo_id}\n"
final_message += "Puedes añadir etiquetas personalizadas (ej. 'tutorial') en la página del dataset en Hugging Face."
else:
final_message += "La subida al Hugging Face Hub fue deshabilitada."
return True, final_message
else:
return False, output
# --- Gradio Interface Logic ---
# Variable global para almacenar el usuario de Hugging Face
current_hf_user = None
def gradio_login(hf_token_input: str):
"""Interfaz Gradio para iniciar sesión en Hugging Face."""
global current_hf_user
success, message = login_to_huggingface(hf_token_input)
if success:
gr.Info(message)
success_user, user_name = get_huggingface_user()
if success_user:
current_hf_user = user_name
return gr.update(value=user_name, interactive=False), gr.update(visible=True, value=message) # Update username field and status box
else:
return gr.update(value="", interactive=True), gr.update(visible=True, value=user_name) # Show error if user not retrieved
else:
current_hf_user = None
return gr.update(value="", interactive=True), gr.update(visible=True, value=message) # Clear username and show error
def gradio_record(robot_port_input: str,
teleop_port_input: str,
num_episodes_input: int,
single_task_input: str,
push_to_hub_input: bool,
resume_input: bool,
episode_time_s_input: int,
reset_time_s_input: int):
"""Interfaz Gradio para iniciar la grabación del dataset."""
global current_hf_user
if not current_hf_user:
return gr.update(visible=True, value="Error: No se ha iniciado sesión en Hugging Face o no se pudo obtener el usuario. Por favor, inicia sesión primero."), gr.update(interactive=False)
gr.Info("Iniciando grabación del dataset. Esto puede tardar unos segundos...")
success, message = record_dataset_core(
hf_user=current_hf_user,
robot_port=robot_port_input,
teleop_port=teleop_port_input,
num_episodes=num_episodes_input,
single_task=single_task_input,
push_to_hub=push_to_hub_input,
resume=resume_input,
episode_time_s=episode_time_s_input,
reset_time_s=reset_time_s_input
)
if success:
gr.Info("Grabación completada.")
return gr.update(visible=True, value=message), gr.update(interactive=True)
else:
gr.Info("Grabación fallida. Revisa el log para más detalles.")
return gr.update(visible=True, value=message), gr.update(interactive=True)
# Define la interfaz de Gradio
with gr.Blocks(title="Controlador de Grabación LeRobot") as demo:
gr.Markdown("# <center>Controlador de Grabación de Datasets LeRobot</center>")
gr.Markdown("Esta interfaz te ayuda a gestionar la grabación de datasets para LeRobot.")
with gr.Tab("1. Configuración de Hugging Face"):
gr.Markdown("## Configuración de Hugging Face")
gr.Markdown(
"Introduce tu **token de Hugging Face con permisos de escritura**. "
"Puedes generarlo en [huggingface.co/settings/tokens](https://huggingface.co/settings/tokens)."
)
hf_token_input = gr.Textbox(
label="Token de Hugging Face",
type="password",
placeholder="hf_YOUR_ACTUAL_WRITE_TOKEN_HERE",
info="El token debe tener permisos de escritura (write)."
)
login_btn = gr.Button("Iniciar Sesión / Verificar Token")
hf_user_output = gr.Textbox(label="Usuario de Hugging Face Actual", interactive=False, placeholder="No autenticado", show_copy_button=True)
login_status_output = gr.Textbox(label="Estado de Autenticación", interactive=False, visible=False)
login_btn.click(
fn=gradio_login,
inputs=hf_token_input,
outputs=[hf_user_output, login_status_output]
)
with gr.Tab("2. Parámetros de Grabación"):
gr.Markdown("## Parámetros del Robot y la Grabación")
gr.Markdown(
"Asegúrate de que los **puertos seriales** de tus robots sean correctos. "
"Puedes detectarlos automáticamente o introducirlos manualmente."
)
# Load initial config from robot_config.json
initial_follower_port, initial_follower_id, initial_leader_port, initial_leader_id = load_robot_config()
with gr.Group(): # Group for auto-detection and manual input
gr.Markdown("### Puerto del Robot Follower")
robot_port_dropdown = gr.Dropdown(
label="Seleccionar Puerto",
choices=[], # Populated on load/refresh
value=initial_follower_port, # Pre-fill from config
interactive=True,
allow_custom_value=True # Allow manual input
)
robot_port_manual = gr.Textbox(
label="O introducir Puerto Manualmente",
value=initial_follower_port, # Pre-fill from config
placeholder="ej. /dev/tty.usbmodemXXXXX",
interactive=True
)
# Synchronize dropdown and manual textbox
robot_port_dropdown.change(lambda x: x, inputs=robot_port_dropdown, outputs=robot_port_manual)
robot_port_manual.change(lambda x: x, inputs=robot_port_manual, outputs=robot_port_dropdown)
with gr.Group(): # Group for auto-detection and manual input
gr.Markdown("### Puerto del Teleoperador Leader")
teleop_port_dropdown = gr.Dropdown(
label="Seleccionar Puerto",
choices=[], # Populated on load/refresh
value=initial_leader_port, # Pre-fill from config
interactive=True,
allow_custom_value=True # Allow manual input
)
teleop_port_manual = gr.Textbox(
label="O introducir Puerto Manualmente",
value=initial_leader_port, # Pre-fill from config
placeholder="ej. /dev/tty.usbmodemXXXXX",
interactive=True
)
# Synchronize dropdown and manual textbox
teleop_port_dropdown.change(lambda x: x, inputs=teleop_port_dropdown, outputs=teleop_port_manual)
teleop_port_manual.change(lambda x: x, inputs=teleop_port_manual, outputs=teleop_port_dropdown)
refresh_ports_btn = gr.Button("🔄 Refrescar Puertos Detectados")
num_episodes_input = gr.Slider(
minimum=1, maximum=100, step=1, value=2, label="Número de Episodios a Grabar",
info="Define cuántos episodios se grabarán."
)
single_task_input = gr.Textbox(
label="Descripción de la Tarea (Single Task)",
value="Grab the black cube",
placeholder="Describe la tarea que realizará el robot"
)
episode_time_s_input = gr.Slider(
minimum=10, maximum=300, step=10, value=60, label="Duración del Episodio (segundos)",
info="Tiempo máximo de grabación para cada episodio."
)
reset_time_s_input = gr.Slider(
minimum=5, maximum=120, step=5, value=60, label="Tiempo de Reseteo (segundos)",
info="Tiempo para preparar el entorno entre episodios."
)
push_to_hub_input = gr.Checkbox(
label="Subir Dataset a Hugging Face Hub",
value=True,
info="Marca esta casilla para subir el dataset a tu perfil de Hugging Face."
)
resume_input = gr.Checkbox(
label="Reanudar Grabación Existente",
value=False,
info="Si hay una grabación previa, continúa desde donde se detuvo."
)
# Function to update dropdown choices on button click or page load
def update_port_dropdowns():
ports = list_serial_device_paths()
# The dropdowns allow custom values, so setting choices updates the list for selection
return gr.update(choices=ports), gr.update(choices=ports)
# Attach update function to refresh button and initial load
refresh_ports_btn.click(
fn=update_port_dropdowns,
inputs=None,
outputs=[robot_port_dropdown, teleop_port_dropdown]
)
demo.load(
fn=update_port_dropdowns,
inputs=None,
outputs=[robot_port_dropdown, teleop_port_dropdown]
)
with gr.Tab("3. Iniciar Grabación"):
gr.Markdown("## Ejecutar Grabación del Dataset")
gr.Markdown(
"Haz clic en el botón para iniciar la grabación. "
"Una nueva ventana de LeRobot se abrirá para la teleoperación y visualización."
)
record_btn = gr.Button("🚀 Iniciar Grabación 🚀", variant="primary")
# Output para el estado y mensajes de la grabación
record_status_output = gr.Textbox(
label="Estado de la Grabación",
interactive=False,
visible=False,
lines=10
)
record_btn.click(
fn=gradio_record,
inputs=[
# Use the current value from either dropdown or textbox (they are synchronized)
robot_port_manual, # We'll use the textbox value as it's the ultimate source of truth
teleop_port_manual, # Same for teleop port
num_episodes_input,
single_task_input,
push_to_hub_input,
resume_input,
episode_time_s_input,
reset_time_s_input
],
outputs=[record_status_output, record_btn] # Update status output and re-enable button
)
gr.Markdown("### Controles de Teclado Durante la Grabación (en la ventana de LeRobot):")
gr.Markdown(
"- **Flecha Derecha (→):** Detener el episodio actual y pasar al siguiente.\n"
"- **Flecha Izquierda (←):** Cancelar el episodio actual y volver a grabar.\n"
"- **Escape (ESC):** Detener la sesión, codificar videos y subir el dataset (si está activado)."
)
gr.Markdown("---")
gr.Markdown("Hecho con ❤️ para RobotCleanPupusas503")
# Lanza la interfaz de Gradio
if __name__ == "__main__":
try:
import serial # Test if pyserial is installed
except ImportError:
print("Error: La librería 'pyserial' no está instalada.")
print("Por favor, instala 'pyserial' ejecutando: pip install pyserial")
exit(1)
demo.launch(share=False) |