frida-cortex / app.py
afr2903's picture
[fix]: Fixes for ZeroGPU deployment
35e47c9
"""
FRIDA Command Interpreter - Hugging Face Space
Natural Language Robot Command Parser with Grounding Stage
"""
import os
import sys
import json
import re
import warnings
import random
from types import SimpleNamespace
import gradio as gr
from spaces import GPU
# Add paths for imports
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
# Import BAML client
from baml_client.sync_client import b
from baml_py import ClientRegistry
# Import command interpreter components
from command_interpreter.caller import execute_function, clear_command_history
from command_interpreter.tasks import Tasks
# Import command generator
from gpsr_commands_structured import CommandGenerator
# ============================================================================
# Configuration
# ============================================================================
# Fine-tuned model identifier
FINETUNED_MODEL_KEY = 'RBRGS_FINETUNED'
AVAILABLE_MODELS = [
FINETUNED_MODEL_KEY,
'GEMINI_PRO_2_5',
'GEMINI_FLASH_2_5',
'OPENAI_GPT_4_1_MINI',
'ANTHROPIC_CLAUDE_SONNET_4',
'META_LLAMA_3_3_70B',
'KIMI_K2_THINKING'
]
MODEL_DISPLAY_NAMES = {
FINETUNED_MODEL_KEY: 'RoBorregos Fine-tuned (Local)',
'GEMINI_PRO_2_5': 'Gemini Pro 2.5',
'GEMINI_FLASH_2_5': 'Gemini Flash 2.5',
'OPENAI_GPT_4_1_MINI': 'OpenAI GPT-4.1 Mini',
'ANTHROPIC_CLAUDE_SONNET_4': 'Claude Sonnet 4',
'META_LLAMA_3_3_70B': 'Meta Llama 3.3 70B',
'KIMI_K2_THINKING': 'Kimi K2 Thinking'
}
# Initialize client registry
client_registry = ClientRegistry()
# Initialize Tasks instance (includes embeddings)
tasks = Tasks()
# ============================================================================
# Fine-tuned Model (llama-cpp-python)
# ============================================================================
# Lazy-loaded fine-tuned model
_finetuned_llm = None
@GPU(duration=60) # Reserve GPU for 60 seconds
def get_finetuned_model():
"""Lazy load the fine-tuned model using llama-cpp-python"""
global _finetuned_llm
if _finetuned_llm is None:
print("Loading fine-tuned model from HuggingFace...")
try:
from llama_cpp import Llama
_finetuned_llm = Llama.from_pretrained(
repo_id="diegohc/rbrgs-finetuning",
filename="q4/unsloth.Q4_K_M.gguf",
n_ctx=4096,
n_gpu_layers=-1, # Use all GPU layers with ZeroGPU
verbose=False
)
print("Fine-tuned model loaded successfully!")
except Exception as e:
print(f"Error loading fine-tuned model: {e}")
raise
return _finetuned_llm
@GPU(duration=30) # Reserve GPU for 30 seconds per inference
def inference_finetuned(command: str) -> list:
"""
Run inference on the fine-tuned model.
Returns a list of command dictionaries.
"""
llm = get_finetuned_model()
# Create chat completion with the fine-tuned model
# Using the same prompt structure as BAML's GenerateCommandListFineTuned
response = llm.create_chat_completion(
messages=[
{
"role": "system",
"content": "You are a command interpreter for a robot. Your task is to interpret the user's command and convert it into a structured format that the robot can understand."
},
{
"role": "user",
"content": command
}
],
max_tokens=2048,
temperature=0.1,
response_format={"type": "json_object"}
)
# Extract the response text
response_text = response['choices'][0]['message']['content']
# Parse JSON from response
try:
parsed = json.loads(response_text)
if isinstance(parsed, dict) and 'commands' in parsed:
return parsed['commands']
elif isinstance(parsed, list):
return parsed
else:
return [parsed]
except json.JSONDecodeError:
# Try to extract JSON from the response
json_match = re.search(r'\{[\s\S]*\}|\[[\s\S]*\]', response_text)
if json_match:
parsed = json.loads(json_match.group())
if isinstance(parsed, dict) and 'commands' in parsed:
return parsed['commands']
elif isinstance(parsed, list):
return parsed
return [parsed]
raise ValueError(f"Could not parse JSON from response: {response_text}")
def create_command_object(cmd_dict: dict):
"""
Create a SimpleNamespace object from a command dictionary.
This allows the execute_function to work with the fine-tuned model output.
"""
return SimpleNamespace(**cmd_dict)
# ============================================================================
# Command Generator Setup
# ============================================================================
def parse_names(data):
"""Parse names from markdown file content"""
parsed_names = re.findall(r"\|\s*([A-Za-z]+)\s*\|", data, re.DOTALL)
parsed_names = [name.strip() for name in parsed_names]
return parsed_names[1:] if parsed_names else []
def parse_locations(data):
"""Parse locations from markdown file content"""
parsed_locations = re.findall(
r"\|\s*([0-9]+)\s*\|\s*([A-Za-z,\s, \(,\)]+)\|", data, re.DOTALL
)
parsed_locations = [b.strip() for (a, b) in parsed_locations]
parsed_placement_locations = [
location for location in parsed_locations if location.endswith("(p)")
]
parsed_locations = [location.replace("(p)", "").strip() for location in parsed_locations]
parsed_placement_locations = [
location.replace("(p)", "").strip() for location in parsed_placement_locations
]
return parsed_locations, parsed_placement_locations
def parse_rooms(data):
"""Parse rooms from markdown file content"""
parsed_rooms = re.findall(r"\|\s*(\w+ \w*)\s*\|", data, re.DOTALL)
parsed_rooms = [rooms.strip() for rooms in parsed_rooms]
return parsed_rooms[1:] if parsed_rooms else []
def parse_objects(data):
"""Parse objects from markdown file content"""
parsed_objects = re.findall(r"\|\s*(\w+)\s*\|", data, re.DOTALL)
parsed_objects = [obj for obj in parsed_objects if obj != "Objectname"]
parsed_objects = [obj.replace("_", " ").strip() for obj in parsed_objects]
parsed_categories = re.findall(r"# Class \s*([\w,\s, \(,\)]+)\s*", data, re.DOTALL)
parsed_categories = [category.strip() for category in parsed_categories]
parsed_categories = [
category.replace("(", "").replace(")", "").split()
for category in parsed_categories
]
parsed_categories_plural = [cat[0].replace("_", " ") for cat in parsed_categories]
parsed_categories_singular = [cat[1].replace("_", " ") for cat in parsed_categories]
return parsed_objects, parsed_categories_plural, parsed_categories_singular
def setup_command_generator():
"""Set up the CommandGenerator with data from CompetitionTemplate"""
try:
base_path = os.path.join(os.path.dirname(__file__), 'CompetitionTemplate')
names_file_path = os.path.join(base_path, 'names', 'names.md')
locations_file_path = os.path.join(base_path, 'maps', 'location_names.md')
rooms_file_path = os.path.join(base_path, 'maps', 'room_names.md')
objects_file_path = os.path.join(base_path, 'objects', 'objects.md')
with open(names_file_path, 'r') as f:
names = parse_names(f.read())
with open(locations_file_path, 'r') as f:
location_names, placement_location_names = parse_locations(f.read())
with open(rooms_file_path, 'r') as f:
room_names = parse_rooms(f.read())
with open(objects_file_path, 'r') as f:
object_names, object_categories_plural, object_categories_singular = parse_objects(f.read())
return CommandGenerator(
names, location_names, placement_location_names, room_names,
object_names, object_categories_plural, object_categories_singular
)
except Exception as e:
print(f"Warning: Could not set up CommandGenerator: {e}")
return None
# Initialize command generator
command_generator = setup_command_generator()
# ============================================================================
# Core Functions
# ============================================================================
def format_command(cmd):
"""Format a command object for display"""
if hasattr(cmd, 'model_dump'):
return cmd.model_dump()
return dict(cmd) if hasattr(cmd, '__iter__') else str(cmd)
def interpret_command(command: str, model: str, execute: bool):
"""Interpret a natural language command using BAML or fine-tuned model"""
if not command or not command.strip():
return "Please enter a command", "", ""
# Get model key from display name
model_key = None
for key, display in MODEL_DISPLAY_NAMES.items():
if display == model:
model_key = key
break
if not model_key:
model_key = model # Fallback to using the value directly
if model_key not in AVAILABLE_MODELS:
return f"Invalid model: {model}", "", ""
try:
commands = []
execution_results = []
# Check if using fine-tuned model
if model_key == FINETUNED_MODEL_KEY:
# Use llama-cpp-python for fine-tuned model
command_dicts = inference_finetuned(command)
for cmd_dict in command_dicts:
commands.append(cmd_dict)
# Execute if requested
if execute:
try:
# Create a SimpleNamespace object for execution
cmd_obj = create_command_object(cmd_dict)
action, success, result = execute_function(cmd_obj, tasks, grounding=True)
execution_results.append({
"action": action,
"success": success,
"result": result
})
except Exception as e:
execution_results.append({
"action": cmd_dict.get('action', str(cmd_dict)),
"success": False,
"result": str(e)
})
else:
# Use BAML for API-based models
client_registry.set_primary(model_key)
# Generate command list using BAML
command_list = b.GenerateCommandList(
command,
baml_options={"client_registry": client_registry}
)
if hasattr(command_list, 'commands') and command_list.commands:
for cmd in command_list.commands:
cmd_dict = format_command(cmd)
commands.append(cmd_dict)
# Execute if requested
if execute:
try:
action, success, result = execute_function(cmd, tasks, grounding=True)
execution_results.append({
"action": action,
"success": success,
"result": result
})
except Exception as e:
execution_results.append({
"action": str(cmd),
"success": False,
"result": str(e)
})
# Clear command history after execution
if execute:
clear_command_history(tasks)
# Format output
parsed_output = json.dumps(commands, indent=2)
exec_output = json.dumps(execution_results, indent=2) if execute else "Execution disabled"
return "Success", parsed_output, exec_output
except Exception as e:
return f"Error: {str(e)}", "", ""
def generate_random_command(model: str, execute: bool):
"""Generate a random GPSR command and interpret it"""
if not command_generator:
return "Command generator not available", "", "", ""
try:
# Generate a random command
string_cmd, structured_cmd = command_generator.generate_full_command()
# Interpret the generated command
status, parsed_output, exec_output = interpret_command(string_cmd, model, execute)
return string_cmd, status, parsed_output, exec_output
except Exception as e:
return "", f"Error: {str(e)}", "", ""
# ============================================================================
# Gradio Interface
# ============================================================================
def create_interface():
"""Create the Gradio interface"""
with gr.Blocks(
title="FRIDA Command Interpreter",
theme=gr.themes.Soft(),
css="""
.header { text-align: center; margin-bottom: 20px; }
.footer { text-align: center; margin-top: 20px; font-size: 0.9em; }
.result-box { font-family: monospace; }
"""
) as demo:
# Header
gr.Markdown("""
# FRIDA Command Interpreter
### Natural Language Robot Command Parser with Grounding Stage
Parse natural language commands into structured robot actions using LLMs and semantic embeddings for grounding.
""")
with gr.Row():
with gr.Column(scale=1):
# Model selection
model_dropdown = gr.Dropdown(
choices=list(MODEL_DISPLAY_NAMES.values()),
value="RoBorregos Fine-tuned (Local)",
label="Select Model",
info="Fine-tuned model runs locally (no API key needed). Other models require OPENROUTER_API_KEY."
)
# Execute toggle
execute_checkbox = gr.Checkbox(
label="Execute commands (simulate with embeddings)",
value=False,
info="Enable grounding stage simulation"
)
with gr.Row():
with gr.Column(scale=2):
# Command input
command_input = gr.Textbox(
label="Enter Command",
placeholder="Enter a natural language command... (e.g., 'go to the kitchen and pick up the apple')",
lines=3
)
with gr.Row():
interpret_btn = gr.Button("Interpret Command", variant="primary")
generate_btn = gr.Button("Generate Random Command", variant="secondary")
# Status output
status_output = gr.Textbox(label="Status", interactive=False)
with gr.Row():
with gr.Column():
# Parsed commands output
parsed_output = gr.Code(
label="Parsed Commands (JSON)",
language="json",
lines=15
)
with gr.Column():
# Execution results output
exec_output = gr.Code(
label="Execution Results (JSON)",
language="json",
lines=15
)
# Footer
gr.Markdown("""
---
**Powered by BAML** | [GitHub](https://github.com/RoBorregos/frida-cortex) | [Paper](https://doi.org/10.1007/978-3-032-09037-9_24)
*Taming the LLM: Reliable Task Planning for Robotics Using Parsing and Grounding*
""")
# Event handlers
interpret_btn.click(
fn=interpret_command,
inputs=[command_input, model_dropdown, execute_checkbox],
outputs=[status_output, parsed_output, exec_output]
)
def generate_and_update(model, execute):
cmd, status, parsed, exec_result = generate_random_command(model, execute)
return cmd, status, parsed, exec_result
generate_btn.click(
fn=generate_and_update,
inputs=[model_dropdown, execute_checkbox],
outputs=[command_input, status_output, parsed_output, exec_output]
)
# Keyboard shortcut: Ctrl+Enter to interpret
command_input.submit(
fn=interpret_command,
inputs=[command_input, model_dropdown, execute_checkbox],
outputs=[status_output, parsed_output, exec_output]
)
return demo
# ============================================================================
# Main
# ============================================================================
if __name__ == "__main__":
demo = create_interface()
demo.launch()