Spaces:
Runtime error
Runtime error
| # backend/record_controller.py (o record_controller_gradio.py) | |
| import subprocess | |
| import os | |
| import gradio as gr | |
| import serial.tools.list_ports # Nuevo: para listar puertos seriales | |
| import json # Necesario para la función load_config si la copiamos aquí | |
| # IMPORTANTE: Si robot_detector.py está en el mismo directorio 'backend', | |
| # puedes importar su función de carga de configuración. | |
| # Asegúrate de que el archivo robot_detector.py exista en la misma carpeta. | |
| try: | |
| from robot_detector import load_config as load_robot_config, CONFIG_FILE as ROBOT_CONFIG_FILE | |
| except ImportError: | |
| print("Advertencia: No se pudo importar 'load_config' de 'robot_detector.py'.") | |
| print("Asegúrate de que 'robot_detector.py' esté en el mismo directorio 'backend'.") | |
| print("Se usará una implementación local de load_config.") | |
| # Implementación fallback de load_config si no se puede importar | |
| ROBOT_CONFIG_FILE = "robot_config.json" | |
| def load_robot_config(): | |
| if os.path.exists(ROBOT_CONFIG_FILE): | |
| try: | |
| with open(ROBOT_CONFIG_FILE, 'r') as f: | |
| config = json.load(f) | |
| return ( | |
| config.get("robot_follower", {}).get("port", ""), | |
| config.get("robot_follower", {}).get("id", ""), | |
| config.get("teleop_leader", {}).get("port", ""), | |
| config.get("teleop_leader", {}).get("id", "") | |
| ) | |
| except Exception as e: | |
| print(f"Error al cargar la configuración existente desde '{ROBOT_CONFIG_FILE}': {e}. Se usarán valores por defecto.") | |
| return "", "", "", "" | |
| return "", "", "", "" | |
| def run_command(command: str, description: str): | |
| """ | |
| Ejecuta un comando de shell y captura su salida, manejando errores. | |
| """ | |
| print(f"\n--- {description} ---") | |
| process_output = [] # Use a list to collect output for better streaming if needed later | |
| try: | |
| # Use Popen to stream output in real-time, important for long running processes | |
| process = subprocess.Popen( | |
| command, | |
| shell=True, | |
| stdout=subprocess.PIPE, | |
| stderr=subprocess.STDOUT, # Redirect stderr to stdout | |
| text=True | |
| ) | |
| for line in iter(process.stdout.readline, ''): | |
| print(line, end='') # Print to console for server-side logging | |
| process_output.append(line) | |
| # You could add a gr.Progress() here for real-time update in Gradio if needed | |
| process.wait() # Wait for the process to complete | |
| if process.returncode == 0: | |
| print(f"Éxito: {description}") | |
| return True, "".join(process_output) | |
| else: | |
| error_message = f"Error durante '{description}': El comando devolvió el código de salida {process.returncode}" | |
| print(error_message) | |
| return False, "".join(process_output) # Return all collected output, including errors | |
| except Exception as e: | |
| error_message = f"Ocurrió un error inesperado durante '{description}': {e}" | |
| print(error_message) | |
| return False, error_message | |
| def list_serial_device_paths(): | |
| """ | |
| Lista los puertos seriales disponibles y devuelve solo las rutas de los dispositivos. | |
| """ | |
| ports = serial.tools.list_ports.comports() | |
| device_paths = [] | |
| if not ports: | |
| return [] # Return empty list if no ports found | |
| for port in ports: | |
| device_paths.append(port.device) | |
| return device_paths | |
| def login_to_huggingface(token: str): | |
| """ | |
| Inicia sesión en Hugging Face CLI con el token proporcionado. | |
| """ | |
| if not token or token == "hf_YOUR_ACTUAL_WRITE_TOKEN_HERE": | |
| return False, "Error: Por favor, proporciona un token de Hugging Face válido." | |
| success, output = run_command( | |
| f"huggingface-cli login --token {token}", # <--- ¡Argumento eliminado! | |
| "Iniciando sesión en Hugging Face CLI" | |
| ) | |
| if success: | |
| return True, "¡Inicio de sesión en Hugging Face exitoso!" | |
| else: | |
| return False, output | |
| def get_huggingface_user(): | |
| """ | |
| Obtiene el nombre de usuario de Hugging Face. | |
| """ | |
| success, output = run_command( | |
| "huggingface-cli whoami | head -n 1", | |
| "Obteniendo nombre de usuario de Hugging Face" | |
| ) | |
| if success: | |
| # The output might contain warnings before the actual username. | |
| # Try to find the username, which should be the first non-empty, non-warning line. | |
| lines = output.splitlines() | |
| for line in lines: | |
| if line.strip() and not (line.strip().startswith("warnings.") or "deprecated" in line.lower()): | |
| return True, line.strip() | |
| return False, "No se pudo extraer el nombre de usuario de la salida de 'whoami'." | |
| else: | |
| return False, output | |
| def record_dataset_core(hf_user: str, | |
| robot_port: str, | |
| teleop_port: str, | |
| num_episodes: int = 2, | |
| single_task: str = "Grab the black cube", | |
| push_to_hub: bool = True, | |
| resume: bool = False, | |
| episode_time_s: int = 60, | |
| reset_time_s: int = 60): | |
| """ | |
| Función central para grabar un dataset usando LeRobot. | |
| """ | |
| if not hf_user: | |
| return False, "Error: Nombre de usuario de Hugging Face no disponible. ¿Falló el inicio de sesión?" | |
| # Use the IDs from the loaded config for consistency | |
| initial_follower_port, initial_follower_id, initial_leader_port, initial_leader_id = load_robot_config() | |
| robot_id = initial_follower_id if initial_follower_id else "my_awesome_follower_arm" | |
| teleop_id = initial_leader_id if initial_leader_id else "my_awesome_leader_arm" | |
| dataset_repo_id = f"{hf_user}/record-test" | |
| command = [ | |
| "python", "-m", "lerobot.record", | |
| "--robot.type=so101_follower", | |
| f"--robot.port={robot_port}", | |
| f"--robot.id={robot_id}", # Use ID from config/default | |
| "--robot.cameras=\"{{ front: {{type: opencv, index_or_path: 0, width: 1920, height: 1080, fps: 30}}}}\"", # Escaping for shell command | |
| "--teleop.type=so101_leader", | |
| f"--teleop.port={teleop_port}", | |
| f"--teleop.id={teleop_id}", # Use ID from config/default | |
| "--display_data=true", | |
| f"--dataset.repo_id={dataset_repo_id}", | |
| f"--dataset.num_episodes={num_episodes}", | |
| f"--dataset.single_task=\"{single_task}\"", | |
| f"--dataset.episode_time_s={episode_time_s}", | |
| f"--dataset.reset_time_s={reset_time_s}" | |
| ] | |
| if not push_to_hub: | |
| command.append("--dataset.push_to_hub=False") | |
| if resume: | |
| command.append("--resume=true") | |
| full_command = " ".join(command) | |
| success, output = run_command(full_command, "Grabación del dataset") | |
| if success: | |
| final_message = f"¡Grabación del dataset completada exitosamente!\n" | |
| final_message += f"El dataset se almacenó localmente en: ~/.cache/huggingface/lerobot/{dataset_repo_id}\n" | |
| if push_to_hub: | |
| final_message += f"El dataset ha sido subido a: https://huggingface.co/datasets/{dataset_repo_id}\n" | |
| final_message += "Puedes añadir etiquetas personalizadas (ej. 'tutorial') en la página del dataset en Hugging Face." | |
| else: | |
| final_message += "La subida al Hugging Face Hub fue deshabilitada." | |
| return True, final_message | |
| else: | |
| return False, output | |
| # --- Gradio Interface Logic --- | |
| # Variable global para almacenar el usuario de Hugging Face | |
| current_hf_user = None | |
| def gradio_login(hf_token_input: str): | |
| """Interfaz Gradio para iniciar sesión en Hugging Face.""" | |
| global current_hf_user | |
| success, message = login_to_huggingface(hf_token_input) | |
| if success: | |
| gr.Info(message) | |
| success_user, user_name = get_huggingface_user() | |
| if success_user: | |
| current_hf_user = user_name | |
| return gr.update(value=user_name, interactive=False), gr.update(visible=True, value=message) # Update username field and status box | |
| else: | |
| return gr.update(value="", interactive=True), gr.update(visible=True, value=user_name) # Show error if user not retrieved | |
| else: | |
| current_hf_user = None | |
| return gr.update(value="", interactive=True), gr.update(visible=True, value=message) # Clear username and show error | |
| def gradio_record(robot_port_input: str, | |
| teleop_port_input: str, | |
| num_episodes_input: int, | |
| single_task_input: str, | |
| push_to_hub_input: bool, | |
| resume_input: bool, | |
| episode_time_s_input: int, | |
| reset_time_s_input: int): | |
| """Interfaz Gradio para iniciar la grabación del dataset.""" | |
| global current_hf_user | |
| if not current_hf_user: | |
| return gr.update(visible=True, value="Error: No se ha iniciado sesión en Hugging Face o no se pudo obtener el usuario. Por favor, inicia sesión primero."), gr.update(interactive=False) | |
| gr.Info("Iniciando grabación del dataset. Esto puede tardar unos segundos...") | |
| success, message = record_dataset_core( | |
| hf_user=current_hf_user, | |
| robot_port=robot_port_input, | |
| teleop_port=teleop_port_input, | |
| num_episodes=num_episodes_input, | |
| single_task=single_task_input, | |
| push_to_hub=push_to_hub_input, | |
| resume=resume_input, | |
| episode_time_s=episode_time_s_input, | |
| reset_time_s=reset_time_s_input | |
| ) | |
| if success: | |
| gr.Info("Grabación completada.") | |
| return gr.update(visible=True, value=message), gr.update(interactive=True) | |
| else: | |
| gr.Info("Grabación fallida. Revisa el log para más detalles.") | |
| return gr.update(visible=True, value=message), gr.update(interactive=True) | |
| # Define la interfaz de Gradio | |
| with gr.Blocks(title="Controlador de Grabación LeRobot") as demo: | |
| gr.Markdown("# <center>Controlador de Grabación de Datasets LeRobot</center>") | |
| gr.Markdown("Esta interfaz te ayuda a gestionar la grabación de datasets para LeRobot.") | |
| with gr.Tab("1. Configuración de Hugging Face"): | |
| gr.Markdown("## Configuración de Hugging Face") | |
| gr.Markdown( | |
| "Introduce tu **token de Hugging Face con permisos de escritura**. " | |
| "Puedes generarlo en [huggingface.co/settings/tokens](https://huggingface.co/settings/tokens)." | |
| ) | |
| hf_token_input = gr.Textbox( | |
| label="Token de Hugging Face", | |
| type="password", | |
| placeholder="hf_YOUR_ACTUAL_WRITE_TOKEN_HERE", | |
| info="El token debe tener permisos de escritura (write)." | |
| ) | |
| login_btn = gr.Button("Iniciar Sesión / Verificar Token") | |
| hf_user_output = gr.Textbox(label="Usuario de Hugging Face Actual", interactive=False, placeholder="No autenticado", show_copy_button=True) | |
| login_status_output = gr.Textbox(label="Estado de Autenticación", interactive=False, visible=False) | |
| login_btn.click( | |
| fn=gradio_login, | |
| inputs=hf_token_input, | |
| outputs=[hf_user_output, login_status_output] | |
| ) | |
| with gr.Tab("2. Parámetros de Grabación"): | |
| gr.Markdown("## Parámetros del Robot y la Grabación") | |
| gr.Markdown( | |
| "Asegúrate de que los **puertos seriales** de tus robots sean correctos. " | |
| "Puedes detectarlos automáticamente o introducirlos manualmente." | |
| ) | |
| # Load initial config from robot_config.json | |
| initial_follower_port, initial_follower_id, initial_leader_port, initial_leader_id = load_robot_config() | |
| with gr.Group(): # Group for auto-detection and manual input | |
| gr.Markdown("### Puerto del Robot Follower") | |
| robot_port_dropdown = gr.Dropdown( | |
| label="Seleccionar Puerto", | |
| choices=[], # Populated on load/refresh | |
| value=initial_follower_port, # Pre-fill from config | |
| interactive=True, | |
| allow_custom_value=True # Allow manual input | |
| ) | |
| robot_port_manual = gr.Textbox( | |
| label="O introducir Puerto Manualmente", | |
| value=initial_follower_port, # Pre-fill from config | |
| placeholder="ej. /dev/tty.usbmodemXXXXX", | |
| interactive=True | |
| ) | |
| # Synchronize dropdown and manual textbox | |
| robot_port_dropdown.change(lambda x: x, inputs=robot_port_dropdown, outputs=robot_port_manual) | |
| robot_port_manual.change(lambda x: x, inputs=robot_port_manual, outputs=robot_port_dropdown) | |
| with gr.Group(): # Group for auto-detection and manual input | |
| gr.Markdown("### Puerto del Teleoperador Leader") | |
| teleop_port_dropdown = gr.Dropdown( | |
| label="Seleccionar Puerto", | |
| choices=[], # Populated on load/refresh | |
| value=initial_leader_port, # Pre-fill from config | |
| interactive=True, | |
| allow_custom_value=True # Allow manual input | |
| ) | |
| teleop_port_manual = gr.Textbox( | |
| label="O introducir Puerto Manualmente", | |
| value=initial_leader_port, # Pre-fill from config | |
| placeholder="ej. /dev/tty.usbmodemXXXXX", | |
| interactive=True | |
| ) | |
| # Synchronize dropdown and manual textbox | |
| teleop_port_dropdown.change(lambda x: x, inputs=teleop_port_dropdown, outputs=teleop_port_manual) | |
| teleop_port_manual.change(lambda x: x, inputs=teleop_port_manual, outputs=teleop_port_dropdown) | |
| refresh_ports_btn = gr.Button("🔄 Refrescar Puertos Detectados") | |
| num_episodes_input = gr.Slider( | |
| minimum=1, maximum=100, step=1, value=2, label="Número de Episodios a Grabar", | |
| info="Define cuántos episodios se grabarán." | |
| ) | |
| single_task_input = gr.Textbox( | |
| label="Descripción de la Tarea (Single Task)", | |
| value="Grab the black cube", | |
| placeholder="Describe la tarea que realizará el robot" | |
| ) | |
| episode_time_s_input = gr.Slider( | |
| minimum=10, maximum=300, step=10, value=60, label="Duración del Episodio (segundos)", | |
| info="Tiempo máximo de grabación para cada episodio." | |
| ) | |
| reset_time_s_input = gr.Slider( | |
| minimum=5, maximum=120, step=5, value=60, label="Tiempo de Reseteo (segundos)", | |
| info="Tiempo para preparar el entorno entre episodios." | |
| ) | |
| push_to_hub_input = gr.Checkbox( | |
| label="Subir Dataset a Hugging Face Hub", | |
| value=True, | |
| info="Marca esta casilla para subir el dataset a tu perfil de Hugging Face." | |
| ) | |
| resume_input = gr.Checkbox( | |
| label="Reanudar Grabación Existente", | |
| value=False, | |
| info="Si hay una grabación previa, continúa desde donde se detuvo." | |
| ) | |
| # Function to update dropdown choices on button click or page load | |
| def update_port_dropdowns(): | |
| ports = list_serial_device_paths() | |
| # The dropdowns allow custom values, so setting choices updates the list for selection | |
| return gr.update(choices=ports), gr.update(choices=ports) | |
| # Attach update function to refresh button and initial load | |
| refresh_ports_btn.click( | |
| fn=update_port_dropdowns, | |
| inputs=None, | |
| outputs=[robot_port_dropdown, teleop_port_dropdown] | |
| ) | |
| demo.load( | |
| fn=update_port_dropdowns, | |
| inputs=None, | |
| outputs=[robot_port_dropdown, teleop_port_dropdown] | |
| ) | |
| with gr.Tab("3. Iniciar Grabación"): | |
| gr.Markdown("## Ejecutar Grabación del Dataset") | |
| gr.Markdown( | |
| "Haz clic en el botón para iniciar la grabación. " | |
| "Una nueva ventana de LeRobot se abrirá para la teleoperación y visualización." | |
| ) | |
| record_btn = gr.Button("🚀 Iniciar Grabación 🚀", variant="primary") | |
| # Output para el estado y mensajes de la grabación | |
| record_status_output = gr.Textbox( | |
| label="Estado de la Grabación", | |
| interactive=False, | |
| visible=False, | |
| lines=10 | |
| ) | |
| record_btn.click( | |
| fn=gradio_record, | |
| inputs=[ | |
| # Use the current value from either dropdown or textbox (they are synchronized) | |
| robot_port_manual, # We'll use the textbox value as it's the ultimate source of truth | |
| teleop_port_manual, # Same for teleop port | |
| num_episodes_input, | |
| single_task_input, | |
| push_to_hub_input, | |
| resume_input, | |
| episode_time_s_input, | |
| reset_time_s_input | |
| ], | |
| outputs=[record_status_output, record_btn] # Update status output and re-enable button | |
| ) | |
| gr.Markdown("### Controles de Teclado Durante la Grabación (en la ventana de LeRobot):") | |
| gr.Markdown( | |
| "- **Flecha Derecha (→):** Detener el episodio actual y pasar al siguiente.\n" | |
| "- **Flecha Izquierda (←):** Cancelar el episodio actual y volver a grabar.\n" | |
| "- **Escape (ESC):** Detener la sesión, codificar videos y subir el dataset (si está activado)." | |
| ) | |
| gr.Markdown("---") | |
| gr.Markdown("Hecho con ❤️ para RobotCleanPupusas503") | |
| # Lanza la interfaz de Gradio | |
| if __name__ == "__main__": | |
| try: | |
| import serial # Test if pyserial is installed | |
| except ImportError: | |
| print("Error: La librería 'pyserial' no está instalada.") | |
| print("Por favor, instala 'pyserial' ejecutando: pip install pyserial") | |
| exit(1) | |
| demo.launch(share=False) |