Spaces:
Sleeping
Sleeping
| """ | |
| FRIDA Command Interpreter - Hugging Face Space | |
| Natural Language Robot Command Parser with Grounding Stage | |
| """ | |
| import os | |
| import sys | |
| import json | |
| import re | |
| import warnings | |
| import random | |
| from types import SimpleNamespace | |
| import gradio as gr | |
| from spaces import GPU | |
| # Add paths for imports | |
| sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) | |
| # Import BAML client | |
| from baml_client.sync_client import b | |
| from baml_py import ClientRegistry | |
| # Import command interpreter components | |
| from command_interpreter.caller import execute_function, clear_command_history | |
| from command_interpreter.tasks import Tasks | |
| # Import command generator | |
| from gpsr_commands_structured import CommandGenerator | |
| # ============================================================================ | |
| # Configuration | |
| # ============================================================================ | |
| # Fine-tuned model identifier | |
| FINETUNED_MODEL_KEY = 'RBRGS_FINETUNED' | |
| AVAILABLE_MODELS = [ | |
| FINETUNED_MODEL_KEY, | |
| 'GEMINI_PRO_2_5', | |
| 'GEMINI_FLASH_2_5', | |
| 'OPENAI_GPT_4_1_MINI', | |
| 'ANTHROPIC_CLAUDE_SONNET_4', | |
| 'META_LLAMA_3_3_70B', | |
| 'KIMI_K2_THINKING' | |
| ] | |
| MODEL_DISPLAY_NAMES = { | |
| FINETUNED_MODEL_KEY: 'RoBorregos Fine-tuned (Local)', | |
| 'GEMINI_PRO_2_5': 'Gemini Pro 2.5', | |
| 'GEMINI_FLASH_2_5': 'Gemini Flash 2.5', | |
| 'OPENAI_GPT_4_1_MINI': 'OpenAI GPT-4.1 Mini', | |
| 'ANTHROPIC_CLAUDE_SONNET_4': 'Claude Sonnet 4', | |
| 'META_LLAMA_3_3_70B': 'Meta Llama 3.3 70B', | |
| 'KIMI_K2_THINKING': 'Kimi K2 Thinking' | |
| } | |
| # Initialize client registry | |
| client_registry = ClientRegistry() | |
| # Initialize Tasks instance (includes embeddings) | |
| tasks = Tasks() | |
| # ============================================================================ | |
| # Fine-tuned Model (llama-cpp-python) | |
| # ============================================================================ | |
| # Lazy-loaded fine-tuned model | |
| _finetuned_llm = None | |
| # Reserve GPU for 60 seconds | |
| def get_finetuned_model(): | |
| """Lazy load the fine-tuned model using llama-cpp-python""" | |
| global _finetuned_llm | |
| if _finetuned_llm is None: | |
| print("Loading fine-tuned model from HuggingFace...") | |
| try: | |
| from llama_cpp import Llama | |
| _finetuned_llm = Llama.from_pretrained( | |
| repo_id="diegohc/rbrgs-finetuning", | |
| filename="q4/unsloth.Q4_K_M.gguf", | |
| n_ctx=4096, | |
| n_gpu_layers=-1, # Use all GPU layers with ZeroGPU | |
| verbose=False | |
| ) | |
| print("Fine-tuned model loaded successfully!") | |
| except Exception as e: | |
| print(f"Error loading fine-tuned model: {e}") | |
| raise | |
| return _finetuned_llm | |
| # Reserve GPU for 30 seconds per inference | |
| def inference_finetuned(command: str) -> list: | |
| """ | |
| Run inference on the fine-tuned model. | |
| Returns a list of command dictionaries. | |
| """ | |
| llm = get_finetuned_model() | |
| # Create chat completion with the fine-tuned model | |
| # Using the same prompt structure as BAML's GenerateCommandListFineTuned | |
| response = llm.create_chat_completion( | |
| messages=[ | |
| { | |
| "role": "system", | |
| "content": "You are a command interpreter for a robot. Your task is to interpret the user's command and convert it into a structured format that the robot can understand." | |
| }, | |
| { | |
| "role": "user", | |
| "content": command | |
| } | |
| ], | |
| max_tokens=2048, | |
| temperature=0.1, | |
| response_format={"type": "json_object"} | |
| ) | |
| # Extract the response text | |
| response_text = response['choices'][0]['message']['content'] | |
| # Parse JSON from response | |
| try: | |
| parsed = json.loads(response_text) | |
| if isinstance(parsed, dict) and 'commands' in parsed: | |
| return parsed['commands'] | |
| elif isinstance(parsed, list): | |
| return parsed | |
| else: | |
| return [parsed] | |
| except json.JSONDecodeError: | |
| # Try to extract JSON from the response | |
| json_match = re.search(r'\{[\s\S]*\}|\[[\s\S]*\]', response_text) | |
| if json_match: | |
| parsed = json.loads(json_match.group()) | |
| if isinstance(parsed, dict) and 'commands' in parsed: | |
| return parsed['commands'] | |
| elif isinstance(parsed, list): | |
| return parsed | |
| return [parsed] | |
| raise ValueError(f"Could not parse JSON from response: {response_text}") | |
| def create_command_object(cmd_dict: dict): | |
| """ | |
| Create a SimpleNamespace object from a command dictionary. | |
| This allows the execute_function to work with the fine-tuned model output. | |
| """ | |
| return SimpleNamespace(**cmd_dict) | |
| # ============================================================================ | |
| # Command Generator Setup | |
| # ============================================================================ | |
| def parse_names(data): | |
| """Parse names from markdown file content""" | |
| parsed_names = re.findall(r"\|\s*([A-Za-z]+)\s*\|", data, re.DOTALL) | |
| parsed_names = [name.strip() for name in parsed_names] | |
| return parsed_names[1:] if parsed_names else [] | |
| def parse_locations(data): | |
| """Parse locations from markdown file content""" | |
| parsed_locations = re.findall( | |
| r"\|\s*([0-9]+)\s*\|\s*([A-Za-z,\s, \(,\)]+)\|", data, re.DOTALL | |
| ) | |
| parsed_locations = [b.strip() for (a, b) in parsed_locations] | |
| parsed_placement_locations = [ | |
| location for location in parsed_locations if location.endswith("(p)") | |
| ] | |
| parsed_locations = [location.replace("(p)", "").strip() for location in parsed_locations] | |
| parsed_placement_locations = [ | |
| location.replace("(p)", "").strip() for location in parsed_placement_locations | |
| ] | |
| return parsed_locations, parsed_placement_locations | |
| def parse_rooms(data): | |
| """Parse rooms from markdown file content""" | |
| parsed_rooms = re.findall(r"\|\s*(\w+ \w*)\s*\|", data, re.DOTALL) | |
| parsed_rooms = [rooms.strip() for rooms in parsed_rooms] | |
| return parsed_rooms[1:] if parsed_rooms else [] | |
| def parse_objects(data): | |
| """Parse objects from markdown file content""" | |
| parsed_objects = re.findall(r"\|\s*(\w+)\s*\|", data, re.DOTALL) | |
| parsed_objects = [obj for obj in parsed_objects if obj != "Objectname"] | |
| parsed_objects = [obj.replace("_", " ").strip() for obj in parsed_objects] | |
| parsed_categories = re.findall(r"# Class \s*([\w,\s, \(,\)]+)\s*", data, re.DOTALL) | |
| parsed_categories = [category.strip() for category in parsed_categories] | |
| parsed_categories = [ | |
| category.replace("(", "").replace(")", "").split() | |
| for category in parsed_categories | |
| ] | |
| parsed_categories_plural = [cat[0].replace("_", " ") for cat in parsed_categories] | |
| parsed_categories_singular = [cat[1].replace("_", " ") for cat in parsed_categories] | |
| return parsed_objects, parsed_categories_plural, parsed_categories_singular | |
| def setup_command_generator(): | |
| """Set up the CommandGenerator with data from CompetitionTemplate""" | |
| try: | |
| base_path = os.path.join(os.path.dirname(__file__), 'CompetitionTemplate') | |
| names_file_path = os.path.join(base_path, 'names', 'names.md') | |
| locations_file_path = os.path.join(base_path, 'maps', 'location_names.md') | |
| rooms_file_path = os.path.join(base_path, 'maps', 'room_names.md') | |
| objects_file_path = os.path.join(base_path, 'objects', 'objects.md') | |
| with open(names_file_path, 'r') as f: | |
| names = parse_names(f.read()) | |
| with open(locations_file_path, 'r') as f: | |
| location_names, placement_location_names = parse_locations(f.read()) | |
| with open(rooms_file_path, 'r') as f: | |
| room_names = parse_rooms(f.read()) | |
| with open(objects_file_path, 'r') as f: | |
| object_names, object_categories_plural, object_categories_singular = parse_objects(f.read()) | |
| return CommandGenerator( | |
| names, location_names, placement_location_names, room_names, | |
| object_names, object_categories_plural, object_categories_singular | |
| ) | |
| except Exception as e: | |
| print(f"Warning: Could not set up CommandGenerator: {e}") | |
| return None | |
| # Initialize command generator | |
| command_generator = setup_command_generator() | |
| # ============================================================================ | |
| # Core Functions | |
| # ============================================================================ | |
| def format_command(cmd): | |
| """Format a command object for display""" | |
| if hasattr(cmd, 'model_dump'): | |
| return cmd.model_dump() | |
| return dict(cmd) if hasattr(cmd, '__iter__') else str(cmd) | |
| def interpret_command(command: str, model: str, execute: bool): | |
| """Interpret a natural language command using BAML or fine-tuned model""" | |
| if not command or not command.strip(): | |
| return "Please enter a command", "", "" | |
| # Get model key from display name | |
| model_key = None | |
| for key, display in MODEL_DISPLAY_NAMES.items(): | |
| if display == model: | |
| model_key = key | |
| break | |
| if not model_key: | |
| model_key = model # Fallback to using the value directly | |
| if model_key not in AVAILABLE_MODELS: | |
| return f"Invalid model: {model}", "", "" | |
| try: | |
| commands = [] | |
| execution_results = [] | |
| # Check if using fine-tuned model | |
| if model_key == FINETUNED_MODEL_KEY: | |
| # Use llama-cpp-python for fine-tuned model | |
| command_dicts = inference_finetuned(command) | |
| for cmd_dict in command_dicts: | |
| commands.append(cmd_dict) | |
| # Execute if requested | |
| if execute: | |
| try: | |
| # Create a SimpleNamespace object for execution | |
| cmd_obj = create_command_object(cmd_dict) | |
| action, success, result = execute_function(cmd_obj, tasks, grounding=True) | |
| execution_results.append({ | |
| "action": action, | |
| "success": success, | |
| "result": result | |
| }) | |
| except Exception as e: | |
| execution_results.append({ | |
| "action": cmd_dict.get('action', str(cmd_dict)), | |
| "success": False, | |
| "result": str(e) | |
| }) | |
| else: | |
| # Use BAML for API-based models | |
| client_registry.set_primary(model_key) | |
| # Generate command list using BAML | |
| command_list = b.GenerateCommandList( | |
| command, | |
| baml_options={"client_registry": client_registry} | |
| ) | |
| if hasattr(command_list, 'commands') and command_list.commands: | |
| for cmd in command_list.commands: | |
| cmd_dict = format_command(cmd) | |
| commands.append(cmd_dict) | |
| # Execute if requested | |
| if execute: | |
| try: | |
| action, success, result = execute_function(cmd, tasks, grounding=True) | |
| execution_results.append({ | |
| "action": action, | |
| "success": success, | |
| "result": result | |
| }) | |
| except Exception as e: | |
| execution_results.append({ | |
| "action": str(cmd), | |
| "success": False, | |
| "result": str(e) | |
| }) | |
| # Clear command history after execution | |
| if execute: | |
| clear_command_history(tasks) | |
| # Format output | |
| parsed_output = json.dumps(commands, indent=2) | |
| exec_output = json.dumps(execution_results, indent=2) if execute else "Execution disabled" | |
| return "Success", parsed_output, exec_output | |
| except Exception as e: | |
| return f"Error: {str(e)}", "", "" | |
| def generate_random_command(model: str, execute: bool): | |
| """Generate a random GPSR command and interpret it""" | |
| if not command_generator: | |
| return "Command generator not available", "", "", "" | |
| try: | |
| # Generate a random command | |
| string_cmd, structured_cmd = command_generator.generate_full_command() | |
| # Interpret the generated command | |
| status, parsed_output, exec_output = interpret_command(string_cmd, model, execute) | |
| return string_cmd, status, parsed_output, exec_output | |
| except Exception as e: | |
| return "", f"Error: {str(e)}", "", "" | |
| # ============================================================================ | |
| # Gradio Interface | |
| # ============================================================================ | |
| def create_interface(): | |
| """Create the Gradio interface""" | |
| with gr.Blocks( | |
| title="FRIDA Command Interpreter", | |
| theme=gr.themes.Soft(), | |
| css=""" | |
| .header { text-align: center; margin-bottom: 20px; } | |
| .footer { text-align: center; margin-top: 20px; font-size: 0.9em; } | |
| .result-box { font-family: monospace; } | |
| """ | |
| ) as demo: | |
| # Header | |
| gr.Markdown(""" | |
| # FRIDA Command Interpreter | |
| ### Natural Language Robot Command Parser with Grounding Stage | |
| Parse natural language commands into structured robot actions using LLMs and semantic embeddings for grounding. | |
| """) | |
| with gr.Row(): | |
| with gr.Column(scale=1): | |
| # Model selection | |
| model_dropdown = gr.Dropdown( | |
| choices=list(MODEL_DISPLAY_NAMES.values()), | |
| value="RoBorregos Fine-tuned (Local)", | |
| label="Select Model", | |
| info="Fine-tuned model runs locally (no API key needed). Other models require OPENROUTER_API_KEY." | |
| ) | |
| # Execute toggle | |
| execute_checkbox = gr.Checkbox( | |
| label="Execute commands (simulate with embeddings)", | |
| value=False, | |
| info="Enable grounding stage simulation" | |
| ) | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| # Command input | |
| command_input = gr.Textbox( | |
| label="Enter Command", | |
| placeholder="Enter a natural language command... (e.g., 'go to the kitchen and pick up the apple')", | |
| lines=3 | |
| ) | |
| with gr.Row(): | |
| interpret_btn = gr.Button("Interpret Command", variant="primary") | |
| generate_btn = gr.Button("Generate Random Command", variant="secondary") | |
| # Status output | |
| status_output = gr.Textbox(label="Status", interactive=False) | |
| with gr.Row(): | |
| with gr.Column(): | |
| # Parsed commands output | |
| parsed_output = gr.Code( | |
| label="Parsed Commands (JSON)", | |
| language="json", | |
| lines=15 | |
| ) | |
| with gr.Column(): | |
| # Execution results output | |
| exec_output = gr.Code( | |
| label="Execution Results (JSON)", | |
| language="json", | |
| lines=15 | |
| ) | |
| # Footer | |
| gr.Markdown(""" | |
| --- | |
| **Powered by BAML** | [GitHub](https://github.com/RoBorregos/frida-cortex) | [Paper](https://doi.org/10.1007/978-3-032-09037-9_24) | |
| *Taming the LLM: Reliable Task Planning for Robotics Using Parsing and Grounding* | |
| """) | |
| # Event handlers | |
| interpret_btn.click( | |
| fn=interpret_command, | |
| inputs=[command_input, model_dropdown, execute_checkbox], | |
| outputs=[status_output, parsed_output, exec_output] | |
| ) | |
| def generate_and_update(model, execute): | |
| cmd, status, parsed, exec_result = generate_random_command(model, execute) | |
| return cmd, status, parsed, exec_result | |
| generate_btn.click( | |
| fn=generate_and_update, | |
| inputs=[model_dropdown, execute_checkbox], | |
| outputs=[command_input, status_output, parsed_output, exec_output] | |
| ) | |
| # Keyboard shortcut: Ctrl+Enter to interpret | |
| command_input.submit( | |
| fn=interpret_command, | |
| inputs=[command_input, model_dropdown, execute_checkbox], | |
| outputs=[status_output, parsed_output, exec_output] | |
| ) | |
| return demo | |
| # ============================================================================ | |
| # Main | |
| # ============================================================================ | |
| if __name__ == "__main__": | |
| demo = create_interface() | |
| demo.launch() | |