import gradio as gr import os import numpy as np from transformers import GPT2LMHeadModel, GPT2Tokenizer import requests import json import uuid from requests.exceptions import RequestException, HTTPError, Timeout, ConnectionError from datasets import load_dataset # Global initialization print("Initializing GPT-2 Tokenizer and Model...") tokenizer = GPT2Tokenizer.from_pretrained('gpt2') model = GPT2LMHeadModel.from_pretrained('gpt2') from scipy.spatial import KDTree # Load your dataset print("Loading dataset...") dataset = load_dataset("visionlab/block-towers-10k-3s-trajectory-scale1", split='train') print("Dataset loaded successfully.") from scipy.spatial import KDTree import time # Initialize an empty list to store positions final_positions = [] # Initialize a counter for progress tracking counter = 0 print ("Counting...") # Total positions count (assuming each item['data']['final_positions'] is a list of positions) total_positions = sum(len(item['data']['final_positions']) for item in dataset) print ("Done counting...") for item in dataset: for position in item['data']['final_positions']: final_positions.append(position) counter += 1 # Print progress every 100 steps if counter % 100 == 0 or counter == total_positions: print(f"Processed {counter} / {total_positions} positions...") # Convert the list of positions to a NumPy array positions_array = np.array([[p['x'], p['y'], p['z']] for p in final_positions]) print("Completed processing positions into an array.") def estimate_time_and_build_kdtree(data): start_time = time.time() print("Estimating time based on dataset size...") # Mock estimation logic for demonstration estimated_time_seconds = len(data) * 0.0001 # Adjust based on your context print(f"Estimated time to build KD-tree: {estimated_time_seconds:.2f} seconds") # Actual KD-tree construction tree = KDTree(data) end_time = time.time() actual_time_taken = end_time - start_time print(f"KD-tree built in {actual_time_taken:.2f} seconds.") return tree # Now, with positions_array defined, build the KD-tree and estimate time tree = estimate_time_and_build_kdtree(positions_array) def safe_convert_single_to_double_quotes(s): try: temp_placeholder = "" s = s.replace("\\'", temp_placeholder) s = s.replace("'", '"') s = s.replace(temp_placeholder, "\\'") return json.loads(s) except Exception as e: print(f"Error converting or parsing JSON-like string: {e}") return None class SecondLifeNavigator: def __init__(self): self.dataset = dataset print("SecondLifeNavigator initialized with the dataset.") def determine_action_sequence(current_xyz): print(f"Determining action sequence for position: {current_xyz}") # Query KD-tree for the nearest position distance, index = tree.query(current_xyz) nearest_position = final_positions[index] # Construct the action sequence action_sequence = f"Move to position {nearest_position['x']}, {nearest_position['y']}, {nearest_position['z']}" print(f"Action sequence determined: {action_sequence}") return action_sequence def send_command_to_corrade(self, corrade_endpoint, command, parameters): print(f"Sending command to Corrade: {command}") command_data = { "command": command, "group": "e269893f-a570-0087-930e-6ba2a0b77f9c", "password": "nucleus", } command_data.update(parameters) try: response = requests.post(corrade_endpoint, json=command_data, timeout=10) response.raise_for_status() print(f"Command {command} executed successfully.") return response.json() except Timeout: print("Error: The request timed out.") except HTTPError as http_err: print(f"HTTP error occurred: {http_err}, Status Code: {response.status_code}") except ConnectionError: print("Error: A connection error occurred.") except RequestException as req_err: print(f"Error sending command to Corrade: {req_err}") return None def create_interface(): def navigate(current_state): # Correctly convert single quotes to double quotes and decode JSON state_dict = safe_convert_single_to_double_quotes(current_state) sl_navigator = SecondLifeNavigator() # Pass the corrected and parsed JSON to determine_action_sequence action_sequence = sl_navigator.determine_action_sequence() # Use state_dict instead of current_state directly random_id = str(uuid.uuid4()) return random_id + ": " + action_sequence interface = gr.Interface( fn=navigate, inputs=gr.Textbox(lines=2, placeholder="Enter current state..."), outputs=gr.Textbox(label="Generated Action Sequence"), title="Second Life Navigator", description="Generates an action sequence based on the current state for navigating in Second Life.", ) interface.launch() if __name__ == "__main__": create_interface()