# backend/record_controller.py (o record_controller_gradio.py) import subprocess import os import gradio as gr import serial.tools.list_ports # Nuevo: para listar puertos seriales import json # Necesario para la función load_config si la copiamos aquí # IMPORTANTE: Si robot_detector.py está en el mismo directorio 'backend', # puedes importar su función de carga de configuración. # Asegúrate de que el archivo robot_detector.py exista en la misma carpeta. try: from robot_detector import load_config as load_robot_config, CONFIG_FILE as ROBOT_CONFIG_FILE except ImportError: print("Advertencia: No se pudo importar 'load_config' de 'robot_detector.py'.") print("Asegúrate de que 'robot_detector.py' esté en el mismo directorio 'backend'.") print("Se usará una implementación local de load_config.") # Implementación fallback de load_config si no se puede importar ROBOT_CONFIG_FILE = "robot_config.json" def load_robot_config(): if os.path.exists(ROBOT_CONFIG_FILE): try: with open(ROBOT_CONFIG_FILE, 'r') as f: config = json.load(f) return ( config.get("robot_follower", {}).get("port", ""), config.get("robot_follower", {}).get("id", ""), config.get("teleop_leader", {}).get("port", ""), config.get("teleop_leader", {}).get("id", "") ) except Exception as e: print(f"Error al cargar la configuración existente desde '{ROBOT_CONFIG_FILE}': {e}. Se usarán valores por defecto.") return "", "", "", "" return "", "", "", "" def run_command(command: str, description: str): """ Ejecuta un comando de shell y captura su salida, manejando errores. """ print(f"\n--- {description} ---") process_output = [] # Use a list to collect output for better streaming if needed later try: # Use Popen to stream output in real-time, important for long running processes process = subprocess.Popen( command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, # Redirect stderr to stdout text=True ) for line in iter(process.stdout.readline, ''): print(line, end='') # Print to console for server-side logging process_output.append(line) # You could add a gr.Progress() here for real-time update in Gradio if needed process.wait() # Wait for the process to complete if process.returncode == 0: print(f"Éxito: {description}") return True, "".join(process_output) else: error_message = f"Error durante '{description}': El comando devolvió el código de salida {process.returncode}" print(error_message) return False, "".join(process_output) # Return all collected output, including errors except Exception as e: error_message = f"Ocurrió un error inesperado durante '{description}': {e}" print(error_message) return False, error_message def list_serial_device_paths(): """ Lista los puertos seriales disponibles y devuelve solo las rutas de los dispositivos. """ ports = serial.tools.list_ports.comports() device_paths = [] if not ports: return [] # Return empty list if no ports found for port in ports: device_paths.append(port.device) return device_paths def login_to_huggingface(token: str): """ Inicia sesión en Hugging Face CLI con el token proporcionado. """ if not token or token == "hf_YOUR_ACTUAL_WRITE_TOKEN_HERE": return False, "Error: Por favor, proporciona un token de Hugging Face válido." success, output = run_command( f"huggingface-cli login --token {token}", # <--- ¡Argumento eliminado! "Iniciando sesión en Hugging Face CLI" ) if success: return True, "¡Inicio de sesión en Hugging Face exitoso!" else: return False, output def get_huggingface_user(): """ Obtiene el nombre de usuario de Hugging Face. """ success, output = run_command( "huggingface-cli whoami | head -n 1", "Obteniendo nombre de usuario de Hugging Face" ) if success: # The output might contain warnings before the actual username. # Try to find the username, which should be the first non-empty, non-warning line. lines = output.splitlines() for line in lines: if line.strip() and not (line.strip().startswith("warnings.") or "deprecated" in line.lower()): return True, line.strip() return False, "No se pudo extraer el nombre de usuario de la salida de 'whoami'." else: return False, output def record_dataset_core(hf_user: str, robot_port: str, teleop_port: str, num_episodes: int = 2, single_task: str = "Grab the black cube", push_to_hub: bool = True, resume: bool = False, episode_time_s: int = 60, reset_time_s: int = 60): """ Función central para grabar un dataset usando LeRobot. """ if not hf_user: return False, "Error: Nombre de usuario de Hugging Face no disponible. ¿Falló el inicio de sesión?" # Use the IDs from the loaded config for consistency initial_follower_port, initial_follower_id, initial_leader_port, initial_leader_id = load_robot_config() robot_id = initial_follower_id if initial_follower_id else "my_awesome_follower_arm" teleop_id = initial_leader_id if initial_leader_id else "my_awesome_leader_arm" dataset_repo_id = f"{hf_user}/record-test" command = [ "python", "-m", "lerobot.record", "--robot.type=so101_follower", f"--robot.port={robot_port}", f"--robot.id={robot_id}", # Use ID from config/default "--robot.cameras=\"{{ front: {{type: opencv, index_or_path: 0, width: 1920, height: 1080, fps: 30}}}}\"", # Escaping for shell command "--teleop.type=so101_leader", f"--teleop.port={teleop_port}", f"--teleop.id={teleop_id}", # Use ID from config/default "--display_data=true", f"--dataset.repo_id={dataset_repo_id}", f"--dataset.num_episodes={num_episodes}", f"--dataset.single_task=\"{single_task}\"", f"--dataset.episode_time_s={episode_time_s}", f"--dataset.reset_time_s={reset_time_s}" ] if not push_to_hub: command.append("--dataset.push_to_hub=False") if resume: command.append("--resume=true") full_command = " ".join(command) success, output = run_command(full_command, "Grabación del dataset") if success: final_message = f"¡Grabación del dataset completada exitosamente!\n" final_message += f"El dataset se almacenó localmente en: ~/.cache/huggingface/lerobot/{dataset_repo_id}\n" if push_to_hub: final_message += f"El dataset ha sido subido a: https://huggingface.co/datasets/{dataset_repo_id}\n" final_message += "Puedes añadir etiquetas personalizadas (ej. 'tutorial') en la página del dataset en Hugging Face." else: final_message += "La subida al Hugging Face Hub fue deshabilitada." return True, final_message else: return False, output # --- Gradio Interface Logic --- # Variable global para almacenar el usuario de Hugging Face current_hf_user = None def gradio_login(hf_token_input: str): """Interfaz Gradio para iniciar sesión en Hugging Face.""" global current_hf_user success, message = login_to_huggingface(hf_token_input) if success: gr.Info(message) success_user, user_name = get_huggingface_user() if success_user: current_hf_user = user_name return gr.update(value=user_name, interactive=False), gr.update(visible=True, value=message) # Update username field and status box else: return gr.update(value="", interactive=True), gr.update(visible=True, value=user_name) # Show error if user not retrieved else: current_hf_user = None return gr.update(value="", interactive=True), gr.update(visible=True, value=message) # Clear username and show error def gradio_record(robot_port_input: str, teleop_port_input: str, num_episodes_input: int, single_task_input: str, push_to_hub_input: bool, resume_input: bool, episode_time_s_input: int, reset_time_s_input: int): """Interfaz Gradio para iniciar la grabación del dataset.""" global current_hf_user if not current_hf_user: return gr.update(visible=True, value="Error: No se ha iniciado sesión en Hugging Face o no se pudo obtener el usuario. Por favor, inicia sesión primero."), gr.update(interactive=False) gr.Info("Iniciando grabación del dataset. Esto puede tardar unos segundos...") success, message = record_dataset_core( hf_user=current_hf_user, robot_port=robot_port_input, teleop_port=teleop_port_input, num_episodes=num_episodes_input, single_task=single_task_input, push_to_hub=push_to_hub_input, resume=resume_input, episode_time_s=episode_time_s_input, reset_time_s=reset_time_s_input ) if success: gr.Info("Grabación completada.") return gr.update(visible=True, value=message), gr.update(interactive=True) else: gr.Info("Grabación fallida. Revisa el log para más detalles.") return gr.update(visible=True, value=message), gr.update(interactive=True) # Define la interfaz de Gradio with gr.Blocks(title="Controlador de Grabación LeRobot") as demo: gr.Markdown("#