""" FRIDA Command Interpreter - Hugging Face Space Natural Language Robot Command Parser with Grounding Stage """ import os import sys import json import re import warnings import random from types import SimpleNamespace import gradio as gr from spaces import GPU # Add paths for imports sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) # Import BAML client from baml_client.sync_client import b from baml_py import ClientRegistry # Import command interpreter components from command_interpreter.caller import execute_function, clear_command_history from command_interpreter.tasks import Tasks # Import command generator from gpsr_commands_structured import CommandGenerator # ============================================================================ # Configuration # ============================================================================ # Fine-tuned model identifier FINETUNED_MODEL_KEY = 'RBRGS_FINETUNED' AVAILABLE_MODELS = [ FINETUNED_MODEL_KEY, 'GEMINI_PRO_2_5', 'GEMINI_FLASH_2_5', 'OPENAI_GPT_4_1_MINI', 'ANTHROPIC_CLAUDE_SONNET_4', 'META_LLAMA_3_3_70B', 'KIMI_K2_THINKING' ] MODEL_DISPLAY_NAMES = { FINETUNED_MODEL_KEY: 'RoBorregos Fine-tuned (Local)', 'GEMINI_PRO_2_5': 'Gemini Pro 2.5', 'GEMINI_FLASH_2_5': 'Gemini Flash 2.5', 'OPENAI_GPT_4_1_MINI': 'OpenAI GPT-4.1 Mini', 'ANTHROPIC_CLAUDE_SONNET_4': 'Claude Sonnet 4', 'META_LLAMA_3_3_70B': 'Meta Llama 3.3 70B', 'KIMI_K2_THINKING': 'Kimi K2 Thinking' } # Initialize client registry client_registry = ClientRegistry() # Initialize Tasks instance (includes embeddings) tasks = Tasks() # ============================================================================ # Fine-tuned Model (llama-cpp-python) # ============================================================================ # Lazy-loaded fine-tuned model _finetuned_llm = None @GPU(duration=60) # Reserve GPU for 60 seconds def get_finetuned_model(): """Lazy load the fine-tuned model using llama-cpp-python""" global _finetuned_llm if _finetuned_llm is None: print("Loading fine-tuned model from HuggingFace...") try: from llama_cpp import Llama _finetuned_llm = Llama.from_pretrained( repo_id="diegohc/rbrgs-finetuning", filename="q4/unsloth.Q4_K_M.gguf", n_ctx=4096, n_gpu_layers=-1, # Use all GPU layers with ZeroGPU verbose=False ) print("Fine-tuned model loaded successfully!") except Exception as e: print(f"Error loading fine-tuned model: {e}") raise return _finetuned_llm @GPU(duration=30) # Reserve GPU for 30 seconds per inference def inference_finetuned(command: str) -> list: """ Run inference on the fine-tuned model. Returns a list of command dictionaries. """ llm = get_finetuned_model() # Create chat completion with the fine-tuned model # Using the same prompt structure as BAML's GenerateCommandListFineTuned response = llm.create_chat_completion( messages=[ { "role": "system", "content": "You are a command interpreter for a robot. Your task is to interpret the user's command and convert it into a structured format that the robot can understand." }, { "role": "user", "content": command } ], max_tokens=2048, temperature=0.1, response_format={"type": "json_object"} ) # Extract the response text response_text = response['choices'][0]['message']['content'] # Parse JSON from response try: parsed = json.loads(response_text) if isinstance(parsed, dict) and 'commands' in parsed: return parsed['commands'] elif isinstance(parsed, list): return parsed else: return [parsed] except json.JSONDecodeError: # Try to extract JSON from the response json_match = re.search(r'\{[\s\S]*\}|\[[\s\S]*\]', response_text) if json_match: parsed = json.loads(json_match.group()) if isinstance(parsed, dict) and 'commands' in parsed: return parsed['commands'] elif isinstance(parsed, list): return parsed return [parsed] raise ValueError(f"Could not parse JSON from response: {response_text}") def create_command_object(cmd_dict: dict): """ Create a SimpleNamespace object from a command dictionary. This allows the execute_function to work with the fine-tuned model output. """ return SimpleNamespace(**cmd_dict) # ============================================================================ # Command Generator Setup # ============================================================================ def parse_names(data): """Parse names from markdown file content""" parsed_names = re.findall(r"\|\s*([A-Za-z]+)\s*\|", data, re.DOTALL) parsed_names = [name.strip() for name in parsed_names] return parsed_names[1:] if parsed_names else [] def parse_locations(data): """Parse locations from markdown file content""" parsed_locations = re.findall( r"\|\s*([0-9]+)\s*\|\s*([A-Za-z,\s, \(,\)]+)\|", data, re.DOTALL ) parsed_locations = [b.strip() for (a, b) in parsed_locations] parsed_placement_locations = [ location for location in parsed_locations if location.endswith("(p)") ] parsed_locations = [location.replace("(p)", "").strip() for location in parsed_locations] parsed_placement_locations = [ location.replace("(p)", "").strip() for location in parsed_placement_locations ] return parsed_locations, parsed_placement_locations def parse_rooms(data): """Parse rooms from markdown file content""" parsed_rooms = re.findall(r"\|\s*(\w+ \w*)\s*\|", data, re.DOTALL) parsed_rooms = [rooms.strip() for rooms in parsed_rooms] return parsed_rooms[1:] if parsed_rooms else [] def parse_objects(data): """Parse objects from markdown file content""" parsed_objects = re.findall(r"\|\s*(\w+)\s*\|", data, re.DOTALL) parsed_objects = [obj for obj in parsed_objects if obj != "Objectname"] parsed_objects = [obj.replace("_", " ").strip() for obj in parsed_objects] parsed_categories = re.findall(r"# Class \s*([\w,\s, \(,\)]+)\s*", data, re.DOTALL) parsed_categories = [category.strip() for category in parsed_categories] parsed_categories = [ category.replace("(", "").replace(")", "").split() for category in parsed_categories ] parsed_categories_plural = [cat[0].replace("_", " ") for cat in parsed_categories] parsed_categories_singular = [cat[1].replace("_", " ") for cat in parsed_categories] return parsed_objects, parsed_categories_plural, parsed_categories_singular def setup_command_generator(): """Set up the CommandGenerator with data from CompetitionTemplate""" try: base_path = os.path.join(os.path.dirname(__file__), 'CompetitionTemplate') names_file_path = os.path.join(base_path, 'names', 'names.md') locations_file_path = os.path.join(base_path, 'maps', 'location_names.md') rooms_file_path = os.path.join(base_path, 'maps', 'room_names.md') objects_file_path = os.path.join(base_path, 'objects', 'objects.md') with open(names_file_path, 'r') as f: names = parse_names(f.read()) with open(locations_file_path, 'r') as f: location_names, placement_location_names = parse_locations(f.read()) with open(rooms_file_path, 'r') as f: room_names = parse_rooms(f.read()) with open(objects_file_path, 'r') as f: object_names, object_categories_plural, object_categories_singular = parse_objects(f.read()) return CommandGenerator( names, location_names, placement_location_names, room_names, object_names, object_categories_plural, object_categories_singular ) except Exception as e: print(f"Warning: Could not set up CommandGenerator: {e}") return None # Initialize command generator command_generator = setup_command_generator() # ============================================================================ # Core Functions # ============================================================================ def format_command(cmd): """Format a command object for display""" if hasattr(cmd, 'model_dump'): return cmd.model_dump() return dict(cmd) if hasattr(cmd, '__iter__') else str(cmd) def interpret_command(command: str, model: str, execute: bool): """Interpret a natural language command using BAML or fine-tuned model""" if not command or not command.strip(): return "Please enter a command", "", "" # Get model key from display name model_key = None for key, display in MODEL_DISPLAY_NAMES.items(): if display == model: model_key = key break if not model_key: model_key = model # Fallback to using the value directly if model_key not in AVAILABLE_MODELS: return f"Invalid model: {model}", "", "" try: commands = [] execution_results = [] # Check if using fine-tuned model if model_key == FINETUNED_MODEL_KEY: # Use llama-cpp-python for fine-tuned model command_dicts = inference_finetuned(command) for cmd_dict in command_dicts: commands.append(cmd_dict) # Execute if requested if execute: try: # Create a SimpleNamespace object for execution cmd_obj = create_command_object(cmd_dict) action, success, result = execute_function(cmd_obj, tasks, grounding=True) execution_results.append({ "action": action, "success": success, "result": result }) except Exception as e: execution_results.append({ "action": cmd_dict.get('action', str(cmd_dict)), "success": False, "result": str(e) }) else: # Use BAML for API-based models client_registry.set_primary(model_key) # Generate command list using BAML command_list = b.GenerateCommandList( command, baml_options={"client_registry": client_registry} ) if hasattr(command_list, 'commands') and command_list.commands: for cmd in command_list.commands: cmd_dict = format_command(cmd) commands.append(cmd_dict) # Execute if requested if execute: try: action, success, result = execute_function(cmd, tasks, grounding=True) execution_results.append({ "action": action, "success": success, "result": result }) except Exception as e: execution_results.append({ "action": str(cmd), "success": False, "result": str(e) }) # Clear command history after execution if execute: clear_command_history(tasks) # Format output parsed_output = json.dumps(commands, indent=2) exec_output = json.dumps(execution_results, indent=2) if execute else "Execution disabled" return "Success", parsed_output, exec_output except Exception as e: return f"Error: {str(e)}", "", "" def generate_random_command(model: str, execute: bool): """Generate a random GPSR command and interpret it""" if not command_generator: return "Command generator not available", "", "", "" try: # Generate a random command string_cmd, structured_cmd = command_generator.generate_full_command() # Interpret the generated command status, parsed_output, exec_output = interpret_command(string_cmd, model, execute) return string_cmd, status, parsed_output, exec_output except Exception as e: return "", f"Error: {str(e)}", "", "" # ============================================================================ # Gradio Interface # ============================================================================ def create_interface(): """Create the Gradio interface""" with gr.Blocks( title="FRIDA Command Interpreter", theme=gr.themes.Soft(), css=""" .header { text-align: center; margin-bottom: 20px; } .footer { text-align: center; margin-top: 20px; font-size: 0.9em; } .result-box { font-family: monospace; } """ ) as demo: # Header gr.Markdown(""" # FRIDA Command Interpreter ### Natural Language Robot Command Parser with Grounding Stage Parse natural language commands into structured robot actions using LLMs and semantic embeddings for grounding. """) with gr.Row(): with gr.Column(scale=1): # Model selection model_dropdown = gr.Dropdown( choices=list(MODEL_DISPLAY_NAMES.values()), value="RoBorregos Fine-tuned (Local)", label="Select Model", info="Fine-tuned model runs locally (no API key needed). Other models require OPENROUTER_API_KEY." ) # Execute toggle execute_checkbox = gr.Checkbox( label="Execute commands (simulate with embeddings)", value=False, info="Enable grounding stage simulation" ) with gr.Row(): with gr.Column(scale=2): # Command input command_input = gr.Textbox( label="Enter Command", placeholder="Enter a natural language command... (e.g., 'go to the kitchen and pick up the apple')", lines=3 ) with gr.Row(): interpret_btn = gr.Button("Interpret Command", variant="primary") generate_btn = gr.Button("Generate Random Command", variant="secondary") # Status output status_output = gr.Textbox(label="Status", interactive=False) with gr.Row(): with gr.Column(): # Parsed commands output parsed_output = gr.Code( label="Parsed Commands (JSON)", language="json", lines=15 ) with gr.Column(): # Execution results output exec_output = gr.Code( label="Execution Results (JSON)", language="json", lines=15 ) # Footer gr.Markdown(""" --- **Powered by BAML** | [GitHub](https://github.com/RoBorregos/frida-cortex) | [Paper](https://doi.org/10.1007/978-3-032-09037-9_24) *Taming the LLM: Reliable Task Planning for Robotics Using Parsing and Grounding* """) # Event handlers interpret_btn.click( fn=interpret_command, inputs=[command_input, model_dropdown, execute_checkbox], outputs=[status_output, parsed_output, exec_output] ) def generate_and_update(model, execute): cmd, status, parsed, exec_result = generate_random_command(model, execute) return cmd, status, parsed, exec_result generate_btn.click( fn=generate_and_update, inputs=[model_dropdown, execute_checkbox], outputs=[command_input, status_output, parsed_output, exec_output] ) # Keyboard shortcut: Ctrl+Enter to interpret command_input.submit( fn=interpret_command, inputs=[command_input, model_dropdown, execute_checkbox], outputs=[status_output, parsed_output, exec_output] ) return demo # ============================================================================ # Main # ============================================================================ if __name__ == "__main__": demo = create_interface() demo.launch()