# backend/train_robot.py import subprocess import os import gradio as gr import json import torch # --- Helper Functions (reused from record_controller for consistency) --- def run_command(command: str, description: str): """ Ejecuta un comando de shell y captura su salida, manejando errores. """ print(f"\n--- {description} ---") process_output = [] try: # Use Popen to stream output in real-time process = subprocess.Popen( command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, # Redirect stderr to stdout text=True ) for line in iter(process.stdout.readline, ''): print(line, end='') # Print to console process_output.append(line) # You might want to yield here for Gradio's gr.Progress, # but for simplicity, we'll collect all output and return at the end. process.wait() # Wait for the process to complete if process.returncode == 0: print(f"Éxito: {description}") return True, "".join(process_output) else: error_message = f"Error durante '{description}': El comando devolvió el código de salida {process.returncode}" print(error_message) return False, "".join(process_output) # Return all collected output, including errors except Exception as e: error_message = f"Ocurrió un error inesperado durante '{description}': {e}" print(error_message) return False, error_message def login_to_huggingface(token: str): """ Inicia sesión en Hugging Face CLI con el token proporcionado. """ if not token or token == "hf_YOUR_ACTUAL_WRITE_TOKEN_HERE": return False, "Error: Por favor, proporciona un token de Hugging Face válido." success, output = run_command( f"huggingface-cli login --token {token} --add-to-git-credential", "Iniciando sesión en Hugging Face CLI" ) if success: return True, "¡Inicio de sesión en Hugging Face exitoso!" else: return False, output def get_huggingface_user(): """ Obtiene el nombre de usuario de Hugging Face. """ success, output = run_command( "huggingface-cli whoami | head -n 1", "Obteniendo nombre de usuario de Hugging Face" ) if success: # The output might contain warnings before the actual username. # Try to find the username, which should be the first non-empty, non-warning line. lines = output.splitlines() for line in lines: if line.strip() and not (line.strip().startswith("warnings.") or "deprecated" in line.lower()): return True, line.strip() return False, "No se pudo extraer el nombre de usuario de la salida de 'whoami'." else: return False, output # --- Core Training and Upload Logic --- def train_policy_core(hf_user: str, dataset_repo_id: str, policy_type: str, output_dir: str, job_name: str, policy_device: str, wandb_enable: bool, resume: bool, resume_config_path: str): """ Entrena una política de robot usando el script `lerobot/scripts/train.py`. """ if not hf_user: return False, "Error: Nombre de usuario de Hugging Face no disponible. Por favor, inicia sesión primero." if not dataset_repo_id.startswith(f"{hf_user}/"): dataset_repo_id = f"{hf_user}/{dataset_repo_id.split('/')[-1]}" # Ensure correct repo_id format print(f"\nPreparando para entrenar la política '{policy_type}' con el dataset '{dataset_repo_id}'...") command = [ "python", "-m", "lerobot.scripts.train" # Changed to -m lerobot.scripts.train ] if resume and resume_config_path: command.extend([ f"--config_path={resume_config_path}", "--resume=true" ]) else: command.extend([ f"--dataset.repo_id={dataset_repo_id}", f"--policy.type={policy_type}", f"--output_dir={output_dir}", f"--job_name={job_name}", f"--policy.device={policy_device}" ]) if wandb_enable: command.append("--wandb.enable=true") full_command = " ".join(command) success, output = run_command(full_command, "Entrenamiento de la Política") if success: final_message = f"¡Entrenamiento de la política completado exitosamente!\n" final_message += f"Los checkpoints se guardaron en: {output_dir}/checkpoints\n" if wandb_enable: final_message += "Revisa Weights & Biases para los gráficos de entrenamiento.\n" return True, final_message + "\n" + output # Add full output for visibility else: return False, f"Error durante el entrenamiento: {output}" def upload_policy_core(hf_user: str, policy_repo_name: str, checkpoint_path: str, is_intermediate: bool = False): """ Sube un checkpoint de política a Hugging Face Hub. """ if not hf_user: return False, "Error: Nombre de usuario de Hugging Face no disponible. Por favor, inicia sesión primero." if not policy_repo_name: return False, "Error: El nombre del repositorio de la política no puede estar vacío." if not checkpoint_path: return False, "Error: La ruta al checkpoint no puede estar vacía." if not os.path.exists(checkpoint_path): return False, f"Error: La ruta del checkpoint '{checkpoint_path}' no existe." full_repo_id = f"{hf_user}/{policy_repo_name}" # Hugging Face CLI upload expects the local path to be the second argument command = [ "huggingface-cli", "upload", full_repo_id, checkpoint_path, "--repo-type=model" # Specify repo type as model for policies ] if is_intermediate: # For intermediate, we might want to append CKPT to the repo name or use a specific folder within the repo # The provided doc uses policy_nameCKPT. Let's adapt to that if the user provides just base name # However, huggingface-cli upload expects a repo_id, which is HF_USER/REPO_NAME # The common practice is to upload to the same repo but into a different subfolder. # For simplicity, we'll stick to uploading the specified path to the given repo_id. pass # The logic for is_intermediate might depend on how the user names their repos/checkpoints full_command = " ".join(command) success, output = run_command(full_command, f"Subiendo política a {full_repo_id}") if success: return True, f"¡Política subida exitosamente a https://huggingface.co/{full_repo_id}!" else: return False, f"Error al subir política: {output}" def evaluate_policy_core(hf_user: str, robot_type: str, robot_port: str, robot_cameras: str, # Raw string for cameras robot_id: str, display_data: bool, dataset_repo_id_eval: str, single_task: str, policy_path: str, teleop_enable: bool = False, # Optional teleop for evaluation teleop_type: str = "", teleop_port: str = "", teleop_id: str = ""): """ Evalúa una política utilizando el script `lerobot.record` modificado. """ if not hf_user: return False, "Error: Nombre de usuario de Hugging Face no disponible. Por favor, inicia sesión primero." if not policy_path: return False, "Error: La ruta a la política para evaluar no puede estar vacía." # Ensure eval dataset repo ID starts with user if not dataset_repo_id_eval.startswith(f"{hf_user}/"): dataset_repo_id_eval = f"{hf_user}/{dataset_repo_id_eval.split('/')[-1]}" print(f"\nPreparando para evaluar la política '{policy_path}'...") command = [ "python", "-m", "lerobot.record", f"--robot.type={robot_type}", f"--robot.port={robot_port}", f"--robot.cameras=\"{robot_cameras}\"", # Use the raw string provided by user f"--robot.id={robot_id}", f"--display_data={str(display_data).lower()}", f"--dataset.repo_id={dataset_repo_id_eval}", f"--dataset.single_task=\"{single_task}\"", f"--policy.path={policy_path}" ] if teleop_enable: command.extend([ f"--teleop.type={teleop_type}", f"--teleop.port={teleop_port}", f"--teleop.id={teleop_id}" ]) full_command = " ".join(command) success, output = run_command(full_command, "Evaluación de la Política") if success: final_message = f"¡Evaluación de la política completada exitosamente!\n" final_message += f"Los datos de evaluación se guardaron en: ~/.cache/huggingface/lerobot/{dataset_repo_id_eval}\n" return True, final_message + "\n" + output else: return False, f"Error durante la evaluación: {output}" # --- Gradio Interface Logic --- # Variable global para almacenar el usuario de Hugging Face current_hf_user = None def gradio_login(hf_token_input: str): """Interfaz Gradio para iniciar sesión en Hugging Face.""" global current_hf_user success, message = login_to_huggingface(hf_token_input) if success: gr.Info(message) success_user, user_name = get_huggingface_user() if success_user: current_hf_user = user_name return gr.update(value=user_name, interactive=False), gr.update(visible=True, value=message) else: return gr.update(value="", interactive=True), gr.update(visible=True, value=user_name) else: current_hf_user = None return gr.update(value="", interactive=True), gr.update(visible=True, value=message) def gradio_train(dataset_repo_id_input: str, policy_type_input: str, output_dir_input: str, job_name_input: str, policy_device_input: str, wandb_enable_input: bool, resume_input: bool, resume_config_path_input: str): """Interfaz Gradio para iniciar el entrenamiento.""" global current_hf_user if not current_hf_user: return gr.update(visible=True, value="Error: No se ha iniciado sesión en Hugging Face o no se pudo obtener el usuario. Por favor, inicia sesión primero.") gr.Info("Iniciando entrenamiento del modelo. Esto puede tardar mucho tiempo...") success, message = train_policy_core( hf_user=current_hf_user, dataset_repo_id=dataset_repo_id_input, policy_type=policy_type_input, output_dir=output_dir_input, job_name=job_name_input, policy_device=policy_device_input, wandb_enable=wandb_enable_input, resume=resume_input, resume_config_path=resume_config_path_input ) if success: gr.Info("Entrenamiento completado. Revisa la salida para los detalles.") else: gr.Info("Entrenamiento fallido. Revisa la salida para los errores.") return gr.update(visible=True, value=message) def gradio_upload(policy_repo_name_input: str, checkpoint_path_input: str): """Interfaz Gradio para subir un checkpoint.""" global current_hf_user if not current_hf_user: return gr.update(visible=True, value="Error: No se ha iniciado sesión en Hugging Face. Por favor, inicia sesión primero.") gr.Info(f"Subiendo checkpoint '{checkpoint_path_input}' a '{policy_repo_name_input}'...") success, message = upload_policy_core( hf_user=current_hf_user, policy_repo_name=policy_repo_name_input, checkpoint_path=checkpoint_path_input ) if success: gr.Info("Subida completada.") else: gr.Info("Subida fallida. Revisa la salida.") return gr.update(visible=True, value=message) def gradio_evaluate(robot_type_input: str, robot_port_input: str, robot_cameras_input: str, robot_id_input: str, display_data_input: bool, dataset_repo_id_eval_input: str, single_task_eval_input: str, policy_path_input: str, teleop_enable_input: bool, teleop_type_input: str, teleop_port_input: str, teleop_id_input: str): """Interfaz Gradio para evaluar una política.""" global current_hf_user if not current_hf_user: return gr.update(visible=True, value="Error: No se ha iniciado sesión en Hugging Face. Por favor, inicia sesión primero.") gr.Info("Iniciando evaluación de la política...") success, message = evaluate_policy_core( hf_user=current_hf_user, robot_type=robot_type_input, robot_port=robot_port_input, robot_cameras=robot_cameras_input, robot_id=robot_id_input, display_data=display_data_input, dataset_repo_id_eval=dataset_repo_id_eval_input, single_task=single_task_eval_input, policy_path=policy_path_input, teleop_enable=teleop_enable_input, teleop_type=teleop_type_input, teleop_port=teleop_port_input, teleop_id=teleop_id_input ) if success: gr.Info("Evaluación completada.") else: gr.Info("Evaluación fallida. Revisa la salida.") return gr.update(visible=True, value=message) # --- Gradio Interface Definition --- with gr.Blocks(title="Controlador de Entrenamiento y Evaluación LeRobot") as demo: gr.Markdown("#