RobotCleanPupusas / backend /record_controller.py
Trujasx's picture
backend done
5ed0597
# backend/record_controller.py (o record_controller_gradio.py)
import subprocess
import os
import gradio as gr
import serial.tools.list_ports # Nuevo: para listar puertos seriales
import json # Necesario para la función load_config si la copiamos aquí
# IMPORTANTE: Si robot_detector.py está en el mismo directorio 'backend',
# puedes importar su función de carga de configuración.
# Asegúrate de que el archivo robot_detector.py exista en la misma carpeta.
try:
from robot_detector import load_config as load_robot_config, CONFIG_FILE as ROBOT_CONFIG_FILE
except ImportError:
print("Advertencia: No se pudo importar 'load_config' de 'robot_detector.py'.")
print("Asegúrate de que 'robot_detector.py' esté en el mismo directorio 'backend'.")
print("Se usará una implementación local de load_config.")
# Implementación fallback de load_config si no se puede importar
ROBOT_CONFIG_FILE = "robot_config.json"
def load_robot_config():
if os.path.exists(ROBOT_CONFIG_FILE):
try:
with open(ROBOT_CONFIG_FILE, 'r') as f:
config = json.load(f)
return (
config.get("robot_follower", {}).get("port", ""),
config.get("robot_follower", {}).get("id", ""),
config.get("teleop_leader", {}).get("port", ""),
config.get("teleop_leader", {}).get("id", "")
)
except Exception as e:
print(f"Error al cargar la configuración existente desde '{ROBOT_CONFIG_FILE}': {e}. Se usarán valores por defecto.")
return "", "", "", ""
return "", "", "", ""
def run_command(command: str, description: str):
"""
Ejecuta un comando de shell y captura su salida, manejando errores.
"""
print(f"\n--- {description} ---")
process_output = [] # Use a list to collect output for better streaming if needed later
try:
# Use Popen to stream output in real-time, important for long running processes
process = subprocess.Popen(
command,
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT, # Redirect stderr to stdout
text=True
)
for line in iter(process.stdout.readline, ''):
print(line, end='') # Print to console for server-side logging
process_output.append(line)
# You could add a gr.Progress() here for real-time update in Gradio if needed
process.wait() # Wait for the process to complete
if process.returncode == 0:
print(f"Éxito: {description}")
return True, "".join(process_output)
else:
error_message = f"Error durante '{description}': El comando devolvió el código de salida {process.returncode}"
print(error_message)
return False, "".join(process_output) # Return all collected output, including errors
except Exception as e:
error_message = f"Ocurrió un error inesperado durante '{description}': {e}"
print(error_message)
return False, error_message
def list_serial_device_paths():
"""
Lista los puertos seriales disponibles y devuelve solo las rutas de los dispositivos.
"""
ports = serial.tools.list_ports.comports()
device_paths = []
if not ports:
return [] # Return empty list if no ports found
for port in ports:
device_paths.append(port.device)
return device_paths
def login_to_huggingface(token: str):
"""
Inicia sesión en Hugging Face CLI con el token proporcionado.
"""
if not token or token == "hf_YOUR_ACTUAL_WRITE_TOKEN_HERE":
return False, "Error: Por favor, proporciona un token de Hugging Face válido."
success, output = run_command(
f"huggingface-cli login --token {token}", # <--- ¡Argumento eliminado!
"Iniciando sesión en Hugging Face CLI"
)
if success:
return True, "¡Inicio de sesión en Hugging Face exitoso!"
else:
return False, output
def get_huggingface_user():
"""
Obtiene el nombre de usuario de Hugging Face.
"""
success, output = run_command(
"huggingface-cli whoami | head -n 1",
"Obteniendo nombre de usuario de Hugging Face"
)
if success:
# The output might contain warnings before the actual username.
# Try to find the username, which should be the first non-empty, non-warning line.
lines = output.splitlines()
for line in lines:
if line.strip() and not (line.strip().startswith("warnings.") or "deprecated" in line.lower()):
return True, line.strip()
return False, "No se pudo extraer el nombre de usuario de la salida de 'whoami'."
else:
return False, output
def record_dataset_core(hf_user: str,
robot_port: str,
teleop_port: str,
num_episodes: int = 2,
single_task: str = "Grab the black cube",
push_to_hub: bool = True,
resume: bool = False,
episode_time_s: int = 60,
reset_time_s: int = 60):
"""
Función central para grabar un dataset usando LeRobot.
"""
if not hf_user:
return False, "Error: Nombre de usuario de Hugging Face no disponible. ¿Falló el inicio de sesión?"
# Use the IDs from the loaded config for consistency
initial_follower_port, initial_follower_id, initial_leader_port, initial_leader_id = load_robot_config()
robot_id = initial_follower_id if initial_follower_id else "my_awesome_follower_arm"
teleop_id = initial_leader_id if initial_leader_id else "my_awesome_leader_arm"
dataset_repo_id = f"{hf_user}/record-test"
command = [
"python", "-m", "lerobot.record",
"--robot.type=so101_follower",
f"--robot.port={robot_port}",
f"--robot.id={robot_id}", # Use ID from config/default
"--robot.cameras=\"{{ front: {{type: opencv, index_or_path: 0, width: 1920, height: 1080, fps: 30}}}}\"", # Escaping for shell command
"--teleop.type=so101_leader",
f"--teleop.port={teleop_port}",
f"--teleop.id={teleop_id}", # Use ID from config/default
"--display_data=true",
f"--dataset.repo_id={dataset_repo_id}",
f"--dataset.num_episodes={num_episodes}",
f"--dataset.single_task=\"{single_task}\"",
f"--dataset.episode_time_s={episode_time_s}",
f"--dataset.reset_time_s={reset_time_s}"
]
if not push_to_hub:
command.append("--dataset.push_to_hub=False")
if resume:
command.append("--resume=true")
full_command = " ".join(command)
success, output = run_command(full_command, "Grabación del dataset")
if success:
final_message = f"¡Grabación del dataset completada exitosamente!\n"
final_message += f"El dataset se almacenó localmente en: ~/.cache/huggingface/lerobot/{dataset_repo_id}\n"
if push_to_hub:
final_message += f"El dataset ha sido subido a: https://huggingface.co/datasets/{dataset_repo_id}\n"
final_message += "Puedes añadir etiquetas personalizadas (ej. 'tutorial') en la página del dataset en Hugging Face."
else:
final_message += "La subida al Hugging Face Hub fue deshabilitada."
return True, final_message
else:
return False, output
# --- Gradio Interface Logic ---
# Variable global para almacenar el usuario de Hugging Face
current_hf_user = None
def gradio_login(hf_token_input: str):
"""Interfaz Gradio para iniciar sesión en Hugging Face."""
global current_hf_user
success, message = login_to_huggingface(hf_token_input)
if success:
gr.Info(message)
success_user, user_name = get_huggingface_user()
if success_user:
current_hf_user = user_name
return gr.update(value=user_name, interactive=False), gr.update(visible=True, value=message) # Update username field and status box
else:
return gr.update(value="", interactive=True), gr.update(visible=True, value=user_name) # Show error if user not retrieved
else:
current_hf_user = None
return gr.update(value="", interactive=True), gr.update(visible=True, value=message) # Clear username and show error
def gradio_record(robot_port_input: str,
teleop_port_input: str,
num_episodes_input: int,
single_task_input: str,
push_to_hub_input: bool,
resume_input: bool,
episode_time_s_input: int,
reset_time_s_input: int):
"""Interfaz Gradio para iniciar la grabación del dataset."""
global current_hf_user
if not current_hf_user:
return gr.update(visible=True, value="Error: No se ha iniciado sesión en Hugging Face o no se pudo obtener el usuario. Por favor, inicia sesión primero."), gr.update(interactive=False)
gr.Info("Iniciando grabación del dataset. Esto puede tardar unos segundos...")
success, message = record_dataset_core(
hf_user=current_hf_user,
robot_port=robot_port_input,
teleop_port=teleop_port_input,
num_episodes=num_episodes_input,
single_task=single_task_input,
push_to_hub=push_to_hub_input,
resume=resume_input,
episode_time_s=episode_time_s_input,
reset_time_s=reset_time_s_input
)
if success:
gr.Info("Grabación completada.")
return gr.update(visible=True, value=message), gr.update(interactive=True)
else:
gr.Info("Grabación fallida. Revisa el log para más detalles.")
return gr.update(visible=True, value=message), gr.update(interactive=True)
# Define la interfaz de Gradio
with gr.Blocks(title="Controlador de Grabación LeRobot") as demo:
gr.Markdown("# <center>Controlador de Grabación de Datasets LeRobot</center>")
gr.Markdown("Esta interfaz te ayuda a gestionar la grabación de datasets para LeRobot.")
with gr.Tab("1. Configuración de Hugging Face"):
gr.Markdown("## Configuración de Hugging Face")
gr.Markdown(
"Introduce tu **token de Hugging Face con permisos de escritura**. "
"Puedes generarlo en [huggingface.co/settings/tokens](https://huggingface.co/settings/tokens)."
)
hf_token_input = gr.Textbox(
label="Token de Hugging Face",
type="password",
placeholder="hf_YOUR_ACTUAL_WRITE_TOKEN_HERE",
info="El token debe tener permisos de escritura (write)."
)
login_btn = gr.Button("Iniciar Sesión / Verificar Token")
hf_user_output = gr.Textbox(label="Usuario de Hugging Face Actual", interactive=False, placeholder="No autenticado", show_copy_button=True)
login_status_output = gr.Textbox(label="Estado de Autenticación", interactive=False, visible=False)
login_btn.click(
fn=gradio_login,
inputs=hf_token_input,
outputs=[hf_user_output, login_status_output]
)
with gr.Tab("2. Parámetros de Grabación"):
gr.Markdown("## Parámetros del Robot y la Grabación")
gr.Markdown(
"Asegúrate de que los **puertos seriales** de tus robots sean correctos. "
"Puedes detectarlos automáticamente o introducirlos manualmente."
)
# Load initial config from robot_config.json
initial_follower_port, initial_follower_id, initial_leader_port, initial_leader_id = load_robot_config()
with gr.Group(): # Group for auto-detection and manual input
gr.Markdown("### Puerto del Robot Follower")
robot_port_dropdown = gr.Dropdown(
label="Seleccionar Puerto",
choices=[], # Populated on load/refresh
value=initial_follower_port, # Pre-fill from config
interactive=True,
allow_custom_value=True # Allow manual input
)
robot_port_manual = gr.Textbox(
label="O introducir Puerto Manualmente",
value=initial_follower_port, # Pre-fill from config
placeholder="ej. /dev/tty.usbmodemXXXXX",
interactive=True
)
# Synchronize dropdown and manual textbox
robot_port_dropdown.change(lambda x: x, inputs=robot_port_dropdown, outputs=robot_port_manual)
robot_port_manual.change(lambda x: x, inputs=robot_port_manual, outputs=robot_port_dropdown)
with gr.Group(): # Group for auto-detection and manual input
gr.Markdown("### Puerto del Teleoperador Leader")
teleop_port_dropdown = gr.Dropdown(
label="Seleccionar Puerto",
choices=[], # Populated on load/refresh
value=initial_leader_port, # Pre-fill from config
interactive=True,
allow_custom_value=True # Allow manual input
)
teleop_port_manual = gr.Textbox(
label="O introducir Puerto Manualmente",
value=initial_leader_port, # Pre-fill from config
placeholder="ej. /dev/tty.usbmodemXXXXX",
interactive=True
)
# Synchronize dropdown and manual textbox
teleop_port_dropdown.change(lambda x: x, inputs=teleop_port_dropdown, outputs=teleop_port_manual)
teleop_port_manual.change(lambda x: x, inputs=teleop_port_manual, outputs=teleop_port_dropdown)
refresh_ports_btn = gr.Button("🔄 Refrescar Puertos Detectados")
num_episodes_input = gr.Slider(
minimum=1, maximum=100, step=1, value=2, label="Número de Episodios a Grabar",
info="Define cuántos episodios se grabarán."
)
single_task_input = gr.Textbox(
label="Descripción de la Tarea (Single Task)",
value="Grab the black cube",
placeholder="Describe la tarea que realizará el robot"
)
episode_time_s_input = gr.Slider(
minimum=10, maximum=300, step=10, value=60, label="Duración del Episodio (segundos)",
info="Tiempo máximo de grabación para cada episodio."
)
reset_time_s_input = gr.Slider(
minimum=5, maximum=120, step=5, value=60, label="Tiempo de Reseteo (segundos)",
info="Tiempo para preparar el entorno entre episodios."
)
push_to_hub_input = gr.Checkbox(
label="Subir Dataset a Hugging Face Hub",
value=True,
info="Marca esta casilla para subir el dataset a tu perfil de Hugging Face."
)
resume_input = gr.Checkbox(
label="Reanudar Grabación Existente",
value=False,
info="Si hay una grabación previa, continúa desde donde se detuvo."
)
# Function to update dropdown choices on button click or page load
def update_port_dropdowns():
ports = list_serial_device_paths()
# The dropdowns allow custom values, so setting choices updates the list for selection
return gr.update(choices=ports), gr.update(choices=ports)
# Attach update function to refresh button and initial load
refresh_ports_btn.click(
fn=update_port_dropdowns,
inputs=None,
outputs=[robot_port_dropdown, teleop_port_dropdown]
)
demo.load(
fn=update_port_dropdowns,
inputs=None,
outputs=[robot_port_dropdown, teleop_port_dropdown]
)
with gr.Tab("3. Iniciar Grabación"):
gr.Markdown("## Ejecutar Grabación del Dataset")
gr.Markdown(
"Haz clic en el botón para iniciar la grabación. "
"Una nueva ventana de LeRobot se abrirá para la teleoperación y visualización."
)
record_btn = gr.Button("🚀 Iniciar Grabación 🚀", variant="primary")
# Output para el estado y mensajes de la grabación
record_status_output = gr.Textbox(
label="Estado de la Grabación",
interactive=False,
visible=False,
lines=10
)
record_btn.click(
fn=gradio_record,
inputs=[
# Use the current value from either dropdown or textbox (they are synchronized)
robot_port_manual, # We'll use the textbox value as it's the ultimate source of truth
teleop_port_manual, # Same for teleop port
num_episodes_input,
single_task_input,
push_to_hub_input,
resume_input,
episode_time_s_input,
reset_time_s_input
],
outputs=[record_status_output, record_btn] # Update status output and re-enable button
)
gr.Markdown("### Controles de Teclado Durante la Grabación (en la ventana de LeRobot):")
gr.Markdown(
"- **Flecha Derecha (→):** Detener el episodio actual y pasar al siguiente.\n"
"- **Flecha Izquierda (←):** Cancelar el episodio actual y volver a grabar.\n"
"- **Escape (ESC):** Detener la sesión, codificar videos y subir el dataset (si está activado)."
)
gr.Markdown("---")
gr.Markdown("Hecho con ❤️ para RobotCleanPupusas503")
# Lanza la interfaz de Gradio
if __name__ == "__main__":
try:
import serial # Test if pyserial is installed
except ImportError:
print("Error: La librería 'pyserial' no está instalada.")
print("Por favor, instala 'pyserial' ejecutando: pip install pyserial")
exit(1)
demo.launch(share=False)