SL-NAV2 / app.py
AEUPH's picture
Update app.py
bfe0eca verified
import gradio as gr
import os
import numpy as np
from transformers import GPT2LMHeadModel, GPT2Tokenizer
import requests
import json
import uuid
from requests.exceptions import RequestException, HTTPError, Timeout, ConnectionError
from datasets import load_dataset
# Global initialization
print("Initializing GPT-2 Tokenizer and Model...")
tokenizer = GPT2Tokenizer.from_pretrained('gpt2')
model = GPT2LMHeadModel.from_pretrained('gpt2')
from scipy.spatial import KDTree
# Load your dataset
print("Loading dataset...")
dataset = load_dataset("visionlab/block-towers-10k-3s-trajectory-scale1", split='train')
print("Dataset loaded successfully.")
from scipy.spatial import KDTree
import time
# Initialize an empty list to store positions
final_positions = []
# Initialize a counter for progress tracking
counter = 0
print ("Counting...")
# Total positions count (assuming each item['data']['final_positions'] is a list of positions)
total_positions = sum(len(item['data']['final_positions']) for item in dataset)
print ("Done counting...")
for item in dataset:
for position in item['data']['final_positions']:
final_positions.append(position)
counter += 1
# Print progress every 100 steps
if counter % 100 == 0 or counter == total_positions:
print(f"Processed {counter} / {total_positions} positions...")
# Convert the list of positions to a NumPy array
positions_array = np.array([[p['x'], p['y'], p['z']] for p in final_positions])
print("Completed processing positions into an array.")
def estimate_time_and_build_kdtree(data):
start_time = time.time()
print("Estimating time based on dataset size...")
# Mock estimation logic for demonstration
estimated_time_seconds = len(data) * 0.0001 # Adjust based on your context
print(f"Estimated time to build KD-tree: {estimated_time_seconds:.2f} seconds")
# Actual KD-tree construction
tree = KDTree(data)
end_time = time.time()
actual_time_taken = end_time - start_time
print(f"KD-tree built in {actual_time_taken:.2f} seconds.")
return tree
# Now, with positions_array defined, build the KD-tree and estimate time
tree = estimate_time_and_build_kdtree(positions_array)
def safe_convert_single_to_double_quotes(s):
try:
temp_placeholder = "<TEMP_ESCAPED_SINGLE_QUOTE>"
s = s.replace("\\'", temp_placeholder)
s = s.replace("'", '"')
s = s.replace(temp_placeholder, "\\'")
return json.loads(s)
except Exception as e:
print(f"Error converting or parsing JSON-like string: {e}")
return None
class SecondLifeNavigator:
def __init__(self):
self.dataset = dataset
print("SecondLifeNavigator initialized with the dataset.")
def determine_action_sequence(current_xyz):
print(f"Determining action sequence for position: {current_xyz}")
# Query KD-tree for the nearest position
distance, index = tree.query(current_xyz)
nearest_position = final_positions[index]
# Construct the action sequence
action_sequence = f"Move to position {nearest_position['x']}, {nearest_position['y']}, {nearest_position['z']}"
print(f"Action sequence determined: {action_sequence}")
return action_sequence
def send_command_to_corrade(self, corrade_endpoint, command, parameters):
print(f"Sending command to Corrade: {command}")
command_data = {
"command": command,
"group": "e269893f-a570-0087-930e-6ba2a0b77f9c",
"password": "nucleus",
}
command_data.update(parameters)
try:
response = requests.post(corrade_endpoint, json=command_data, timeout=10)
response.raise_for_status()
print(f"Command {command} executed successfully.")
return response.json()
except Timeout:
print("Error: The request timed out.")
except HTTPError as http_err:
print(f"HTTP error occurred: {http_err}, Status Code: {response.status_code}")
except ConnectionError:
print("Error: A connection error occurred.")
except RequestException as req_err:
print(f"Error sending command to Corrade: {req_err}")
return None
def create_interface():
def navigate(current_state):
# Correctly convert single quotes to double quotes and decode JSON
state_dict = safe_convert_single_to_double_quotes(current_state)
sl_navigator = SecondLifeNavigator()
# Pass the corrected and parsed JSON to determine_action_sequence
action_sequence = sl_navigator.determine_action_sequence() # Use state_dict instead of current_state directly
random_id = str(uuid.uuid4())
return random_id + ": " + action_sequence
interface = gr.Interface(
fn=navigate,
inputs=gr.Textbox(lines=2, placeholder="Enter current state..."),
outputs=gr.Textbox(label="Generated Action Sequence"),
title="Second Life Navigator",
description="Generates an action sequence based on the current state for navigating in Second Life.",
)
interface.launch()
if __name__ == "__main__":
create_interface()