files
Browse files
app.py
CHANGED
|
@@ -9,13 +9,9 @@ import zipfile
|
|
| 9 |
import tempfile
|
| 10 |
import uuid
|
| 11 |
|
| 12 |
-
# ---
|
| 13 |
-
# (Reuse the class code from previous turns or the game_env.py above)
|
| 14 |
-
# Ensure the .render() method is the full HTML version from your FIRST prompt.
|
| 15 |
-
# ----------------------------------------------
|
| 16 |
-
|
| 17 |
-
# --- COPY OF MegaWorldEnv CLASS FOR CONTEXT ---
|
| 18 |
RADAR_ENCODING = {"EMPTY": 0,"WALL": 1,"GOAL": 2,"ICE": 3,"MUD": 4,"DANGER": 5,"CHARGER": 6,"ENEMY": 7}
|
|
|
|
| 19 |
class MegaWorldEnv:
|
| 20 |
def __init__(self):
|
| 21 |
self.start = (1, 1); self.goal = (18, 18)
|
|
@@ -26,6 +22,7 @@ class MegaWorldEnv:
|
|
| 26 |
self.chargers = [(18,2),(10,10)]
|
| 27 |
self.enemies = [{"pos":[5,5],"type":"patrol","axis":"x","range":(5,10),"dir":1},{"pos":[15,5],"type":"patrol","axis":"x","range":(12,17),"dir":1},{"pos":[12,12],"type":"hunter", "step": 0}, {"pos":[16,16],"type":"hunter", "step": 0}]
|
| 28 |
random.shuffle(self.enemies)
|
|
|
|
| 29 |
def _generate_walls(self):
|
| 30 |
walls = []
|
| 31 |
for y in range(20):
|
|
@@ -36,8 +33,13 @@ class MegaWorldEnv:
|
|
| 36 |
for i in range(15, 19): walls.append((i, 15))
|
| 37 |
walls.extend([(14,14), (13,13)])
|
| 38 |
return walls
|
|
|
|
|
|
|
| 39 |
def shaped_reward(self, old_pos, new_pos):
|
| 40 |
-
|
|
|
|
|
|
|
|
|
|
| 41 |
def get_radar(self, pos):
|
| 42 |
x,y=pos; radar={}
|
| 43 |
dirs={"up":(x,y+1),"down":(x,y-1),"left":(x-1,y),"right":(x+1,y)}
|
|
@@ -54,6 +56,7 @@ class MegaWorldEnv:
|
|
| 54 |
if tuple(e["pos"])==(nx,ny): info="ENEMY"
|
| 55 |
radar[d]=RADAR_ENCODING[info]
|
| 56 |
return radar
|
|
|
|
| 57 |
def update_enemies(self, player_pos):
|
| 58 |
for e in self.enemies:
|
| 59 |
if e["type"]=="patrol":
|
|
@@ -66,9 +69,11 @@ class MegaWorldEnv:
|
|
| 66 |
nx, ny = e["pos"][0] + move[0], e["pos"][1] + move[1]
|
| 67 |
if (nx, ny) not in self.walls and 0<=nx<20 and 0<=ny<20: e["pos"]=[nx,ny]
|
| 68 |
e["step"] += 1
|
|
|
|
| 69 |
def render(self, player_pos, history, battery, score):
|
|
|
|
| 70 |
html="<div style='background:#000;padding:10px;border-radius:12px;font-family:monospace'>"
|
| 71 |
-
html+=f"<div style='color:white;margin-bottom:5px'>π {battery} | π {score}</div>"
|
| 72 |
html+="<div style='display:grid;grid-template-columns:repeat(20,20px);gap:1px;width:fit-content;margin:auto'>"
|
| 73 |
enemy_pos=[tuple(e["pos"]) for e in self.enemies]
|
| 74 |
for y in range(19,-1,-1):
|
|
@@ -89,7 +94,7 @@ class MegaWorldEnv:
|
|
| 89 |
# ---------------------------------------------------------
|
| 90 |
# SERVER CONFIG
|
| 91 |
# ---------------------------------------------------------
|
| 92 |
-
FLAG = "CTF{
|
| 93 |
|
| 94 |
def run_mega_simulation(zip_file):
|
| 95 |
env = MegaWorldEnv()
|
|
@@ -98,28 +103,26 @@ def run_mega_simulation(zip_file):
|
|
| 98 |
yield env.render(env.start, [], 100, 0), {"status": "Waiting for upload..."}
|
| 99 |
return
|
| 100 |
|
| 101 |
-
#
|
| 102 |
run_id = str(uuid.uuid4())
|
| 103 |
temp_dir = os.path.join(tempfile.gettempdir(), "ctf_run_" + run_id)
|
| 104 |
os.makedirs(temp_dir, exist_ok=True)
|
| 105 |
|
| 106 |
try:
|
| 107 |
-
#
|
| 108 |
try:
|
| 109 |
with zipfile.ZipFile(zip_file.name, 'r') as zip_ref:
|
| 110 |
zip_ref.extractall(temp_dir)
|
| 111 |
except Exception as e:
|
| 112 |
-
yield env.render(env.start, [], 0, 0), {"error": f"Invalid Zip
|
| 113 |
return
|
| 114 |
|
| 115 |
-
#
|
| 116 |
agent_path = os.path.join(temp_dir, "agent.py")
|
| 117 |
if not os.path.exists(agent_path):
|
| 118 |
-
yield env.render(env.start, [], 0, 0), {"error": "agent.py not found
|
| 119 |
return
|
| 120 |
|
| 121 |
-
# 4. Dynamic Import with Path Handling
|
| 122 |
-
# We append temp_dir to sys.path so the agent can 'import brain.pkl' from its own folder
|
| 123 |
sys.path.append(temp_dir)
|
| 124 |
|
| 125 |
try:
|
|
@@ -127,10 +130,10 @@ def run_mega_simulation(zip_file):
|
|
| 127 |
agent = importlib.util.module_from_spec(spec)
|
| 128 |
spec.loader.exec_module(agent)
|
| 129 |
except Exception as e:
|
| 130 |
-
yield env.render(env.start, [], 0, 0), {"error": f"Code Error
|
| 131 |
return
|
| 132 |
|
| 133 |
-
#
|
| 134 |
pos = list(env.start)
|
| 135 |
battery = 2500
|
| 136 |
score = 0
|
|
@@ -140,7 +143,6 @@ def run_mega_simulation(zip_file):
|
|
| 140 |
radar = env.get_radar(pos)
|
| 141 |
|
| 142 |
try:
|
| 143 |
-
# Capture STDOUT/STDERR could be added here to prevent log spam
|
| 144 |
action = agent.get_action(tuple(pos), radar, battery)
|
| 145 |
if action not in [0, 1, 2, 3]: action = 0
|
| 146 |
except Exception as e:
|
|
@@ -151,20 +153,28 @@ def run_mega_simulation(zip_file):
|
|
| 151 |
prev_pos = pos[:]
|
| 152 |
nx, ny = pos[0]+dx, pos[1]+dy
|
| 153 |
|
|
|
|
| 154 |
if not (0 <= nx < 20 and 0 <= ny < 20) or (nx, ny) in env.walls:
|
| 155 |
nx, ny = pos
|
| 156 |
pos = [nx, ny]
|
| 157 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 158 |
env.update_enemies(pos)
|
| 159 |
history.append(tuple(pos))
|
| 160 |
battery -= 1
|
| 161 |
if tuple(pos) in env.mud: battery -= 5
|
| 162 |
|
| 163 |
-
#
|
| 164 |
if tuple(pos) == env.goal:
|
| 165 |
-
|
|
|
|
| 166 |
break
|
| 167 |
|
|
|
|
| 168 |
enemy_pos = [tuple(e["pos"]) for e in env.enemies]
|
| 169 |
if battery <= 0:
|
| 170 |
yield env.render(tuple(pos), history, 0, score), {"RESULT": "DIED: Battery Empty"}
|
|
@@ -173,30 +183,26 @@ def run_mega_simulation(zip_file):
|
|
| 173 |
yield env.render(tuple(pos), history, 0, score), {"RESULT": "DIED: Caught by Enemy"}
|
| 174 |
break
|
| 175 |
if tuple(pos) in env.traps:
|
| 176 |
-
battery -= 10
|
| 177 |
|
|
|
|
| 178 |
yield env.render(tuple(pos), history, battery, score), {"step": step}
|
| 179 |
-
time.sleep(0.01)
|
| 180 |
|
| 181 |
finally:
|
| 182 |
-
# 6. Cleanup
|
| 183 |
-
# Remove temp dir from path and filesystem
|
| 184 |
if temp_dir in sys.path:
|
| 185 |
sys.path.remove(temp_dir)
|
| 186 |
shutil.rmtree(temp_dir, ignore_errors=True)
|
| 187 |
|
| 188 |
-
# --- GRADIO INTERFACE ---
|
| 189 |
with gr.Blocks(theme=gr.themes.Monochrome()) as demo:
|
| 190 |
-
gr.Markdown("# π© CTF: The
|
| 191 |
-
gr.Markdown("Upload
|
| 192 |
-
|
| 193 |
with gr.Row():
|
| 194 |
game = gr.HTML(MegaWorldEnv().render((1,1), [], 100, 0))
|
| 195 |
with gr.Column():
|
| 196 |
file_input = gr.File(label="Upload Submission (.zip)", file_types=[".zip"])
|
| 197 |
run_btn = gr.Button("Deploy Agent", variant="primary")
|
| 198 |
-
logs = gr.JSON(label="
|
| 199 |
-
|
| 200 |
run_btn.click(run_mega_simulation, file_input, [game, logs])
|
| 201 |
|
| 202 |
if __name__ == "__main__":
|
|
|
|
| 9 |
import tempfile
|
| 10 |
import uuid
|
| 11 |
|
| 12 |
+
# --- MEGA WORLD CLASS (SERVER VERSION) ---
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 13 |
RADAR_ENCODING = {"EMPTY": 0,"WALL": 1,"GOAL": 2,"ICE": 3,"MUD": 4,"DANGER": 5,"CHARGER": 6,"ENEMY": 7}
|
| 14 |
+
|
| 15 |
class MegaWorldEnv:
|
| 16 |
def __init__(self):
|
| 17 |
self.start = (1, 1); self.goal = (18, 18)
|
|
|
|
| 22 |
self.chargers = [(18,2),(10,10)]
|
| 23 |
self.enemies = [{"pos":[5,5],"type":"patrol","axis":"x","range":(5,10),"dir":1},{"pos":[15,5],"type":"patrol","axis":"x","range":(12,17),"dir":1},{"pos":[12,12],"type":"hunter", "step": 0}, {"pos":[16,16],"type":"hunter", "step": 0}]
|
| 24 |
random.shuffle(self.enemies)
|
| 25 |
+
|
| 26 |
def _generate_walls(self):
|
| 27 |
walls = []
|
| 28 |
for y in range(20):
|
|
|
|
| 33 |
for i in range(15, 19): walls.append((i, 15))
|
| 34 |
walls.extend([(14,14), (13,13)])
|
| 35 |
return walls
|
| 36 |
+
|
| 37 |
+
# *** ENABLED SHAPED REWARD HERE ***
|
| 38 |
def shaped_reward(self, old_pos, new_pos):
|
| 39 |
+
old_dist = abs(old_pos[0]-self.goal[0]) + abs(old_pos[1]-self.goal[1])
|
| 40 |
+
new_dist = abs(new_pos[0]-self.goal[0]) + abs(new_pos[1]-self.goal[1])
|
| 41 |
+
return 3.0 * (old_dist - new_dist)
|
| 42 |
+
|
| 43 |
def get_radar(self, pos):
|
| 44 |
x,y=pos; radar={}
|
| 45 |
dirs={"up":(x,y+1),"down":(x,y-1),"left":(x-1,y),"right":(x+1,y)}
|
|
|
|
| 56 |
if tuple(e["pos"])==(nx,ny): info="ENEMY"
|
| 57 |
radar[d]=RADAR_ENCODING[info]
|
| 58 |
return radar
|
| 59 |
+
|
| 60 |
def update_enemies(self, player_pos):
|
| 61 |
for e in self.enemies:
|
| 62 |
if e["type"]=="patrol":
|
|
|
|
| 69 |
nx, ny = e["pos"][0] + move[0], e["pos"][1] + move[1]
|
| 70 |
if (nx, ny) not in self.walls and 0<=nx<20 and 0<=ny<20: e["pos"]=[nx,ny]
|
| 71 |
e["step"] += 1
|
| 72 |
+
|
| 73 |
def render(self, player_pos, history, battery, score):
|
| 74 |
+
# HTML Visualizer
|
| 75 |
html="<div style='background:#000;padding:10px;border-radius:12px;font-family:monospace'>"
|
| 76 |
+
html+=f"<div style='color:white;margin-bottom:5px'>π {battery} | π {score:.1f}</div>" # Added .1f for float formatting
|
| 77 |
html+="<div style='display:grid;grid-template-columns:repeat(20,20px);gap:1px;width:fit-content;margin:auto'>"
|
| 78 |
enemy_pos=[tuple(e["pos"]) for e in self.enemies]
|
| 79 |
for y in range(19,-1,-1):
|
|
|
|
| 94 |
# ---------------------------------------------------------
|
| 95 |
# SERVER CONFIG
|
| 96 |
# ---------------------------------------------------------
|
| 97 |
+
FLAG = "CTF{r3w4rd_sh4p1ng_1s_th3_k3y}"
|
| 98 |
|
| 99 |
def run_mega_simulation(zip_file):
|
| 100 |
env = MegaWorldEnv()
|
|
|
|
| 103 |
yield env.render(env.start, [], 100, 0), {"status": "Waiting for upload..."}
|
| 104 |
return
|
| 105 |
|
| 106 |
+
# Setup Temp Directory
|
| 107 |
run_id = str(uuid.uuid4())
|
| 108 |
temp_dir = os.path.join(tempfile.gettempdir(), "ctf_run_" + run_id)
|
| 109 |
os.makedirs(temp_dir, exist_ok=True)
|
| 110 |
|
| 111 |
try:
|
| 112 |
+
# Extract Zip
|
| 113 |
try:
|
| 114 |
with zipfile.ZipFile(zip_file.name, 'r') as zip_ref:
|
| 115 |
zip_ref.extractall(temp_dir)
|
| 116 |
except Exception as e:
|
| 117 |
+
yield env.render(env.start, [], 0, 0), {"error": f"Invalid Zip: {e}"}
|
| 118 |
return
|
| 119 |
|
| 120 |
+
# Load Agent
|
| 121 |
agent_path = os.path.join(temp_dir, "agent.py")
|
| 122 |
if not os.path.exists(agent_path):
|
| 123 |
+
yield env.render(env.start, [], 0, 0), {"error": "agent.py not found!"}
|
| 124 |
return
|
| 125 |
|
|
|
|
|
|
|
| 126 |
sys.path.append(temp_dir)
|
| 127 |
|
| 128 |
try:
|
|
|
|
| 130 |
agent = importlib.util.module_from_spec(spec)
|
| 131 |
spec.loader.exec_module(agent)
|
| 132 |
except Exception as e:
|
| 133 |
+
yield env.render(env.start, [], 0, 0), {"error": f"Code Error: {e}"}
|
| 134 |
return
|
| 135 |
|
| 136 |
+
# Run Simulation
|
| 137 |
pos = list(env.start)
|
| 138 |
battery = 2500
|
| 139 |
score = 0
|
|
|
|
| 143 |
radar = env.get_radar(pos)
|
| 144 |
|
| 145 |
try:
|
|
|
|
| 146 |
action = agent.get_action(tuple(pos), radar, battery)
|
| 147 |
if action not in [0, 1, 2, 3]: action = 0
|
| 148 |
except Exception as e:
|
|
|
|
| 153 |
prev_pos = pos[:]
|
| 154 |
nx, ny = pos[0]+dx, pos[1]+dy
|
| 155 |
|
| 156 |
+
# Wall Collision
|
| 157 |
if not (0 <= nx < 20 and 0 <= ny < 20) or (nx, ny) in env.walls:
|
| 158 |
nx, ny = pos
|
| 159 |
pos = [nx, ny]
|
| 160 |
|
| 161 |
+
# *** APPLY SHAPED REWARD ***
|
| 162 |
+
# This updates the score visible on screen
|
| 163 |
+
step_reward = env.shaped_reward(tuple(prev_pos), tuple(pos))
|
| 164 |
+
score += step_reward
|
| 165 |
+
|
| 166 |
env.update_enemies(pos)
|
| 167 |
history.append(tuple(pos))
|
| 168 |
battery -= 1
|
| 169 |
if tuple(pos) in env.mud: battery -= 5
|
| 170 |
|
| 171 |
+
# Win
|
| 172 |
if tuple(pos) == env.goal:
|
| 173 |
+
score += 1000
|
| 174 |
+
yield env.render(tuple(pos), history, battery, score), {"RESULT": f"VICTORY! {FLAG}"}
|
| 175 |
break
|
| 176 |
|
| 177 |
+
# Loss
|
| 178 |
enemy_pos = [tuple(e["pos"]) for e in env.enemies]
|
| 179 |
if battery <= 0:
|
| 180 |
yield env.render(tuple(pos), history, 0, score), {"RESULT": "DIED: Battery Empty"}
|
|
|
|
| 183 |
yield env.render(tuple(pos), history, 0, score), {"RESULT": "DIED: Caught by Enemy"}
|
| 184 |
break
|
| 185 |
if tuple(pos) in env.traps:
|
| 186 |
+
battery -= 10
|
| 187 |
|
| 188 |
+
# Render
|
| 189 |
yield env.render(tuple(pos), history, battery, score), {"step": step}
|
| 190 |
+
time.sleep(0.01)
|
| 191 |
|
| 192 |
finally:
|
|
|
|
|
|
|
| 193 |
if temp_dir in sys.path:
|
| 194 |
sys.path.remove(temp_dir)
|
| 195 |
shutil.rmtree(temp_dir, ignore_errors=True)
|
| 196 |
|
|
|
|
| 197 |
with gr.Blocks(theme=gr.themes.Monochrome()) as demo:
|
| 198 |
+
gr.Markdown("# π© CTF: The Fortress Run")
|
| 199 |
+
gr.Markdown("Upload `solution.zip` to run your agent.")
|
|
|
|
| 200 |
with gr.Row():
|
| 201 |
game = gr.HTML(MegaWorldEnv().render((1,1), [], 100, 0))
|
| 202 |
with gr.Column():
|
| 203 |
file_input = gr.File(label="Upload Submission (.zip)", file_types=[".zip"])
|
| 204 |
run_btn = gr.Button("Deploy Agent", variant="primary")
|
| 205 |
+
logs = gr.JSON(label="Status")
|
|
|
|
| 206 |
run_btn.click(run_mega_simulation, file_input, [game, logs])
|
| 207 |
|
| 208 |
if __name__ == "__main__":
|