File size: 6,400 Bytes
36ba3ef |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 |
import os.path
import time
import warnings
from typing import SupportsFloat, Any, Tuple, Dict
import requests
import json
import gymnasium as gym
from gymnasium.core import ObsType
import utils as U
from .minecraft_launcher import MinecraftInstance
from .process_monitor import SubprocessMonitor
class VoyagerEnv(gym.Env):
def __init__(
self,
mc_port=None,
azure_login=None,
server_host="http://127.0.0.1",
server_port=3000,
request_timeout=600,
log_path="./logs",
visual_server_port=-1
):
if not mc_port and not azure_login:
raise ValueError("Either mc_port or azure_login must be specified")
if mc_port and azure_login:
warnings.warn(
"Both mc_port and mc_login are specified, mc_port will be ignored"
)
self.mc_port = mc_port
self.visual_server_port = visual_server_port
self.azure_login = azure_login
self.server = f"{server_host}:{server_port}"
self.server_port = server_port
self.request_timeout = request_timeout
self.log_path = log_path
self.mineflayer = self.get_mineflayer_process(server_port)
if azure_login:
self.mc_instance = self.get_mc_instance()
else:
self.mc_instance = None
self.has_reset = False
self.reset_options = None
self.connected = False
self.server_paused = False
def get_mineflayer_process(self, server_port):
U.f_mkdir(self.log_path, "mineflayer")
file_path = os.path.abspath(os.path.dirname(__file__))
return SubprocessMonitor(
commands=[
"node",
U.f_join(file_path, "mineflayer/index.js"),
str(server_port),
str(self.visual_server_port)
],
name="mineflayer",
ready_match=r"Server started on port (\d+)",
log_path=U.f_join(self.log_path, "mineflayer"),
)
def get_mc_instance(self):
print("Creating Minecraft server")
U.f_mkdir(self.log_path, "minecraft")
return MinecraftInstance(
**self.azure_login,
mineflayer=self.mineflayer,
log_path=U.f_join(self.log_path, "minecraft"),
)
def check_process(self):
if self.mc_instance and not self.mc_instance.is_running:
print("Starting Minecraft server")
self.mc_instance.run()
self.mc_port = self.mc_instance.port
self.reset_options["port"] = self.mc_instance.port
print(f"Server started on port {self.reset_options['port']}")
retry = 0
while not self.mineflayer.is_running:
print("Mineflayer process has exited, restarting")
self.mineflayer.run()
if not self.mineflayer.is_running:
if retry > 3:
raise RuntimeError("Mineflayer process failed to start")
else:
continue
print(self.mineflayer.ready_line)
res = requests.post(
f"{self.server}/start",
json=self.reset_options,
timeout=self.request_timeout,
)
if res.status_code != 200:
self.mineflayer.stop()
raise RuntimeError(
f"Minecraft server reply with code {res.status_code}"
)
return res.json()
def step(
self,
code: str,
programs: str = "",
) -> Tuple[ObsType, SupportsFloat, bool, bool, Dict[str, Any]]:
if not self.has_reset:
raise RuntimeError("Environment has not been reset yet")
self.check_process()
self.unpause()
data = {
"code": code,
"programs": programs,
}
res = requests.post(
f"{self.server}/step", json=data, timeout=self.request_timeout
)
if res.status_code != 200:
raise RuntimeError("Failed to step Minecraft server")
returned_data = res.json()
self.pause()
return json.loads(returned_data)
def render(self):
raise NotImplementedError("render is not implemented")
def reset(
self,
*,
seed=None,
options=None,
) -> Tuple[ObsType, Dict[str, Any]]:
if options is None:
options = {}
if options.get("inventory", {}) and options.get("mode", "hard") != "hard":
raise RuntimeError("inventory can only be set when options is hard")
self.reset_options = {
"port": self.mc_port,
"reset": options.get("mode", "peaceful"),
"inventory": options.get("inventory", {}),
"equipment": options.get("equipment", []),
"spread": options.get("spread", False),
"waitTicks": options.get("wait_ticks", 5),
"position": options.get("position", None),
}
self.unpause()
self.mineflayer.stop()
time.sleep(1) # wait for mineflayer to exit
returned_data = self.check_process()
self.has_reset = True
self.connected = True
# All the reset in step will be soft
self.reset_options["reset"] = "soft"
self.pause()
return json.loads(returned_data)
def close(self):
self.unpause()
if self.connected:
res = requests.post(f"{self.server}/stop")
if res.status_code == 200:
self.connected = False
if self.mc_instance:
self.mc_instance.stop()
self.mineflayer.stop()
return not self.connected
def pause(self):
# if self.mineflayer.is_running and not self.server_paused:
# res = requests.post(f"{self.server}/pause")
# if res.status_code == 200:
# self.server_paused = True
return True # self.server_paused
def unpause(self):
# if self.mineflayer.is_running and self.server_paused:
# res = requests.post(f"{self.server}/pause")
# if res.status_code == 200:
# self.server_paused = False
# else:
# print(res.json())
return False # self.server_paused
|