|
|
import gradio as gr |
|
|
import os |
|
|
import numpy as np |
|
|
from transformers import GPT2LMHeadModel, GPT2Tokenizer |
|
|
import requests |
|
|
import json |
|
|
import uuid |
|
|
from requests.exceptions import RequestException, HTTPError, Timeout, ConnectionError |
|
|
from datasets import load_dataset |
|
|
|
|
|
|
|
|
print("Initializing GPT-2 Tokenizer and Model...") |
|
|
tokenizer = GPT2Tokenizer.from_pretrained('gpt2') |
|
|
model = GPT2LMHeadModel.from_pretrained('gpt2') |
|
|
|
|
|
from scipy.spatial import KDTree |
|
|
|
|
|
|
|
|
|
|
|
print("Loading dataset...") |
|
|
dataset = load_dataset("visionlab/block-towers-10k-3s-trajectory-scale1", split='train') |
|
|
print("Dataset loaded successfully.") |
|
|
|
|
|
from scipy.spatial import KDTree |
|
|
|
|
|
import time |
|
|
|
|
|
|
|
|
final_positions = [] |
|
|
|
|
|
|
|
|
counter = 0 |
|
|
|
|
|
print ("Counting...") |
|
|
|
|
|
total_positions = sum(len(item['data']['final_positions']) for item in dataset) |
|
|
|
|
|
print ("Done counting...") |
|
|
for item in dataset: |
|
|
for position in item['data']['final_positions']: |
|
|
final_positions.append(position) |
|
|
counter += 1 |
|
|
|
|
|
|
|
|
if counter % 100 == 0 or counter == total_positions: |
|
|
print(f"Processed {counter} / {total_positions} positions...") |
|
|
|
|
|
|
|
|
positions_array = np.array([[p['x'], p['y'], p['z']] for p in final_positions]) |
|
|
|
|
|
print("Completed processing positions into an array.") |
|
|
|
|
|
def estimate_time_and_build_kdtree(data): |
|
|
start_time = time.time() |
|
|
print("Estimating time based on dataset size...") |
|
|
|
|
|
|
|
|
estimated_time_seconds = len(data) * 0.0001 |
|
|
print(f"Estimated time to build KD-tree: {estimated_time_seconds:.2f} seconds") |
|
|
|
|
|
|
|
|
tree = KDTree(data) |
|
|
|
|
|
end_time = time.time() |
|
|
actual_time_taken = end_time - start_time |
|
|
print(f"KD-tree built in {actual_time_taken:.2f} seconds.") |
|
|
return tree |
|
|
|
|
|
|
|
|
tree = estimate_time_and_build_kdtree(positions_array) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def safe_convert_single_to_double_quotes(s): |
|
|
try: |
|
|
temp_placeholder = "<TEMP_ESCAPED_SINGLE_QUOTE>" |
|
|
s = s.replace("\\'", temp_placeholder) |
|
|
s = s.replace("'", '"') |
|
|
s = s.replace(temp_placeholder, "\\'") |
|
|
return json.loads(s) |
|
|
except Exception as e: |
|
|
print(f"Error converting or parsing JSON-like string: {e}") |
|
|
return None |
|
|
|
|
|
class SecondLifeNavigator: |
|
|
def __init__(self): |
|
|
self.dataset = dataset |
|
|
print("SecondLifeNavigator initialized with the dataset.") |
|
|
|
|
|
|
|
|
def determine_action_sequence(current_xyz): |
|
|
print(f"Determining action sequence for position: {current_xyz}") |
|
|
|
|
|
|
|
|
distance, index = tree.query(current_xyz) |
|
|
nearest_position = final_positions[index] |
|
|
|
|
|
|
|
|
action_sequence = f"Move to position {nearest_position['x']}, {nearest_position['y']}, {nearest_position['z']}" |
|
|
print(f"Action sequence determined: {action_sequence}") |
|
|
|
|
|
return action_sequence |
|
|
|
|
|
|
|
|
def send_command_to_corrade(self, corrade_endpoint, command, parameters): |
|
|
print(f"Sending command to Corrade: {command}") |
|
|
command_data = { |
|
|
"command": command, |
|
|
"group": "e269893f-a570-0087-930e-6ba2a0b77f9c", |
|
|
"password": "nucleus", |
|
|
} |
|
|
command_data.update(parameters) |
|
|
|
|
|
try: |
|
|
response = requests.post(corrade_endpoint, json=command_data, timeout=10) |
|
|
response.raise_for_status() |
|
|
print(f"Command {command} executed successfully.") |
|
|
return response.json() |
|
|
except Timeout: |
|
|
print("Error: The request timed out.") |
|
|
except HTTPError as http_err: |
|
|
print(f"HTTP error occurred: {http_err}, Status Code: {response.status_code}") |
|
|
except ConnectionError: |
|
|
print("Error: A connection error occurred.") |
|
|
except RequestException as req_err: |
|
|
print(f"Error sending command to Corrade: {req_err}") |
|
|
return None |
|
|
|
|
|
def create_interface(): |
|
|
|
|
|
def navigate(current_state): |
|
|
|
|
|
state_dict = safe_convert_single_to_double_quotes(current_state) |
|
|
sl_navigator = SecondLifeNavigator() |
|
|
|
|
|
action_sequence = sl_navigator.determine_action_sequence() |
|
|
random_id = str(uuid.uuid4()) |
|
|
return random_id + ": " + action_sequence |
|
|
|
|
|
|
|
|
|
|
|
interface = gr.Interface( |
|
|
fn=navigate, |
|
|
inputs=gr.Textbox(lines=2, placeholder="Enter current state..."), |
|
|
outputs=gr.Textbox(label="Generated Action Sequence"), |
|
|
title="Second Life Navigator", |
|
|
description="Generates an action sequence based on the current state for navigating in Second Life.", |
|
|
) |
|
|
|
|
|
interface.launch() |
|
|
|
|
|
if __name__ == "__main__": |
|
|
create_interface() |
|
|
|