File size: 5,313 Bytes
ac4aa15
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c2f48f9
9230f20
 
 
 
 
 
 
de864d9
9230f20
 
 
de864d9
9230f20
 
 
 
 
 
 
 
 
 
ac4aa15
 
9230f20
 
c2f48f9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
ac4aa15
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
bfe0eca
ac4aa15
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
import gradio as gr
import os
import numpy as np
from transformers import GPT2LMHeadModel, GPT2Tokenizer
import requests
import json
import uuid
from requests.exceptions import RequestException, HTTPError, Timeout, ConnectionError
from datasets import load_dataset

# Global initialization
print("Initializing GPT-2 Tokenizer and Model...")
tokenizer = GPT2Tokenizer.from_pretrained('gpt2')
model = GPT2LMHeadModel.from_pretrained('gpt2')

from scipy.spatial import KDTree


# Load your dataset
print("Loading dataset...")
dataset = load_dataset("visionlab/block-towers-10k-3s-trajectory-scale1", split='train')
print("Dataset loaded successfully.")

from scipy.spatial import KDTree

import time

# Initialize an empty list to store positions
final_positions = []

# Initialize a counter for progress tracking
counter = 0

print ("Counting...")
# Total positions count (assuming each item['data']['final_positions'] is a list of positions)
total_positions = sum(len(item['data']['final_positions']) for item in dataset)

print ("Done counting...")
for item in dataset:
    for position in item['data']['final_positions']:
        final_positions.append(position)
        counter += 1
        
        # Print progress every 100 steps
        if counter % 100 == 0 or counter == total_positions:
            print(f"Processed {counter} / {total_positions} positions...")

# Convert the list of positions to a NumPy array
positions_array = np.array([[p['x'], p['y'], p['z']] for p in final_positions])

print("Completed processing positions into an array.")

def estimate_time_and_build_kdtree(data):
    start_time = time.time()
    print("Estimating time based on dataset size...")
    
    # Mock estimation logic for demonstration
    estimated_time_seconds = len(data) * 0.0001  # Adjust based on your context
    print(f"Estimated time to build KD-tree: {estimated_time_seconds:.2f} seconds")
    
    # Actual KD-tree construction
    tree = KDTree(data)
    
    end_time = time.time()
    actual_time_taken = end_time - start_time
    print(f"KD-tree built in {actual_time_taken:.2f} seconds.")
    return tree

# Now, with positions_array defined, build the KD-tree and estimate time
tree = estimate_time_and_build_kdtree(positions_array)





def safe_convert_single_to_double_quotes(s):
    try:
        temp_placeholder = "<TEMP_ESCAPED_SINGLE_QUOTE>"
        s = s.replace("\\'", temp_placeholder)
        s = s.replace("'", '"')
        s = s.replace(temp_placeholder, "\\'")
        return json.loads(s)
    except Exception as e:
        print(f"Error converting or parsing JSON-like string: {e}")
        return None
        
class SecondLifeNavigator:
    def __init__(self):
        self.dataset = dataset
        print("SecondLifeNavigator initialized with the dataset.")

    
    def determine_action_sequence(current_xyz):
        print(f"Determining action sequence for position: {current_xyz}")
        
        # Query KD-tree for the nearest position
        distance, index = tree.query(current_xyz)
        nearest_position = final_positions[index]
    
        # Construct the action sequence
        action_sequence = f"Move to position {nearest_position['x']}, {nearest_position['y']}, {nearest_position['z']}"
        print(f"Action sequence determined: {action_sequence}")
        
        return action_sequence


    def send_command_to_corrade(self, corrade_endpoint, command, parameters):
        print(f"Sending command to Corrade: {command}")
        command_data = {
            "command": command,
            "group": "e269893f-a570-0087-930e-6ba2a0b77f9c",
            "password": "nucleus",
        }
        command_data.update(parameters)

        try:
            response = requests.post(corrade_endpoint, json=command_data, timeout=10)
            response.raise_for_status()
            print(f"Command {command} executed successfully.")
            return response.json()
        except Timeout:
            print("Error: The request timed out.")
        except HTTPError as http_err:
            print(f"HTTP error occurred: {http_err}, Status Code: {response.status_code}")
        except ConnectionError:
            print("Error: A connection error occurred.")
        except RequestException as req_err:
            print(f"Error sending command to Corrade: {req_err}")
        return None

def create_interface():
    
    def navigate(current_state):
        # Correctly convert single quotes to double quotes and decode JSON
        state_dict = safe_convert_single_to_double_quotes(current_state)
        sl_navigator = SecondLifeNavigator()
        # Pass the corrected and parsed JSON to determine_action_sequence
        action_sequence = sl_navigator.determine_action_sequence()  # Use state_dict instead of current_state directly
        random_id = str(uuid.uuid4())
        return random_id + ": " + action_sequence
        


    interface = gr.Interface(
        fn=navigate,
        inputs=gr.Textbox(lines=2, placeholder="Enter current state..."),
        outputs=gr.Textbox(label="Generated Action Sequence"),
        title="Second Life Navigator",
        description="Generates an action sequence based on the current state for navigating in Second Life.",
    )

    interface.launch()

if __name__ == "__main__":
    create_interface()